mitlist/be/app/api/auth/oauth.py
mohamad 1897d48188 feat: Update JWT refresh token endpoint to include user manager dependency
This commit enhances the JWT refresh token functionality by introducing a user manager dependency for loading user instances from the database. It also adds detailed documentation for the refresh token process, ensuring clarity on the expected request format and error handling for invalid tokens. These changes aim to improve security and maintainability of the authentication flow.
2025-06-22 12:46:01 +02:00

131 lines
5.4 KiB
Python

from fastapi import APIRouter, Depends, Request, HTTPException, status
from fastapi.responses import RedirectResponse, JSONResponse
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.database import get_transactional_session
from app.models import User
from app.auth import oauth, fastapi_users, auth_backend, get_jwt_strategy, get_refresh_jwt_strategy, get_user_manager
from app.config import settings
from fastapi.security import OAuth2PasswordRequestForm
router = APIRouter()
@router.get('/google/login')
async def google_login(request: Request):
return await oauth.google.authorize_redirect(request, settings.GOOGLE_REDIRECT_URI)
@router.get('/google/callback')
async def google_callback(request: Request, db: AsyncSession = Depends(get_transactional_session)):
token_data = await oauth.google.authorize_access_token(request)
user_info = await oauth.google.parse_id_token(request, token_data)
existing_user = (await db.execute(select(User).where(User.email == user_info['email']))).scalar_one_or_none()
user_to_login = existing_user
if not existing_user:
new_user = User(
email=user_info['email'],
name=user_info.get('name', user_info.get('email')),
is_verified=True,
is_active=True
)
db.add(new_user)
await db.flush()
user_to_login = new_user
access_strategy = get_jwt_strategy()
refresh_strategy = get_refresh_jwt_strategy()
access_token = await access_strategy.write_token(user_to_login)
refresh_token = await refresh_strategy.write_token(user_to_login)
redirect_url = f"{settings.FRONTEND_URL}/auth/callback?access_token={access_token}&refresh_token={refresh_token}"
return RedirectResponse(url=redirect_url)
@router.get('/apple/login')
async def apple_login(request: Request):
return await oauth.apple.authorize_redirect(request, settings.APPLE_REDIRECT_URI)
@router.get('/apple/callback')
async def apple_callback(request: Request, db: AsyncSession = Depends(get_transactional_session)):
token_data = await oauth.apple.authorize_access_token(request)
user_info = token_data.get('user', await oauth.apple.userinfo(token=token_data) if hasattr(oauth.apple, 'userinfo') else {})
if 'email' not in user_info and 'sub' in token_data:
parsed_id_token = await oauth.apple.parse_id_token(request, token_data) if hasattr(oauth.apple, 'parse_id_token') else {}
user_info = {**parsed_id_token, **user_info}
if 'email' not in user_info:
return RedirectResponse(url=f"{settings.FRONTEND_URL}/auth/callback?error=apple_email_missing")
existing_user = (await db.execute(select(User).where(User.email == user_info['email']))).scalar_one_or_none()
user_to_login = existing_user
if not existing_user:
name_info = user_info.get('name', {})
first_name = name_info.get('firstName', '')
last_name = name_info.get('lastName', '')
full_name = f"{first_name} {last_name}".strip() if first_name or last_name else user_info.get('email')
new_user = User(
email=user_info['email'],
name=full_name,
is_verified=True,
is_active=True
)
db.add(new_user)
await db.flush()
user_to_login = new_user
access_strategy = get_jwt_strategy()
refresh_strategy = get_refresh_jwt_strategy()
access_token = await access_strategy.write_token(user_to_login)
refresh_token = await refresh_strategy.write_token(user_to_login)
redirect_url = f"{settings.FRONTEND_URL}/auth/callback?access_token={access_token}&refresh_token={refresh_token}"
return RedirectResponse(url=redirect_url)
@router.post('/jwt/refresh')
async def refresh_jwt_token(
request: Request,
user_manager=Depends(get_user_manager),
):
"""Refresh the JWT access token using a valid refresh token.
The incoming request must provide a JSON body with a ``refresh_token`` field.
A new access and refresh token pair will be returned when the provided refresh
token is valid. If the token is invalid or expired, a *401* error is raised.
"""
data = await request.json()
refresh_token = data.get('refresh_token')
if not refresh_token:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Missing refresh token")
refresh_strategy = get_refresh_jwt_strategy()
try:
# ``read_token`` needs a callback capable of loading the *User* from the
# database. We therefore pass the user manager obtained via dependency
# injection so that the strategy can hydrate the full *User* instance.
user = await refresh_strategy.read_token(refresh_token, user_manager)
except Exception:
# Any error during decoding or lookup should result in an unauthorized
# response to avoid leaking information about token validity.
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token")
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token")
access_strategy = get_jwt_strategy()
access_token = await access_strategy.write_token(user)
new_refresh_token = await refresh_strategy.write_token(user)
return JSONResponse({
"access_token": access_token,
"refresh_token": new_refresh_token,
"token_type": "bearer"
})