add display for MetaData

This commit is contained in:
Thomas Peetz
2025-05-04 12:27:03 +02:00
parent 4cf1941f44
commit 13dad3961c
33 changed files with 609 additions and 44 deletions
+69
View File
@@ -0,0 +1,69 @@
import logging
from datetime import timedelta
import bcrypt
from fastapi import APIRouter, HTTPException, status, Response, Depends
from fastapi.security import OAuth2PasswordRequestForm
from jose import jwt, JWTError
from src.apis.utils import SessionDep, OAuth2PasswordBearerWithCookie
from src.core.config import settings
from src.core.security import create_access_token
from src.db.models.admin import Profile
from src.db.repository.admin import get_profile
from src.schema.admin import Token
router = APIRouter()
def authenticate_user(username: str, password: str, db: SessionDep) -> Profile | None:
user = get_profile(username=username, db=db)
print(user)
if not user:
return None
if bcrypt.checkpw(password.encode(), user.password.encode()):
print("User successful authenticated")
else:
logging.info("Authentication failed!")
return user
@router.post("/token", response_model=Token)
def login_for_access_token(response: Response, db: SessionDep, form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(form_data.username, form_data.password, db)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email}, expires_delta=access_token_expires
)
response.set_cookie(
key="access_token", value=f"Bearer {access_token}", httponly=True
)
return {"access_token": access_token, "token_type": "bearer"}
oauth2_scheme = OAuth2PasswordBearerWithCookie(tokenUrl="/api/login/token")
def get_current_user_from_token(db: SessionDep, token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
username: str = payload.get("sub")
print("username/email extracted is ", username)
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = get_profile(username=username, db=db)
if user is None:
raise credentials_exception
return user
+1 -5
View File
@@ -8,11 +8,7 @@ from src.schema.comics.artist import ArtistCreation, ArtistDetailResponse, Artis
from src.db.models.comic import Comic, Artist, Issue
from src.schema.comics.issue import IssueDetailsResponse
router = APIRouter(
prefix="/comic",
tags=["comics"],
responses={404: {"description": "Not found"}},
)
router = APIRouter()
@router.get("/comics")
+10 -12
View File
@@ -1,22 +1,20 @@
from typing import List, AnyStr
from uuid import UUID
from fastapi import APIRouter, status, HTTPException
from fastapi import APIRouter, status, HTTPException, Depends
from sqlalchemy import select, Sequence
from src.apis.utils import SessionDep
from src.apis.version1.admin import get_current_user_from_token
from src.db.models.admin import Profile
from src.schema.media.file import MediaFileResponse, Link, get_file_details, set_file
from src.db.models.media import MediaFile
router = APIRouter(
prefix="/media",
tags=["media"]
)
router = APIRouter()
@router.get("/update-titles")
def update_titles(db: SessionDep) -> list[MediaFileResponse]:
results: list[MediaFileResponse] = []
files = db.query(MediaFile).filter(MediaFile.review == 1).all()
files = db.query(MediaFile).filter(MediaFile.review == True).all()
for mediafile in files:
mediafile.update_title()
db.add(mediafile)
@@ -27,13 +25,13 @@ def update_titles(db: SessionDep) -> list[MediaFileResponse]:
@router.get("/files", response_model=List[MediaFileResponse])
def get_all_files(db: SessionDep, review: bool = False, download: bool = False) -> List[MediaFileResponse]:
def get_all_files(db: SessionDep, review: bool = False, download: bool = False, current_user: Profile = Depends(get_current_user_from_token)) -> List[MediaFileResponse]:
results: list[MediaFileResponse] = []
files: Sequence[MediaFile]
if review:
files = db.query(MediaFile).filter(MediaFile.review == 1).all()
files = db.query(MediaFile).filter(MediaFile.review == True).all()
elif download:
files = db.query(MediaFile).filter(MediaFile.should_download == 1).all()
files = db.query(MediaFile).filter(MediaFile.should_download == True).all()
else:
files = db.scalars(select(MediaFile)).all()
for mediafile in files:
@@ -66,8 +64,8 @@ def add_file(new_link: Link, db: SessionDep) -> MediaFileResponse:
try:
mediaFile: MediaFile = MediaFile()
setattr(mediaFile, "url", new_link.url)
setattr(mediaFile, "review", 1)
setattr(mediaFile, "should_download", 1)
setattr(mediaFile, "review", True)
setattr(mediaFile, "should_download", True)
db.add(mediaFile)
db.commit()
except:
+26
View File
@@ -0,0 +1,26 @@
from typing import List
from fastapi import APIRouter
from src.apis.utils import SessionDep
from src.db.models.metadata import MetaDataTable, MetaDataColumn
from src.db.repository.metadata import get_tables, get_columns
from src.schema.admin import MetaDataTableResponse, MetaDataColumnResponse
router = APIRouter()
@router.get("/tables")
def get_meta_data_tables(db: SessionDep) -> List[MetaDataTableResponse]:
tables = db.query(MetaDataTable).all()
response: List[MetaDataTableResponse] = get_tables(tables)
return response
@router.get("/columns")
def get_meta_data_columns(db: SessionDep) -> List[MetaDataColumnResponse]:
columns = db.query(MetaDataColumn).all()
response: List[MetaDataColumnResponse] = get_columns(columns)
return response
+1 -5
View File
@@ -5,11 +5,7 @@ from src.apis.utils import SessionDep
from src.schema.tysc.sport import SportResponse
from src.db.models.tysc import Sport
router = APIRouter(
prefix="/tysc",
tags=["tysc"],
responses={404: {"description": "Not found"}},
)
router = APIRouter()
@router.get("/sports")
def get_all_sports(db: SessionDep) -> List[SportResponse]: