Files
kontor/kontor-api/src/core/security.py
T

163 lines
5.8 KiB
Python

import logging
from datetime import datetime, timedelta, timezone
from typing import Annotated, Dict, List, Optional
import bcrypt
from fastapi import Depends, HTTPException, Request, Security, status
from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
from fastapi.security import OAuth2, OAuth2PasswordBearer, SecurityScopes
from fastapi.security.utils import get_authorization_scheme_param
from jose import JWTError, jwt
from pydantic import ValidationError
from src.core.config import settings
from src.core.log_conf import logger
from src.db.models.admin import Profile
from src.db.repository.admin import get_profile, is_database_empty
from src.db.session import SessionLocal
from src.schema.admin import ProfileModel, TokenData
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="/api/login/token",
scopes={"me": "read", "admin": "read"},
)
class OAuth2PasswordBearerWithCookie(OAuth2):
def __init__(
self,
tokenUrl: str,
scheme_name: Optional[str] = None,
scopes: Optional[Dict[str, str]] = None,
auto_error: bool = True,
):
if not scopes:
scopes = {}
flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes}) # type: ignore
super().__init__(flows=flows, scheme_name=scheme_name, auto_error=auto_error)
async def __call__(self, request: Request) -> Optional[str]:
authorization: str = request.cookies.get("access_token") # changed to accept access token from httpOnly Cookie
scheme, param = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "bearer":
if self.auto_error:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
else:
return None
return param
def authenticate_user(username: str, password: str) -> Optional[Profile]:
with SessionLocal() as db:
user = get_profile(username=username, db=db)
logger.debug(user)
if not user:
if is_database_empty(db):
logger.info("database is empty, use temporary access")
user = Profile()
user.email = "init_user@thpeetz.de"
return user
return None
else:
if bcrypt.checkpw(password.encode(), user.password.encode()):
logger.info("User successful authenticated")
else:
logger.info("Authentication failed!")
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(
to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM
)
return encoded_jwt
async def get_current_user(
security_scopes: SecurityScopes, token: Annotated[str, Depends(oauth2_scheme)]
):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
username: str = payload.get("sub") # type: ignore
logger.info("username/email extracted is %s", username)
if username is None:
raise credentials_exception
scope: str = payload.get("scope", "")
token_scopes: List[str] = scope.split(" ")
token_data = TokenData(scopes=token_scopes, username=username)
except (JWTError, ValidationError):
raise credentials_exception
with SessionLocal() as db:
user = get_profile(username=token_data.username, db=db) # type: ignore
if user is None:
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_scopes:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user
async def get_current_active_user(
current_user: Annotated[Profile, Security(get_current_user, scopes=["me"])],
) -> ProfileModel:
if not current_user.enabled: # type: ignore
raise HTTPException(status_code=400, detail="Inactive user")
user_model = ProfileModel(
username=current_user.user_name,
email=current_user.email, # type: ignore
first_name=current_user.first_name,
last_name=current_user.last_name, # type: ignore
active=current_user.enabled,
) # type: ignore
return user_model
def get_current_user_from_token(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
username: str = payload.get("sub") # type: ignore
logger.info("username/email extracted is %s", username)
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
with SessionLocal() as db:
user = get_profile(username=username, db=db)
if user is None:
raise credentials_exception
return user