diff --git a/kontor-api/src/apis/version1/admin.py b/kontor-api/src/apis/version1/admin.py index 6214a45..8185435 100644 --- a/kontor-api/src/apis/version1/admin.py +++ b/kontor-api/src/apis/version1/admin.py @@ -5,7 +5,7 @@ from fastapi import APIRouter, Body, HTTPException, status, Depends, Response from fastapi.security import OAuth2PasswordRequestForm from src.core.config import settings -from src.core.security import create_access_token, authenticate_user, get_current_active_user +from src.core.security import create_access_token, authenticate_user_by_username, get_current_active_user from src.db.models.admin import Profile from src.schema.admin import Token, ProfileModel from src.webapps.auth.forms import LoginForm @@ -15,7 +15,7 @@ router = APIRouter() @router.post("/token") def login_for_access_token(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]) -> Token: - user = authenticate_user(form_data.username, form_data.password) + user = authenticate_user_by_username(form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, @@ -31,7 +31,7 @@ def login_for_access_token(form_data: Annotated[OAuth2PasswordRequestForm, Depen # @router.post("/token-cookie", response_model=Token) def login_for_token_cookie(response: Response, form_data: LoginForm = Depends()): - user = authenticate_user(form_data.username, form_data.password) # type: ignore + user = authenticate_user_by_username(form_data.username, form_data.password) # type: ignore if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, diff --git a/kontor-api/src/apis/version1/login.py b/kontor-api/src/apis/version1/login.py index a0b763b..9c35590 100644 --- a/kontor-api/src/apis/version1/login.py +++ b/kontor-api/src/apis/version1/login.py @@ -6,7 +6,7 @@ from pydantic import BaseModel from typing import Annotated from src.core.config import settings from src.core.log_conf import logger -from src.core.security import authenticate_user, create_access_token +from src.core.security import authenticate_user_by_email, authenticate_user_by_username, create_access_token from src.schema.admin import Token login_router = APIRouter() @@ -26,7 +26,7 @@ class LoginRequest(BaseModel): ) def login(request: LoginRequest) -> Token: logger.info(f"login with {request.email}") - user = authenticate_user(request.email, request.password) + user = authenticate_user_by_email(request.email, request.password) scopes = ["admin", "read"] if not user: raise HTTPException( @@ -45,7 +45,7 @@ def login(request: LoginRequest) -> Token: async def login_for_access_token( form_data: Annotated[OAuth2PasswordRequestForm, Depends()], ) -> Token: - user = authenticate_user(form_data.username, form_data.password) + user = authenticate_user_by_username(form_data.username, form_data.password) if not user: raise HTTPException(status_code=400, detail="Incorrect username or password") access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) diff --git a/kontor-api/src/core/security.py b/kontor-api/src/core/security.py index 19d30d2..41433c5 100644 --- a/kontor-api/src/core/security.py +++ b/kontor-api/src/core/security.py @@ -13,7 +13,7 @@ from pydantic import ValidationError from src.core.config import settings from src.core.log_conf import logger from src.db.models.admin import Profile -from src.db.repository.admin import get_profile_by_username, is_database_empty +from src.db.repository.admin import get_profile_by_username, get_profile_by_email, is_database_empty from src.db.session import SessionLocal from src.schema.admin import ProfileModel, TokenData @@ -51,10 +51,28 @@ oauth2_scheme = OAuth2PasswordBearer( # return None # return param - -def authenticate_user(username: str, password: str) -> Optional[Profile]: +def authenticate_user_by_email(email: str, password: str) -> Optional[Profile]: with SessionLocal() as db: - user = get_profile(username=username, db=db) + user = get_profile_by_email(email=email, db=db) + logger.debug(user) + if not user: + if is_database_empty(db): + logger.info("database is empty, use temporary access") + user = Profile() + user.email = "init_user@thpeetz.de" + return user + return None + else: + if bcrypt.checkpw(password.encode(), user.password.encode()): + logger.info("User successful authenticated") + else: + logger.info("Authentication failed!") + return user + + +def authenticate_user_by_username(username: str, password: str) -> Optional[Profile]: + with SessionLocal() as db: + user = get_profile_by_username(username=username, db=db) logger.debug(user) if not user: if is_database_empty(db):