from datetime import datetime, timedelta, timezone from typing import Annotated, Final, Optional from urllib.parse import urlencode from fastapi import Body, Depends, HTTPException, Request, status from fastapi.responses import RedirectResponse from fastapi.security.http import HTTPBasic from config import ( OIDC_ENABLED, OIDC_END_SESSION_ENDPOINT, OIDC_REDIRECT_URI, OIDC_RP_INITIATED_LOGOUT, ) from decorators.auth import oauth from endpoints.forms.identity import OAuth2RequestForm from endpoints.responses.oauth import OIDCLogoutResponse, TokenResponse from exceptions.auth_exceptions import ( AuthCredentialsException, OIDCDisabledException, OIDCNotConfiguredException, UserDisabledException, ) from handler.auth import auth_handler, oauth_handler, oidc_handler from handler.database import db_user_handler from logger.formatter import CYAN from logger.formatter import highlight as hl from logger.logger import log from utils.router import APIRouter ACCESS_TOKEN_EXPIRE_SECONDS: Final = 30 * 60 # 30 minutes REFRESH_TOKEN_EXPIRE_DAYS: Final = 7 router = APIRouter( tags=["auth"], ) # Session authentication endpoints @router.post("/login") def login( request: Request, credentials=Depends(HTTPBasic()), # noqa ) -> None: """Session login endpoint Args: request (Request): Fastapi Request object credentials: Defaults to Depends(HTTPBasic()). Raises: CredentialsException: Invalid credentials UserDisabledException: Auth is disabled """ user = auth_handler.authenticate_user(credentials.username, credentials.password) if not user: raise AuthCredentialsException if not user.enabled: raise UserDisabledException request.session["iss"] = "romm:auth" request.session["sub"] = user.username # Update last login and active times now = datetime.now(timezone.utc) db_user_handler.update_user(user.id, {"last_login": now, "last_active": now}) @router.post("/logout", status_code=status.HTTP_200_OK) async def logout(request: Request) -> Optional[OIDCLogoutResponse]: """Session logout endpoint Args: request (Request): Fastapi Request object Returns: Optional[dict]: When OIDC RP-Initiated Logout is enabled and the session contains an OIDC id_token, returns a dict with the OIDC end-session URL so the client can redirect the browser to log out of the OIDC provider. """ id_token = request.session.get("oidc_id_token") request.session.clear() if OIDC_RP_INITIATED_LOGOUT and id_token: end_session_endpoint = OIDC_END_SESSION_ENDPOINT if not end_session_endpoint and oauth.openid: try: metadata = await oauth.openid.load_server_metadata() end_session_endpoint = metadata.get("end_session_endpoint", "") except Exception: log.warning( "Failed to load OIDC server metadata for RP-Initiated Logout" ) if end_session_endpoint: params = urlencode({"id_token_hint": id_token}) return {"oidc_logout_url": f"{end_session_endpoint}?{params}"} return None @router.post("/token") async def token(form_data: Annotated[OAuth2RequestForm, Depends()]) -> TokenResponse: """OAuth2 token endpoint Args: form_data (Annotated[OAuth2RequestForm, Depends): Form Data with OAuth2 info Raises: HTTPException: Missing refresh token HTTPException: Invalid refresh token HTTPException: Missing username or password HTTPException: Invalid username or password HTTPException: Client credentials are not yet supported HTTPException: Invalid or unsupported grant type HTTPException: Insufficient scope Returns: TokenResponse: TypedDict with the new generated token info """ # Support refreshing access tokens if form_data.grant_type == "refresh_token": token = form_data.refresh_token if not token: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Missing refresh token" ) user, claims = await oauth_handler.consume_refresh_token(token) if not user or not claims: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token" ) if not user.enabled: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User account is disabled" ) access_token = oauth_handler.create_access_token( data={ "sub": user.username, "iss": "romm:oauth", "scopes": claims.get("scopes"), }, expires_delta=timedelta(seconds=ACCESS_TOKEN_EXPIRE_SECONDS), ) refresh_token = oauth_handler.create_refresh_token( data={ "sub": user.username, "iss": "romm:oauth", "scopes": claims.get("scopes"), }, expires_delta=timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS), ) return { "access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer", # trunk-ignore(bandit/B105) "expires": ACCESS_TOKEN_EXPIRE_SECONDS, "refresh_expires": REFRESH_TOKEN_EXPIRE_DAYS * 24 * 60 * 60, } # Authentication via username/password elif form_data.grant_type == "password": if not form_data.username or not form_data.password: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Missing username or password", ) user = auth_handler.authenticate_user(form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid username or password", ) if not user.enabled: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User account is disabled" ) # TODO: Authentication via client_id/client_secret elif form_data.grant_type == "client_credentials": raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Client credentials are not yet supported", ) else: # All other grant types are unsupported raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid or unsupported grant type", ) # Check if user has access to requested scopes if not set(form_data.scopes).issubset(user.oauth_scopes): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient scope", ) access_token = oauth_handler.create_access_token( data={ "sub": user.username, "iss": "romm:oauth", "scopes": " ".join(form_data.scopes), }, expires_delta=timedelta(seconds=ACCESS_TOKEN_EXPIRE_SECONDS), ) refresh_token = oauth_handler.create_refresh_token( data={ "sub": user.username, "iss": "romm:oauth", "scopes": " ".join(form_data.scopes), }, expires_delta=timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS), ) return { "access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer", # trunk-ignore(bandit/B105) "expires": ACCESS_TOKEN_EXPIRE_SECONDS, "refresh_expires": REFRESH_TOKEN_EXPIRE_DAYS * 24 * 60 * 60, } # OIDC login and callback endpoints @router.get("/login/openid") async def login_via_openid(request: Request): """OIDC login endpoint Args: request (Request): Fastapi Request object Raises: OIDCDisabledException: OAuth is disabled OIDCNotConfiguredException: OAuth not configured Returns: RedirectResponse: Redirect to OIDC provider """ if not OIDC_ENABLED: raise OIDCDisabledException if not oauth.openid: raise OIDCNotConfiguredException return await oauth.openid.authorize_redirect(request, OIDC_REDIRECT_URI) @router.get("/oauth/openid") async def auth_openid(request: Request): """OIDC callback endpoint Args: request (Request): Fastapi Request object Raises: OIDCDisabledException: OAuth is disabled OIDCNotConfiguredException: OAuth not configured AuthCredentialsException: Invalid credentials UserDisabledException: Auth is disabled Returns: RedirectResponse: Redirect to home page """ if not OIDC_ENABLED: raise OIDCDisabledException if not oauth.openid: raise OIDCNotConfiguredException token = await oauth.openid.authorize_access_token(request) potential_user, _userinfo = ( await oidc_handler.get_current_active_user_from_openid_token(token) ) if not potential_user: raise AuthCredentialsException if not potential_user.enabled: raise UserDisabledException request.session["iss"] = "romm:auth" request.session["sub"] = potential_user.username if OIDC_RP_INITIATED_LOGOUT: request.session["oidc_id_token"] = token.get("id_token", "") # Update last login and active times now = datetime.now(timezone.utc) db_user_handler.update_user( potential_user.id, {"last_login": now, "last_active": now} ) return RedirectResponse(url="/") @router.post("/forgot-password", status_code=status.HTTP_200_OK) def request_password_reset(username: str = Body(..., embed=True)) -> None: """Request a password reset link for the user. Args: username (str): Username of the user requesting the reset Returns: None: Returns 200 OK status """ user = db_user_handler.get_user_by_username(username) if user: auth_handler.generate_password_reset_token(user) else: log.warning( f"Reset password link requested for a user {hl(username, color=CYAN)}, but that username does not exist." ) @router.post("/reset-password", status_code=status.HTTP_200_OK) async def reset_password( token: str = Body(..., embed=True), new_password: str = Body(..., embed=True), ) -> None: """Reset password using the token. Args: token (str): Reset token from the URL new_password (str): New user password Returns: None: Returns 200 OK status """ user = auth_handler.verify_password_reset_token(token) await auth_handler.set_user_new_password(user, new_password) log.info( f"Password was successfully reset for user {hl(user.username, color=CYAN)}." )