secure media endpoints
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 2s

This commit is contained in:
2026-05-17 21:48:40 +02:00
parent cd033f458d
commit 6dd8e12218
4 changed files with 138 additions and 51 deletions
+20 -5
View File
@@ -13,13 +13,23 @@ from pydantic import ValidationError
from src.core.config import settings
from src.core.log_conf import logger
from src.db.models.admin import Profile
from src.db.repository.admin import get_profile_by_username, get_profile_by_email, is_database_empty
from src.db.repository.admin import (
get_profile_by_username,
get_profile_by_email,
is_database_empty,
)
from src.db.session import SessionLocal
from src.schema.admin import ProfileModel, TokenData
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="/token",
scopes={"me": "read", "admin": "read", "ROLE_ADMIN": "admin", "ROLE_MEDIA": "media", "ROLE_USER": "user"},
scopes={
"me": "read",
"admin": "read",
"ROLE_ADMIN": "admin",
"ROLE_MEDIA": "media",
"ROLE_USER": "user",
},
)
@@ -38,7 +48,7 @@ oauth2_scheme = OAuth2PasswordBearer(
# async def __call__(self, request: Request) -> Optional[str]:
# authorization: str = request.cookies.get("access_token") # changed to accept access token from httpOnly Cookie
# scheme, param = get_authorization_scheme_param(authorization)
# if not authorization or scheme.lower() != "bearer":
# if self.auto_error:
@@ -51,6 +61,7 @@ oauth2_scheme = OAuth2PasswordBearer(
# return None
# return param
def authenticate_user_by_email(email: str, password: str) -> Optional[Profile]:
with SessionLocal() as db:
user = get_profile_by_email(email=email, db=db)
@@ -161,6 +172,7 @@ async def get_current_active_user(
def get_current_user_from_token(token: str = Depends(oauth2_scheme)):
""" """
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
@@ -173,10 +185,13 @@ def get_current_user_from_token(token: str = Depends(oauth2_scheme)):
logger.info("username/email extracted is %s", username)
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
except JWTError as exception:
raise credentials_exception from exception
with SessionLocal() as db:
user = get_profile_by_email(email=username, db=db)
if user is None:
raise credentials_exception
return user
UserDep = Annotated[Profile, Depends(get_current_user_from_token)]