This commit is contained in:
@@ -0,0 +1,56 @@
|
||||
from datetime import timedelta
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer, SecurityScopes
|
||||
from pydantic import BaseModel
|
||||
from typing import Annotated
|
||||
from src.core.config import settings
|
||||
from src.core.log_conf import logger
|
||||
from src.core.security import authenticate_user_by_email, authenticate_user_by_username, create_access_token
|
||||
from src.schema.admin import Token
|
||||
|
||||
login_router = APIRouter()
|
||||
|
||||
|
||||
class LoginRequest(BaseModel):
|
||||
email: str | None = None
|
||||
password: str | None = None
|
||||
|
||||
|
||||
@login_router.post(
|
||||
"/login",
|
||||
tags=["login"],
|
||||
summary="Login and get token",
|
||||
response_description="Return HTTP status code 200 (OK)",
|
||||
status_code=status.HTTP_200_OK,
|
||||
)
|
||||
def login(request: LoginRequest) -> Token:
|
||||
logger.info(f"login with {request.email}")
|
||||
user = authenticate_user_by_email(str(request.email), str(request.password))
|
||||
scopes = ["admin", "read"]
|
||||
if not user:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Incorrect username or password",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
|
||||
access_token = create_access_token(
|
||||
data={"sub": user.email, "scope": " ".join(scopes)},
|
||||
expires_delta=access_token_expires,
|
||||
)
|
||||
return Token(access_token=access_token, token_type="bearer")
|
||||
|
||||
@login_router.post("/token", tags=["login"], summary="Login for access token")
|
||||
async def login_for_access_token(
|
||||
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
|
||||
) -> Token:
|
||||
user = authenticate_user_by_username(form_data.username, form_data.password)
|
||||
if not user:
|
||||
raise HTTPException(status_code=400, detail="Incorrect username or password")
|
||||
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
|
||||
access_token = create_access_token(
|
||||
data={"sub": user.user_name, "scope": " ".join(form_data.scopes)},
|
||||
expires_delta=access_token_expires,
|
||||
)
|
||||
return Token(access_token=access_token, token_type="bearer")
|
||||
Reference in New Issue
Block a user