from datetime import datetime, timedelta, timezone from typing import Annotated, List, Optional import bcrypt from fastapi import Depends, HTTPException, Security, status #from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel from fastapi.security import ( #OAuth2, OAuth2PasswordBearer, SecurityScopes ) #from fastapi.security.utils import get_authorization_scheme_param from jose import JWTError, jwt from pydantic import ValidationError from src.core.config import settings from src.core.log_conf import logger from src.db.models.admin import Profile from src.db.repository.admin import ( get_profile_by_username, get_profile_by_email, is_database_empty, ) from src.db.session import SessionLocal from src.schema.admin.token import TokenData from src.schema.user.profile import ProfileModel, to_model oauth2_scheme = OAuth2PasswordBearer( tokenUrl="/token", scopes={ "me": "read", "admin": "read", "ROLE_ADMIN": "admin", "ROLE_MEDIA": "media", "ROLE_USER": "user", }, ) # class OAuth2PasswordBearerWithCookie(OAuth2): # def __init__( # self, # tokenUrl: str, # scheme_name: Optional[str] = None, # scopes: Optional[Dict[str, str]] = None, # auto_error: bool = True, # ): # if not scopes: # scopes = {} # flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes}) # type: ignore # super().__init__(flows=flows, scheme_name=scheme_name, auto_error=auto_error) # async def __call__(self, request: Request) -> Optional[str]: # authorization: str = request.cookies.get("access_token") # changed to accept access token from httpOnly Cookie # scheme, param = get_authorization_scheme_param(authorization) # if not authorization or scheme.lower() != "bearer": # if self.auto_error: # raise HTTPException( # status_code=status.HTTP_401_UNAUTHORIZED, # detail="Not authenticated", # headers={"WWW-Authenticate": "Bearer"}, # ) # else: # return None # return param def authenticate_user_by_email(email: str, password: str) -> Optional[Profile]: with SessionLocal() as db: user = get_profile_by_email(email=email, db=db) logger.debug(user) if not user: if is_database_empty(db): logger.info("database is empty, use temporary access") user = Profile() user.email = "init_user@thpeetz.de" return user return None else: if bcrypt.checkpw(password.encode(), user.password.encode()): logger.info("User successful authenticated") else: logger.info("Authentication failed!") return user def authenticate_user_by_username(username: str, password: str) -> Optional[Profile]: with SessionLocal() as db: user = get_profile_by_username(username=username, db=db) logger.debug(user) if not user: if is_database_empty(db): logger.info("database is empty, use temporary access") user = Profile() user.email = "init_user@thpeetz.de" return user return None else: if bcrypt.checkpw(password.encode(), user.password.encode()): logger.info("User successful authenticated") else: logger.info("Authentication failed!") return user def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.now(timezone.utc) + expires_delta else: expire = datetime.now(timezone.utc) + timedelta( minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES ) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode( to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM ) return encoded_jwt async def get_current_user( security_scopes: SecurityScopes, token: Annotated[str, Depends(oauth2_scheme)] ): if security_scopes.scopes: authenticate_value = f'Bearer scope="{security_scopes.scope_str}"' else: authenticate_value = "Bearer" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": authenticate_value}, ) try: payload = jwt.decode( token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) username: str = payload.get("sub") # type: ignore logger.info("username/email extracted is %s", username) if username is None: raise credentials_exception scope: str = payload.get("scope", "") token_scopes: List[str] = scope.split(" ") token_data = TokenData(scopes=token_scopes, username=username) except (JWTError, ValidationError): logger.info("Exception raised", exc_info=True) raise credentials_exception with SessionLocal() as db: user = get_profile_by_username(username=str(token_data.username), db=db) if user is None: logger.info("user not found") raise credentials_exception for scope in security_scopes.scopes: if scope not in token_scopes: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="not enough permissions", headers={"WWW-Authenticate": authenticate_value}, ) return user async def get_current_active_user( current_user: Annotated[Profile, Security(get_current_user, scopes=["me"])], ) -> ProfileModel: if not current_user.enabled: raise HTTPException(status_code=400, detail="Inactive user") user_model = to_model(current_user) return user_model def get_current_user_from_token(token: str = Depends(oauth2_scheme)): """ """ credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", ) try: payload = jwt.decode( token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) username: Optional[str] = payload.get("sub") logger.info("username/email extracted is %s", username) if username is None: raise credentials_exception except JWTError as exception: raise credentials_exception from exception with SessionLocal() as db: user = get_profile_by_email(email=username, db=db) if user is None: user = get_profile_by_username(username=username, db=db) if user is None: raise credentials_exception return user UserDep = Annotated[Profile, Depends(get_current_user_from_token)] CurrentUser = Annotated[Profile, Depends(get_current_active_user)]