complete loading single items
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 3s

This commit is contained in:
Thomas Peetz
2026-05-21 13:43:53 +02:00
parent 6269b54ee8
commit 40b498ed2a
40 changed files with 467 additions and 235 deletions
+9 -2
View File
@@ -6,6 +6,7 @@ from fastapi import APIRouter, Depends
from src.apis.version1.admin import mailaccount
from src.apis.version1.comics import (
artist,
publisher,
comic,
issue,
worktype,
@@ -34,7 +35,7 @@ from src.apis.version1.tysc import (
from src.core.security import get_current_user_from_token
from src.apis.version1.user import assignment, permission, profile, token
from src.apis.version1.bookshelf import article, publisher, book, author, articleauthor, bookauthor
from src.apis.version1.bookshelf import article, bookshelf_publisher, book, author, articleauthor, bookauthor
api_router = APIRouter(prefix="/api")
api_router.include_router(
@@ -43,6 +44,12 @@ api_router.include_router(
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
publisher.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
artist.router,
prefix="/comics",
@@ -170,7 +177,7 @@ api_router.include_router(
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
publisher.router,
bookshelf_publisher.router,
prefix="/bookshelf",
tags=["bookshelf"],
dependencies=[Depends(get_current_user_from_token)],
@@ -1,23 +1,32 @@
from typing import List
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from src.db.models.admin import MailAccount
from src.db.session import SessionDep
from src.schema.admin.mailaccount import MailAccountResponse, to_response
router = APIRouter()
@router.get("/mailaccounts", response_model=List[MailAccountResponse])
def get_all_mailaccounts(db: SessionDep) -> List[MailAccountResponse]:
"""
return all MailAccounts as JSON.
"""
results: List[MailAccountResponse] = []
players = db.query(MailAccount).all()
for player in players:
response = to_response(player)
mailaccounts = db.query(MailAccount).all()
for mailaccount in mailaccounts:
response = to_response(mailaccount)
results.append(response)
return results
@router.get("/mailaccounts/{mailaccount_id}", response_model=MailAccountResponse)
def get_mailaccount(mailaccount_id: str, db: SessionDep) -> MailAccountResponse:
"""
return MailAccounts by id.
"""
mailaccount = db.get(MailAccount, mailaccount_id)
if mailaccount is None:
raise HTTPException(status_code=409, detail="Mailaccount could not be found")
response = to_response(mailaccount)
return response
@@ -1,6 +1,6 @@
from typing import List
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from src.db.models.bookshelf import Author
from src.db.session import SessionDep
@@ -8,12 +8,19 @@ from src.schema.bookshelf.author import AuthorResponse, to_response
router = APIRouter()
@router.get("/authors", response_model=List[AuthorResponse])
def get_all_artists(db: SessionDep) -> List[AuthorResponse]:
def get_all_authors(db: SessionDep) -> List[AuthorResponse]:
results: List[AuthorResponse] = []
authors = db.query(Author).all()
for author in authors:
response = to_response(author)
results.append(response)
return results
@router.get("/authors/{author_id}", response_model=AuthorResponse)
def get_author(author_id: str, db: SessionDep) -> AuthorResponse:
author = db.get(Author, author_id)
if author is None:
raise HTTPException(status_code=404, detail="Author could not be found")
response = to_response(author)
return response
+15 -5
View File
@@ -5,7 +5,7 @@ from fastapi import APIRouter, HTTPException, status
from src.db.models.comic import Artist
from src.db.repository.comics.artist import get_artist_details
from src.db.session import SessionDep
from src.schema.comics.artist import ArtistCreation, ArtistResponse
from src.schema.comics.artist import ArtistCreation, ArtistResponse, artist_to_response
from src.schema.comics.artist_details import ArtistDetailResponse
@@ -17,12 +17,22 @@ def get_all_artists(db: SessionDep) -> List[ArtistResponse]:
results: List[ArtistResponse] = []
artists = db.query(Artist).all()
for artist in artists:
results.append(ArtistResponse(id=artist.id, name=str(artist.name)))
response = artist_to_response(artist)
results.append(response)
return results
@router.get("/artists/{artist_id}", response_model=ArtistDetailResponse)
def get_artist(artist_id: str, db: SessionDep) -> ArtistDetailResponse:
@router.get("/artists/{artist_id}", response_model=ArtistResponse)
def get_artist(artist_id: str, db: SessionDep) -> ArtistResponse:
artist = db.get(Artist, artist_id)
if artist is None:
raise HTTPException(status_code=404, detail="Artist could not be found")
response: ArtistResponse = artist_to_response(artist)
return response
@router.get("/artists/details/{artist_id}", response_model=ArtistDetailResponse)
def get_artist_details_by_id(artist_id: str, db: SessionDep) -> ArtistDetailResponse:
artist = db.get(Artist, artist_id)
if artist is None:
raise HTTPException(status_code=404, detail="Artist could not be found")
@@ -39,5 +49,5 @@ def add_artist(db: SessionDep, artist_creation: ArtistCreation) -> ArtistRespons
db.commit()
except:
raise HTTPException(status_code=409, detail="Artist already added")
response = ArtistResponse(id=artist.id, name=str(artist.name))
response = artist_to_response(artist)
return response
+15 -34
View File
@@ -3,20 +3,13 @@ from typing import List
from fastapi import APIRouter, HTTPException
from src.core.log_conf import logger
from src.db.models.comic import Comic, Publisher
from src.db.repository.comics.artist import get_artist_details
from src.db.models.comic import Comic
from src.db.repository.comics.comic import (
get_comic_details,
get_issue_details,
get_short_info,
list_comics,
)
from src.db.repository.comics.publisher import get_publisher_details
from src.db.session import SessionDep
from src.schema.comics.comic import ComicResponse
from src.schema.comics.comic import ComicResponse, comic_to_response
from src.schema.comics.comic_details import ComicDetailsResponse
from src.schema.comics.publisher import PublisherResponse
from src.schema.comics.publisher_details import PublisherDetailsResponse
router = APIRouter()
@@ -24,15 +17,24 @@ router = APIRouter()
@router.get("/comics")
def get_all_comics(db: SessionDep) -> List[ComicResponse]:
results: List[ComicResponse] = []
comics = list_comics(db)
comics = db.query(Comic).all()
for comic in comics:
response = get_short_info(comic)
response = comic_to_response(comic)
results.append(response)
return results
@router.get("/comics/{comic_id}", response_model=ComicDetailsResponse)
def get_comic(comic_id: str, db: SessionDep) -> ComicDetailsResponse:
@router.get("/comics/{comic_id}", response_model=ComicResponse)
def get_comic(comic_id: str, db: SessionDep) -> ComicResponse:
comic = db.get(Comic, comic_id)
if comic is None:
raise HTTPException(status_code=404, detail="Comic could not be found")
response: ComicResponse = comic_to_response(comic)
return response
@router.get("/comics/details/{comic_id}", response_model=ComicDetailsResponse)
def get_comic_details_by_id(comic_id: str, db: SessionDep) -> ComicDetailsResponse:
comic = db.get(Comic, comic_id)
if comic is None:
raise HTTPException(status_code=404, detail="Comic could not be found")
@@ -40,24 +42,3 @@ def get_comic(comic_id: str, db: SessionDep) -> ComicDetailsResponse:
response: ComicDetailsResponse = get_comic_details(comic)
logger.info(f"ComicDetailsResponse: {response}")
return response
@router.get("/publishers", response_model=List[PublisherResponse])
def get_all_publishers(db: SessionDep) -> List[PublisherResponse]:
results: List[PublisherResponse] = []
publishers = db.query(Publisher).all()
for publisher in publishers:
results.append(PublisherResponse(id=publisher.id, name=str(publisher.name)))
return results
@router.get("/publishers/{publisher_id}", response_model=PublisherDetailsResponse)
def get_publisher(publisher_id: str, db: SessionDep) -> PublisherDetailsResponse:
publisher = db.get(Publisher, publisher_id)
if publisher is None:
raise HTTPException(status_code=404, detail="Publisher could not be found")
response: PublisherDetailsResponse = get_publisher_details(publisher)
return response
@@ -1,20 +1,27 @@
from typing import List
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from src.db.models.comic import ComicWork
from src.db.session import SessionDep
from src.schema.comics.comicwork import ComicWorkResponse, to_response
from src.schema.comics.comicwork import ComicWorkResponse, comicwork_to_response
router = APIRouter()
@router.get("/comicworks", response_model=List[ComicWorkResponse])
def get_comicworks(db: SessionDep) -> List[ComicWorkResponse]:
results: List[ComicWorkResponse] = []
worktypes = db.query(ComicWork).all()
for worktype in worktypes:
response = to_response(worktype)
response = comicwork_to_response(worktype)
results.append(response)
return results
@router.get("/comicworks/{comicwork_id}", response_model=ComicWorkResponse)
def get_comicwork(comicwork_id: str, db: SessionDep) -> ComicWorkResponse:
worktype = db.get(ComicWork, comicwork_id)
if worktype is None:
raise HTTPException(status_code=404, detail="Comicwork could not be found")
response = comicwork_to_response(worktype)
return response
+11 -3
View File
@@ -1,10 +1,10 @@
from typing import List
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from src.db.models.comic import Issue
from src.db.session import SessionDep
from src.schema.comics.issue import IssueResponse, to_response
from src.schema.comics.issue import IssueResponse, issue_to_response
router = APIRouter()
@@ -14,6 +14,14 @@ def get_issues(db: SessionDep) -> List[IssueResponse]:
results: List[IssueResponse] = []
issues = db.query(Issue).all()
for issue in issues:
response = to_response(issue)
response = issue_to_response(issue)
results.append(response)
return results
@router.get("/issues/{issue_id}", response_model=IssueResponse)
def get_issue(issue_id: str, db: SessionDep) -> IssueResponse:
issue = db.get(Issue, issue_id)
if issue is None:
raise HTTPException(status_code=404, detail="Issue could not be found")
response = issue_to_response(issue)
return response
@@ -1,20 +1,27 @@
from typing import List
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from src.db.models.comic import IssueWork
from src.db.session import SessionDep
from src.schema.comics.issuework import IssueWorkResponse, to_response
from src.schema.comics.issuework import IssueWorkResponse, issuework_to_response
router = APIRouter()
@router.get("/issueworks", response_model=List[IssueWorkResponse])
def get_issueworks(db: SessionDep) -> List[IssueWorkResponse]:
results: List[IssueWorkResponse] = []
worktypes = db.query(IssueWork).all()
for worktype in worktypes:
response = to_response(worktype)
response = issuework_to_response(worktype)
results.append(response)
return results
@router.get("/issueworks/{issuework_id}", response_model=IssueWorkResponse)
def get_issuework(issuework_id: str, db: SessionDep) -> IssueWorkResponse:
worktype = db.get(IssueWork, issuework_id)
if worktype is None:
raise HTTPException(status_code=404, detail="Issuework could not be found")
response = issuework_to_response(worktype)
return response
@@ -0,0 +1,37 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.comic import Publisher
from src.db.repository.comics.publisher import get_publisher_details
from src.db.session import SessionDep
from src.schema.comics.publisher import PublisherResponse, publisher_to_response
from src.schema.comics.publisher_details import PublisherDetailsResponse
router = APIRouter()
@router.get("/publishers", response_model=List[PublisherResponse])
def get_all_publishers(db: SessionDep) -> List[PublisherResponse]:
results: List[PublisherResponse] = []
publishers = db.query(Publisher).all()
for publisher in publishers:
response: PublisherResponse = publisher_to_response(publisher)
results.append(response)
return results
@router.get("/publishers/{publisher_id}", response_model=PublisherResponse)
def get_publisher(publisher_id: str, db: SessionDep) -> PublisherResponse:
publisher = db.get(Publisher, publisher_id)
if publisher is None:
raise HTTPException(status_code=404, detail="Publisher could not be found")
response: PublisherResponse = publisher_to_response(publisher)
return response
@router.get("/publishers/details/{publisher_id}", response_model=PublisherDetailsResponse)
def get_publisher_details_by_id(publisher_id: str, db: SessionDep) -> PublisherDetailsResponse:
publisher = db.get(Publisher, publisher_id)
if publisher is None:
raise HTTPException(status_code=404, detail="Publisher could not be found")
response: PublisherDetailsResponse = get_publisher_details(publisher)
return response
@@ -1,18 +1,26 @@
from typing import List
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from src.db.models.comic import StoryArc
from src.db.session import SessionDep
from src.schema.comics.storyarc import StoryArcResponse, to_response
from src.schema.comics.storyarc import StoryArcResponse, storyarc_to_response
router = APIRouter()
@router.get("/storyarcs", response_model=List[StoryArcResponse])
def get_issues(db: SessionDep) -> List[StoryArcResponse]:
def get_storyarcs(db: SessionDep) -> List[StoryArcResponse]:
results: List[StoryArcResponse] = []
storyarcs = db.query(StoryArc).all()
for storyarc in storyarcs:
response = to_response(storyarc)
response = storyarc_to_response(storyarc)
results.append(response)
return results
@router.get("/storyarcs/{storyarc_id}", response_model=StoryArcResponse)
def get_storyarc(story_arc_id: str, db: SessionDep) -> StoryArcResponse:
storyarc = db.get(StoryArc, story_arc_id)
if storyarc is None:
raise HTTPException(status_code=404, detail="Storyarc could not be found")
response = storyarc_to_response(storyarc)
return response
+12 -4
View File
@@ -1,18 +1,26 @@
from typing import List
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from src.db.models.comic import Volume
from src.db.session import SessionDep
from src.schema.comics.volume import VolumeResponse, to_response
from src.schema.comics.volume import VolumeResponse, volume_to_response
router = APIRouter()
@router.get("/volumes", response_model=List[VolumeResponse])
def get_issues(db: SessionDep) -> List[VolumeResponse]:
def volumes(db: SessionDep) -> List[VolumeResponse]:
results: List[VolumeResponse] = []
worktypes = db.query(Volume).all()
for worktype in worktypes:
response = to_response(worktype)
response = volume_to_response(worktype)
results.append(response)
return results
@router.get("/volumes/{volume_id}", response_model=VolumeResponse)
def get_volume(volume_id: str, db: SessionDep) -> VolumeResponse:
worktype = db.get(Volume, volume_id)
if worktype is None:
raise HTTPException(status_code=404, detail="Volume could not be found")
response = volume_to_response(worktype)
return response
@@ -1,10 +1,10 @@
from typing import List
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from src.db.models.comic import WorkType
from src.db.session import SessionDep
from src.schema.comics.worktype import WorktypeResponse, to_response
from src.schema.comics.worktype import WorktypeResponse, worktype_to_response
router = APIRouter()
@@ -14,6 +14,14 @@ def get_issues(db: SessionDep) -> List[WorktypeResponse]:
results: List[WorktypeResponse] = []
worktypes = db.query(WorkType).all()
for worktype in worktypes:
response = to_response(worktype)
response = worktype_to_response(worktype)
results.append(response)
return results
@router.get("/worktypes/{worktype_id}", response_model=WorktypeResponse)
def get_issue(worktype_id: str, db: SessionDep) -> WorktypeResponse:
worktype = db.get(WorkType, worktype_id)
if worktype is None:
raise HTTPException(status_code=404, detail="Worktype could not be found")
response = worktype_to_response(worktype)
return response
@@ -1,33 +1,33 @@
from typing import List
from fastapi import APIRouter, status, HTTPException
from sqlalchemy import select
from src.core.log_conf import logger
from src.db.repository.media import create_new_mediaactor, delete_mediaactor, get_actor_details
from src.db.repository.media import create_new_mediaactor, delete_mediaactor
from src.db.session import SessionDep
from src.schema.media.actor import MediaActorModel, MediaActorResponse
from src.schema.media.actor import MediaActorModel, MediaActorResponse, actor_to_response
from src.db.models.media import MediaActor
router = APIRouter()
@router.get("/actors", response_model=list[MediaActorResponse])
# def get_all_files(db: SessionDep, review: bool = False, download: bool = False, current_user: Profile = Depends(get_current_user_from_token)) -> List[MediaFileResponse]:
def get_all_actors(db: SessionDep, review: bool = False, download: bool = False) -> list[MediaActorResponse]: # type: ignore
@router.get("/actors", response_model=List[MediaActorResponse])
def get_all_actors(db: SessionDep) -> List[MediaActorResponse]:
results: list[MediaActorResponse] = []
actors = db.scalars(select(MediaActor)).all()
actors = db.query(MediaActor).all()
for mediaactor in actors:
response = MediaActorResponse(id=mediaactor.id, name=str(mediaactor.name), url=str(mediaactor.url))
response = actor_to_response(mediaactor)
results.append(response)
return results
@router.get("/actors/{actor_id}", response_model=MediaActorResponse)
def get_actor(actor_id: str, db: SessionDep) -> MediaActorResponse: # type: ignore
def get_actor(actor_id: str, db: SessionDep) -> MediaActorResponse:
media_actor = db.get(MediaActor, actor_id)
if not media_actor:
raise HTTPException(status_code=404, detail="MediaActor could not be found")
response = get_actor_details(media_actor)
response = actor_to_response(media_actor)
return response
@router.delete("/actors/{actor_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_actor(actor_id: str, db: SessionDep): # type: ignore
def delete_actor(actor_id: str, db: SessionDep):
media_actor = db.get(MediaActor, actor_id)
if not media_actor:
raise HTTPException(status_code=404, detail="MediaActor could not be found")
@@ -35,11 +35,11 @@ def delete_actor(actor_id: str, db: SessionDep): # type: ignore
delete_mediaactor(db, media_actor.id)
@router.post("/actors", status_code=status.HTTP_201_CREATED)
def add_actor(new_actor: MediaActorModel, db: SessionDep) -> MediaActorResponse: # type: ignore
def add_actor(new_actor: MediaActorModel, db: SessionDep) -> MediaActorResponse:
logger.info(f"add actor {new_actor.url}")
try:
mediaActor: MediaActor = create_new_mediaactor(new_actor, db)
except:
raise HTTPException(status_code=409, detail="Link duplicate")
response = get_actor_details(mediaActor)
response = actor_to_response(mediaActor)
return response
@@ -1,32 +1,31 @@
from typing import List
from fastapi import APIRouter, status, HTTPException
from sqlalchemy import select
from src.db.models.media import MediaActorFile
from src.db.repository.media import delete_mediaactorfile, get_actorfile_details
from src.db.repository.media import delete_mediaactorfile
from src.db.session import SessionDep
from src.schema.media.actorfile import MediaActorFileResponse
from src.schema.media.actorfile import MediaActorFileResponse, actorfile_to_response
router = APIRouter()
@router.get("/actorfiles", response_model=List[MediaActorFileResponse])
def get_all_actorfiles(db: SessionDep) -> List[MediaActorFileResponse]: # type: ignore
def get_all_actorfiles(db: SessionDep) -> List[MediaActorFileResponse]:
results: List[MediaActorFileResponse] = []
actorfiles = db.scalars(select(MediaActorFile)).all()
actorfiles = db.query(MediaActorFile).all()
for media_actorfile in actorfiles:
response = get_actorfile_details(media_actorfile)
response = actorfile_to_response(media_actorfile)
results.append(response)
return results
@router.get("/actorfiles/{actorfile_id}", response_model=MediaActorFileResponse)
def get_actorfile(actorfile_id: str, db: SessionDep) -> MediaActorFileResponse: # type: ignore
def get_actorfile(actorfile_id: str, db: SessionDep) -> MediaActorFileResponse:
media_actorfile = db.get(MediaActorFile, actorfile_id)
if not media_actorfile:
raise HTTPException(status_code=404, detail="MediaActor could not be found")
response = get_actorfile_details(media_actorfile)
response = actorfile_to_response(media_actorfile)
return response
@router.delete("/actorfiles/{actorfile_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_actorfile(actorfile_id: str, db: SessionDep): # type: ignore
def delete_actorfile(actorfile_id: str, db: SessionDep):
media_actorfile = db.get(MediaActorFile, actorfile_id)
if not media_actorfile:
raise HTTPException(status_code=404, detail="MediaActor could not be found")
+18 -27
View File
@@ -1,7 +1,6 @@
from typing import List
from fastapi import APIRouter, status, HTTPException
from sqlalchemy import select
from src.core.log_conf import logger
from src.db.repository.media import (
create_new_mediaactorfile,
@@ -9,16 +8,16 @@ from src.db.repository.media import (
delete_mediafile,
)
from src.db.session import SessionDep
from src.schema.media.actor import MediaActorResponse
from src.schema.media.actorfile import MediaActorFileResponse
from src.schema.media.file import MediaFileResponse, Link, get_file_details, set_file
from src.schema.media.actor import MediaActorResponse, actor_to_response
from src.schema.media.actorfile import MediaActorFileResponse, actorfile_to_response
from src.schema.media.file import MediaFileResponse, Link, file_to_response, set_file
from src.db.models.media import MediaFile
router = APIRouter()
@router.get("/update-titles")
def update_titles(db: SessionDep) -> list[MediaFileResponse]:
def update_titles(db: SessionDep) -> List[MediaFileResponse]:
"""
Update title for given MediaFile.
"""
@@ -27,13 +26,13 @@ def update_titles(db: SessionDep) -> list[MediaFileResponse]:
for mediafile in files:
mediafile.update_title()
db.add(mediafile)
response = get_file_details(mediafile)
response = file_to_response(mediafile)
results.append(response)
db.commit()
return results
@router.get("/files", response_model=list[MediaFileResponse])
@router.get("/files", response_model=List[MediaFileResponse])
def get_all_files(
db: SessionDep, review: bool = False, download: bool = False
) -> List[MediaFileResponse]:
@@ -49,7 +48,7 @@ def get_all_files(
else:
files = db.query(MediaFile).all()
for mediafile in files:
response = get_file_details(mediafile)
response = file_to_response(mediafile)
results.append(response)
return results
@@ -62,7 +61,7 @@ def get_file(file_id: str, db: SessionDep) -> MediaFileResponse:
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
response = get_file_details(mediafile)
response = file_to_response(mediafile)
return response
@@ -84,7 +83,7 @@ def delete_file(file_id: str, db: SessionDep):
@router.get("/files/{file_id}/actors", response_model=list[MediaActorResponse])
def get_file_actors(file_id: str, db: SessionDep) -> list[MediaActorResponse]:
"""
Get list of ACtors for given MediaFile.
Get list of Actors for given MediaFile.
"""
mediafile = db.get(MediaFile, file_id)
if not mediafile:
@@ -93,19 +92,15 @@ def get_file_actors(file_id: str, db: SessionDep) -> list[MediaActorResponse]:
logger.info("already known actors: %s", actor_files)
results: list[MediaActorResponse] = []
for actor_file in actor_files:
response = MediaActorResponse(
id=actor_file.media_actor.id,
name=actor_file.media_actor.name,
url=str(actor_file.media_actor.url),
)
response = actor_to_response(actor_file.media_actor)
results.append(response)
return results
@router.put("/files/{file_id}/actors", response_model=list[MediaActorFileResponse])
@router.put("/files/{file_id}/actors", response_model=List[MediaActorFileResponse])
def update_file_actors(
file_id: str, db: SessionDep, actors: list[MediaActorResponse]
) -> list[MediaActorFileResponse]: # type: ignore
file_id: str, db: SessionDep, actors: List[MediaActorResponse]
) -> List[MediaActorFileResponse]:
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
@@ -122,13 +117,9 @@ def update_file_actors(
create_new_mediaactorfile(db, actor.id, mediafile.id)
db.refresh(mediafile)
actor_files = mediafile.media_actor_files
results: list[MediaActorFileResponse] = []
results: List[MediaActorFileResponse] = []
for actor_file in actor_files:
response = MediaActorFileResponse(
id=actor_file.id,
actor_id=actor_file.media_actor_id,
file_id=actor_file.media_file_id,
)
response = actorfile_to_response(actor_file)
results.append(response)
return results
@@ -136,7 +127,7 @@ def update_file_actors(
@router.put("/files/{file_id}", response_model=MediaFileResponse)
def update_file(
file_id: str, db: SessionDep, info: MediaFileResponse
) -> MediaFileResponse: # type: ignore
) -> MediaFileResponse:
mediaFile = db.get(MediaFile, file_id)
if not mediaFile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
@@ -146,7 +137,7 @@ def update_file(
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be updated")
response = get_file_details(mediafile)
response = file_to_response(mediafile)
return response
@@ -157,5 +148,5 @@ def add_file(new_link: Link, db: SessionDep) -> MediaFileResponse: # type: igno
mediaFile: MediaFile = create_new_mediafile(new_link.url, db)
except:
raise HTTPException(status_code=409, detail="Link duplicate")
response = get_file_details(mediaFile)
response = file_to_response(mediaFile)
return response
@@ -1,17 +1,17 @@
from typing import List
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from src.db.models.media import MediaVideo
from src.db.session import SessionDep
from src.schema.media.video import MediaVideoResponse, to_response
from src.schema.media.video import MediaVideoResponse, video_to_response
router = APIRouter()
@router.get("/videos", response_model=List[MediaVideoResponse])
def get_all_files(
def get_all_videos(
db: SessionDep, review: bool = False, download: bool = False
) -> List[MediaVideoResponse]:
"""
@@ -26,6 +26,17 @@ def get_all_files(
else:
files = db.query(MediaVideo).all()
for mediafile in files:
response = to_response(mediafile)
response = video_to_response(mediafile)
results.append(response)
return results
@router.get("/videos/{video_id}", response_model=MediaVideoResponse)
def get_video(video_id: str, db: SessionDep) -> MediaVideoResponse:
"""
Get MediaVideo by id.
"""
video = db.get(MediaVideo, video_id)
if video is None:
raise HTTPException(status_code=404, detail="MediaVideo could not be found")
response = video_to_response(video)
return response
+10 -2
View File
@@ -1,6 +1,6 @@
from typing import List
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import Card
from src.db.session import SessionDep
@@ -10,10 +10,18 @@ from src.schema.tysc.card import CardResponse, to_response
router = APIRouter()
@router.get("/cards")
def get_all_players(db: SessionDep) -> List[CardResponse]:
def get_all_cards(db: SessionDep) -> List[CardResponse]:
results: List[CardResponse] = []
cards = db.query(Card).all()
for card in cards:
response = to_response(card)
results.append(response)
return results
@router.get("/cards/{card_id}", response_model=CardResponse)
def get_card(card_id: str, db: SessionDep) -> CardResponse:
card = db.get(Card, card_id)
if card is None:
raise HTTPException(status_code=404, detail="Card could not be found")
response = to_response(card)
return response
+10 -2
View File
@@ -1,6 +1,6 @@
from typing import List
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import CardSet
from src.db.session import SessionDep
@@ -10,10 +10,18 @@ from src.schema.tysc.cardset import CardSetResponse, to_response
router = APIRouter()
@router.get("/cardsets")
def get_all_players(db: SessionDep) -> List[CardSetResponse]:
def get_all_cardsets(db: SessionDep) -> List[CardSetResponse]:
results: List[CardSetResponse] = []
cardsets = db.query(CardSet).all()
for cardset in cardsets:
response = to_response(cardset)
results.append(response)
return results
@router.get("/cardsets/{cardset_id}", response_model=CardSetResponse)
def get_cardset(cardset_id: str, db: SessionDep) -> CardSetResponse:
cardset = db.get(CardSet, cardset_id)
if cardset is None:
raise HTTPException(status_code=404, detail="Cardset could not be found")
response = to_response(cardset)
return response
@@ -1,20 +1,26 @@
from typing import List
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from src.db.models.admin import Assignment
from src.db.session import SessionDep
from src.schema.user.assignment import AssignmentResponse, to_response
router = APIRouter()
@router.get("/assignments", response_model=List[AssignmentResponse])
def get_all_profiles(db: SessionDep) -> List[AssignmentResponse]:
def get_all_assignments(db: SessionDep) -> List[AssignmentResponse]:
results: List[AssignmentResponse] = []
profiles = db.query(Assignment).all()
for profile in profiles:
response = to_response(profile)
assignments = db.query(Assignment).all()
for assignment in assignments:
response = to_response(assignment)
results.append(response)
return results
@router.get("/assignments/{assignment_id}", response_model=AssignmentResponse)
def get_assignment(assignment_id: str, db: SessionDep) -> AssignmentResponse:
assignment = db.get(Assignment, assignment_id)
if assignment is None:
raise HTTPException(status_code=404, detail="Assignment could not be found")
response = to_response(assignment)
return response
@@ -1,6 +1,6 @@
from typing import List
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from src.db.models.admin import Permission
from src.db.session import SessionDep
@@ -8,12 +8,19 @@ from src.schema.user.permission import PermissionResponse, to_response
router = APIRouter()
@router.get("/permissions", response_model=List[PermissionResponse])
def get_all_profiles(db: SessionDep) -> List[PermissionResponse]:
def get_all_permissions(db: SessionDep) -> List[PermissionResponse]:
results: List[PermissionResponse] = []
permissions = db.query(Permission).all()
for permission in permissions:
response = to_response(permission)
results.append(response)
return results
@router.get("/permissions/{permission_id}", response_model=PermissionResponse)
def get_permission(permission_id: str, db: SessionDep) -> PermissionResponse:
permission = db.get(Permission, permission_id)
if permission is None:
raise HTTPException(status_code=404, detail="Permission could not be found")
response = to_response(permission)
return response
+6 -6
View File
@@ -5,9 +5,9 @@ from src.core.log_conf import logger
from src.core.security import CurrentUser
from src.db.models.admin import Profile
from src.db.repository.user import create_new_profile, get_profile_details
from src.db.repository.user import create_new_profile
from src.db.session import SessionDep
from src.schema.user.profile import ProfileResponse, ProfileModel
from src.schema.user.profile import ProfileResponse, ProfileModel, to_response
router = APIRouter()
@@ -22,7 +22,7 @@ def get_all_profiles(db: SessionDep) -> List[ProfileResponse]:
results: List[ProfileResponse] = []
profiles = db.scalars(select(Profile)).all()
for profile in profiles:
response = get_profile_details(profile)
response = to_response(profile)
results.append(response)
return results
@@ -30,8 +30,8 @@ def get_all_profiles(db: SessionDep) -> List[ProfileResponse]:
def get_profile(profile_id: str, db: SessionDep) -> ProfileResponse:
profile = db.get(Profile, profile_id)
if not profile:
raise HTTPException(status_code=404, detail="MediaActor could not be found")
response = get_profile_details(profile)
raise HTTPException(status_code=404, detail="Profile could not be found")
response = to_response(profile)
return response
@router.delete("/profiles/{profile_id}", status_code=status.HTTP_204_NO_CONTENT)
@@ -49,5 +49,5 @@ def add_profile(new_profile: ProfileModel, db: SessionDep) -> ProfileResponse:
profile: Profile = create_new_profile(new_profile, db)
except:
raise HTTPException(status_code=409, detail="Profile duplicate")
response = get_profile_details(profile)
response = to_response(profile)
return response