update usage of api for login by email or username
This commit is contained in:
@@ -5,7 +5,7 @@ from fastapi import APIRouter, Body, HTTPException, status, Depends, Response
|
|||||||
from fastapi.security import OAuth2PasswordRequestForm
|
from fastapi.security import OAuth2PasswordRequestForm
|
||||||
|
|
||||||
from src.core.config import settings
|
from src.core.config import settings
|
||||||
from src.core.security import create_access_token, authenticate_user, get_current_active_user
|
from src.core.security import create_access_token, authenticate_user_by_username, get_current_active_user
|
||||||
from src.db.models.admin import Profile
|
from src.db.models.admin import Profile
|
||||||
from src.schema.admin import Token, ProfileModel
|
from src.schema.admin import Token, ProfileModel
|
||||||
from src.webapps.auth.forms import LoginForm
|
from src.webapps.auth.forms import LoginForm
|
||||||
@@ -15,7 +15,7 @@ router = APIRouter()
|
|||||||
|
|
||||||
@router.post("/token")
|
@router.post("/token")
|
||||||
def login_for_access_token(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]) -> Token:
|
def login_for_access_token(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]) -> Token:
|
||||||
user = authenticate_user(form_data.username, form_data.password)
|
user = authenticate_user_by_username(form_data.username, form_data.password)
|
||||||
if not user:
|
if not user:
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||||
@@ -31,7 +31,7 @@ def login_for_access_token(form_data: Annotated[OAuth2PasswordRequestForm, Depen
|
|||||||
|
|
||||||
# @router.post("/token-cookie", response_model=Token)
|
# @router.post("/token-cookie", response_model=Token)
|
||||||
def login_for_token_cookie(response: Response, form_data: LoginForm = Depends()):
|
def login_for_token_cookie(response: Response, form_data: LoginForm = Depends()):
|
||||||
user = authenticate_user(form_data.username, form_data.password) # type: ignore
|
user = authenticate_user_by_username(form_data.username, form_data.password) # type: ignore
|
||||||
if not user:
|
if not user:
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ from pydantic import BaseModel
|
|||||||
from typing import Annotated
|
from typing import Annotated
|
||||||
from src.core.config import settings
|
from src.core.config import settings
|
||||||
from src.core.log_conf import logger
|
from src.core.log_conf import logger
|
||||||
from src.core.security import authenticate_user, create_access_token
|
from src.core.security import authenticate_user_by_email, authenticate_user_by_username, create_access_token
|
||||||
from src.schema.admin import Token
|
from src.schema.admin import Token
|
||||||
|
|
||||||
login_router = APIRouter()
|
login_router = APIRouter()
|
||||||
@@ -26,7 +26,7 @@ class LoginRequest(BaseModel):
|
|||||||
)
|
)
|
||||||
def login(request: LoginRequest) -> Token:
|
def login(request: LoginRequest) -> Token:
|
||||||
logger.info(f"login with {request.email}")
|
logger.info(f"login with {request.email}")
|
||||||
user = authenticate_user(request.email, request.password)
|
user = authenticate_user_by_email(request.email, request.password)
|
||||||
scopes = ["admin", "read"]
|
scopes = ["admin", "read"]
|
||||||
if not user:
|
if not user:
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
@@ -45,7 +45,7 @@ def login(request: LoginRequest) -> Token:
|
|||||||
async def login_for_access_token(
|
async def login_for_access_token(
|
||||||
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
|
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
|
||||||
) -> Token:
|
) -> Token:
|
||||||
user = authenticate_user(form_data.username, form_data.password)
|
user = authenticate_user_by_username(form_data.username, form_data.password)
|
||||||
if not user:
|
if not user:
|
||||||
raise HTTPException(status_code=400, detail="Incorrect username or password")
|
raise HTTPException(status_code=400, detail="Incorrect username or password")
|
||||||
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
|
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ from pydantic import ValidationError
|
|||||||
from src.core.config import settings
|
from src.core.config import settings
|
||||||
from src.core.log_conf import logger
|
from src.core.log_conf import logger
|
||||||
from src.db.models.admin import Profile
|
from src.db.models.admin import Profile
|
||||||
from src.db.repository.admin import get_profile_by_username, is_database_empty
|
from src.db.repository.admin import get_profile_by_username, get_profile_by_email, is_database_empty
|
||||||
from src.db.session import SessionLocal
|
from src.db.session import SessionLocal
|
||||||
from src.schema.admin import ProfileModel, TokenData
|
from src.schema.admin import ProfileModel, TokenData
|
||||||
|
|
||||||
@@ -51,10 +51,28 @@ oauth2_scheme = OAuth2PasswordBearer(
|
|||||||
# return None
|
# return None
|
||||||
# return param
|
# return param
|
||||||
|
|
||||||
|
def authenticate_user_by_email(email: str, password: str) -> Optional[Profile]:
|
||||||
def authenticate_user(username: str, password: str) -> Optional[Profile]:
|
|
||||||
with SessionLocal() as db:
|
with SessionLocal() as db:
|
||||||
user = get_profile(username=username, db=db)
|
user = get_profile_by_email(email=email, db=db)
|
||||||
|
logger.debug(user)
|
||||||
|
if not user:
|
||||||
|
if is_database_empty(db):
|
||||||
|
logger.info("database is empty, use temporary access")
|
||||||
|
user = Profile()
|
||||||
|
user.email = "init_user@thpeetz.de"
|
||||||
|
return user
|
||||||
|
return None
|
||||||
|
else:
|
||||||
|
if bcrypt.checkpw(password.encode(), user.password.encode()):
|
||||||
|
logger.info("User successful authenticated")
|
||||||
|
else:
|
||||||
|
logger.info("Authentication failed!")
|
||||||
|
return user
|
||||||
|
|
||||||
|
|
||||||
|
def authenticate_user_by_username(username: str, password: str) -> Optional[Profile]:
|
||||||
|
with SessionLocal() as db:
|
||||||
|
user = get_profile_by_username(username=username, db=db)
|
||||||
logger.debug(user)
|
logger.debug(user)
|
||||||
if not user:
|
if not user:
|
||||||
if is_database_empty(db):
|
if is_database_empty(db):
|
||||||
|
|||||||
Reference in New Issue
Block a user