1 Commits

Author SHA1 Message Date
tpeetz 4c918f01db kontor-vue with tutorial implementation 2026-04-12 16:23:10 +02:00
388 changed files with 6102 additions and 12561 deletions
-19
View File
@@ -1,19 +0,0 @@
name: Gitea Actions Demo
run-name: ${{ gitea.actor }} ist testing out Gitea Actions
on: [push]
jobs:
Explore-Gitea-Actions:
runs-on: inky
steps:
- run: echo "The job was automatically triggered by a ${{ gitea.event_name }} event."
- run: echo "This job is now running on a ${{ runner.os }} server hosted by Gitea!"
- run: echo "The name of your branch is ${{ gitea.ref }} and your repository is ${{ gitea.repository }}."
- name: Check out repository code
uses: actions/checkout@v4
- run: echo "The ${{ gitea.repository }} repository has been cloned to the runner."
- run: echo "The workflow is now ready to test your code on the runner."
- name: List files in the directory
run: |
ls ${{ gitea.workspace }}
- run: echo "The job's status is ${{ job.status }}."
@@ -1,5 +1,5 @@
### STAGE 1: Build ###
FROM docker.io/node:22-alpine AS build
FROM node:22.15-alpine AS build
WORKDIR /app
COPY package.json package-lock.json ./
RUN npm install
@@ -7,9 +7,8 @@ COPY . .
RUN npm run build
### STAGE 2: Run ###
FROM docker.io/library/nginx:stable-alpine
FROM nginx:1.17.1-alpine
COPY nginx.conf /etc/nginx/conf.d/default.conf
COPY --from=build /app/dist/kontor-angular/browser /usr/share/nginx/html
EXPOSE 80
CMD ["nginx", "-g", "daemon off;"]
-1
View File
@@ -1,5 +1,4 @@
server {
listen 8800;
# Root-Verzeichnis für den Server setzen (wir kopieren unsere Anwendung hierher)
root /usr/share/nginx/html;
+10 -236
View File
@@ -1,238 +1,12 @@
"""
add router for different parts (like comics, tysc, media)
"""
from fastapi import APIRouter
from fastapi import APIRouter, Depends
from src.apis.version1.admin import mailaccount
from src.apis.version1.comics import (
artist,
publisher,
comic,
issue,
worktype,
volume,
storyarc,
comicwork,
issuework,
)
from src.apis.version1.media import (
actor,
file,
mediaactorfile,
mediavideo,
mediaarticle,
)
from src.apis.version1.tysc import (
card,
cardset,
fieldposition,
player,
rooster,
sport,
team,
vendor,
)
from src.core.security import get_current_user_from_token
from src.apis.version1 import comic, mediaactor, mediafile, mediaactorfile, tysc, admin, user
from src.apis.version1.user import assignment, permission, profile, token
from src.apis.version1.bookshelf import article, bookshelf_publisher, book, author, articleauthor, bookauthor
api_router = APIRouter(prefix="/api")
api_router.include_router(
comic.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
publisher.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
artist.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
issue.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
worktype.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
volume.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
storyarc.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
comicwork.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
issuework.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
file.router,
prefix="/media",
tags=["media"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
mediavideo.router,
prefix="/media",
tags=["media"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
mediaarticle.router,
prefix="/media",
tags=["media"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
actor.router,
prefix="/media",
tags=["media"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
mediaactorfile.router,
prefix="/media",
tags=["media"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
sport.router,
prefix="/tysc",
tags=["tysc"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
player.router,
prefix="/tysc",
tags=["tysc"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
team.router,
prefix="/tysc",
tags=["tysc"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
fieldposition.router,
prefix="/tysc",
tags=["tysc"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
rooster.router,
prefix="/tysc",
tags=["tysc"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
vendor.router,
prefix="/tysc",
tags=["tysc"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
cardset.router,
prefix="/tysc",
tags=["tysc"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
card.router,
prefix="/tysc",
tags=["tysc"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
article.router,
prefix="/bookshelf",
tags=["bookshelf"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
bookshelf_publisher.router,
prefix="/bookshelf",
tags=["bookshelf"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
book.router,
prefix="/bookshelf",
tags=["bookshelf"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
author.router,
prefix="/bookshelf",
tags=["bookshelf"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
articleauthor.router,
prefix="/bookshelf",
tags=["bookshelf"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
bookauthor.router,
prefix="/bookshelf",
tags=["bookshelf"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
profile.router,
prefix="/user",
tags=["user"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
token.router,
prefix="/user",
tags=["user"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
permission.router,
prefix="/user",
tags=["user"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
assignment.router,
prefix="/user",
tags=["user"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
mailaccount.router,
prefix="/admin",
tags=["admin"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router = APIRouter(prefix="/api")
api_router.include_router(comic.router, prefix="/comics", tags=["comics"])
api_router.include_router(mediafile.router, prefix="/media", tags=["media"])
api_router.include_router(mediaactor.router, prefix="/media", tags=["media"])
api_router.include_router(mediaactorfile.router, prefix="/media", tags=["media"])
api_router.include_router(tysc.router, prefix="/tysc", tags=["tysc"])
api_router.include_router(admin.router, prefix="/login", tags=["login"])
api_router.include_router(user.router, prefix="/user", tags=["user"])
+52
View File
@@ -0,0 +1,52 @@
from datetime import timedelta
from typing import Annotated
from fastapi import APIRouter, Body, HTTPException, status, Depends, Response
from fastapi.security import OAuth2PasswordRequestForm
from src.core.config import settings
from src.core.security import create_access_token, authenticate_user, get_current_active_user
from src.db.models.admin import Profile
from src.schema.admin import Token, ProfileModel
from src.webapps.auth.forms import LoginForm
router = APIRouter()
@router.post("/token")
def login_for_access_token(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]) -> Token:
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email, "scope": " ".join(form_data.scopes)}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
# @router.post("/token-cookie", response_model=Token)
def login_for_token_cookie(response: Response, form_data: LoginForm = Depends()):
user = authenticate_user(form_data.username, form_data.password) # type: ignore
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email}, expires_delta=access_token_expires
)
response.set_cookie(
key="access_token", value=f"Bearer {access_token}", httponly=True
)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/profile", response_model=ProfileModel)
async def read_profile(current_user: Annotated[Profile, Depends(get_current_active_user)]):
return current_user
@@ -1,74 +0,0 @@
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, Response, status
from fastapi.security import OAuth2PasswordRequestForm
from src.core.config import settings
from src.core.log_conf import logger
from src.core.security import (
authenticate_user_by_email,
authenticate_user_by_username,
create_access_token,
)
from src.schema.admin.login import LoginRequest
from src.schema.admin.token import Token
from src.webapps.auth.forms import LoginForm
login_router = APIRouter()
@login_router.post(
"/login",
tags=["login"],
summary="Login and get token",
response_description="Return HTTP status code 200 (OK)",
status_code=status.HTTP_200_OK,
)
def login(request: LoginRequest) -> Token:
logger.info("login with %s", request.email)
user = authenticate_user_by_email(str(request.email), str(request.password))
scopes = ["admin", "read"]
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email, "scope": " ".join(scopes)},
expires_delta=access_token_expires,
)
return Token(access_token=access_token, token_type="bearer")
@login_router.post("/token", tags=["login"], summary="Login for access token")
# async def login_for_access_token(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]) -> Token:
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
) -> Token:
user = authenticate_user_by_username(form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.user_name, "scope": " ".join(form_data.scopes)},
expires_delta=access_token_expires,
)
return Token(access_token=access_token, token_type="bearer")
def login_for_token_cookie(response: Response, form_data: LoginForm = Depends()):
user = authenticate_user_by_email(str(form_data.username), str(form_data.password))
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email}, expires_delta=access_token_expires
)
response.set_cookie(
key="access_token", value=f"Bearer {access_token}", httponly=True
)
return {"access_token": access_token, "token_type": "bearer"}
@@ -1,32 +0,0 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.admin import MailAccount
from src.db.session import SessionDep
from src.schema.admin.mailaccount import MailAccountResponse, to_response
router = APIRouter()
@router.get("/mailaccounts", response_model=List[MailAccountResponse])
def get_all_mailaccounts(db: SessionDep) -> List[MailAccountResponse]:
"""
return all MailAccounts as JSON.
"""
results: List[MailAccountResponse] = []
mailaccounts = db.query(MailAccount).all()
for mailaccount in mailaccounts:
response = to_response(mailaccount)
results.append(response)
return results
@router.get("/mailaccounts/{mailaccount_id}", response_model=MailAccountResponse)
def get_mailaccount(mailaccount_id: str, db: SessionDep) -> MailAccountResponse:
"""
return MailAccounts by id.
"""
mailaccount = db.get(MailAccount, mailaccount_id)
if mailaccount is None:
raise HTTPException(status_code=409, detail="Mailaccount could not be found")
response = to_response(mailaccount)
return response
@@ -1,20 +0,0 @@
from typing import List
from fastapi import APIRouter
from src.db.models.bookshelf import Article
from src.db.session import SessionDep
from src.schema.bookshelf.article import ArticleResponse, to_response
router = APIRouter()
@router.get("/articles", response_model=List[ArticleResponse])
def get_all_artists(db: SessionDep) -> List[ArticleResponse]:
results: List[ArticleResponse] = []
articles = db.query(Article).all()
for article in articles:
response = to_response(article)
results.append(response)
return results
@@ -1,19 +0,0 @@
from typing import List
from fastapi import APIRouter
from src.db.models.bookshelf import ArticleAuthor
from src.db.session import SessionDep
from src.schema.bookshelf.articleauthor import ArticleAuthorResponse, to_response
router = APIRouter()
@router.get("/articleauthors", response_model=List[ArticleAuthorResponse])
def get_all_artists(db: SessionDep) -> List[ArticleAuthorResponse]:
results: List[ArticleAuthorResponse] = []
articleauthors = db.query(ArticleAuthor).all()
for articleauthor in articleauthors:
response = to_response(articleauthor)
results.append(response)
return results
@@ -1,26 +0,0 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.bookshelf import Author
from src.db.session import SessionDep
from src.schema.bookshelf.author import AuthorResponse, to_response
router = APIRouter()
@router.get("/authors", response_model=List[AuthorResponse])
def get_all_authors(db: SessionDep) -> List[AuthorResponse]:
results: List[AuthorResponse] = []
authors = db.query(Author).all()
for author in authors:
response = to_response(author)
results.append(response)
return results
@router.get("/authors/{author_id}", response_model=AuthorResponse)
def get_author(author_id: str, db: SessionDep) -> AuthorResponse:
author = db.get(Author, author_id)
if author is None:
raise HTTPException(status_code=404, detail="Author could not be found")
response = to_response(author)
return response
@@ -1,19 +0,0 @@
from typing import List
from fastapi import APIRouter
from src.db.models.bookshelf import Book
from src.db.session import SessionDep
from src.schema.bookshelf.book import BookResponse, to_response
router = APIRouter()
@router.get("/books", response_model=List[BookResponse])
def get_all_artists(db: SessionDep) -> List[BookResponse]:
results: List[BookResponse] = []
books = db.query(Book).all()
for book in books:
response = to_response(book)
results.append(response)
return results
@@ -1,19 +0,0 @@
from typing import List
from fastapi import APIRouter
from src.db.models.bookshelf import BookAuthor
from src.db.session import SessionDep
from src.schema.bookshelf.bookauthor import BookAuthorResponse, to_response
router = APIRouter()
@router.get("/bookauthors", response_model=List[BookAuthorResponse])
def get_all_artists(db: SessionDep) -> List[BookAuthorResponse]:
results: List[BookAuthorResponse] = []
bookauthors = db.query(BookAuthor).all()
for bookauthor in bookauthors:
response = to_response(bookauthor)
results.append(response)
return results
@@ -1,19 +0,0 @@
from typing import List
from fastapi import APIRouter
from src.db.models.bookshelf import BookshelfPublisher
from src.db.session import SessionDep
from src.schema.bookshelf.publisher import PublisherResponse, to_response
router = APIRouter()
@router.get("/publishers", response_model=List[PublisherResponse])
def get_all_artists(db: SessionDep) -> List[PublisherResponse]:
results: List[PublisherResponse] = []
publishers = db.query(BookshelfPublisher).all()
for publisher in publishers:
response = to_response(publisher)
results.append(response)
return results
+103
View File
@@ -0,0 +1,103 @@
from typing import List
from fastapi import APIRouter, HTTPException, status
from src.core.log_conf import logger
from src.db.models.comic import Artist, Comic, Issue, Publisher
from src.db.repository.comics.artist import get_artist_details
from src.db.repository.comics.comic import (
get_comic_details,
get_issue_details,
get_short_info,
list_comics,
)
from src.db.repository.comics.publisher import get_publisher_details
from src.db.session import SessionDep
from src.schema.comics.artist import ArtistCreation, ArtistResponse
from src.schema.comics.artist_details import ArtistDetailResponse
from src.schema.comics.comic import ComicResponse
from src.schema.comics.comic_details import ComicDetailsResponse
from src.schema.comics.issue_details import IssueDetailsResponse
from src.schema.comics.publisher import PublisherResponse
from src.schema.comics.publisher_details import PublisherDetailsResponse
router = APIRouter()
@router.get("/comics")
def get_all_comics(db: SessionDep) -> List[ComicResponse]: # type: ignore
results: List[ComicResponse] = []
comics = list_comics(db)
for comic in comics:
response = get_short_info(comic)
results.append(response)
return results
@router.get("/comics/{comic_id}", response_model=ComicDetailsResponse)
def get_comic(comic_id: str, db: SessionDep) -> ComicDetailsResponse: # type: ignore
comic = db.get(Comic, comic_id)
if comic is None:
raise HTTPException(status_code=404, detail="Comic could not be found")
logger.info(f"create ComicDetailsResponse for {comic}")
response: ComicDetailsResponse = get_comic_details(comic)
logger.info(f"ComicDetailsResponse: {response}")
return response
@router.get("/artists", response_model=List[ArtistResponse])
def get_all_artists(db: SessionDep) -> List[ArtistResponse]: # type: ignore
results: List[ArtistResponse] = []
artists = db.query(Artist).all()
for artist in artists:
results.append(ArtistResponse(id=artist.id, name=str(artist.name))) # type: ignore
return results
@router.get("/artists/{artist_id}", response_model=ArtistDetailResponse)
def get_artist(artist_id: str, db: SessionDep) -> ArtistDetailResponse: # type: ignore
artist = db.get(Artist, artist_id)
if artist is None:
raise HTTPException(status_code=404, detail="Artist could not be found")
response: ArtistDetailResponse = get_artist_details(artist)
return response
@router.post("/artists", status_code=status.HTTP_201_CREATED)
def add_artist(db: SessionDep, artist_creation: ArtistCreation) -> ArtistResponse: # type: ignore
artist: Artist = Artist()
setattr(artist, "name", artist_creation.name)
try:
db.add(artist)
db.commit()
except:
raise HTTPException(status_code=409, detail="Artist already added")
response = ArtistResponse(id=artist.id, name=str(artist.name))
return response
@router.get("/publishers", response_model=List[PublisherResponse])
def get_all_publishers(db: SessionDep) -> List[PublisherResponse]: # type: ignore
results: List[PublisherResponse] = []
publishers = db.query(Publisher).all()
for publisher in publishers:
results.append(PublisherResponse(id=publisher.id, name=str(publisher.name)))
return results
@router.get("/publishers/{publisher_id}", response_model=PublisherDetailsResponse)
def get_publisher(publisher_id: str, db: SessionDep) -> PublisherDetailsResponse: # type: ignore
publisher = db.get(Publisher, publisher_id)
if publisher is None:
raise HTTPException(status_code=404, detail="Publisher could not be found")
response: PublisherDetailsResponse = get_publisher_details(publisher)
return response
@router.get("/issues", response_model=List[IssueDetailsResponse])
def get_issues(db: SessionDep) -> List[IssueDetailsResponse]: # type: ignore
results: List[IssueDetailsResponse] = []
issues = db.query(Issue).all()
for issue in issues:
results.append(get_issue_details(issue))
return results
@@ -1,53 +0,0 @@
from typing import List
from fastapi import APIRouter, HTTPException, status
from src.db.models.comic import Artist
from src.db.repository.comics.artist import get_artist_details
from src.db.session import SessionDep
from src.schema.comics.artist import ArtistCreation, ArtistResponse, artist_to_response
from src.schema.comics.artist_details import ArtistDetailResponse
router = APIRouter()
@router.get("/artists", response_model=List[ArtistResponse])
def get_all_artists(db: SessionDep) -> List[ArtistResponse]:
results: List[ArtistResponse] = []
artists = db.query(Artist).all()
for artist in artists:
response = artist_to_response(artist)
results.append(response)
return results
@router.get("/artists/{artist_id}", response_model=ArtistResponse)
def get_artist(artist_id: str, db: SessionDep) -> ArtistResponse:
artist = db.get(Artist, artist_id)
if artist is None:
raise HTTPException(status_code=404, detail="Artist could not be found")
response: ArtistResponse = artist_to_response(artist)
return response
@router.get("/artists/details/{artist_id}", response_model=ArtistDetailResponse)
def get_artist_details_by_id(artist_id: str, db: SessionDep) -> ArtistDetailResponse:
artist = db.get(Artist, artist_id)
if artist is None:
raise HTTPException(status_code=404, detail="Artist could not be found")
response: ArtistDetailResponse = get_artist_details(artist)
return response
@router.post("/artists", status_code=status.HTTP_201_CREATED)
def add_artist(db: SessionDep, artist_creation: ArtistCreation) -> ArtistResponse:
artist: Artist = Artist()
setattr(artist, "name", artist_creation.name)
try:
db.add(artist)
db.commit()
except:
raise HTTPException(status_code=409, detail="Artist already added")
response = artist_to_response(artist)
return response
@@ -1,44 +0,0 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.core.log_conf import logger
from src.db.models.comic import Comic
from src.db.repository.comics.comic import (
get_comic_details,
)
from src.db.session import SessionDep
from src.schema.comics.comic import ComicResponse, comic_to_response
from src.schema.comics.comic_details import ComicDetailsResponse
router = APIRouter()
@router.get("/comics")
def get_all_comics(db: SessionDep) -> List[ComicResponse]:
results: List[ComicResponse] = []
comics = db.query(Comic).all()
for comic in comics:
response = comic_to_response(comic)
results.append(response)
return results
@router.get("/comics/{comic_id}", response_model=ComicResponse)
def get_comic(comic_id: str, db: SessionDep) -> ComicResponse:
comic = db.get(Comic, comic_id)
if comic is None:
raise HTTPException(status_code=404, detail="Comic could not be found")
response: ComicResponse = comic_to_response(comic)
return response
@router.get("/comics/details/{comic_id}", response_model=ComicDetailsResponse)
def get_comic_details_by_id(comic_id: str, db: SessionDep) -> ComicDetailsResponse:
comic = db.get(Comic, comic_id)
if comic is None:
raise HTTPException(status_code=404, detail="Comic could not be found")
logger.info(f"create ComicDetailsResponse for {comic}")
response: ComicDetailsResponse = get_comic_details(comic)
logger.info(f"ComicDetailsResponse: {response}")
return response
@@ -1,27 +0,0 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.comic import ComicWork
from src.db.session import SessionDep
from src.schema.comics.comicwork import ComicWorkResponse, comicwork_to_response
router = APIRouter()
@router.get("/comicworks", response_model=List[ComicWorkResponse])
def get_comicworks(db: SessionDep) -> List[ComicWorkResponse]:
results: List[ComicWorkResponse] = []
worktypes = db.query(ComicWork).all()
for worktype in worktypes:
response = comicwork_to_response(worktype)
results.append(response)
return results
@router.get("/comicworks/{comicwork_id}", response_model=ComicWorkResponse)
def get_comicwork(comicwork_id: str, db: SessionDep) -> ComicWorkResponse:
worktype = db.get(ComicWork, comicwork_id)
if worktype is None:
raise HTTPException(status_code=404, detail="Comicwork could not be found")
response = comicwork_to_response(worktype)
return response
@@ -1,27 +0,0 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.comic import Issue
from src.db.session import SessionDep
from src.schema.comics.issue import IssueResponse, issue_to_response
router = APIRouter()
@router.get("/issues", response_model=List[IssueResponse])
def get_issues(db: SessionDep) -> List[IssueResponse]:
results: List[IssueResponse] = []
issues = db.query(Issue).all()
for issue in issues:
response = issue_to_response(issue)
results.append(response)
return results
@router.get("/issues/{issue_id}", response_model=IssueResponse)
def get_issue(issue_id: str, db: SessionDep) -> IssueResponse:
issue = db.get(Issue, issue_id)
if issue is None:
raise HTTPException(status_code=404, detail="Issue could not be found")
response = issue_to_response(issue)
return response
@@ -1,27 +0,0 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.comic import IssueWork
from src.db.session import SessionDep
from src.schema.comics.issuework import IssueWorkResponse, issuework_to_response
router = APIRouter()
@router.get("/issueworks", response_model=List[IssueWorkResponse])
def get_issueworks(db: SessionDep) -> List[IssueWorkResponse]:
results: List[IssueWorkResponse] = []
worktypes = db.query(IssueWork).all()
for worktype in worktypes:
response = issuework_to_response(worktype)
results.append(response)
return results
@router.get("/issueworks/{issuework_id}", response_model=IssueWorkResponse)
def get_issuework(issuework_id: str, db: SessionDep) -> IssueWorkResponse:
worktype = db.get(IssueWork, issuework_id)
if worktype is None:
raise HTTPException(status_code=404, detail="Issuework could not be found")
response = issuework_to_response(worktype)
return response
@@ -1,37 +0,0 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.comic import Publisher
from src.db.repository.comics.publisher import get_publisher_details
from src.db.session import SessionDep
from src.schema.comics.publisher import PublisherResponse, publisher_to_response
from src.schema.comics.publisher_details import PublisherDetailsResponse
router = APIRouter()
@router.get("/publishers", response_model=List[PublisherResponse])
def get_all_publishers(db: SessionDep) -> List[PublisherResponse]:
results: List[PublisherResponse] = []
publishers = db.query(Publisher).all()
for publisher in publishers:
response: PublisherResponse = publisher_to_response(publisher)
results.append(response)
return results
@router.get("/publishers/{publisher_id}", response_model=PublisherResponse)
def get_publisher(publisher_id: str, db: SessionDep) -> PublisherResponse:
publisher = db.get(Publisher, publisher_id)
if publisher is None:
raise HTTPException(status_code=404, detail="Publisher could not be found")
response: PublisherResponse = publisher_to_response(publisher)
return response
@router.get("/publishers/details/{publisher_id}", response_model=PublisherDetailsResponse)
def get_publisher_details_by_id(publisher_id: str, db: SessionDep) -> PublisherDetailsResponse:
publisher = db.get(Publisher, publisher_id)
if publisher is None:
raise HTTPException(status_code=404, detail="Publisher could not be found")
response: PublisherDetailsResponse = get_publisher_details(publisher)
return response
@@ -1,26 +0,0 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.comic import StoryArc
from src.db.session import SessionDep
from src.schema.comics.storyarc import StoryArcResponse, storyarc_to_response
router = APIRouter()
@router.get("/storyarcs", response_model=List[StoryArcResponse])
def get_storyarcs(db: SessionDep) -> List[StoryArcResponse]:
results: List[StoryArcResponse] = []
storyarcs = db.query(StoryArc).all()
for storyarc in storyarcs:
response = storyarc_to_response(storyarc)
results.append(response)
return results
@router.get("/storyarcs/{storyarc_id}", response_model=StoryArcResponse)
def get_storyarc(story_arc_id: str, db: SessionDep) -> StoryArcResponse:
storyarc = db.get(StoryArc, story_arc_id)
if storyarc is None:
raise HTTPException(status_code=404, detail="Storyarc could not be found")
response = storyarc_to_response(storyarc)
return response
@@ -1,26 +0,0 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.comic import Volume
from src.db.session import SessionDep
from src.schema.comics.volume import VolumeResponse, volume_to_response
router = APIRouter()
@router.get("/volumes", response_model=List[VolumeResponse])
def volumes(db: SessionDep) -> List[VolumeResponse]:
results: List[VolumeResponse] = []
worktypes = db.query(Volume).all()
for worktype in worktypes:
response = volume_to_response(worktype)
results.append(response)
return results
@router.get("/volumes/{volume_id}", response_model=VolumeResponse)
def get_volume(volume_id: str, db: SessionDep) -> VolumeResponse:
worktype = db.get(Volume, volume_id)
if worktype is None:
raise HTTPException(status_code=404, detail="Volume could not be found")
response = volume_to_response(worktype)
return response
@@ -1,27 +0,0 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.comic import WorkType
from src.db.session import SessionDep
from src.schema.comics.worktype import WorktypeResponse, worktype_to_response
router = APIRouter()
@router.get("/worktypes", response_model=List[WorktypeResponse])
def get_issues(db: SessionDep) -> List[WorktypeResponse]:
results: List[WorktypeResponse] = []
worktypes = db.query(WorkType).all()
for worktype in worktypes:
response = worktype_to_response(worktype)
results.append(response)
return results
@router.get("/worktypes/{worktype_id}", response_model=WorktypeResponse)
def get_issue(worktype_id: str, db: SessionDep) -> WorktypeResponse:
worktype = db.get(WorkType, worktype_id)
if worktype is None:
raise HTTPException(status_code=404, detail="Worktype could not be found")
response = worktype_to_response(worktype)
return response
+1 -1
View File
@@ -1,6 +1,6 @@
from fastapi import APIRouter, status
from src.schema.admin.healthcheck import HealthCheck
from src.schema.admin import HealthCheck
health_router = APIRouter()
+41
View File
@@ -0,0 +1,41 @@
from datetime import timedelta
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel
from src.core.config import settings
from src.core.log_conf import logger
from src.core.security import authenticate_user, create_access_token
from src.schema.admin import Token
login_router = APIRouter()
class LoginRequest(BaseModel):
email: str | None = None
password: str | None = None
@login_router.post(
"/login",
tags=["login"],
summary="Login and get token",
response_description="Return HTTP status code 200 (OK)",
status_code=status.HTTP_200_OK,
)
def login(request: LoginRequest) -> Token:
logger.info(f"login with {request.email}")
user = authenticate_user(request.email, request.password)
scopes = ["admin", "read"]
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email, "scope": " ".join(scopes)},
expires_delta=access_token_expires,
)
return Token(access_token=access_token, token_type="bearer")
-163
View File
@@ -1,163 +0,0 @@
from typing import List
from fastapi import APIRouter, status, HTTPException
from src.core.log_conf import logger
from src.db.repository.media.actorfile import (
create_new_mediaactorfile,
delete_mediaactorfile,
)
from src.db.repository.media.file import delete_mediafile, import_mediafile
from src.db.session import SessionDep
from src.schema.media.actor import MediaActorResponse, actor_to_response
from src.schema.media.actorfile import MediaActorFileResponse, actorfile_to_response
from src.schema.media.file import (
MediaFileModel,
MediaFileResponse,
file_to_response,
file_to_model,
)
from src.db.models.media import MediaFile
router = APIRouter()
@router.get("/update-titles")
def update_titles(db: SessionDep) -> List[MediaFileResponse]:
"""
Update title for given MediaFile.
"""
results: list[MediaFileResponse] = []
files = db.query(MediaFile).filter(MediaFile.review.is_(True)).all()
for mediafile in files:
mediafile.update_title()
db.add(mediafile)
response = file_to_response(mediafile)
results.append(response)
db.commit()
return results
@router.get("/files", response_model=List[MediaFileResponse])
def get_all_files(
db: SessionDep, review: bool = False, download: bool = False
) -> List[MediaFileResponse]:
"""
Get all MediaFiles.
"""
results: List[MediaFileResponse] = []
files: List[MediaFile]
if review:
files = db.query(MediaFile).filter(MediaFile.review.is_(True)).all()
elif download:
files = db.query(MediaFile).filter(MediaFile.should_download.is_(True)).all()
else:
files = db.query(MediaFile).all()
for mediafile in files:
response = file_to_response(mediafile)
results.append(response)
return results
@router.get("/files/{file_id}", response_model=MediaFileResponse)
def get_file(file_id: str, db: SessionDep) -> MediaFileResponse:
"""
Get MediaFile with given id or return HTTPException.
"""
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
response = file_to_response(mediafile)
return response
@router.delete("/files/{file_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_file(file_id: str, db: SessionDep):
"""
Delete MediaFile by given id.
"""
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
logger.info("delete MediaFile: %s", file_id)
actor_files = mediafile.media_actor_files
logger.info("MediaActorFiles links %s", len(actor_files))
if len(actor_files) > 0:
logger.info("delete MediaActor relations first")
for actor_file in actor_files:
delete_mediaactorfile(db, actor_file.id)
delete_mediafile(db, mediafile.id)
@router.get("/files/{file_id}/actors", response_model=list[MediaActorResponse])
def get_file_actors(file_id: str, db: SessionDep) -> list[MediaActorResponse]:
"""
Get list of Actors for given MediaFile.
"""
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
actor_files = mediafile.media_actor_files
logger.info("already known actors: %s", actor_files)
results: list[MediaActorResponse] = []
for actor_file in actor_files:
response = actor_to_response(actor_file.media_actor)
results.append(response)
return results
@router.put("/files/{file_id}/actors", response_model=List[MediaActorFileResponse])
def update_file_actors(
file_id: str, db: SessionDep, actors: List[MediaActorResponse]
) -> List[MediaActorFileResponse]:
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
actor_files = mediafile.media_actor_files
logger.info("already known actors: %s", actor_files)
for actor in actors:
already_associated = False
for actor_file in actor_files:
if actor.id == actor_file.media_actor_id:
logger.info("alreay associated - do nothing")
already_associated = True
break
if not already_associated:
create_new_mediaactorfile(db, actor.id, mediafile.id)
db.refresh(mediafile)
actor_files = mediafile.media_actor_files
results: List[MediaActorFileResponse] = []
for actor_file in actor_files:
response = actorfile_to_response(actor_file)
results.append(response)
return results
@router.put("/files/{file_id}", response_model=MediaFileResponse)
def update_file(
file_id: str, db: SessionDep, info: MediaFileResponse
) -> MediaFileResponse:
"""
Update MediaFile with given id and data.
"""
media_file = db.get(MediaFile, file_id)
if not media_file:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
file_to_model(info, media_file)
db.add(media_file)
db.commit()
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be updated")
response = file_to_response(mediafile)
return response
@router.post("/files", status_code=status.HTTP_201_CREATED)
def add_file(new_file: MediaFileModel, db: SessionDep) -> MediaFileResponse:
logger.info("add mediafile %s", new_file)
try:
mediaFile: MediaFile = import_mediafile(db, new_file)
except:
raise HTTPException(status_code=409, detail="MediaFile duplicate")
response = file_to_response(mediaFile)
return response
@@ -1,25 +0,0 @@
from typing import List
from fastapi import APIRouter
from src.db.models.media import MediaArticle
from src.db.session import SessionDep
from src.schema.media.article import MediaArticleResponse, to_response
router = APIRouter()
@router.get("/articles", response_model=List[MediaArticleResponse])
def get_all_files(
db: SessionDep, review: bool = False, download: bool = False
) -> List[MediaArticleResponse]:
"""
Get all MediaVideos.
"""
results: List[MediaArticleResponse] = []
articles: List[MediaArticle]
articles = db.query(MediaArticle).all()
for article in articles:
response = to_response(article)
results.append(response)
return results
@@ -1,42 +0,0 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.media import MediaVideo
from src.db.session import SessionDep
from src.schema.media.video import MediaVideoResponse, video_to_response
router = APIRouter()
@router.get("/videos", response_model=List[MediaVideoResponse])
def get_all_videos(
db: SessionDep, review: bool = False, download: bool = False
) -> List[MediaVideoResponse]:
"""
Get all MediaVideos.
"""
results: List[MediaVideoResponse] = []
files: List[MediaVideo]
if review:
files = db.query(MediaVideo).filter(MediaVideo.review.is_(True)).all()
elif download:
files = db.query(MediaVideo).filter(MediaVideo.should_download.is_(True)).all()
else:
files = db.query(MediaVideo).all()
for mediafile in files:
response = video_to_response(mediafile)
results.append(response)
return results
@router.get("/videos/{video_id}", response_model=MediaVideoResponse)
def get_video(video_id: str, db: SessionDep) -> MediaVideoResponse:
"""
Get MediaVideo by id.
"""
video = db.get(MediaVideo, video_id)
if video is None:
raise HTTPException(status_code=404, detail="MediaVideo could not be found")
response = video_to_response(video)
return response
@@ -1,33 +1,33 @@
from typing import List
from fastapi import APIRouter, status, HTTPException
from sqlalchemy import select
from src.core.log_conf import logger
from src.db.repository.media.actor import delete_mediaactor, import_mediaactor
from src.db.repository.media import create_new_mediaactor, delete_mediaactor, get_actor_details
from src.db.session import SessionDep
from src.schema.media.actor import MediaActorModel, MediaActorResponse, actor_to_response
from src.schema.media.actor import MediaActorModel, MediaActorResponse
from src.db.models.media import MediaActor
router = APIRouter()
@router.get("/actors", response_model=List[MediaActorResponse])
def get_all_actors(db: SessionDep) -> List[MediaActorResponse]:
@router.get("/actors", response_model=list[MediaActorResponse])
# def get_all_files(db: SessionDep, review: bool = False, download: bool = False, current_user: Profile = Depends(get_current_user_from_token)) -> List[MediaFileResponse]:
def get_all_actors(db: SessionDep, review: bool = False, download: bool = False) -> list[MediaActorResponse]: # type: ignore
results: list[MediaActorResponse] = []
actors = db.query(MediaActor).all()
actors = db.scalars(select(MediaActor)).all()
for mediaactor in actors:
response = actor_to_response(mediaactor)
response = MediaActorResponse(id=mediaactor.id, name=str(mediaactor.name), url=str(mediaactor.url))
results.append(response)
return results
@router.get("/actors/{actor_id}", response_model=MediaActorResponse)
def get_actor(actor_id: str, db: SessionDep) -> MediaActorResponse:
def get_actor(actor_id: str, db: SessionDep) -> MediaActorResponse: # type: ignore
media_actor = db.get(MediaActor, actor_id)
if not media_actor:
raise HTTPException(status_code=404, detail="MediaActor could not be found")
response = actor_to_response(media_actor)
response = get_actor_details(media_actor)
return response
@router.delete("/actors/{actor_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_actor(actor_id: str, db: SessionDep):
def delete_actor(actor_id: str, db: SessionDep): # type: ignore
media_actor = db.get(MediaActor, actor_id)
if not media_actor:
raise HTTPException(status_code=404, detail="MediaActor could not be found")
@@ -35,11 +35,11 @@ def delete_actor(actor_id: str, db: SessionDep):
delete_mediaactor(db, media_actor.id)
@router.post("/actors", status_code=status.HTTP_201_CREATED)
def add_actor(new_actor: MediaActorModel, db: SessionDep) -> MediaActorResponse:
def add_actor(new_actor: MediaActorModel, db: SessionDep) -> MediaActorResponse: # type: ignore
logger.info(f"add actor {new_actor.url}")
try:
mediaActor: MediaActor = import_mediaactor(db, new_actor)
except Exception as exception:
raise HTTPException(status_code=409, detail=f"Link duplicate: {exception}")
response = actor_to_response(mediaActor)
mediaActor: MediaActor = create_new_mediaactor(new_actor, db)
except:
raise HTTPException(status_code=409, detail="Link duplicate")
response = get_actor_details(mediaActor)
return response
@@ -1,31 +1,32 @@
from typing import List
from fastapi import APIRouter, status, HTTPException
from sqlalchemy import select
from src.db.models.media import MediaActorFile
from src.db.repository.media.actorfile import delete_mediaactorfile
from src.db.repository.media import delete_mediaactorfile, get_actorfile_details
from src.db.session import SessionDep
from src.schema.media.actorfile import MediaActorFileResponse, actorfile_to_response
from src.schema.media.actorfile import MediaActorFileResponse
router = APIRouter()
@router.get("/actorfiles", response_model=List[MediaActorFileResponse])
def get_all_actorfiles(db: SessionDep) -> List[MediaActorFileResponse]:
def get_all_actorfiles(db: SessionDep) -> List[MediaActorFileResponse]: # type: ignore
results: List[MediaActorFileResponse] = []
actorfiles = db.query(MediaActorFile).all()
actorfiles = db.scalars(select(MediaActorFile)).all()
for media_actorfile in actorfiles:
response = actorfile_to_response(media_actorfile)
response = get_actorfile_details(media_actorfile)
results.append(response)
return results
@router.get("/actorfiles/{actorfile_id}", response_model=MediaActorFileResponse)
def get_actorfile(actorfile_id: str, db: SessionDep) -> MediaActorFileResponse:
def get_actorfile(actorfile_id: str, db: SessionDep) -> MediaActorFileResponse: # type: ignore
media_actorfile = db.get(MediaActorFile, actorfile_id)
if not media_actorfile:
raise HTTPException(status_code=404, detail="MediaActor could not be found")
response = actorfile_to_response(media_actorfile)
response = get_actorfile_details(media_actorfile)
return response
@router.delete("/actorfiles/{actorfile_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_actorfile(actorfile_id: str, db: SessionDep):
def delete_actorfile(actorfile_id: str, db: SessionDep): # type: ignore
media_actorfile = db.get(MediaActorFile, actorfile_id)
if not media_actorfile:
raise HTTPException(status_code=404, detail="MediaActor could not be found")
+120
View File
@@ -0,0 +1,120 @@
from fastapi import APIRouter, status, HTTPException, Depends
from sqlalchemy import select, Sequence
from src.core.log_conf import logger
from src.db.repository.media import create_new_mediaactorfile, create_new_mediafile, delete_mediafile
from src.db.session import SessionDep
from src.schema.media.actor import MediaActorResponse
from src.schema.media.actorfile import MediaActorFileResponse
from src.schema.media.file import MediaFileResponse, Link, get_file_details, set_file
from src.db.models.media import MediaFile
router = APIRouter()
@router.get("/update-titles")
def update_titles(db: SessionDep) -> list[MediaFileResponse]: # type: ignore
results: list[MediaFileResponse] = []
files = db.query(MediaFile).filter(MediaFile.review == True).all()
for mediafile in files:
mediafile.update_title()
db.add(mediafile)
response = get_file_details(mediafile)
results.append(response)
db.commit()
return results
@router.get("/files", response_model=list[MediaFileResponse])
# def get_all_files(db: SessionDep, review: bool = False, download: bool = False, current_user: Profile = Depends(get_current_user_from_token)) -> List[MediaFileResponse]:
def get_all_files(db: SessionDep, review: bool = False, download: bool = False) -> list[MediaFileResponse]: # type: ignore
results: list[MediaFileResponse] = []
files: Sequence[MediaFile]
if review:
files = db.query(MediaFile).filter(MediaFile.review == True).all() # type: ignore
elif download:
files = db.query(MediaFile).filter(MediaFile.should_download == True).all() # type: ignore
else:
files = db.scalars(select(MediaFile)).all() # type: ignore
for mediafile in files: # type: ignore
response = get_file_details(mediafile)
results.append(response)
return results
@router.get("/files/{file_id}", response_model=MediaFileResponse)
def get_file(file_id: str, db: SessionDep) -> MediaFileResponse: # type: ignore
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
response = get_file_details(mediafile)
return response
@router.delete("/files/{file_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_file(file_id: str, db: SessionDep): # type: ignore
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
logger.info(f"delete MediaFile: {file_id}")
actor_files = mediafile.media_actor_files
logger.info(f"MediaActorFiles links {len(actor_files)}")
if len(actor_files) == 0:
delete_mediafile(db, mediafile.id)
@router.get("/files/{file_id}/actors", response_model=list[MediaActorResponse])
def get_file_actors(file_id: str, db: SessionDep) -> list[MediaActorResponse]: # type: ignore
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
actor_files = mediafile.media_actor_files
logger.info(f"already known actors: {actor_files}")
results: list[MediaActorResponse] = []
for actor_file in actor_files:
response = MediaActorResponse(id=actor_file.media_actor.id, name=actor_file.media_actor.name, url=actor_file.media_actor.url)
results.append(response)
return results
@router.put("/files/{file_id}/actors", response_model=list[MediaActorFileResponse])
def update_file_actors(file_id: str, db: SessionDep, actors: list[MediaActorResponse]) -> list[MediaActorFileResponse]: # type: ignore
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
actor_files = mediafile.media_actor_files
logger.info(f"already known actors: {actor_files}")
for actor in actors:
already_associated = False
for actor_file in actor_files:
if actor.id == actor_file.media_actor_id:
logger.info("alreay associated - do nothing")
already_associated = True
break
if not already_associated:
create_new_mediaactorfile(db, actor.id, mediafile.id)
db.refresh(mediafile)
actor_files = mediafile.media_actor_files
results: list[MediaActorFileResponse] = []
for actor_file in actor_files:
response = MediaActorFileResponse(id=actor_file.id, actor_id=actor_file.media_actor_id, file_id=actor_file.media_file_id)
results.append(response)
return results
@router.put("/files/{file_id}", response_model=MediaFileResponse)
def update_file(file_id: str, db: SessionDep, info: MediaFileResponse) -> MediaFileResponse: # type: ignore
mediaFile = db.get(MediaFile, file_id)
if not mediaFile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
set_file(info, mediaFile)
db.add(mediaFile)
db.commit()
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be updated")
response = get_file_details(mediafile)
return response
@router.post("/files", status_code=status.HTTP_201_CREATED)
def add_file(new_link: Link, db: SessionDep) -> MediaFileResponse: # type: ignore
logger.info(f"add url {new_link.url}")
try:
mediaFile: MediaFile = create_new_mediafile(new_link.url, db)
except:
raise HTTPException(status_code=409, detail="Link duplicate")
response = get_file_details(mediaFile)
return response
+16
View File
@@ -0,0 +1,16 @@
from typing import List
from fastapi import APIRouter
from src.db.session import SessionDep
from src.schema.tysc.sport import SportResponse
from src.db.models.tysc import Sport
router = APIRouter()
@router.get("/sports")
def get_all_sports(db: SessionDep) -> List[SportResponse]:
results: list[SportResponse] = []
sports = db.query(Sport).all()
for sport in sports:
results.append(SportResponse(id=sport.id, name=sport.name))
return results
-27
View File
@@ -1,27 +0,0 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import Card
from src.db.session import SessionDep
from src.schema.tysc.card import CardResponse, to_response
router = APIRouter()
@router.get("/cards")
def get_all_cards(db: SessionDep) -> List[CardResponse]:
results: List[CardResponse] = []
cards = db.query(Card).all()
for card in cards:
response = to_response(card)
results.append(response)
return results
@router.get("/cards/{card_id}", response_model=CardResponse)
def get_card(card_id: str, db: SessionDep) -> CardResponse:
card = db.get(Card, card_id)
if card is None:
raise HTTPException(status_code=404, detail="Card could not be found")
response = to_response(card)
return response
@@ -1,27 +0,0 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import CardSet
from src.db.session import SessionDep
from src.schema.tysc.cardset import CardSetResponse, to_response
router = APIRouter()
@router.get("/cardsets")
def get_all_cardsets(db: SessionDep) -> List[CardSetResponse]:
results: List[CardSetResponse] = []
cardsets = db.query(CardSet).all()
for cardset in cardsets:
response = to_response(cardset)
results.append(response)
return results
@router.get("/cardsets/{cardset_id}", response_model=CardSetResponse)
def get_cardset(cardset_id: str, db: SessionDep) -> CardSetResponse:
cardset = db.get(CardSet, cardset_id)
if cardset is None:
raise HTTPException(status_code=404, detail="Cardset could not be found")
response = to_response(cardset)
return response
@@ -1,27 +0,0 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import FieldPosition
from src.db.session import SessionDep
from src.schema.tysc.fieldposition import FieldPositionResponse, to_response
router = APIRouter()
@router.get("/positions")
def get_all_positions(db: SessionDep) -> List[FieldPositionResponse]:
results: list[FieldPositionResponse] = []
positions = db.query(FieldPosition).all()
for position in positions:
response = to_response(position)
results.append(response)
return results
@router.get("/positions/{position_id}", response_model=FieldPositionResponse)
def get_position(position_id: str, db: SessionDep) -> FieldPositionResponse:
position = db.get(FieldPosition, position_id)
if position is None:
raise HTTPException(status_code=404, detail="Fieldposition could not be found")
response = to_response(position)
return response
@@ -1,27 +0,0 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import Player
from src.db.session import SessionDep
from src.schema.tysc.player import PlayerResponse, to_response
router = APIRouter()
@router.get("/players")
def get_all_players(db: SessionDep) -> List[PlayerResponse]:
results: List[PlayerResponse] = []
players = db.query(Player).all()
for player in players:
response = to_response(player)
results.append(response)
return results
@router.get("/players/{player_id}", response_model=PlayerResponse)
def get_player(player_id: str, db: SessionDep) -> PlayerResponse:
player = db.get(Player, player_id)
if player is None:
raise HTTPException(status_code=404, detail="Player could not be found")
response = to_response(player)
return response
@@ -1,27 +0,0 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import Rooster
from src.db.session import SessionDep
from src.schema.tysc.rooster import RoosterResponse, to_response
router = APIRouter()
@router.get("/roosters")
def get_all_roosters(db: SessionDep) -> List[RoosterResponse]:
results: list[RoosterResponse] = []
roosters = db.query(Rooster).all()
for rooster in roosters:
response = to_response(rooster)
results.append(response)
return results
@router.get("/roosters/{rooster_id}", response_model=RoosterResponse)
def get_rooster(rooster_id: str, db: SessionDep) -> RoosterResponse:
rooster = db.get(Rooster, rooster_id)
if rooster is None:
raise HTTPException(status_code=404, detail="Rooster could not be found")
response = to_response(rooster)
return response
@@ -1,28 +0,0 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.core.log_conf import logger
from src.db.session import SessionDep
from src.schema.tysc.sport import SportResponse, to_response
from src.db.models.tysc import Sport
router = APIRouter()
@router.get("/sports")
def get_all_sports(db: SessionDep) -> List[SportResponse]:
results: list[SportResponse] = []
sports = db.query(Sport).all()
for sport in sports:
response = to_response(sport)
results.append(response)
return results
@router.get("/sports/{sport_id}", response_model=SportResponse)
def get_sport(sport_id: str, db: SessionDep) -> SportResponse:
sport = db.get(Sport, sport_id)
if sport is None:
raise HTTPException(status_code=404, detail="Sport could not be found")
logger.debug(f"create SportResponse for {sport}")
response: SportResponse = to_response(sport)
return response
-27
View File
@@ -1,27 +0,0 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import Team
from src.db.session import SessionDep
from src.schema.tysc.team import TeamResponse, to_response
router = APIRouter()
@router.get("/teams")
def get_all_teams(db: SessionDep) -> List[TeamResponse]:
results: list[TeamResponse] = []
teams = db.query(Team).all()
for team in teams:
response = to_response(team)
results.append(response)
return results
@router.get("/teams/{team_id}", response_model=TeamResponse)
def get_team(team_id: str, db: SessionDep) -> TeamResponse:
team = db.get(Team, team_id)
if team is None:
raise HTTPException(status_code=404, detail="Team could not be found")
response = to_response(team)
return response
@@ -1,33 +0,0 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import Vendor
from src.db.session import SessionDep
from src.schema.tysc.vendor import VendorResponse, to_response
router = APIRouter()
@router.get("/vendors")
def get_all_vendors(db: SessionDep) -> List[VendorResponse]:
"""
retrieve all vendors as json response.
"""
results: list[VendorResponse] = []
vendors = db.query(Vendor).all()
for vendor in vendors:
response = to_response(vendor)
results.append(response)
return results
@router.get("/vendors/{vendor_id}", response_model=VendorResponse)
def get_vendor(vendor_id: str, db: SessionDep) -> VendorResponse:
"""
retrieve vendor by id as json response.
"""
vendor = db.get(Vendor, vendor_id)
if vendor is None:
raise HTTPException(status_code=404, detail="Vendor could not be found")
response = to_response(vendor)
return response
@@ -3,35 +3,29 @@ from fastapi import APIRouter, HTTPException, status
from sqlalchemy import select
from src.core.log_conf import logger
from src.core.security import CurrentUser
from src.db.models.admin import Profile
from src.db.repository.user import create_new_profile
from src.db.repository.user import create_new_profile, get_profile_details
from src.db.session import SessionDep
from src.schema.user.profile import ProfileResponse, ProfileModel, to_response
from src.schema.user.profile import ProfileResponse, ProfileModel
router = APIRouter()
@router.get("/profile", response_model=ProfileModel)
async def read_profile(current_user: CurrentUser):
return current_user
@router.get("/profiles", response_model=List[ProfileResponse])
def get_all_profiles(db: SessionDep) -> List[ProfileResponse]:
def get_all_profiles(db: SessionDep) -> List[ProfileResponse]: # type: ignore
results: List[ProfileResponse] = []
profiles = db.scalars(select(Profile)).all()
for profile in profiles:
response = to_response(profile)
response = get_profile_details(profile)
results.append(response)
return results
@router.get("/profiles/{profile_id}", response_model=ProfileResponse)
def get_profile(profile_id: str, db: SessionDep) -> ProfileResponse:
def get_profile(profile_id: str, db: SessionDep) -> ProfileResponse: # type: ignore
profile = db.get(Profile, profile_id)
if not profile:
raise HTTPException(status_code=404, detail="Profile could not be found")
response = to_response(profile)
raise HTTPException(status_code=404, detail="MediaActor could not be found")
response = get_profile_details(profile)
return response
@router.delete("/profiles/{profile_id}", status_code=status.HTTP_204_NO_CONTENT)
@@ -43,11 +37,11 @@ def delete_profile(profile_id: str, db: SessionDep): # type: ignore
delete_profile(profile_id=profile_id, db=db)
@router.post("/profiles", status_code=status.HTTP_201_CREATED)
def add_profile(new_profile: ProfileModel, db: SessionDep) -> ProfileResponse:
logger.info(f"add profile {new_profile.username}")
def add_profile(new_profile: ProfileModel, db: SessionDep) -> ProfileResponse: # type: ignore
logger.info(f"add profile {new_profile.user_name}")
try:
profile: Profile = create_new_profile(new_profile, db)
except:
raise HTTPException(status_code=409, detail="Profile duplicate")
response = to_response(profile)
response = get_profile_details(profile)
return response
@@ -1,26 +0,0 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.admin import Assignment
from src.db.session import SessionDep
from src.schema.user.assignment import AssignmentResponse, to_response
router = APIRouter()
@router.get("/assignments", response_model=List[AssignmentResponse])
def get_all_assignments(db: SessionDep) -> List[AssignmentResponse]:
results: List[AssignmentResponse] = []
assignments = db.query(Assignment).all()
for assignment in assignments:
response = to_response(assignment)
results.append(response)
return results
@router.get("/assignments/{assignment_id}", response_model=AssignmentResponse)
def get_assignment(assignment_id: str, db: SessionDep) -> AssignmentResponse:
assignment = db.get(Assignment, assignment_id)
if assignment is None:
raise HTTPException(status_code=404, detail="Assignment could not be found")
response = to_response(assignment)
return response
@@ -1,26 +0,0 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.admin import Permission
from src.db.session import SessionDep
from src.schema.user.permission import PermissionResponse, to_response
router = APIRouter()
@router.get("/permissions", response_model=List[PermissionResponse])
def get_all_permissions(db: SessionDep) -> List[PermissionResponse]:
results: List[PermissionResponse] = []
permissions = db.query(Permission).all()
for permission in permissions:
response = to_response(permission)
results.append(response)
return results
@router.get("/permissions/{permission_id}", response_model=PermissionResponse)
def get_permission(permission_id: str, db: SessionDep) -> PermissionResponse:
permission = db.get(Permission, permission_id)
if permission is None:
raise HTTPException(status_code=404, detail="Permission could not be found")
response = to_response(permission)
return response
@@ -1,19 +0,0 @@
from typing import List
from fastapi import APIRouter
from src.db.models.admin import Token
from src.db.session import SessionDep
from src.schema.user.token import TokenResponse, to_response
router = APIRouter()
@router.get("/tokens", response_model=List[TokenResponse])
def get_all_profiles(db: SessionDep) -> List[TokenResponse]:
results: List[TokenResponse] = []
tokens = db.query(Token).all()
for token in tokens:
response = to_response(token)
results.append(response)
return results
-35
View File
@@ -1,35 +0,0 @@
import json
import time
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from src.core.log_conf import logger
class RequestLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next) -> Response:
start_time = time.time()
path = request.url.path
if path != "/health":
# Log request info
request_info ={
"method": request.method,
"path": path,
"client_ip": request.client.host if request.client else "unknown"
}
logger.info("Incoming: %s", json.dumps(request_info))
# Process request
response = await call_next(request)
if path != "/health":
# Log response info
duration_ms = (time.time()- start_time)*1000
response_info = {
"status_code": response.status_code,
"duration_ms": round(duration_ms, 2)
}
logger.info("completed: %s", json.dumps(response_info))
return response
+55 -92
View File
@@ -1,74 +1,60 @@
import logging
from datetime import datetime, timedelta, timezone
from typing import Annotated, List, Optional
from typing import Annotated, Dict, List, Optional
import bcrypt
from fastapi import Depends, HTTPException, Security, status
#from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
from fastapi.security import (
#OAuth2,
OAuth2PasswordBearer,
SecurityScopes
)
#from fastapi.security.utils import get_authorization_scheme_param
from fastapi import Depends, HTTPException, Request, Security, status
from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
from fastapi.security import OAuth2, OAuth2PasswordBearer, SecurityScopes
from fastapi.security.utils import get_authorization_scheme_param
from jose import JWTError, jwt
from pydantic import ValidationError
from src.core.config import settings
from src.core.log_conf import logger
from src.db.models.admin import Profile
from src.db.repository.admin import (
get_profile_by_username,
get_profile_by_email,
is_database_empty,
)
from src.db.repository.admin import get_profile, is_database_empty
from src.db.session import SessionLocal
from src.schema.admin.token import TokenData
from src.schema.user.profile import ProfileModel, to_model
from src.schema.admin import ProfileModel, TokenData
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="/token",
scopes={
"me": "read",
"admin": "read",
"ROLE_ADMIN": "admin",
"ROLE_MEDIA": "media",
"ROLE_USER": "user",
},
tokenUrl="/api/login/token",
scopes={"me": "read", "admin": "read"},
)
# class OAuth2PasswordBearerWithCookie(OAuth2):
# def __init__(
# self,
# tokenUrl: str,
# scheme_name: Optional[str] = None,
# scopes: Optional[Dict[str, str]] = None,
# auto_error: bool = True,
# ):
# if not scopes:
# scopes = {}
# flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes}) # type: ignore
# super().__init__(flows=flows, scheme_name=scheme_name, auto_error=auto_error)
class OAuth2PasswordBearerWithCookie(OAuth2):
def __init__(
self,
tokenUrl: str,
scheme_name: Optional[str] = None,
scopes: Optional[Dict[str, str]] = None,
auto_error: bool = True,
):
if not scopes:
scopes = {}
flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes}) # type: ignore
super().__init__(flows=flows, scheme_name=scheme_name, auto_error=auto_error)
# async def __call__(self, request: Request) -> Optional[str]:
# authorization: str = request.cookies.get("access_token") # changed to accept access token from httpOnly Cookie
# scheme, param = get_authorization_scheme_param(authorization)
# if not authorization or scheme.lower() != "bearer":
# if self.auto_error:
# raise HTTPException(
# status_code=status.HTTP_401_UNAUTHORIZED,
# detail="Not authenticated",
# headers={"WWW-Authenticate": "Bearer"},
# )
# else:
# return None
# return param
async def __call__(self, request: Request) -> Optional[str]:
authorization: str = request.cookies.get("access_token") # changed to accept access token from httpOnly Cookie
scheme, param = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "bearer":
if self.auto_error:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
else:
return None
return param
def authenticate_user_by_email(email: str, password: str) -> Optional[Profile]:
def authenticate_user(username: str, password: str) -> Optional[Profile]:
with SessionLocal() as db:
user = get_profile_by_email(email=email, db=db)
user = get_profile(username=username, db=db)
logger.debug(user)
if not user:
if is_database_empty(db):
@@ -79,26 +65,7 @@ def authenticate_user_by_email(email: str, password: str) -> Optional[Profile]:
return None
else:
if bcrypt.checkpw(password.encode(), user.password.encode()):
logger.info("User successful authenticated")
else:
logger.info("Authentication failed!")
return user
def authenticate_user_by_username(username: str, password: str) -> Optional[Profile]:
with SessionLocal() as db:
user = get_profile_by_username(username=username, db=db)
logger.debug(user)
if not user:
if is_database_empty(db):
logger.info("database is empty, use temporary access")
user = Profile()
user.email = "init_user@thpeetz.de"
return user
return None
else:
if bcrypt.checkpw(password.encode(), user.password.encode()):
logger.info("User successful authenticated")
print("User successful authenticated")
else:
logger.info("Authentication failed!")
return user
@@ -136,19 +103,17 @@ async def get_current_user(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
username: str = payload.get("sub") # type: ignore
logger.info("username/email extracted is %s", username)
logger.info("username/email extracted is ", username)
if username is None:
raise credentials_exception
scope: str = payload.get("scope", "")
token_scopes: List[str] = scope.split(" ")
token_data = TokenData(scopes=token_scopes, username=username)
except (JWTError, ValidationError):
logger.info("Exception raised", exc_info=True)
raise credentials_exception
with SessionLocal() as db:
user = get_profile_by_username(username=str(token_data.username), db=db)
user = get_profile(username=token_data.username, db=db) # type: ignore
if user is None:
logger.info("user not found")
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_scopes:
@@ -163,14 +128,19 @@ async def get_current_user(
async def get_current_active_user(
current_user: Annotated[Profile, Security(get_current_user, scopes=["me"])],
) -> ProfileModel:
if not current_user.enabled:
if not current_user.enabled: # type: ignore
raise HTTPException(status_code=400, detail="Inactive user")
user_model = to_model(current_user)
user_model = ProfileModel(
username=current_user.user_name,
email=current_user.email, # type: ignore
first_name=current_user.first_name,
last_name=current_user.last_name, # type: ignore
active=current_user.enabled,
) # type: ignore
return user_model
def get_current_user_from_token(token: str = Depends(oauth2_scheme)):
""" """
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
@@ -179,21 +149,14 @@ def get_current_user_from_token(token: str = Depends(oauth2_scheme)):
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
username: Optional[str] = payload.get("sub")
logger.info("username/email extracted is %s", username)
username: str = payload.get("sub") # type: ignore
logger.info("username/email extracted is ", username)
if username is None:
raise credentials_exception
except JWTError as exception:
raise credentials_exception from exception
except JWTError:
raise credentials_exception
with SessionLocal() as db:
user = get_profile_by_email(email=username, db=db)
user = get_profile(username=username, db=db)
if user is None:
user = get_profile_by_username(username=username, db=db)
if user is None:
raise credentials_exception
raise credentials_exception
return user
UserDep = Annotated[Profile, Depends(get_current_user_from_token)]
CurrentUser = Annotated[Profile, Depends(get_current_active_user)]
+22 -24
View File
@@ -1,52 +1,50 @@
from typing import List
from sqlalchemy import Column, ForeignKey, Integer, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.orm import relationship
from src.db.models.base import Base, BaseMixin
class Article(Base, BaseMixin):
__tablename__ = 'article'
title: Mapped[str] = mapped_column(unique=True)
title = Column(String, unique=True)
article_authors = relationship("ArticleAuthor")
class Author(Base, BaseMixin):
__tablename__ = 'author'
first_name: Mapped[str]
last_name: Mapped[str]
article_authors: Mapped[List["ArticleAuthor"]] = relationship(back_populates="author")
book_authors: Mapped[List["BookAuthor"]] = relationship(back_populates="author")
first_name = Column(String)
last_name = Column(String)
article_authors = relationship("ArticleAuthor")
book_authors = relationship("BookAuthor")
class BookshelfPublisher(Base, BaseMixin):
__tablename__ = 'bookshelf_publisher'
name: Mapped[str] = mapped_column(unique=True)
books: Mapped[List["Book"]] = relationship(back_populates="publisher")
name = Column(String, unique=True)
books = relationship("Book")
class Book(Base, BaseMixin):
__tablename__ = 'book'
isbn: Mapped[str] = mapped_column(unique=True)
title: Mapped[str]
year: Mapped[int] = mapped_column(nullable=False)
publisher_id: Mapped[str] = mapped_column(ForeignKey("bookshelf_publisher.id"), nullable=False)
publisher: Mapped[BookshelfPublisher] = relationship(back_populates="books")
book_authors: Mapped[List["BookAuthor"]] = relationship(back_populates="book")
isbn = Column(String, unique=True)
title = Column(String)
year = Column(Integer, nullable=False)
publisher_id = Column(String, ForeignKey('bookshelf_publisher.id'), nullable=False)
publisher = relationship('BookshelfPublisher', back_populates="books")
book_authors = relationship("BookAuthor")
class ArticleAuthor(Base, BaseMixin):
__tablename__ = 'article_author'
article_id: Mapped[str] = mapped_column(ForeignKey("article.id"), nullable=False)
article: Mapped[Article] = relationship(back_populates="article_authors")
author_id: Mapped[str] = mapped_column(ForeignKey("author.id"), nullable=False)
author: Mapped[Author] = relationship(back_populates="article_authors")
article_id = Column(String, ForeignKey('article.id'), nullable=False)
article = relationship('Article', back_populates="article_authors")
author_id = Column(String, ForeignKey('author.id'), nullable=False)
author = relationship('Author', back_populates="article_authors")
class BookAuthor(Base, BaseMixin):
__tablename__ = 'book_author'
author_id: Mapped[str] = mapped_column(ForeignKey("author.id"), nullable=False)
author: Mapped[Author] = relationship(back_populates="book_authors")
book_id: Mapped[str] = mapped_column(ForeignKey("book.id"), nullable=False)
book: Mapped[Book] = relationship(back_populates="book_authors")
author_id = Column(String, ForeignKey('author.id'), nullable=False)
author = relationship('Author', back_populates="book_authors")
book_id = Column(String, ForeignKey('book.id'), nullable=False)
book = relationship('Book', back_populates="book_authors")
+19 -39
View File
@@ -14,71 +14,51 @@ from src.db.models.base import Base, BaseMixin, BaseVideoMixin
class MediaFile(Base, BaseMixin, BaseVideoMixin):
"""
MediaFile represents video link.
"""
__tablename__ = "media_file"
media_actor_files: Mapped[List["MediaActorFile"]] = relationship(
back_populates="media_file"
)
__tablename__ = 'media_file'
media_actor_files: Mapped[List["MediaActorFile"]] = relationship(back_populates="media_file")
def __repr__(self):
return f"MediaFile({self.id} {self.title} {self.title})"
return f'MediaFile({self.id} {self.title} {self.title})'
def __str__(self):
return f"{self.title}({self.id})"
def update_title(self):
"""
Update title from url.
"""
return f'{self.title}({self.id})'
class MediaActor(Base, BaseMixin):
__tablename__ = "media_actor"
__tablename__ = 'media_actor'
name: Mapped[str]
url: Mapped[Optional[str]] = mapped_column(unique=True)
media_actor_files = relationship("MediaActorFile")
def __repr__(self) -> str:
return f"MediaActor({self.id} {self.name} {self.url})"
return f'MediaActor({self.id} {self.name} {self.url})'
def __str__(self) -> str:
return f"{self.url}({self.id})"
return f'{self.url}({self.id})'
class MediaActorFile(Base, BaseMixin):
__tablename__ = "media_actor_file"
media_actor_id: Mapped[str] = mapped_column(
ForeignKey("media_actor.id"), nullable=False
)
__tablename__ = 'media_actor_file'
media_actor_id: Mapped[str] = mapped_column(ForeignKey("media_actor.id"), nullable=False)
media_actor: Mapped[MediaActor] = relationship(back_populates="media_actor_files")
media_file_id: Mapped[str] = mapped_column(
ForeignKey("media_file.id"), nullable=True
)
media_file_id: Mapped[str] = mapped_column(ForeignKey("media_file.id"), nullable=True)
media_file: Mapped[MediaFile] = relationship(back_populates="media_actor_files")
def __repr__(self):
return f"MediaActorFile({self.id} {self.media_actor_id} {self.media_file_id})"
return f'MediaActorFile({self.id} {self.media_actor_id} {self.media_file_id})'
def __str__(self) -> str:
return f"{self.id} {self.media_actor_id} {self.media_file_id}"
return f'{self.id} {self.media_actor_id} {self.media_file_id}'
class MediaArticle(Base, BaseMixin):
__tablename__ = "media_article"
__tablename__ = 'media_article'
review: Mapped[bool]
title: Mapped[str]
url: Mapped[str] = mapped_column(unique=True)
class MediaVideo(Base, BaseMixin):
"""
MediaFile represents video link.
"""
__tablename__ = "media_video"
__tablename__ = 'media_video'
cloud_link: Mapped[str]
file_name: Mapped[str]
path: Mapped[str]
@@ -88,10 +68,10 @@ class MediaVideo(Base, BaseMixin):
should_download: Mapped[bool]
def __repr__(self):
return f"MediaFile({self.id} {self.title} {self.url})"
return f'MediaFile({self.id} {self.title} {self.url})'
def __str__(self):
if self.title is None:
return f"{self.url}({self.id})"
return f'{self.url}({self.id})'
else:
return f"{self.title}({self.id})"
return f'{self.title}({self.id})'
+3 -7
View File
@@ -1,16 +1,12 @@
from typing import Optional
from typing import AnyStr, Optional
from sqlalchemy.orm import Session
from src.db.models.admin import Profile
def get_profile_by_username(username: str, db: Session) -> Optional[Profile]:
profile = db.query(Profile).filter(Profile.user_name == username).first()
return profile
def get_profile_by_email(email: str, db: Session) -> Optional[Profile]:
profile = db.query(Profile).filter(Profile.email == email).first()
def get_profile(username: AnyStr, db: Session) -> Optional[Profile]:
profile = db.query(Profile).filter(Profile.email == username).first()
return profile
def is_database_empty(db: Session) -> bool:
+32 -17
View File
@@ -4,27 +4,29 @@ from sqlalchemy.orm import Session
from src.core.log_conf import logger
from src.db.models.comic import Comic, Issue
from src.schema.comics.artist import artist_to_response
from src.schema.comics.comic import ComicSchema, comic_to_response
from src.schema.comics.artist import ArtistResponse
from src.schema.comics.comic import ComicResponse, ComicSchema
from src.schema.comics.comic_details import ComicDetailsResponse, ComicWorktypeArtistResponse
from src.schema.comics.issue import IssueResponse, issue_to_response
from src.schema.comics.issue import IssueResponse
from src.schema.comics.issue_details import IssueDetailsResponse
from src.schema.comics.publisher import publisher_to_response
from src.schema.comics.volume import VolumeResponse, volume_to_response
from src.schema.comics.worktype import worktype_to_response
from src.schema.comics.publisher import PublisherResponse
from src.schema.comics.volume import VolumeResponse
from src.schema.comics.worktype import WorktypeResponse
def list_comics(db: Session) -> List[Comic]:
comics = db.query(Comic).all()
return comics
def get_issue_details(issue: Issue) -> IssueDetailsResponse:
volume = None
if issue.volume:
volume = volume_to_response(issue.volume)
response = IssueDetailsResponse(
id=issue.id,
issue_number=str(issue.issue_number),
in_stock=bool(issue.in_stock),
is_read=bool(issue.is_read),
comic=comic_to_response(issue.comic),
volume=volume
comic=ComicResponse(id=issue.comic.id, title=issue.comic.title, completed=issue.comic.completed),
volume=VolumeResponse(id=issue.volume.id, name=issue.volume.name)
)
return response
@@ -34,26 +36,39 @@ def update_comic(new_comic: ComicSchema, comic_id: str, db: Session) -> Optional
comic: Optional[Comic] = db.get(Comic, comic_id)
return comic
def get_short_info(comic: Comic) -> ComicResponse:
response = ComicResponse(
id=comic.id,
title=str(comic.title),
completed=bool(comic.completed == 1)
)
return response
def get_comic_details(comic: Comic) -> ComicDetailsResponse:
volumes: List[VolumeResponse] = []
for volume in comic.volumes:
volumes.append(volume_to_response(volume))
volumes.append(VolumeResponse(id=volume.id, name=volume.name))
issues: List[IssueResponse] = []
for issue in comic.issues:
issues.append(issue_to_response(issue))
issues.append(IssueResponse(
id=issue.id,
issue_number=issue.issue_number,
in_stock=issue.in_stock,
is_read=issue.is_read
))
works: List[ComicWorktypeArtistResponse] = []
works_map: Dict[str, ComicWorktypeArtistResponse] = {}
for work in comic.comic_works:
worktype_id = work.work_type.id
if worktype_id in works_map:
artist = artist_to_response(work.artist)
artist = ArtistResponse(id=work.artist.id, name=work.artist.name)
works_map[worktype_id].artists.append(artist)
logger.info(f"add artist to response map: {artist} -> {works_map}")
print(f"add artist to response map: {artist} -> {works_map}")
else:
works_map[worktype_id] = ComicWorktypeArtistResponse(
worktype=worktype_to_response(work.work_type),
artists=[artist_to_response(work.artist)]
worktype=WorktypeResponse(id=worktype_id, name=work.work_type.name),
artists=[ArtistResponse(id=work.artist.id, name=work.artist.name)]
)
for value in works_map.values():
works.append(value)
@@ -64,7 +79,7 @@ def get_comic_details(comic: Comic) -> ComicDetailsResponse:
completed=bool(comic.completed),
current_order=bool(comic.current_order),
weblink=str(comic.weblink),
publisher=publisher_to_response(comic.publisher),
publisher=PublisherResponse(id=comic.publisher.id, name=comic.publisher.name),
issues=issues,
volumes=volumes,
works=works
+105
View File
@@ -0,0 +1,105 @@
from sqlalchemy.orm import Session
import uuid
from datetime import datetime
from src.core.log_conf import logger
from src.db.models.media import MediaActor, MediaActorFile, MediaFile, MediaVideo
from src.schema.media.actor import MediaActorModel, MediaActorResponse
from src.schema.media.actorfile import MediaActorFileResponse
from src.webapps.media.forms import AddLinkForm
def create_new_video(video: AddLinkForm, db: Session) -> MediaVideo:
print(video.url)
media_video = MediaVideo()
media_video.id = str(uuid.uuid4())
media_video.url = video.url
media_video.created_date = datetime.now()
media_video.last_modified_date = datetime.now()
media_video.review = True
media_video.should_download = True
db.add(media_video)
db.commit()
db.refresh(media_video)
print(media_video)
return media_video
def create_new_mediafile(link: str, db: Session) -> MediaFile:
logger.info("create MediaFile with url {link}")
media_file: MediaFile = MediaFile()
media_file.id = str(uuid.uuid4())
media_file.url = link
media_file.created_date = datetime.now()
media_file.last_modified_date = datetime.now()
media_file.version = 0
media_file.review = True
media_file.should_download = True
db.add(media_file)
db.commit()
db.refresh(media_file)
logger.info(f"created {media_file}")
return media_file
def delete_mediafile(db: Session, media_file_id: str):
logger.info(f"delete MediaFile with id {media_file_id}")
media_file = db.get(MediaFile, media_file_id)
db.delete(media_file)
db.commit()
def create_new_mediaactor(new_actor: MediaActorModel, db: Session) -> MediaActor:
logger.info(f"create MediaActor with url {new_actor.url}")
media_actor: MediaActor = MediaActor()
media_actor.id = str(uuid.uuid4())
if new_actor.name is not None:
media_actor.name = new_actor.name
media_actor.url = new_actor.url
media_actor.created_date = datetime.now()
media_actor.last_modified_date = datetime.now()
media_actor.version = 0
db.add(media_actor)
db.commit()
db.refresh(media_actor)
logger.info(f"created {media_actor}")
return media_actor
def delete_mediaactor(db: Session, actor_id: str):
logger.info(f"delete MediaActor with id {actor_id}")
media_actor = db.get(MediaActor, actor_id)
if media_actor is not None:
actor_files = media_actor.media_actor_files
for actor_file in actor_files:
delete_mediaactorfile(db, actorfile_id=actor_file.id)
db.refresh(media_actor)
db.delete(media_actor)
db.commit()
def get_actor_details(media_actor: MediaActor) -> MediaActorResponse:
reponse: MediaActorResponse = MediaActorResponse(id=media_actor.id, name=str(media_actor.name), url=str(media_actor.url))
return reponse
def create_new_mediaactorfile(db: Session, actor_id: str, file_id: str) -> MediaActorFile:
logger.info(f"create MediaActorFile with actor {actor_id} and file {file_id}")
media_actor_file: MediaActorFile = MediaActorFile()
media_actor_file.id = str(uuid.uuid4())
media_actor_file.created_date = datetime.now()
media_actor_file.last_modified_date = datetime.now()
media_actor_file.version = 0
media_actor_file.media_actor_id = actor_id
media_actor_file.media_file_id = file_id
db.add(media_actor_file)
db.commit()
db.refresh(media_actor_file)
return media_actor_file
def delete_mediaactorfile(db: Session, actorfile_id: str):
logger.info(f"delete MediaActorFile with id {actorfile_id}")
media_actorfile = db.get(MediaActorFile, actorfile_id)
db.delete(media_actorfile)
db.commit()
def get_actorfile_details(media_actorfile: MediaActorFile) -> MediaActorFileResponse:
response: MediaActorFileResponse = MediaActorFileResponse(
id=media_actorfile.id,
file_id=str(media_actorfile.media_file_id),
actor_id=str(media_actorfile.media_actor_id)
)
return response
@@ -1,66 +0,0 @@
from datetime import datetime
import uuid
from sqlalchemy.orm import Session
from src.core.log_conf import logger
from src.db.models.media import MediaActor
from src.db.repository.media.actorfile import delete_mediaactorfile
from src.schema.media.actor import MediaActorModel
def create_new_mediaactor(new_actor: MediaActorModel, db: Session) -> MediaActor:
logger.info(f"create MediaActor with url {new_actor.url}")
media_actor: MediaActor = MediaActor()
media_actor.id = str(uuid.uuid4())
if new_actor.name is not None:
media_actor.name = new_actor.name
media_actor.url = new_actor.url
media_actor.created_date = datetime.now()
media_actor.last_modified_date = datetime.now()
media_actor.version = 0
db.add(media_actor)
db.commit()
db.refresh(media_actor)
logger.info(f"created {media_actor}")
return media_actor
def delete_mediaactor(db: Session, actor_id: str):
logger.info(f"delete MediaActor with id {actor_id}")
media_actor = db.get(MediaActor, actor_id)
if media_actor is not None:
actor_files = media_actor.media_actor_files
for actor_file in actor_files:
delete_mediaactorfile(db, actorfile_id=actor_file.id)
db.refresh(media_actor)
db.delete(media_actor)
db.commit()
def import_mediaactor(db: Session, new_actor: MediaActorModel) -> MediaActor:
"""
import MediaFile and set missing values with default ones.
"""
logger.info("import MediaActor with %s", new_actor)
media_actor: MediaActor = MediaActor()
media_actor.id = new_actor.id
if new_actor.created_date:
media_actor.created_date = new_actor.created_date
else:
media_actor.created_date = datetime.now()
if new_actor.last_modified_date:
media_actor.last_modified_date = new_actor.last_modified_date
else:
media_actor.last_modified_date = datetime.now()
media_actor.version = new_actor.version
if new_actor.name:
media_actor.name = new_actor.name
else:
media_actor.name = ""
if new_actor.url:
media_actor.url = new_actor.url
else:
media_actor.url = ""
db.add(media_actor)
db.commit()
db.refresh(media_actor)
return media_actor
@@ -1,49 +0,0 @@
from datetime import datetime
import uuid
from sqlalchemy.orm import Session
from src.core.log_conf import logger
from src.db.models.media import MediaActorFile
from src.schema.media.actorfile import MediaActorFileModel
def create_new_mediaactorfile(
db: Session, actor_id: str, file_id: str
) -> MediaActorFile:
"""
Create relation for MediaFile and MediaActor
"""
logger.info("create MediaActorFile with actor %s and file %s", actor_id, file_id)
media_actor_file: MediaActorFile = MediaActorFile()
media_actor_file.id = str(uuid.uuid4())
media_actor_file.created_date = datetime.now()
media_actor_file.last_modified_date = datetime.now()
media_actor_file.version = 0
media_actor_file.media_actor_id = actor_id
media_actor_file.media_file_id = file_id
db.add(media_actor_file)
db.commit()
db.refresh(media_actor_file)
return media_actor_file
def delete_mediaactorfile(db: Session, actorfile_id: str):
"""
Delete relation between MediaFile and MediaActor.
"""
logger.info("delete MediaActorFile with id %s", actorfile_id)
media_actorfile = db.get(MediaActorFile, actorfile_id)
db.delete(media_actorfile)
db.commit()
def import_mediaactorfile(
db: Session, new_actorfile: MediaActorFileModel
) -> MediaActorFile:
"""
Import MediaFile and set missing values with default ones.
"""
logger.info("import MediaActorFile with %s", new_actorfile)
media_actor_file: MediaActorFile = MediaActorFile()
return media_actor_file
@@ -1,78 +0,0 @@
from datetime import datetime
import uuid
from sqlalchemy.orm import Session
from src.core.log_conf import logger
from src.db.models.media import MediaFile
from src.schema.media.file import MediaFileModel
def create_new_mediafile(link: str, db: Session) -> MediaFile:
"""
Create MediaFile with gievne URL.
"""
logger.info("create MediaFile with url {link}")
media_file: MediaFile = MediaFile()
media_file.id = str(uuid.uuid4())
media_file.url = link
media_file.created_date = datetime.now()
media_file.last_modified_date = datetime.now()
media_file.version = 0
media_file.review = True
media_file.should_download = True
db.add(media_file)
db.commit()
db.refresh(media_file)
logger.info("created %s", media_file)
return media_file
def delete_mediafile(db: Session, media_file_id: str):
"""
Delete MediaFile with given ID from db.
"""
logger.info("delete MediaFile with id %s", media_file_id)
media_file = db.get(MediaFile, media_file_id)
db.delete(media_file)
db.commit()
def import_mediafile(db: Session, new_file: MediaFileModel) -> MediaFile:
"""
import MediaActor and set missing values with defautl ones.
"""
logger.info("import MediaFile with %s", new_file)
media_file: MediaFile = MediaFile()
media_file.id = new_file.id
if new_file.created_date:
media_file.created_date = new_file.created_date
else:
media_file.created_date = datetime.now()
if new_file.last_modified_date:
media_file.last_modified_date = new_file.last_modified_date
else:
media_file.last_modified_date = datetime.now()
media_file.version = new_file.version
if new_file.title:
media_file.title = new_file.title
else:
media_file.title = ""
if new_file.file_name:
media_file.file_name = new_file.file_name
else:
media_file.file_name = ""
if new_file.cloud_link:
media_file.cloud_link = new_file.cloud_link
else:
media_file.cloud_link = ""
if new_file.url:
media_file.url = new_file.url
else:
media_file.url = ""
media_file.review = new_file.review
media_file.should_download = new_file.should_download
db.add(media_file)
db.commit()
db.refresh(media_file)
return media_file
@@ -1,23 +0,0 @@
from datetime import datetime
import uuid
from sqlalchemy.orm import Session
from src.db.models.media import MediaVideo
from src.webapps.media.forms import AddLinkForm
def create_new_video(video: AddLinkForm, db: Session) -> MediaVideo:
print(video.url)
media_video = MediaVideo()
media_video.id = str(uuid.uuid4())
media_video.url = str(video.url)
media_video.created_date = datetime.now()
media_video.last_modified_date = datetime.now()
media_video.review = True
media_video.should_download = True
db.add(media_video)
db.commit()
db.refresh(media_video)
print(media_video)
return media_video
+11 -3
View File
@@ -5,14 +5,15 @@ from src.core.log_conf import logger
from src.db.models.admin import Assignment, Profile
from sqlalchemy.orm import Session
from src.schema.user.profile import ProfileModel
from src.schema.user.profile import ProfileModel, ProfileResponse
def create_new_profile(new_profile: ProfileModel, db: Session) -> Profile:
logger.info(f"create MediaActor with url {new_profile.username}")
logger.info(f"create MediaActor with url {new_profile.user_name}")
profile: Profile = Profile()
profile.id = str(uuid.uuid4())
profile.user_name = new_profile.username
profile.user_name = new_profile.user_name
profile.first_name = new_profile.first_name
profile.last_name = new_profile.last_name
profile.created_date = datetime.now()
@@ -34,6 +35,13 @@ def delete_profile(db: Session, profile_id: str):
db.delete(profile)
db.commit()
def get_profile_details(profile: Profile) -> ProfileResponse:
reponse: ProfileResponse = ProfileResponse(
id=profile.id,
user_name=str(profile.user_name)
)
return reponse
def delete_assignment(db: Session, assignment_id: str) -> None:
logger.info(f"delete Assignment with id {assignment_id}")
assignment: Optional[Assignment] = db.get(Assignment, assignment_id)
+2 -4
View File
@@ -12,10 +12,8 @@ engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)
def get_db() -> Generator[Session, None, None]:
""" """
def get_db() -> Generator:
with SessionLocal() as db:
yield db
SessionDep = Annotated[Session, Depends(get_db)]
SessionDep: type[Session] = Annotated[Session, Depends(get_db)]
+5 -7
View File
@@ -6,10 +6,9 @@ from fastapi.staticfiles import StaticFiles
from src.apis.base import api_router
from src.apis.version1.healthcheck import health_router
from src.apis.version1.admin.login import login_router
from src.apis.version1.login import login_router
from src.core.config import settings
from src.core.log_conf import logger
from src.core.middleware import RequestLoggingMiddleware
from src.db.models.base import Base
from src.db.session import engine
from src.db.utils import check_db_connected, check_db_disconnected
@@ -24,10 +23,10 @@ async def lifespan(app: FastAPI):
def include_router(app: FastAPI):
app.include_router(api_router, tags=["api"])
app.include_router(web_app_router, tags=["webapp"])
app.include_router(health_router, tags=["admin"])
app.include_router(login_router, tags=["webapp"])
app.include_router(api_router)
app.include_router(web_app_router)
app.include_router(health_router)
app.include_router(login_router)
def configure_static(app: FastAPI):
@@ -42,7 +41,6 @@ def add_middle_ware(app: FastAPI):
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(RequestLoggingMiddleware)
def create_tables():
+27
View File
@@ -0,0 +1,27 @@
from typing import Optional, List
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
scopes: List[str] = []
class ProfileModel(BaseModel):
username: str
email: str
first_name: str
last_name: str
active: bool
class HealthCheck(BaseModel):
"""
Health check model
"""
status: str = "ok"
@@ -1,9 +0,0 @@
from pydantic import BaseModel
class HealthCheck(BaseModel):
"""
Health check model
"""
status: str = "ok"
-8
View File
@@ -1,8 +0,0 @@
from typing import Optional
from pydantic import BaseModel
class LoginRequest(BaseModel):
email: Optional[str] = None
password: Optional[str] = None
@@ -1,34 +0,0 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.admin import MailAccount
class MailAccountResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
host: str
port: int
protocol: str
user_name: str
password: str
start_tls: bool
def to_response(account: MailAccount) -> MailAccountResponse:
response: MailAccountResponse = MailAccountResponse(
id=account.id,
created_date=account.created_date,
last_modified_date=account.last_modified_date,
version=account.version,
host=account.host,
port=account.port,
protocol=account.protocol,
user_name=account.user_name,
password=account.password,
start_tls=account.start_tls,
)
return response
-13
View File
@@ -1,13 +0,0 @@
from typing import List, Optional
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
scopes: List[str] = []
@@ -1,23 +0,0 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.bookshelf import Article
class ArticleResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
title: str
def to_response(article: Article) -> ArticleResponse:
response: ArticleResponse = ArticleResponse(
id=article.id,
created_date=article.created_date,
last_modified_date=article.last_modified_date,
version=article.version,
title=article.title
)
return response
@@ -1,25 +0,0 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.bookshelf import ArticleAuthor
class ArticleAuthorResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
article_id: str
author_id: str
def to_response(articleauthor: ArticleAuthor) -> ArticleAuthorResponse:
response: ArticleAuthorResponse = ArticleAuthorResponse(
id=articleauthor.id,
created_date=articleauthor.created_date,
last_modified_date=articleauthor.last_modified_date,
version=articleauthor.version,
article_id=articleauthor.article_id,
author_id=articleauthor.author_id
)
return response
-25
View File
@@ -1,25 +0,0 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.bookshelf import Author
class AuthorResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
first_name: str
last_name: str
def to_response(author: Author) -> AuthorResponse:
response: AuthorResponse = AuthorResponse(
id=author.id,
created_date=author.created_date,
last_modified_date=author.last_modified_date,
version=author.version,
first_name=author.first_name,
last_name=author.last_name
)
return response
-29
View File
@@ -1,29 +0,0 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.bookshelf import Book
class BookResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
isbn: str
title: str
year: int
publisher_id: str
def to_response(book: Book) -> BookResponse:
response: BookResponse = BookResponse(
id=book.id,
created_date=book.created_date,
last_modified_date=book.last_modified_date,
version=book.version,
isbn=book.isbn,
title=book.title,
year=book.year,
publisher_id=book.publisher_id
)
return response
@@ -1,25 +0,0 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.bookshelf import BookAuthor
class BookAuthorResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
author_id: str
book_id: str
def to_response(bookauthor: BookAuthor) -> BookAuthorResponse:
response: BookAuthorResponse = BookAuthorResponse(
id=bookauthor.id,
created_date=bookauthor.created_date,
last_modified_date=bookauthor.last_modified_date,
version=bookauthor.version,
author_id=bookauthor.author_id,
book_id=bookauthor.book_id
)
return response
@@ -1,23 +0,0 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.bookshelf import BookshelfPublisher
class PublisherResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
def to_response(publisher: BookshelfPublisher) -> PublisherResponse:
response: PublisherResponse = PublisherResponse(
id=publisher.id,
created_date=publisher.created_date,
last_modified_date=publisher.last_modified_date,
version=publisher.version,
name=publisher.name
)
return response
-21
View File
@@ -1,10 +1,5 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
from src.db.models.comic import Artist
class ArtistCreation(BaseModel):
id: str
@@ -12,23 +7,7 @@ class ArtistCreation(BaseModel):
class ArtistResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
weblink: Optional[str]
def artist_to_response(artist: Artist) -> ArtistResponse:
response: ArtistResponse = ArtistResponse(
id=artist.id,
created_date=artist.created_date,
last_modified_date=artist.last_modified_date,
version=artist.version,
name=artist.name,
weblink=artist.weblink
)
return response
class AddArtist(BaseModel):
id: str
+1 -32
View File
@@ -1,46 +1,15 @@
"""
Schema definitions for Comics.
"""
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, AnyUrl
from src.db.models.comic import Comic
from src.core.log_conf import logger
class ComicResponse(BaseModel):
"""
Pydantic model for returning Comic objects.
"""
id: str
created_date: datetime
last_modified_date: datetime
version: int
title: str
publisher_id: str
current_order: bool
completed: bool
weblink: Optional[str]
def comic_to_response(comic: Comic) -> ComicResponse:
response: ComicResponse = ComicResponse(
id=comic.id,
created_date=comic.created_date,
last_modified_date=comic.last_modified_date,
version=comic.version,
title=comic.title,
publisher_id=comic.publisher_id,
current_order=comic.current_order,
completed=comic.completed,
weblink=comic.weblink
)
return response
class ComicSchema(BaseModel):
"""
Pydantic model for uploading Comic object.
"""
id: str
title: str
weblink: Optional[AnyUrl]
-32
View File
@@ -1,32 +0,0 @@
"""
Model definitions for ComicWork.
"""
from datetime import datetime
from pydantic import BaseModel
from src.db.models.comic import ComicWork
class ComicWorkResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
comic_id: str
artist_id: str
work_type_id: str
def comicwork_to_response(comicwork: ComicWork) -> ComicWorkResponse:
response: ComicWorkResponse = ComicWorkResponse(
id=comicwork.id,
created_date=comicwork.created_date,
last_modified_date=comicwork.last_modified_date,
version=comicwork.version,
comic_id=comicwork.comic_id,
artist_id=comicwork.artist_id,
work_type_id=comicwork.work_type_id,
)
return response
-30
View File
@@ -1,38 +1,8 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
from src.db.models.comic import Issue
class IssueResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
issue_number: str
title: Optional[str]
published_on: Optional[datetime]
in_stock: bool
is_read: bool
comic_id: str
volume_id: Optional[str]
story_arc_id: Optional[str]
def issue_to_response(issue: Issue) -> IssueResponse:
response: IssueResponse = IssueResponse(
id=issue.id,
created_date=issue.created_date,
last_modified_date=issue.last_modified_date,
version=issue.version,
issue_number=issue.issue_number,
title=issue.title,
published_on=issue.published_on,
in_stock=issue.in_stock,
is_read=issue.is_read,
comic_id=issue.comic_id,
volume_id=issue.volume_id,
story_arc_id=issue.story_arc_id
)
return response
@@ -11,4 +11,3 @@ class IssueDetailsResponse(BaseModel):
is_read: bool
comic: ComicResponse
volume: VolumeResponse | None
-32
View File
@@ -1,32 +0,0 @@
"""
Model definitions for IssueWork.
"""
from datetime import datetime
from pydantic import BaseModel
from src.db.models.comic import IssueWork
class IssueWorkResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
issue_id: str
artist_id: str
work_type_id: str
def issuework_to_response(issuework: IssueWork) -> IssueWorkResponse:
response: IssueWorkResponse = IssueWorkResponse(
id=issuework.id,
created_date=issuework.created_date,
last_modified_date=issuework.last_modified_date,
version=issuework.version,
issue_id=issuework.issue_id,
artist_id=issuework.artist_id,
work_type_id=issuework.work_type_id,
)
return response
-24
View File
@@ -1,30 +1,6 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
from src.db.models.comic import Publisher
class PublisherResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
weblink: Optional[str]
parent_publisher_id: Optional[str]
def publisher_to_response(publisher: Publisher) -> PublisherResponse:
response: PublisherResponse = PublisherResponse(
id=publisher.id,
created_date=publisher.created_date,
last_modified_date=publisher.last_modified_date,
version=publisher.version,
name=publisher.name,
weblink=publisher.weblink,
parent_publisher_id=publisher.parent_publisher_id
)
return response
-30
View File
@@ -1,30 +0,0 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
from src.db.models.comic import StoryArc
class StoryArcResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
comic_id: str
volume_id: Optional[str]
class AddLink(BaseModel):
url: str
def storyarc_to_response(storyarc: StoryArc) -> StoryArcResponse:
response: StoryArcResponse = StoryArcResponse(
id=storyarc.id,
created_date=storyarc.created_date,
last_modified_date=storyarc.last_modified_date,
version=storyarc.version,
name=storyarc.name,
comic_id=storyarc.comic_id,
volume_id=storyarc.volume_id
)
return response
-19
View File
@@ -1,25 +1,6 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.comic import Volume
class VolumeResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
comic_id: str
def volume_to_response(volume: Volume) -> VolumeResponse:
response: VolumeResponse = VolumeResponse(
id=volume.id,
created_date=volume.created_date,
last_modified_date=volume.last_modified_date,
version=volume.version,
name=volume.name,
comic_id=volume.comic_id
)
return response
-17
View File
@@ -1,26 +1,9 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.comic import WorkType
class AddWorkType(BaseModel):
worktype: str
class WorktypeResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
def worktype_to_response(worktype: WorkType) -> WorktypeResponse:
response: WorktypeResponse = WorktypeResponse(
id=worktype.id,
created_date=worktype.created_date,
last_modified_date=worktype.last_modified_date,
version=worktype.version,
name=worktype.name
)
return response
+2 -24
View File
@@ -1,34 +1,12 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
from src.db.models.media import MediaActor
class MediaActorResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: Optional[str]
url: Optional[str]
def actor_to_response(actor: MediaActor) -> MediaActorResponse:
response: MediaActorResponse = MediaActorResponse(
id=actor.id,
created_date=actor.created_date,
last_modified_date=actor.last_modified_date,
version=actor.version,
name=actor.name,
url=actor.url
)
return response
url: str
class MediaActorModel(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: Optional[str]
url: Optional[str]
url: str
+2 -27
View File
@@ -1,5 +1,4 @@
from datetime import datetime
from typing import Optional
from src.db.models.media import MediaActorFile
from pydantic import BaseModel
@@ -7,29 +6,5 @@ from pydantic import BaseModel
class MediaActorFileResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
media_actor_id: str
media_file_id: Optional[str]
def actorfile_to_response(actorfile: MediaActorFile) -> MediaActorFileResponse:
response: MediaActorFileResponse = MediaActorFileResponse(
id=actorfile.id,
created_date=actorfile.created_date,
last_modified_date=actorfile.last_modified_date,
version=actorfile.version,
media_actor_id=actorfile.media_actor_id,
media_file_id=actorfile.media_file_id
)
return response
class MediaActorFileModel(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
media_actor_id: str
media_file_id: Optional[str]
file_id: str
actor_id: str
-30
View File
@@ -1,30 +0,0 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
from src.db.models.media import MediaArticle
class MediaArticleResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
review: bool = False
title: Optional[str] = None
url: Optional[str] = None
class AddLink(BaseModel):
url: str
def to_response(video: MediaArticle) -> MediaArticleResponse:
response: MediaArticleResponse = MediaArticleResponse(
id=video.id,
created_date=video.created_date,
last_modified_date=video.last_modified_date,
version=video.version,
review=video.review,
title=video.title,
url=video.url,
)
return response
+17 -58
View File
@@ -1,5 +1,4 @@
from datetime import datetime
from typing import Optional
from src.db.models.media import MediaFile
from pydantic import BaseModel
@@ -7,75 +6,35 @@ from pydantic import BaseModel
class MediaFileResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
title: Optional[str]
file_name: Optional[str]
cloud_link: Optional[str]
url: Optional[str]
title: str | None = None
file_name: str | None = None
cloud_link: str | None = None
url: str | None = None
review: bool = False
should_download: bool = False
class Link(BaseModel):
url: str
def file_to_response(mediafile: MediaFile) -> MediaFileResponse:
"""
Create MediaFileResponse from model.
"""
response: MediaFileResponse = MediaFileResponse(
id=mediafile.id,
created_date=mediafile.created_date,
last_modified_date=mediafile.last_modified_date,
version=mediafile.version,
title=mediafile.title,
file_name=mediafile.file_name,
cloud_link=mediafile.cloud_link,
url=mediafile.url,
review=mediafile.review,
should_download=mediafile.should_download,
)
def get_file_details(mediafile: MediaFile) -> MediaFileResponse:
response = MediaFileResponse(id=mediafile.id,
title=mediafile.title,
file_name=mediafile.file_name,
cloud_link=mediafile.cloud_link,
url=str(mediafile.url),
review=mediafile.review,
should_download=mediafile.should_download)
#print(f"id: {mediafile.id}: review: {response.review} <- {mediafile.review}")
#print(f"id: {mediafile.id}: download: {response.should_download} <- {mediafile.should_download}")
return response
def file_to_model(model: MediaFileResponse, mediafile: MediaFile) -> MediaFile:
"""
Set data of response to model.
"""
def set_file(model: MediaFileResponse, mediafile: MediaFile) -> None:
mediafile.file_name = model.file_name
mediafile.cloud_link = model.cloud_link
if model.url is not None:
mediafile.url = model.url
else:
mediafile.url = ""
if model.title is not None:
mediafile.title = model.title
else:
mediafile.title = ""
mediafile.last_modified_date = datetime.now()
mediafile.review = model.review
mediafile.should_download = model.should_download
return mediafile
class MediaFileModel(BaseModel):
"""
Pydantic model to import MediaFile.
"""
id: str
created_date: Optional[datetime]
last_modified_date: Optional[datetime]
version: int = 0
title: Optional[str]
file_name: Optional[str]
cloud_link: Optional[str]
url: Optional[str]
review: bool = True
should_download: bool = True
class Link(BaseModel):
"""
PYdantic model for uploading url.
"""
url: str
-33
View File
@@ -1,38 +1,5 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
from src.db.models.media import MediaVideo
class MediaVideoResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
cloud_link: Optional[str] = None
file_name: Optional[str] = None
path: Optional[str] = None
review: bool = False
title: Optional[str] = None
url: Optional[str] = None
should_download: bool = False
class AddLink(BaseModel):
url: str
def video_to_response(video: MediaVideo) -> MediaVideoResponse:
response: MediaVideoResponse = MediaVideoResponse(
id=video.id,
created_date=video.created_date,
last_modified_date=video.last_modified_date,
version=video.version,
cloud_link=video.cloud_link,
file_name=video.file_name,
path=video.path,
review=video.review,
title=video.title,
url=video.url,
should_download=video.should_download
)
return response
-31
View File
@@ -1,31 +0,0 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import Card
class CardResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
card_number: int
year: int
card_set_id: str
rooster_id: str
vendor_id: str
def to_response(card: Card) -> CardResponse:
response: CardResponse = CardResponse(
id=card.id,
created_date=card.created_date,
last_modified_date=card.last_modified_date,
version=card.version,
card_number=card.card_number,
year=card.year,
card_set_id=card.card_set_id,
rooster_id=card.rooster_id,
vendor_id=card.vendor_id
)
return response
-30
View File
@@ -1,30 +0,0 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import CardSet
class CardSetResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
parallel_set: bool
insert_set: bool
vendor_id: str
def to_response(cardset: CardSet) -> CardSetResponse:
response: CardSetResponse = CardSetResponse(
id=cardset.id,
created_date=cardset.created_date,
last_modified_date=cardset.last_modified_date,
version=cardset.version,
name=cardset.name,
parallel_set=cardset.parallel_set,
insert_set=cardset.insert_set,
vendor_id=cardset.vendor_id
)
return response
@@ -1,28 +0,0 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import FieldPosition
class FieldPositionResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
short_name: str
sport_id: str
def to_response(fieldposition: FieldPosition) -> FieldPositionResponse:
response: FieldPositionResponse = FieldPositionResponse(
id=fieldposition.id,
created_date=fieldposition.created_date,
last_modified_date=fieldposition.last_modified_date,
version=fieldposition.version,
name=fieldposition.name,
short_name=fieldposition.short_name,
sport_id=fieldposition.sport_id
)
return response
-25
View File
@@ -1,25 +0,0 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import Player
class PlayerResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
first_name: str
last_name: str
def to_response(player: Player) -> PlayerResponse:
response: PlayerResponse = PlayerResponse(
id=player.id,
created_date=player.created_date,
last_modified_date=player.last_modified_date,
version=player.version,
first_name=player.first_name,
last_name=player.last_name
)
return response
-36
View File
@@ -1,36 +0,0 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import Rooster
class RoosterResponse(BaseModel):
"""
Pydantic model for returning Rooster objects.
"""
id: str
created_date: datetime
last_modified_date: datetime
version: int
year: int
team_id: str
player_id: str
position_id: str
def to_response(rooster: Rooster) -> RoosterResponse:
"""
convert database object to response object (Pydantic).
"""
response: RoosterResponse = RoosterResponse(
id=rooster.id,
created_date=rooster.created_date,
last_modified_date=rooster.last_modified_date,
version=rooster.version,
year=rooster.year,
team_id=rooster.team_id,
player_id=rooster.player_id,
position_id=rooster.position_id
)
return response
+2 -18
View File
@@ -1,24 +1,8 @@
from datetime import datetime
from typing import AnyStr
from pydantic import BaseModel
from src.db.models.tysc import Sport
class SportResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
id: AnyStr
name: str
def to_response(sport: Sport) -> SportResponse:
response: SportResponse = SportResponse(
id=sport.id,
created_date=sport.created_date,
last_modified_date=sport.last_modified_date,
version=sport.version,
name=sport.name
)
return response
-28
View File
@@ -1,28 +0,0 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import Team
class TeamResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
short_name: str
sport_id: str
def to_response(team: Team) -> TeamResponse:
response: TeamResponse = TeamResponse(
id=team.id,
created_date=team.created_date,
last_modified_date=team.last_modified_date,
version=team.version,
name=team.name,
short_name=team.short_name,
sport_id=team.sport_id
)
return response
-32
View File
@@ -1,32 +0,0 @@
"""
class and function for json response objects for Vendor.
"""
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import Vendor
class VendorResponse(BaseModel):
"""
Pydantic model for Vendor reponse object.
"""
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
def to_response(vendor: Vendor) -> VendorResponse:
"""
convert database object Vendor to response object VendorResponse.
"""
reponse: VendorResponse = VendorResponse(
id=vendor.id,
created_date=vendor.created_date,
last_modified_date=vendor.last_modified_date,
version=vendor.version,
name=vendor.name
)
return reponse
-26
View File
@@ -1,26 +0,0 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.admin import Assignment
class AssignmentResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
profile_id: str
permission_id: str
def to_response(assignment: Assignment) -> AssignmentResponse:
response: AssignmentResponse = AssignmentResponse(
id=assignment.id,
created_date=assignment.created_date,
last_modified_date=assignment.last_modified_date,
version=assignment.version,
profile_id=assignment.profile_id,
permission_id=assignment.permission_id
)
return response
-24
View File
@@ -1,24 +0,0 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.admin import Permission
class PermissionResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
def to_response(permission: Permission) -> PermissionResponse:
response: PermissionResponse = PermissionResponse(
id=permission.id,
created_date=permission.created_date,
last_modified_date=permission.last_modified_date,
version=permission.version,
name=permission.name
)
return response
+1 -38
View File
@@ -1,5 +1,3 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.admin import Profile
@@ -7,44 +5,9 @@ from src.db.models.admin import Profile
class ProfileResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
first_name: str
last_name: str
user_name: str
email: str
password: str
enabled: bool
def to_response(profile: Profile) -> ProfileResponse:
response: ProfileResponse = ProfileResponse(
id=profile.id,
created_date=profile.created_date,
last_modified_date=profile.last_modified_date,
version=profile.version,
first_name=profile.first_name,
last_name=profile.last_name,
user_name=profile.user_name,
email=profile.email,
password=profile.password,
enabled=profile.enabled
)
return response
class ProfileModel(BaseModel):
username: str
email: str
user_name: str
first_name: str
last_name: str
active: bool
def to_model(profile: Profile) -> ProfileModel:
model: ProfileModel = ProfileModel(
username=profile.user_name,
email=profile.email,
first_name=profile.first_name,
last_name=profile.last_name,
active=profile.enabled,
)
return model
-32
View File
@@ -1,32 +0,0 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.admin import Token
class TokenResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
token: str
name: str
last_used_date: datetime
enabled: bool
profile_id: str
def to_response(token: Token) -> TokenResponse:
response: TokenResponse = TokenResponse(
id=token.id,
created_date=token.created_date,
last_modified_date=token.last_modified_date,
version=token.version,
token=token.token,
name=token.name,
last_used_date=token.last_used_date,
enabled=token.enabled,
profile_id=token.profile_id
)
return response
+8 -6
View File
@@ -1,5 +1,9 @@
from src.apis.version1.admin.login import login_for_token_cookie
from fastapi import APIRouter
# from src.apis.version1.admin import login_for_access_token
from fastapi.security import OAuth2PasswordRequestForm
from src.apis.version1.admin import login_for_token_cookie
from src.db.session import SessionDep
from fastapi import APIRouter, Depends
from fastapi import HTTPException
from fastapi import Request
from fastapi.templating import Jinja2Templates
@@ -16,7 +20,7 @@ def login(request: Request):
@router.post("/login/")
async def validate_login(request: Request):
async def login(request: Request):
form = LoginForm(request)
await form.load_data()
if await form.is_valid():
@@ -27,8 +31,6 @@ async def validate_login(request: Request):
return response
except HTTPException:
form.__dict__.update(msg="")
errors = form.__dict__.get("errors")
if errors:
errors.append("Incorrect Email or Password")
form.__dict__.get("errors").append("Incorrect Email or Password")
return templates.TemplateResponse("auth/login.html", form.__dict__)
return templates.TemplateResponse("auth/login.html", form.__dict__)
+9 -9
View File
@@ -9,15 +9,15 @@ from src.webapps.media import route_actors, route_media, route_videos
templates = Jinja2Templates(directory="src/templates")
api_router = APIRouter()
api_router.include_router(route_comics.router, tags=["webapp"])
api_router.include_router(route_artists.router, tags=["webapp"])
api_router.include_router(route_worktype.router, tags=["webapp"])
api_router.include_router(route_media.router, tags=["webapp"])
api_router.include_router(route_actors.router, tags=["webapp"])
api_router.include_router(route_videos.router, tags=["webapp"])
api_router.include_router(route_login.router, tags=["webapp"])
api_router.include_router(route_admin.router, tags=["webapp"])
api_router.include_router(route_comics.router)
api_router.include_router(route_artists.router)
api_router.include_router(route_worktype.router)
api_router.include_router(route_media.router)
api_router.include_router(route_actors.router)
api_router.include_router(route_videos.router)
api_router.include_router(route_login.router)
api_router.include_router(route_admin.router)
@api_router.get("/", tags=["webapp"])
@api_router.get("/")
def home(request: Request, msg: str | None = None):
return templates.TemplateResponse("index.html", {"request": request, "msg": msg})
+2 -2
View File
@@ -11,7 +11,7 @@ from src.schema.comics.comic import ComicSchema
from src.webapps.comic.forms.comic import ValidateComicForm
templates = Jinja2Templates(directory="src/templates")
router = APIRouter(include_in_schema=True, prefix="/comic")
router = APIRouter(include_in_schema=False, prefix="/comic")
@router.get("/comics")
def get_comics(db: SessionDep, request: Request, msg: str | None = None):
@@ -58,7 +58,7 @@ async def validate_comic(request: Request, db: SessionDep, comic_id: str, action
if form.is_valid():
try:
comic = ComicSchema(**form.__dict__)
comic = update_comic(new_comic=comic, comic_id=comic_id, db=db)
comic = update_comic(comic=comic, comic_id=comic_id, db=db)
return RedirectResponse(f"/comic/comics/{comic.id}", status_code=status.HTTP_303_SEE_OTHER)
except Exception as e:
print(e)
+1 -1
View File
@@ -10,7 +10,7 @@ class AddLinkForm:
async def load_data(self):
form = await self.request.form()
self.url = str(form.get("url"))
self.url = form.get("url")
def is_valid(self):
if not self.url or not (self.url.__contains__("http")):

Some files were not shown because too many files have changed in this diff Show More