from datetime import timedelta from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer, SecurityScopes from pydantic import BaseModel from typing import Annotated from src.core.config import settings from src.core.log_conf import logger from src.core.security import authenticate_user, create_access_token from src.schema.admin import Token login_router = APIRouter() class LoginRequest(BaseModel): email: str | None = None password: str | None = None @login_router.post( "/login", tags=["login"], summary="Login and get token", response_description="Return HTTP status code 200 (OK)", status_code=status.HTTP_200_OK, ) def login(request: LoginRequest) -> Token: logger.info(f"login with {request.email}") user = authenticate_user(request.email, request.password) scopes = ["admin", "read"] if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.email, "scope": " ".join(scopes)}, expires_delta=access_token_expires, ) return Token(access_token=access_token, token_type="bearer") @login_router.post("/token", tags=["login"], summary="Login for access token") async def login_for_access_token( form_data: Annotated[OAuth2PasswordRequestForm, Depends()], ) -> Token: user = authenticate_user(form_data.username, form_data.password) if not user: raise HTTPException(status_code=400, detail="Incorrect username or password") access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.user_name, "scope": " ".join(form_data.scopes)}, expires_delta=access_token_expires, ) return Token(access_token=access_token, token_type="bearer")