Files
kontor/kontor-api/src/core/security.py
T
2025-11-06 16:37:50 +01:00

160 lines
5.6 KiB
Python

import logging
from datetime import datetime, timezone
from datetime import timedelta
from typing import Optional, Annotated, List
from typing import Dict
from typing import Optional
from fastapi import HTTPException, Security
from fastapi import Request
from fastapi import status
from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
from fastapi.security.utils import get_authorization_scheme_param
import bcrypt
from fastapi import Depends
from fastapi.security import SecurityScopes, OAuth2PasswordBearer, OAuth2
from pydantic import ValidationError
from src.core.config import settings
from jose import jwt, JWTError
from src.core.log_conf import logger
from src.db.models.admin import Profile
from src.db.repository.admin import get_profile
from src.db.session import SessionLocal
from src.schema.admin import TokenData, ProfileModel
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="/api/login/token",
scopes={"me": "read", "admin": "read"},
)
class OAuth2PasswordBearerWithCookie(OAuth2):
def __init__(
self,
tokenUrl: str,
scheme_name: Optional[str] = None,
scopes: Optional[Dict[str, str]] = None,
auto_error: bool = True,
):
if not scopes:
scopes = {}
flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes})
super().__init__(flows=flows, scheme_name=scheme_name, auto_error=auto_error)
async def __call__(self, request: Request) -> Optional[str]:
authorization: str = request.cookies.get(
"access_token"
) # changed to accept access token from httpOnly Cookie
scheme, param = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "bearer":
if self.auto_error:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
else:
return None
return param
def authenticate_user(username: str, password: str) -> Optional[Profile]:
with SessionLocal() as db:
user = get_profile(username=username, db=db)
logger.info(user)
if not user:
return None
if bcrypt.checkpw(password.encode(), user.password.encode()):
print("User successful authenticated")
else:
logger.info("Authentication failed!")
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(
to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM
)
return encoded_jwt
async def get_current_user(security_scopes: SecurityScopes, token: Annotated[str, Depends(oauth2_scheme)]):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
username: str = payload.get("sub")
logger.info("username/email extracted is ", username)
if username is None:
raise credentials_exception
scope: str = payload.get("scope", "")
token_scopes: List[str] = scope.split(" ")
token_data = TokenData(scopes=token_scopes, username=username)
except (JWTError, ValidationError):
raise credentials_exception
with SessionLocal() as db:
user = get_profile(username=token_data.username, db=db)
if user is None:
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_scopes:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user
async def get_current_active_user(
current_user: Annotated[Profile, Security(get_current_user, scopes=["me"])],
) -> ProfileModel:
if not current_user.enabled:
raise HTTPException(status_code=400, detail="Inactive user")
user_model = ProfileModel(username=current_user.user_name, email=current_user.email,
first_name=current_user.first_name, last_name=current_user.last_name,
active=current_user.enabled)
return user_model
def get_current_user_from_token(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
username: str = payload.get("sub")
logger.info("username/email extracted is ", username)
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
with SessionLocal() as db:
user = get_profile(username=username, db=db)
if user is None:
raise credentials_exception
return user