38 Commits

Author SHA1 Message Date
tpeetz 0d5ee0dd63 implement REST API for adding MediaFileActors
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 11s
2026-06-14 01:58:57 +02:00
tpeetz b039ae97a9 add subproject kontor-rails
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 4s
2026-06-01 23:47:57 +02:00
tpeetz 71bd7641dc add checking dirs to cjeck_kontor.py
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 3s
2026-05-31 23:17:32 +02:00
tpeetz e70b3ab486 fix problem when deleting MediaFile with MediaActor relations
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 3s
2026-05-31 17:47:27 +02:00
tpeetz 6c4ff8bcad check for duplicate links
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 4s
2026-05-31 00:18:40 +02:00
Thomas Peetz c885f6cc02 add missing endpoints for creating items
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 4s
2026-05-26 22:43:04 +02:00
tpeetz 0f9c90b883 synchronize data between configured servers
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 4s
2026-05-23 20:32:04 +02:00
Thomas Peetz 8d684908e6 update cli scripts to use REST API
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 2s
2026-05-21 16:51:38 +02:00
Thomas Peetz 40b498ed2a complete loading single items
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 3s
2026-05-21 13:43:53 +02:00
tpeetz 6269b54ee8 check if more than one server is configured before syncing
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 9s
2026-05-20 21:43:17 +02:00
Thomas Peetz bbc00ae2cc add endpoints to get items by id
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 4s
2026-05-20 18:34:44 +02:00
Thomas Peetz 944420802f added all missing endpoints
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 3s
2026-05-20 14:08:10 +02:00
tpeetz d23d1f306a add missing comics endpoints
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 4s
2026-05-19 23:00:19 +02:00
tpeetz 6077f685e0 Remove obsolete endpoints (#89)
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 4s
Remove endpoints api/login/token and api/login/profile

---------

Co-authored-by: Thomas Peetz <thomas.peetz@cimt-ag.de>
Reviewed-on: #89
2026-05-19 17:52:30 +00:00
tpeetz f9f4a70a79 add mailaccount endpoint
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 3s
2026-05-18 22:01:20 +02:00
Thomas Peetz 71724ac800 add missing endpoints
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 5s
2026-05-18 16:45:56 +02:00
tpeetz 6dd8e12218 secure media endpoints
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 2s
2026-05-17 21:48:40 +02:00
tpeetz cd033f458d add missing endpoints
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 5s
2026-05-17 19:52:00 +02:00
tpeetz 1b58ec8e27 remove obsolete scripts 2026-05-17 01:08:50 +02:00
Thomas Peetz 85b5168445 fix typo in workflow demo.yaml
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 4s
2026-05-05 18:05:19 +02:00
Thomas Peetz e8763fe635 test gitea actions
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 13s
2026-05-05 18:02:08 +02:00
Thomas Peetz 69e58e6dfb Add environment variable for DB server 2026-05-04 15:21:18 +02:00
Thomas Peetz d8914627a3 move *.service files to subdirectory container and add service file for standalone operations in standalone 2026-05-04 15:02:41 +02:00
Thomas Peetz b256e20491 renamed systemd unit files 2026-04-27 07:08:41 +02:00
tpeetz a8df860752 Merge branch 'feature/44-evaluate-typst-for-documentation' into 'develop/0.3.0'
Resolve "Evaluate Typst for documentation"

Closes #44

See merge request tpeetz/kontor!42
2026-04-26 21:09:54 +02:00
tpeetz 4e88b52dc3 use separate files for chapters 2026-04-26 21:09:06 +02:00
tpeetz 9772c9f554 add subprojekt kontor-doc 2026-04-26 16:08:17 +02:00
tpeetz e81d473a69 Einrichtung Build 2026-04-23 23:14:26 +02:00
tpeetz aca9a1aa4a integrate kontor-angular 2026-04-19 20:05:50 +02:00
tpeetz 57129c2d37 integration von kontor-angular 2026-04-18 00:29:25 +02:00
tpeetz e09f7cddf3 rename Dockerfile to Containerfile and integrate kontor-angular in pod kontor 2026-04-17 00:18:50 +02:00
Thomas Peetz a68dd05794 update kontor-api to reflect api changes 2026-04-15 14:38:38 +02:00
Thomas Peetz 62e22c56c4 update usage of api for login by email or username 2026-04-15 11:32:18 +02:00
Thomas Peetz 7aa6a12044 improve konto-vue/Containerfile by explicitly stating images 2026-04-15 10:26:35 +02:00
tpeetz 39a0759b0b fix OAuth authentication 2026-04-12 20:08:42 +02:00
tpeetz e82f2825ae update systemd service files and add script to copy these files 2026-04-12 17:51:19 +02:00
tpeetz 74d29ae8ef Merge branch 'feature/43-typeerror-not-all-arguments-converted-during-string-formatting' into 'develop/0.3.0'
Resolve "TypeError: not all arguments converted during string formatting"

Closes #43

See merge request tpeetz/kontor!41
2026-04-12 17:14:46 +02:00
tpeetz 614c5f20aa Resolve "TypeError: not all arguments converted during string formatting" 2026-04-12 17:14:46 +02:00
305 changed files with 7884 additions and 2057 deletions
+19
View File
@@ -0,0 +1,19 @@
name: Gitea Actions Demo
run-name: ${{ gitea.actor }} ist testing out Gitea Actions
on: [push]
jobs:
Explore-Gitea-Actions:
runs-on: inky
steps:
- run: echo "The job was automatically triggered by a ${{ gitea.event_name }} event."
- run: echo "This job is now running on a ${{ runner.os }} server hosted by Gitea!"
- run: echo "The name of your branch is ${{ gitea.ref }} and your repository is ${{ gitea.repository }}."
- name: Check out repository code
uses: actions/checkout@v4
- run: echo "The ${{ gitea.repository }} repository has been cloned to the runner."
- run: echo "The workflow is now ready to test your code on the runner."
- name: List files in the directory
run: |
ls ${{ gitea.workspace }}
- run: echo "The job's status is ${{ job.status }}."
@@ -1,5 +1,5 @@
### STAGE 1: Build ### ### STAGE 1: Build ###
FROM node:22.15-alpine AS build FROM docker.io/node:22-alpine AS build
WORKDIR /app WORKDIR /app
COPY package.json package-lock.json ./ COPY package.json package-lock.json ./
RUN npm install RUN npm install
@@ -7,8 +7,9 @@ COPY . .
RUN npm run build RUN npm run build
### STAGE 2: Run ### ### STAGE 2: Run ###
FROM nginx:1.17.1-alpine FROM docker.io/library/nginx:stable-alpine
COPY nginx.conf /etc/nginx/conf.d/default.conf COPY nginx.conf /etc/nginx/conf.d/default.conf
COPY --from=build /app/dist/kontor-angular/browser /usr/share/nginx/html COPY --from=build /app/dist/kontor-angular/browser /usr/share/nginx/html
EXPOSE 80 EXPOSE 80
CMD ["nginx", "-g", "daemon off;"]
+1
View File
@@ -1,4 +1,5 @@
server { server {
listen 8800;
# Root-Verzeichnis für den Server setzen (wir kopieren unsere Anwendung hierher) # Root-Verzeichnis für den Server setzen (wir kopieren unsere Anwendung hierher)
root /usr/share/nginx/html; root /usr/share/nginx/html;
+236 -10
View File
@@ -1,12 +1,238 @@
from fastapi import APIRouter """
add router for different parts (like comics, tysc, media)
"""
from src.apis.version1 import comic, mediaactor, mediafile, mediaactorfile, tysc, admin, user from fastapi import APIRouter, Depends
from src.apis.version1.admin import mailaccount
from src.apis.version1.comics import (
artist,
publisher,
comic,
issue,
worktype,
volume,
storyarc,
comicwork,
issuework,
)
from src.apis.version1.media import (
actor,
file,
mediaactorfile,
mediavideo,
mediaarticle,
)
from src.apis.version1.tysc import (
card,
cardset,
fieldposition,
player,
rooster,
sport,
team,
vendor,
)
from src.core.security import get_current_user_from_token
api_router = APIRouter(prefix="/api") from src.apis.version1.user import assignment, permission, profile, token
api_router.include_router(comic.router, prefix="/comics", tags=["comics"]) from src.apis.version1.bookshelf import article, bookshelf_publisher, book, author, articleauthor, bookauthor
api_router.include_router(mediafile.router, prefix="/media", tags=["media"])
api_router.include_router(mediaactor.router, prefix="/media", tags=["media"]) api_router = APIRouter(prefix="/api")
api_router.include_router(mediaactorfile.router, prefix="/media", tags=["media"]) api_router.include_router(
api_router.include_router(tysc.router, prefix="/tysc", tags=["tysc"]) comic.router,
api_router.include_router(admin.router, prefix="/login", tags=["login"]) prefix="/comics",
api_router.include_router(user.router, prefix="/user", tags=["user"]) tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
publisher.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
artist.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
issue.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
worktype.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
volume.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
storyarc.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
comicwork.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
issuework.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
file.router,
prefix="/media",
tags=["media"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
mediavideo.router,
prefix="/media",
tags=["media"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
mediaarticle.router,
prefix="/media",
tags=["media"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
actor.router,
prefix="/media",
tags=["media"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
mediaactorfile.router,
prefix="/media",
tags=["media"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
sport.router,
prefix="/tysc",
tags=["tysc"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
player.router,
prefix="/tysc",
tags=["tysc"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
team.router,
prefix="/tysc",
tags=["tysc"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
fieldposition.router,
prefix="/tysc",
tags=["tysc"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
rooster.router,
prefix="/tysc",
tags=["tysc"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
vendor.router,
prefix="/tysc",
tags=["tysc"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
cardset.router,
prefix="/tysc",
tags=["tysc"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
card.router,
prefix="/tysc",
tags=["tysc"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
article.router,
prefix="/bookshelf",
tags=["bookshelf"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
bookshelf_publisher.router,
prefix="/bookshelf",
tags=["bookshelf"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
book.router,
prefix="/bookshelf",
tags=["bookshelf"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
author.router,
prefix="/bookshelf",
tags=["bookshelf"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
articleauthor.router,
prefix="/bookshelf",
tags=["bookshelf"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
bookauthor.router,
prefix="/bookshelf",
tags=["bookshelf"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
profile.router,
prefix="/user",
tags=["user"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
token.router,
prefix="/user",
tags=["user"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
permission.router,
prefix="/user",
tags=["user"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
assignment.router,
prefix="/user",
tags=["user"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
mailaccount.router,
prefix="/admin",
tags=["admin"],
dependencies=[Depends(get_current_user_from_token)],
)
-52
View File
@@ -1,52 +0,0 @@
from datetime import timedelta
from typing import Annotated
from fastapi import APIRouter, Body, HTTPException, status, Depends, Response
from fastapi.security import OAuth2PasswordRequestForm
from src.core.config import settings
from src.core.security import create_access_token, authenticate_user, get_current_active_user
from src.db.models.admin import Profile
from src.schema.admin import Token, ProfileModel
from src.webapps.auth.forms import LoginForm
router = APIRouter()
@router.post("/token")
def login_for_access_token(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]) -> Token:
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email, "scope": " ".join(form_data.scopes)}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
# @router.post("/token-cookie", response_model=Token)
def login_for_token_cookie(response: Response, form_data: LoginForm = Depends()):
user = authenticate_user(form_data.username, form_data.password) # type: ignore
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email}, expires_delta=access_token_expires
)
response.set_cookie(
key="access_token", value=f"Bearer {access_token}", httponly=True
)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/profile", response_model=ProfileModel)
async def read_profile(current_user: Annotated[Profile, Depends(get_current_active_user)]):
return current_user
@@ -0,0 +1,74 @@
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, Response, status
from fastapi.security import OAuth2PasswordRequestForm
from src.core.config import settings
from src.core.log_conf import logger
from src.core.security import (
authenticate_user_by_email,
authenticate_user_by_username,
create_access_token,
)
from src.schema.admin.login import LoginRequest
from src.schema.admin.token import Token
from src.webapps.auth.forms import LoginForm
login_router = APIRouter()
@login_router.post(
"/login",
tags=["login"],
summary="Login and get token",
response_description="Return HTTP status code 200 (OK)",
status_code=status.HTTP_200_OK,
)
def login(request: LoginRequest) -> Token:
logger.info("login with %s", request.email)
user = authenticate_user_by_email(str(request.email), str(request.password))
scopes = ["admin", "read"]
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email, "scope": " ".join(scopes)},
expires_delta=access_token_expires,
)
return Token(access_token=access_token, token_type="bearer")
@login_router.post("/token", tags=["login"], summary="Login for access token")
# async def login_for_access_token(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]) -> Token:
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
) -> Token:
user = authenticate_user_by_username(form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.user_name, "scope": " ".join(form_data.scopes)},
expires_delta=access_token_expires,
)
return Token(access_token=access_token, token_type="bearer")
def login_for_token_cookie(response: Response, form_data: LoginForm = Depends()):
user = authenticate_user_by_email(str(form_data.username), str(form_data.password))
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email}, expires_delta=access_token_expires
)
response.set_cookie(
key="access_token", value=f"Bearer {access_token}", httponly=True
)
return {"access_token": access_token, "token_type": "bearer"}
@@ -0,0 +1,32 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.admin import MailAccount
from src.db.session import SessionDep
from src.schema.admin.mailaccount import MailAccountResponse, to_response
router = APIRouter()
@router.get("/mailaccounts", response_model=List[MailAccountResponse])
def get_all_mailaccounts(db: SessionDep) -> List[MailAccountResponse]:
"""
return all MailAccounts as JSON.
"""
results: List[MailAccountResponse] = []
mailaccounts = db.query(MailAccount).all()
for mailaccount in mailaccounts:
response = to_response(mailaccount)
results.append(response)
return results
@router.get("/mailaccounts/{mailaccount_id}", response_model=MailAccountResponse)
def get_mailaccount(mailaccount_id: str, db: SessionDep) -> MailAccountResponse:
"""
return MailAccounts by id.
"""
mailaccount = db.get(MailAccount, mailaccount_id)
if mailaccount is None:
raise HTTPException(status_code=409, detail="Mailaccount could not be found")
response = to_response(mailaccount)
return response
@@ -0,0 +1,20 @@
from typing import List
from fastapi import APIRouter
from src.db.models.bookshelf import Article
from src.db.session import SessionDep
from src.schema.bookshelf.article import ArticleResponse, to_response
router = APIRouter()
@router.get("/articles", response_model=List[ArticleResponse])
def get_all_artists(db: SessionDep) -> List[ArticleResponse]:
results: List[ArticleResponse] = []
articles = db.query(Article).all()
for article in articles:
response = to_response(article)
results.append(response)
return results
@@ -0,0 +1,19 @@
from typing import List
from fastapi import APIRouter
from src.db.models.bookshelf import ArticleAuthor
from src.db.session import SessionDep
from src.schema.bookshelf.articleauthor import ArticleAuthorResponse, to_response
router = APIRouter()
@router.get("/articleauthors", response_model=List[ArticleAuthorResponse])
def get_all_artists(db: SessionDep) -> List[ArticleAuthorResponse]:
results: List[ArticleAuthorResponse] = []
articleauthors = db.query(ArticleAuthor).all()
for articleauthor in articleauthors:
response = to_response(articleauthor)
results.append(response)
return results
@@ -0,0 +1,26 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.bookshelf import Author
from src.db.session import SessionDep
from src.schema.bookshelf.author import AuthorResponse, to_response
router = APIRouter()
@router.get("/authors", response_model=List[AuthorResponse])
def get_all_authors(db: SessionDep) -> List[AuthorResponse]:
results: List[AuthorResponse] = []
authors = db.query(Author).all()
for author in authors:
response = to_response(author)
results.append(response)
return results
@router.get("/authors/{author_id}", response_model=AuthorResponse)
def get_author(author_id: str, db: SessionDep) -> AuthorResponse:
author = db.get(Author, author_id)
if author is None:
raise HTTPException(status_code=404, detail="Author could not be found")
response = to_response(author)
return response
@@ -0,0 +1,19 @@
from typing import List
from fastapi import APIRouter
from src.db.models.bookshelf import Book
from src.db.session import SessionDep
from src.schema.bookshelf.book import BookResponse, to_response
router = APIRouter()
@router.get("/books", response_model=List[BookResponse])
def get_all_artists(db: SessionDep) -> List[BookResponse]:
results: List[BookResponse] = []
books = db.query(Book).all()
for book in books:
response = to_response(book)
results.append(response)
return results
@@ -0,0 +1,19 @@
from typing import List
from fastapi import APIRouter
from src.db.models.bookshelf import BookAuthor
from src.db.session import SessionDep
from src.schema.bookshelf.bookauthor import BookAuthorResponse, to_response
router = APIRouter()
@router.get("/bookauthors", response_model=List[BookAuthorResponse])
def get_all_artists(db: SessionDep) -> List[BookAuthorResponse]:
results: List[BookAuthorResponse] = []
bookauthors = db.query(BookAuthor).all()
for bookauthor in bookauthors:
response = to_response(bookauthor)
results.append(response)
return results
@@ -0,0 +1,19 @@
from typing import List
from fastapi import APIRouter
from src.db.models.bookshelf import BookshelfPublisher
from src.db.session import SessionDep
from src.schema.bookshelf.publisher import PublisherResponse, to_response
router = APIRouter()
@router.get("/publishers", response_model=List[PublisherResponse])
def get_all_artists(db: SessionDep) -> List[PublisherResponse]:
results: List[PublisherResponse] = []
publishers = db.query(BookshelfPublisher).all()
for publisher in publishers:
response = to_response(publisher)
results.append(response)
return results
-103
View File
@@ -1,103 +0,0 @@
from typing import List
from fastapi import APIRouter, HTTPException, status
from src.core.log_conf import logger
from src.db.models.comic import Artist, Comic, Issue, Publisher
from src.db.repository.comics.artist import get_artist_details
from src.db.repository.comics.comic import (
get_comic_details,
get_issue_details,
get_short_info,
list_comics,
)
from src.db.repository.comics.publisher import get_publisher_details
from src.db.session import SessionDep
from src.schema.comics.artist import ArtistCreation, ArtistResponse
from src.schema.comics.artist_details import ArtistDetailResponse
from src.schema.comics.comic import ComicResponse
from src.schema.comics.comic_details import ComicDetailsResponse
from src.schema.comics.issue_details import IssueDetailsResponse
from src.schema.comics.publisher import PublisherResponse
from src.schema.comics.publisher_details import PublisherDetailsResponse
router = APIRouter()
@router.get("/comics")
def get_all_comics(db: SessionDep) -> List[ComicResponse]: # type: ignore
results: List[ComicResponse] = []
comics = list_comics(db)
for comic in comics:
response = get_short_info(comic)
results.append(response)
return results
@router.get("/comics/{comic_id}", response_model=ComicDetailsResponse)
def get_comic(comic_id: str, db: SessionDep) -> ComicDetailsResponse: # type: ignore
comic = db.get(Comic, comic_id)
if comic is None:
raise HTTPException(status_code=404, detail="Comic could not be found")
logger.info(f"create ComicDetailsResponse for {comic}")
response: ComicDetailsResponse = get_comic_details(comic)
logger.info(f"ComicDetailsResponse: {response}")
return response
@router.get("/artists", response_model=List[ArtistResponse])
def get_all_artists(db: SessionDep) -> List[ArtistResponse]: # type: ignore
results: List[ArtistResponse] = []
artists = db.query(Artist).all()
for artist in artists:
results.append(ArtistResponse(id=artist.id, name=str(artist.name))) # type: ignore
return results
@router.get("/artists/{artist_id}", response_model=ArtistDetailResponse)
def get_artist(artist_id: str, db: SessionDep) -> ArtistDetailResponse: # type: ignore
artist = db.get(Artist, artist_id)
if artist is None:
raise HTTPException(status_code=404, detail="Artist could not be found")
response: ArtistDetailResponse = get_artist_details(artist)
return response
@router.post("/artists", status_code=status.HTTP_201_CREATED)
def add_artist(db: SessionDep, artist_creation: ArtistCreation) -> ArtistResponse: # type: ignore
artist: Artist = Artist()
setattr(artist, "name", artist_creation.name)
try:
db.add(artist)
db.commit()
except:
raise HTTPException(status_code=409, detail="Artist already added")
response = ArtistResponse(id=artist.id, name=str(artist.name))
return response
@router.get("/publishers", response_model=List[PublisherResponse])
def get_all_publishers(db: SessionDep) -> List[PublisherResponse]: # type: ignore
results: List[PublisherResponse] = []
publishers = db.query(Publisher).all()
for publisher in publishers:
results.append(PublisherResponse(id=publisher.id, name=str(publisher.name)))
return results
@router.get("/publishers/{publisher_id}", response_model=PublisherDetailsResponse)
def get_publisher(publisher_id: str, db: SessionDep) -> PublisherDetailsResponse: # type: ignore
publisher = db.get(Publisher, publisher_id)
if publisher is None:
raise HTTPException(status_code=404, detail="Publisher could not be found")
response: PublisherDetailsResponse = get_publisher_details(publisher)
return response
@router.get("/issues", response_model=List[IssueDetailsResponse])
def get_issues(db: SessionDep) -> List[IssueDetailsResponse]: # type: ignore
results: List[IssueDetailsResponse] = []
issues = db.query(Issue).all()
for issue in issues:
results.append(get_issue_details(issue))
return results
@@ -0,0 +1,53 @@
from typing import List
from fastapi import APIRouter, HTTPException, status
from src.db.models.comic import Artist
from src.db.repository.comics.artist import get_artist_details
from src.db.session import SessionDep
from src.schema.comics.artist import ArtistCreation, ArtistResponse, artist_to_response
from src.schema.comics.artist_details import ArtistDetailResponse
router = APIRouter()
@router.get("/artists", response_model=List[ArtistResponse])
def get_all_artists(db: SessionDep) -> List[ArtistResponse]:
results: List[ArtistResponse] = []
artists = db.query(Artist).all()
for artist in artists:
response = artist_to_response(artist)
results.append(response)
return results
@router.get("/artists/{artist_id}", response_model=ArtistResponse)
def get_artist(artist_id: str, db: SessionDep) -> ArtistResponse:
artist = db.get(Artist, artist_id)
if artist is None:
raise HTTPException(status_code=404, detail="Artist could not be found")
response: ArtistResponse = artist_to_response(artist)
return response
@router.get("/artists/details/{artist_id}", response_model=ArtistDetailResponse)
def get_artist_details_by_id(artist_id: str, db: SessionDep) -> ArtistDetailResponse:
artist = db.get(Artist, artist_id)
if artist is None:
raise HTTPException(status_code=404, detail="Artist could not be found")
response: ArtistDetailResponse = get_artist_details(artist)
return response
@router.post("/artists", status_code=status.HTTP_201_CREATED)
def add_artist(db: SessionDep, artist_creation: ArtistCreation) -> ArtistResponse:
artist: Artist = Artist()
setattr(artist, "name", artist_creation.name)
try:
db.add(artist)
db.commit()
except:
raise HTTPException(status_code=409, detail="Artist already added")
response = artist_to_response(artist)
return response
@@ -0,0 +1,44 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.core.log_conf import logger
from src.db.models.comic import Comic
from src.db.repository.comics.comic import (
get_comic_details,
)
from src.db.session import SessionDep
from src.schema.comics.comic import ComicResponse, comic_to_response
from src.schema.comics.comic_details import ComicDetailsResponse
router = APIRouter()
@router.get("/comics")
def get_all_comics(db: SessionDep) -> List[ComicResponse]:
results: List[ComicResponse] = []
comics = db.query(Comic).all()
for comic in comics:
response = comic_to_response(comic)
results.append(response)
return results
@router.get("/comics/{comic_id}", response_model=ComicResponse)
def get_comic(comic_id: str, db: SessionDep) -> ComicResponse:
comic = db.get(Comic, comic_id)
if comic is None:
raise HTTPException(status_code=404, detail="Comic could not be found")
response: ComicResponse = comic_to_response(comic)
return response
@router.get("/comics/details/{comic_id}", response_model=ComicDetailsResponse)
def get_comic_details_by_id(comic_id: str, db: SessionDep) -> ComicDetailsResponse:
comic = db.get(Comic, comic_id)
if comic is None:
raise HTTPException(status_code=404, detail="Comic could not be found")
logger.info(f"create ComicDetailsResponse for {comic}")
response: ComicDetailsResponse = get_comic_details(comic)
logger.info(f"ComicDetailsResponse: {response}")
return response
@@ -0,0 +1,27 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.comic import ComicWork
from src.db.session import SessionDep
from src.schema.comics.comicwork import ComicWorkResponse, comicwork_to_response
router = APIRouter()
@router.get("/comicworks", response_model=List[ComicWorkResponse])
def get_comicworks(db: SessionDep) -> List[ComicWorkResponse]:
results: List[ComicWorkResponse] = []
worktypes = db.query(ComicWork).all()
for worktype in worktypes:
response = comicwork_to_response(worktype)
results.append(response)
return results
@router.get("/comicworks/{comicwork_id}", response_model=ComicWorkResponse)
def get_comicwork(comicwork_id: str, db: SessionDep) -> ComicWorkResponse:
worktype = db.get(ComicWork, comicwork_id)
if worktype is None:
raise HTTPException(status_code=404, detail="Comicwork could not be found")
response = comicwork_to_response(worktype)
return response
@@ -0,0 +1,27 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.comic import Issue
from src.db.session import SessionDep
from src.schema.comics.issue import IssueResponse, issue_to_response
router = APIRouter()
@router.get("/issues", response_model=List[IssueResponse])
def get_issues(db: SessionDep) -> List[IssueResponse]:
results: List[IssueResponse] = []
issues = db.query(Issue).all()
for issue in issues:
response = issue_to_response(issue)
results.append(response)
return results
@router.get("/issues/{issue_id}", response_model=IssueResponse)
def get_issue(issue_id: str, db: SessionDep) -> IssueResponse:
issue = db.get(Issue, issue_id)
if issue is None:
raise HTTPException(status_code=404, detail="Issue could not be found")
response = issue_to_response(issue)
return response
@@ -0,0 +1,27 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.comic import IssueWork
from src.db.session import SessionDep
from src.schema.comics.issuework import IssueWorkResponse, issuework_to_response
router = APIRouter()
@router.get("/issueworks", response_model=List[IssueWorkResponse])
def get_issueworks(db: SessionDep) -> List[IssueWorkResponse]:
results: List[IssueWorkResponse] = []
worktypes = db.query(IssueWork).all()
for worktype in worktypes:
response = issuework_to_response(worktype)
results.append(response)
return results
@router.get("/issueworks/{issuework_id}", response_model=IssueWorkResponse)
def get_issuework(issuework_id: str, db: SessionDep) -> IssueWorkResponse:
worktype = db.get(IssueWork, issuework_id)
if worktype is None:
raise HTTPException(status_code=404, detail="Issuework could not be found")
response = issuework_to_response(worktype)
return response
@@ -0,0 +1,37 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.comic import Publisher
from src.db.repository.comics.publisher import get_publisher_details
from src.db.session import SessionDep
from src.schema.comics.publisher import PublisherResponse, publisher_to_response
from src.schema.comics.publisher_details import PublisherDetailsResponse
router = APIRouter()
@router.get("/publishers", response_model=List[PublisherResponse])
def get_all_publishers(db: SessionDep) -> List[PublisherResponse]:
results: List[PublisherResponse] = []
publishers = db.query(Publisher).all()
for publisher in publishers:
response: PublisherResponse = publisher_to_response(publisher)
results.append(response)
return results
@router.get("/publishers/{publisher_id}", response_model=PublisherResponse)
def get_publisher(publisher_id: str, db: SessionDep) -> PublisherResponse:
publisher = db.get(Publisher, publisher_id)
if publisher is None:
raise HTTPException(status_code=404, detail="Publisher could not be found")
response: PublisherResponse = publisher_to_response(publisher)
return response
@router.get("/publishers/details/{publisher_id}", response_model=PublisherDetailsResponse)
def get_publisher_details_by_id(publisher_id: str, db: SessionDep) -> PublisherDetailsResponse:
publisher = db.get(Publisher, publisher_id)
if publisher is None:
raise HTTPException(status_code=404, detail="Publisher could not be found")
response: PublisherDetailsResponse = get_publisher_details(publisher)
return response
@@ -0,0 +1,26 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.comic import StoryArc
from src.db.session import SessionDep
from src.schema.comics.storyarc import StoryArcResponse, storyarc_to_response
router = APIRouter()
@router.get("/storyarcs", response_model=List[StoryArcResponse])
def get_storyarcs(db: SessionDep) -> List[StoryArcResponse]:
results: List[StoryArcResponse] = []
storyarcs = db.query(StoryArc).all()
for storyarc in storyarcs:
response = storyarc_to_response(storyarc)
results.append(response)
return results
@router.get("/storyarcs/{storyarc_id}", response_model=StoryArcResponse)
def get_storyarc(story_arc_id: str, db: SessionDep) -> StoryArcResponse:
storyarc = db.get(StoryArc, story_arc_id)
if storyarc is None:
raise HTTPException(status_code=404, detail="Storyarc could not be found")
response = storyarc_to_response(storyarc)
return response
@@ -0,0 +1,26 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.comic import Volume
from src.db.session import SessionDep
from src.schema.comics.volume import VolumeResponse, volume_to_response
router = APIRouter()
@router.get("/volumes", response_model=List[VolumeResponse])
def volumes(db: SessionDep) -> List[VolumeResponse]:
results: List[VolumeResponse] = []
worktypes = db.query(Volume).all()
for worktype in worktypes:
response = volume_to_response(worktype)
results.append(response)
return results
@router.get("/volumes/{volume_id}", response_model=VolumeResponse)
def get_volume(volume_id: str, db: SessionDep) -> VolumeResponse:
worktype = db.get(Volume, volume_id)
if worktype is None:
raise HTTPException(status_code=404, detail="Volume could not be found")
response = volume_to_response(worktype)
return response
@@ -0,0 +1,27 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.comic import WorkType
from src.db.session import SessionDep
from src.schema.comics.worktype import WorktypeResponse, worktype_to_response
router = APIRouter()
@router.get("/worktypes", response_model=List[WorktypeResponse])
def get_issues(db: SessionDep) -> List[WorktypeResponse]:
results: List[WorktypeResponse] = []
worktypes = db.query(WorkType).all()
for worktype in worktypes:
response = worktype_to_response(worktype)
results.append(response)
return results
@router.get("/worktypes/{worktype_id}", response_model=WorktypeResponse)
def get_issue(worktype_id: str, db: SessionDep) -> WorktypeResponse:
worktype = db.get(WorkType, worktype_id)
if worktype is None:
raise HTTPException(status_code=404, detail="Worktype could not be found")
response = worktype_to_response(worktype)
return response
+1 -1
View File
@@ -1,6 +1,6 @@
from fastapi import APIRouter, status from fastapi import APIRouter, status
from src.schema.admin import HealthCheck from src.schema.admin.healthcheck import HealthCheck
health_router = APIRouter() health_router = APIRouter()
-41
View File
@@ -1,41 +0,0 @@
from datetime import timedelta
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel
from src.core.config import settings
from src.core.log_conf import logger
from src.core.security import authenticate_user, create_access_token
from src.schema.admin import Token
login_router = APIRouter()
class LoginRequest(BaseModel):
email: str | None = None
password: str | None = None
@login_router.post(
"/login",
tags=["login"],
summary="Login and get token",
response_description="Return HTTP status code 200 (OK)",
status_code=status.HTTP_200_OK,
)
def login(request: LoginRequest) -> Token:
logger.info(f"login with {request.email}")
user = authenticate_user(request.email, request.password)
scopes = ["admin", "read"]
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email, "scope": " ".join(scopes)},
expires_delta=access_token_expires,
)
return Token(access_token=access_token, token_type="bearer")
@@ -1,33 +1,33 @@
from typing import List
from fastapi import APIRouter, status, HTTPException from fastapi import APIRouter, status, HTTPException
from sqlalchemy import select
from src.core.log_conf import logger from src.core.log_conf import logger
from src.db.repository.media import create_new_mediaactor, delete_mediaactor, get_actor_details from src.db.repository.media.actor import delete_mediaactor, import_mediaactor
from src.db.session import SessionDep from src.db.session import SessionDep
from src.schema.media.actor import MediaActorModel, MediaActorResponse from src.schema.media.actor import MediaActorModel, MediaActorResponse, actor_to_response
from src.db.models.media import MediaActor from src.db.models.media import MediaActor
router = APIRouter() router = APIRouter()
@router.get("/actors", response_model=list[MediaActorResponse]) @router.get("/actors", response_model=List[MediaActorResponse])
# def get_all_files(db: SessionDep, review: bool = False, download: bool = False, current_user: Profile = Depends(get_current_user_from_token)) -> List[MediaFileResponse]: def get_all_actors(db: SessionDep) -> List[MediaActorResponse]:
def get_all_actors(db: SessionDep, review: bool = False, download: bool = False) -> list[MediaActorResponse]: # type: ignore
results: list[MediaActorResponse] = [] results: list[MediaActorResponse] = []
actors = db.scalars(select(MediaActor)).all() actors = db.query(MediaActor).all()
for mediaactor in actors: for mediaactor in actors:
response = MediaActorResponse(id=mediaactor.id, name=str(mediaactor.name), url=str(mediaactor.url)) response = actor_to_response(mediaactor)
results.append(response) results.append(response)
return results return results
@router.get("/actors/{actor_id}", response_model=MediaActorResponse) @router.get("/actors/{actor_id}", response_model=MediaActorResponse)
def get_actor(actor_id: str, db: SessionDep) -> MediaActorResponse: # type: ignore def get_actor(actor_id: str, db: SessionDep) -> MediaActorResponse:
media_actor = db.get(MediaActor, actor_id) media_actor = db.get(MediaActor, actor_id)
if not media_actor: if not media_actor:
raise HTTPException(status_code=404, detail="MediaActor could not be found") raise HTTPException(status_code=404, detail="MediaActor could not be found")
response = get_actor_details(media_actor) response = actor_to_response(media_actor)
return response return response
@router.delete("/actors/{actor_id}", status_code=status.HTTP_204_NO_CONTENT) @router.delete("/actors/{actor_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_actor(actor_id: str, db: SessionDep): # type: ignore def delete_actor(actor_id: str, db: SessionDep):
media_actor = db.get(MediaActor, actor_id) media_actor = db.get(MediaActor, actor_id)
if not media_actor: if not media_actor:
raise HTTPException(status_code=404, detail="MediaActor could not be found") raise HTTPException(status_code=404, detail="MediaActor could not be found")
@@ -35,11 +35,11 @@ def delete_actor(actor_id: str, db: SessionDep): # type: ignore
delete_mediaactor(db, media_actor.id) delete_mediaactor(db, media_actor.id)
@router.post("/actors", status_code=status.HTTP_201_CREATED) @router.post("/actors", status_code=status.HTTP_201_CREATED)
def add_actor(new_actor: MediaActorModel, db: SessionDep) -> MediaActorResponse: # type: ignore def add_actor(new_actor: MediaActorModel, db: SessionDep) -> MediaActorResponse:
logger.info(f"add actor {new_actor.url}") logger.info(f"add actor {new_actor.url}")
try: try:
mediaActor: MediaActor = create_new_mediaactor(new_actor, db) mediaActor: MediaActor = import_mediaactor(db, new_actor)
except: except Exception as exception:
raise HTTPException(status_code=409, detail="Link duplicate") raise HTTPException(status_code=409, detail=f"Link duplicate: {exception}")
response = get_actor_details(mediaActor) response = actor_to_response(mediaActor)
return response return response
+163
View File
@@ -0,0 +1,163 @@
from typing import List
from fastapi import APIRouter, status, HTTPException
from src.core.log_conf import logger
from src.db.repository.media.actorfile import (
create_new_mediaactorfile,
delete_mediaactorfile,
)
from src.db.repository.media.file import delete_mediafile, import_mediafile
from src.db.session import SessionDep
from src.schema.media.actor import MediaActorResponse, actor_to_response
from src.schema.media.actorfile import MediaActorFileResponse, actorfile_to_response
from src.schema.media.file import (
MediaFileModel,
MediaFileResponse,
file_to_response,
file_to_model,
)
from src.db.models.media import MediaFile
router = APIRouter()
@router.get("/update-titles")
def update_titles(db: SessionDep) -> List[MediaFileResponse]:
"""
Update title for given MediaFile.
"""
results: list[MediaFileResponse] = []
files = db.query(MediaFile).filter(MediaFile.review.is_(True)).all()
for mediafile in files:
mediafile.update_title()
db.add(mediafile)
response = file_to_response(mediafile)
results.append(response)
db.commit()
return results
@router.get("/files", response_model=List[MediaFileResponse])
def get_all_files(
db: SessionDep, review: bool = False, download: bool = False
) -> List[MediaFileResponse]:
"""
Get all MediaFiles.
"""
results: List[MediaFileResponse] = []
files: List[MediaFile]
if review:
files = db.query(MediaFile).filter(MediaFile.review.is_(True)).all()
elif download:
files = db.query(MediaFile).filter(MediaFile.should_download.is_(True)).all()
else:
files = db.query(MediaFile).all()
for mediafile in files:
response = file_to_response(mediafile)
results.append(response)
return results
@router.get("/files/{file_id}", response_model=MediaFileResponse)
def get_file(file_id: str, db: SessionDep) -> MediaFileResponse:
"""
Get MediaFile with given id or return HTTPException.
"""
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
response = file_to_response(mediafile)
return response
@router.delete("/files/{file_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_file(file_id: str, db: SessionDep):
"""
Delete MediaFile by given id.
"""
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
logger.info("delete MediaFile: %s", file_id)
actor_files = mediafile.media_actor_files
logger.info("MediaActorFiles links %s", len(actor_files))
if len(actor_files) > 0:
logger.info("delete MediaActor relations first")
for actor_file in actor_files:
delete_mediaactorfile(db, actor_file.id)
delete_mediafile(db, mediafile.id)
@router.get("/files/{file_id}/actors", response_model=list[MediaActorResponse])
def get_file_actors(file_id: str, db: SessionDep) -> list[MediaActorResponse]:
"""
Get list of Actors for given MediaFile.
"""
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
actor_files = mediafile.media_actor_files
logger.info("already known actors: %s", actor_files)
results: list[MediaActorResponse] = []
for actor_file in actor_files:
response = actor_to_response(actor_file.media_actor)
results.append(response)
return results
@router.put("/files/{file_id}/actors", response_model=List[MediaActorFileResponse])
def update_file_actors(
file_id: str, db: SessionDep, actors: List[MediaActorResponse]
) -> List[MediaActorFileResponse]:
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
actor_files = mediafile.media_actor_files
logger.info("already known actors: %s", actor_files)
for actor in actors:
already_associated = False
for actor_file in actor_files:
if actor.id == actor_file.media_actor_id:
logger.info("alreay associated - do nothing")
already_associated = True
break
if not already_associated:
create_new_mediaactorfile(db, actor.id, mediafile.id)
db.refresh(mediafile)
actor_files = mediafile.media_actor_files
results: List[MediaActorFileResponse] = []
for actor_file in actor_files:
response = actorfile_to_response(actor_file)
results.append(response)
return results
@router.put("/files/{file_id}", response_model=MediaFileResponse)
def update_file(
file_id: str, db: SessionDep, info: MediaFileResponse
) -> MediaFileResponse:
"""
Update MediaFile with given id and data.
"""
media_file = db.get(MediaFile, file_id)
if not media_file:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
file_to_model(info, media_file)
db.add(media_file)
db.commit()
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be updated")
response = file_to_response(mediafile)
return response
@router.post("/files", status_code=status.HTTP_201_CREATED)
def add_file(new_file: MediaFileModel, db: SessionDep) -> MediaFileResponse:
logger.info("add mediafile %s", new_file)
try:
mediaFile: MediaFile = import_mediafile(db, new_file)
except:
raise HTTPException(status_code=409, detail="MediaFile duplicate")
response = file_to_response(mediaFile)
return response
@@ -1,33 +1,43 @@
from typing import List from typing import List
from fastapi import APIRouter, status, HTTPException from fastapi import APIRouter, status, HTTPException
from sqlalchemy import select from src.core.log_conf import logger
from src.db.models.media import MediaActorFile from src.db.models.media import MediaActorFile
from src.db.repository.media import delete_mediaactorfile, get_actorfile_details from src.db.repository.media.actorfile import delete_mediaactorfile, import_mediaactorfile
from src.db.session import SessionDep from src.db.session import SessionDep
from src.schema.media.actorfile import MediaActorFileResponse from src.schema.media.actorfile import MediaActorFileModel, MediaActorFileResponse, actorfile_to_response
router = APIRouter() router = APIRouter()
@router.get("/actorfiles", response_model=List[MediaActorFileResponse]) @router.get("/actorfiles", response_model=List[MediaActorFileResponse])
def get_all_actorfiles(db: SessionDep) -> List[MediaActorFileResponse]: # type: ignore def get_all_actorfiles(db: SessionDep) -> List[MediaActorFileResponse]:
results: List[MediaActorFileResponse] = [] results: List[MediaActorFileResponse] = []
actorfiles = db.scalars(select(MediaActorFile)).all() actorfiles = db.query(MediaActorFile).all()
for media_actorfile in actorfiles: for media_actorfile in actorfiles:
response = get_actorfile_details(media_actorfile) response = actorfile_to_response(media_actorfile)
results.append(response) results.append(response)
return results return results
@router.get("/actorfiles/{actorfile_id}", response_model=MediaActorFileResponse) @router.get("/actorfiles/{actorfile_id}", response_model=MediaActorFileResponse)
def get_actorfile(actorfile_id: str, db: SessionDep) -> MediaActorFileResponse: # type: ignore def get_actorfile(actorfile_id: str, db: SessionDep) -> MediaActorFileResponse:
media_actorfile = db.get(MediaActorFile, actorfile_id) media_actorfile = db.get(MediaActorFile, actorfile_id)
if not media_actorfile: if not media_actorfile:
raise HTTPException(status_code=404, detail="MediaActor could not be found") raise HTTPException(status_code=404, detail="MediaActor could not be found")
response = get_actorfile_details(media_actorfile) response = actorfile_to_response(media_actorfile)
return response return response
@router.delete("/actorfiles/{actorfile_id}", status_code=status.HTTP_204_NO_CONTENT) @router.delete("/actorfiles/{actorfile_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_actorfile(actorfile_id: str, db: SessionDep): # type: ignore def delete_actorfile(actorfile_id: str, db: SessionDep):
media_actorfile = db.get(MediaActorFile, actorfile_id) media_actorfile = db.get(MediaActorFile, actorfile_id)
if not media_actorfile: if not media_actorfile:
raise HTTPException(status_code=404, detail="MediaActor could not be found") raise HTTPException(status_code=404, detail="MediaActor could not be found")
delete_mediaactorfile(db, media_actorfile.id) delete_mediaactorfile(db, media_actorfile.id)
@router.post("/actorfiles", status_code=status.HTTP_201_CREATED)
def add_actorfile(new_actorfile: MediaActorFileModel, db: SessionDep) -> MediaActorFileResponse:
logger.info("add actorfile %s - %s", new_actorfile.media_actor_id, new_actorfile.media_file_id)
try:
mediaActorFile: MediaActorFile = import_mediaactorfile(db, new_actorfile)
except Exception as exception:
raise HTTPException(status_code=409, detail=f"Link duplicate: {exception}")
response = actorfile_to_response(mediaActorFile)
return response
@@ -0,0 +1,25 @@
from typing import List
from fastapi import APIRouter
from src.db.models.media import MediaArticle
from src.db.session import SessionDep
from src.schema.media.article import MediaArticleResponse, to_response
router = APIRouter()
@router.get("/articles", response_model=List[MediaArticleResponse])
def get_all_files(
db: SessionDep, review: bool = False, download: bool = False
) -> List[MediaArticleResponse]:
"""
Get all MediaVideos.
"""
results: List[MediaArticleResponse] = []
articles: List[MediaArticle]
articles = db.query(MediaArticle).all()
for article in articles:
response = to_response(article)
results.append(response)
return results
@@ -0,0 +1,42 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.media import MediaVideo
from src.db.session import SessionDep
from src.schema.media.video import MediaVideoResponse, video_to_response
router = APIRouter()
@router.get("/videos", response_model=List[MediaVideoResponse])
def get_all_videos(
db: SessionDep, review: bool = False, download: bool = False
) -> List[MediaVideoResponse]:
"""
Get all MediaVideos.
"""
results: List[MediaVideoResponse] = []
files: List[MediaVideo]
if review:
files = db.query(MediaVideo).filter(MediaVideo.review.is_(True)).all()
elif download:
files = db.query(MediaVideo).filter(MediaVideo.should_download.is_(True)).all()
else:
files = db.query(MediaVideo).all()
for mediafile in files:
response = video_to_response(mediafile)
results.append(response)
return results
@router.get("/videos/{video_id}", response_model=MediaVideoResponse)
def get_video(video_id: str, db: SessionDep) -> MediaVideoResponse:
"""
Get MediaVideo by id.
"""
video = db.get(MediaVideo, video_id)
if video is None:
raise HTTPException(status_code=404, detail="MediaVideo could not be found")
response = video_to_response(video)
return response
-120
View File
@@ -1,120 +0,0 @@
from fastapi import APIRouter, status, HTTPException, Depends
from sqlalchemy import select, Sequence
from src.core.log_conf import logger
from src.db.repository.media import create_new_mediaactorfile, create_new_mediafile, delete_mediafile
from src.db.session import SessionDep
from src.schema.media.actor import MediaActorResponse
from src.schema.media.actorfile import MediaActorFileResponse
from src.schema.media.file import MediaFileResponse, Link, get_file_details, set_file
from src.db.models.media import MediaFile
router = APIRouter()
@router.get("/update-titles")
def update_titles(db: SessionDep) -> list[MediaFileResponse]: # type: ignore
results: list[MediaFileResponse] = []
files = db.query(MediaFile).filter(MediaFile.review == True).all()
for mediafile in files:
mediafile.update_title()
db.add(mediafile)
response = get_file_details(mediafile)
results.append(response)
db.commit()
return results
@router.get("/files", response_model=list[MediaFileResponse])
# def get_all_files(db: SessionDep, review: bool = False, download: bool = False, current_user: Profile = Depends(get_current_user_from_token)) -> List[MediaFileResponse]:
def get_all_files(db: SessionDep, review: bool = False, download: bool = False) -> list[MediaFileResponse]: # type: ignore
results: list[MediaFileResponse] = []
files: Sequence[MediaFile]
if review:
files = db.query(MediaFile).filter(MediaFile.review == True).all() # type: ignore
elif download:
files = db.query(MediaFile).filter(MediaFile.should_download == True).all() # type: ignore
else:
files = db.scalars(select(MediaFile)).all() # type: ignore
for mediafile in files: # type: ignore
response = get_file_details(mediafile)
results.append(response)
return results
@router.get("/files/{file_id}", response_model=MediaFileResponse)
def get_file(file_id: str, db: SessionDep) -> MediaFileResponse: # type: ignore
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
response = get_file_details(mediafile)
return response
@router.delete("/files/{file_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_file(file_id: str, db: SessionDep): # type: ignore
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
logger.info(f"delete MediaFile: {file_id}")
actor_files = mediafile.media_actor_files
logger.info(f"MediaActorFiles links {len(actor_files)}")
if len(actor_files) == 0:
delete_mediafile(db, mediafile.id)
@router.get("/files/{file_id}/actors", response_model=list[MediaActorResponse])
def get_file_actors(file_id: str, db: SessionDep) -> list[MediaActorResponse]: # type: ignore
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
actor_files = mediafile.media_actor_files
logger.info(f"already known actors: {actor_files}")
results: list[MediaActorResponse] = []
for actor_file in actor_files:
response = MediaActorResponse(id=actor_file.media_actor.id, name=actor_file.media_actor.name, url=actor_file.media_actor.url)
results.append(response)
return results
@router.put("/files/{file_id}/actors", response_model=list[MediaActorFileResponse])
def update_file_actors(file_id: str, db: SessionDep, actors: list[MediaActorResponse]) -> list[MediaActorFileResponse]: # type: ignore
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
actor_files = mediafile.media_actor_files
logger.info(f"already known actors: {actor_files}")
for actor in actors:
already_associated = False
for actor_file in actor_files:
if actor.id == actor_file.media_actor_id:
logger.info("alreay associated - do nothing")
already_associated = True
break
if not already_associated:
create_new_mediaactorfile(db, actor.id, mediafile.id)
db.refresh(mediafile)
actor_files = mediafile.media_actor_files
results: list[MediaActorFileResponse] = []
for actor_file in actor_files:
response = MediaActorFileResponse(id=actor_file.id, actor_id=actor_file.media_actor_id, file_id=actor_file.media_file_id)
results.append(response)
return results
@router.put("/files/{file_id}", response_model=MediaFileResponse)
def update_file(file_id: str, db: SessionDep, info: MediaFileResponse) -> MediaFileResponse: # type: ignore
mediaFile = db.get(MediaFile, file_id)
if not mediaFile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
set_file(info, mediaFile)
db.add(mediaFile)
db.commit()
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be updated")
response = get_file_details(mediafile)
return response
@router.post("/files", status_code=status.HTTP_201_CREATED)
def add_file(new_link: Link, db: SessionDep) -> MediaFileResponse: # type: ignore
logger.info(f"add url {new_link.url}")
try:
mediaFile: MediaFile = create_new_mediafile(new_link.url, db)
except:
raise HTTPException(status_code=409, detail="Link duplicate")
response = get_file_details(mediaFile)
return response
-16
View File
@@ -1,16 +0,0 @@
from typing import List
from fastapi import APIRouter
from src.db.session import SessionDep
from src.schema.tysc.sport import SportResponse
from src.db.models.tysc import Sport
router = APIRouter()
@router.get("/sports")
def get_all_sports(db: SessionDep) -> List[SportResponse]:
results: list[SportResponse] = []
sports = db.query(Sport).all()
for sport in sports:
results.append(SportResponse(id=sport.id, name=sport.name))
return results
+27
View File
@@ -0,0 +1,27 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import Card
from src.db.session import SessionDep
from src.schema.tysc.card import CardResponse, to_response
router = APIRouter()
@router.get("/cards")
def get_all_cards(db: SessionDep) -> List[CardResponse]:
results: List[CardResponse] = []
cards = db.query(Card).all()
for card in cards:
response = to_response(card)
results.append(response)
return results
@router.get("/cards/{card_id}", response_model=CardResponse)
def get_card(card_id: str, db: SessionDep) -> CardResponse:
card = db.get(Card, card_id)
if card is None:
raise HTTPException(status_code=404, detail="Card could not be found")
response = to_response(card)
return response
@@ -0,0 +1,27 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import CardSet
from src.db.session import SessionDep
from src.schema.tysc.cardset import CardSetResponse, to_response
router = APIRouter()
@router.get("/cardsets")
def get_all_cardsets(db: SessionDep) -> List[CardSetResponse]:
results: List[CardSetResponse] = []
cardsets = db.query(CardSet).all()
for cardset in cardsets:
response = to_response(cardset)
results.append(response)
return results
@router.get("/cardsets/{cardset_id}", response_model=CardSetResponse)
def get_cardset(cardset_id: str, db: SessionDep) -> CardSetResponse:
cardset = db.get(CardSet, cardset_id)
if cardset is None:
raise HTTPException(status_code=404, detail="Cardset could not be found")
response = to_response(cardset)
return response
@@ -0,0 +1,27 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import FieldPosition
from src.db.session import SessionDep
from src.schema.tysc.fieldposition import FieldPositionResponse, to_response
router = APIRouter()
@router.get("/positions")
def get_all_positions(db: SessionDep) -> List[FieldPositionResponse]:
results: list[FieldPositionResponse] = []
positions = db.query(FieldPosition).all()
for position in positions:
response = to_response(position)
results.append(response)
return results
@router.get("/positions/{position_id}", response_model=FieldPositionResponse)
def get_position(position_id: str, db: SessionDep) -> FieldPositionResponse:
position = db.get(FieldPosition, position_id)
if position is None:
raise HTTPException(status_code=404, detail="Fieldposition could not be found")
response = to_response(position)
return response
@@ -0,0 +1,27 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import Player
from src.db.session import SessionDep
from src.schema.tysc.player import PlayerResponse, to_response
router = APIRouter()
@router.get("/players")
def get_all_players(db: SessionDep) -> List[PlayerResponse]:
results: List[PlayerResponse] = []
players = db.query(Player).all()
for player in players:
response = to_response(player)
results.append(response)
return results
@router.get("/players/{player_id}", response_model=PlayerResponse)
def get_player(player_id: str, db: SessionDep) -> PlayerResponse:
player = db.get(Player, player_id)
if player is None:
raise HTTPException(status_code=404, detail="Player could not be found")
response = to_response(player)
return response
@@ -0,0 +1,27 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import Rooster
from src.db.session import SessionDep
from src.schema.tysc.rooster import RoosterResponse, to_response
router = APIRouter()
@router.get("/roosters")
def get_all_roosters(db: SessionDep) -> List[RoosterResponse]:
results: list[RoosterResponse] = []
roosters = db.query(Rooster).all()
for rooster in roosters:
response = to_response(rooster)
results.append(response)
return results
@router.get("/roosters/{rooster_id}", response_model=RoosterResponse)
def get_rooster(rooster_id: str, db: SessionDep) -> RoosterResponse:
rooster = db.get(Rooster, rooster_id)
if rooster is None:
raise HTTPException(status_code=404, detail="Rooster could not be found")
response = to_response(rooster)
return response
@@ -0,0 +1,28 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.core.log_conf import logger
from src.db.session import SessionDep
from src.schema.tysc.sport import SportResponse, to_response
from src.db.models.tysc import Sport
router = APIRouter()
@router.get("/sports")
def get_all_sports(db: SessionDep) -> List[SportResponse]:
results: list[SportResponse] = []
sports = db.query(Sport).all()
for sport in sports:
response = to_response(sport)
results.append(response)
return results
@router.get("/sports/{sport_id}", response_model=SportResponse)
def get_sport(sport_id: str, db: SessionDep) -> SportResponse:
sport = db.get(Sport, sport_id)
if sport is None:
raise HTTPException(status_code=404, detail="Sport could not be found")
logger.debug(f"create SportResponse for {sport}")
response: SportResponse = to_response(sport)
return response
+27
View File
@@ -0,0 +1,27 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import Team
from src.db.session import SessionDep
from src.schema.tysc.team import TeamResponse, to_response
router = APIRouter()
@router.get("/teams")
def get_all_teams(db: SessionDep) -> List[TeamResponse]:
results: list[TeamResponse] = []
teams = db.query(Team).all()
for team in teams:
response = to_response(team)
results.append(response)
return results
@router.get("/teams/{team_id}", response_model=TeamResponse)
def get_team(team_id: str, db: SessionDep) -> TeamResponse:
team = db.get(Team, team_id)
if team is None:
raise HTTPException(status_code=404, detail="Team could not be found")
response = to_response(team)
return response
@@ -0,0 +1,33 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import Vendor
from src.db.session import SessionDep
from src.schema.tysc.vendor import VendorResponse, to_response
router = APIRouter()
@router.get("/vendors")
def get_all_vendors(db: SessionDep) -> List[VendorResponse]:
"""
retrieve all vendors as json response.
"""
results: list[VendorResponse] = []
vendors = db.query(Vendor).all()
for vendor in vendors:
response = to_response(vendor)
results.append(response)
return results
@router.get("/vendors/{vendor_id}", response_model=VendorResponse)
def get_vendor(vendor_id: str, db: SessionDep) -> VendorResponse:
"""
retrieve vendor by id as json response.
"""
vendor = db.get(Vendor, vendor_id)
if vendor is None:
raise HTTPException(status_code=404, detail="Vendor could not be found")
response = to_response(vendor)
return response
@@ -0,0 +1,26 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.admin import Assignment
from src.db.session import SessionDep
from src.schema.user.assignment import AssignmentResponse, to_response
router = APIRouter()
@router.get("/assignments", response_model=List[AssignmentResponse])
def get_all_assignments(db: SessionDep) -> List[AssignmentResponse]:
results: List[AssignmentResponse] = []
assignments = db.query(Assignment).all()
for assignment in assignments:
response = to_response(assignment)
results.append(response)
return results
@router.get("/assignments/{assignment_id}", response_model=AssignmentResponse)
def get_assignment(assignment_id: str, db: SessionDep) -> AssignmentResponse:
assignment = db.get(Assignment, assignment_id)
if assignment is None:
raise HTTPException(status_code=404, detail="Assignment could not be found")
response = to_response(assignment)
return response
@@ -0,0 +1,26 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.admin import Permission
from src.db.session import SessionDep
from src.schema.user.permission import PermissionResponse, to_response
router = APIRouter()
@router.get("/permissions", response_model=List[PermissionResponse])
def get_all_permissions(db: SessionDep) -> List[PermissionResponse]:
results: List[PermissionResponse] = []
permissions = db.query(Permission).all()
for permission in permissions:
response = to_response(permission)
results.append(response)
return results
@router.get("/permissions/{permission_id}", response_model=PermissionResponse)
def get_permission(permission_id: str, db: SessionDep) -> PermissionResponse:
permission = db.get(Permission, permission_id)
if permission is None:
raise HTTPException(status_code=404, detail="Permission could not be found")
response = to_response(permission)
return response
@@ -3,29 +3,35 @@ from fastapi import APIRouter, HTTPException, status
from sqlalchemy import select from sqlalchemy import select
from src.core.log_conf import logger from src.core.log_conf import logger
from src.core.security import CurrentUser
from src.db.models.admin import Profile from src.db.models.admin import Profile
from src.db.repository.user import create_new_profile, get_profile_details from src.db.repository.user import create_new_profile
from src.db.session import SessionDep from src.db.session import SessionDep
from src.schema.user.profile import ProfileResponse, ProfileModel from src.schema.user.profile import ProfileResponse, ProfileModel, to_response
router = APIRouter() router = APIRouter()
@router.get("/profile", response_model=ProfileModel)
async def read_profile(current_user: CurrentUser):
return current_user
@router.get("/profiles", response_model=List[ProfileResponse]) @router.get("/profiles", response_model=List[ProfileResponse])
def get_all_profiles(db: SessionDep) -> List[ProfileResponse]: # type: ignore def get_all_profiles(db: SessionDep) -> List[ProfileResponse]:
results: List[ProfileResponse] = [] results: List[ProfileResponse] = []
profiles = db.scalars(select(Profile)).all() profiles = db.scalars(select(Profile)).all()
for profile in profiles: for profile in profiles:
response = get_profile_details(profile) response = to_response(profile)
results.append(response) results.append(response)
return results return results
@router.get("/profiles/{profile_id}", response_model=ProfileResponse) @router.get("/profiles/{profile_id}", response_model=ProfileResponse)
def get_profile(profile_id: str, db: SessionDep) -> ProfileResponse: # type: ignore def get_profile(profile_id: str, db: SessionDep) -> ProfileResponse:
profile = db.get(Profile, profile_id) profile = db.get(Profile, profile_id)
if not profile: if not profile:
raise HTTPException(status_code=404, detail="MediaActor could not be found") raise HTTPException(status_code=404, detail="Profile could not be found")
response = get_profile_details(profile) response = to_response(profile)
return response return response
@router.delete("/profiles/{profile_id}", status_code=status.HTTP_204_NO_CONTENT) @router.delete("/profiles/{profile_id}", status_code=status.HTTP_204_NO_CONTENT)
@@ -37,11 +43,11 @@ def delete_profile(profile_id: str, db: SessionDep): # type: ignore
delete_profile(profile_id=profile_id, db=db) delete_profile(profile_id=profile_id, db=db)
@router.post("/profiles", status_code=status.HTTP_201_CREATED) @router.post("/profiles", status_code=status.HTTP_201_CREATED)
def add_profile(new_profile: ProfileModel, db: SessionDep) -> ProfileResponse: # type: ignore def add_profile(new_profile: ProfileModel, db: SessionDep) -> ProfileResponse:
logger.info(f"add profile {new_profile.user_name}") logger.info(f"add profile {new_profile.username}")
try: try:
profile: Profile = create_new_profile(new_profile, db) profile: Profile = create_new_profile(new_profile, db)
except: except:
raise HTTPException(status_code=409, detail="Profile duplicate") raise HTTPException(status_code=409, detail="Profile duplicate")
response = get_profile_details(profile) response = to_response(profile)
return response return response
@@ -0,0 +1,19 @@
from typing import List
from fastapi import APIRouter
from src.db.models.admin import Token
from src.db.session import SessionDep
from src.schema.user.token import TokenResponse, to_response
router = APIRouter()
@router.get("/tokens", response_model=List[TokenResponse])
def get_all_profiles(db: SessionDep) -> List[TokenResponse]:
results: List[TokenResponse] = []
tokens = db.query(Token).all()
for token in tokens:
response = to_response(token)
results.append(response)
return results
+35
View File
@@ -0,0 +1,35 @@
import json
import time
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from src.core.log_conf import logger
class RequestLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next) -> Response:
start_time = time.time()
path = request.url.path
if path != "/health":
# Log request info
request_info ={
"method": request.method,
"path": path,
"client_ip": request.client.host if request.client else "unknown"
}
logger.info("Incoming: %s", json.dumps(request_info))
# Process request
response = await call_next(request)
if path != "/health":
# Log response info
duration_ms = (time.time()- start_time)*1000
response_info = {
"status_code": response.status_code,
"duration_ms": round(duration_ms, 2)
}
logger.info("completed: %s", json.dumps(response_info))
return response
+92 -55
View File
@@ -1,60 +1,74 @@
import logging
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import Annotated, Dict, List, Optional from typing import Annotated, List, Optional
import bcrypt import bcrypt
from fastapi import Depends, HTTPException, Request, Security, status from fastapi import Depends, HTTPException, Security, status
from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel #from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
from fastapi.security import OAuth2, OAuth2PasswordBearer, SecurityScopes from fastapi.security import (
from fastapi.security.utils import get_authorization_scheme_param #OAuth2,
OAuth2PasswordBearer,
SecurityScopes
)
#from fastapi.security.utils import get_authorization_scheme_param
from jose import JWTError, jwt from jose import JWTError, jwt
from pydantic import ValidationError from pydantic import ValidationError
from src.core.config import settings from src.core.config import settings
from src.core.log_conf import logger from src.core.log_conf import logger
from src.db.models.admin import Profile from src.db.models.admin import Profile
from src.db.repository.admin import get_profile, is_database_empty from src.db.repository.admin import (
get_profile_by_username,
get_profile_by_email,
is_database_empty,
)
from src.db.session import SessionLocal from src.db.session import SessionLocal
from src.schema.admin import ProfileModel, TokenData from src.schema.admin.token import TokenData
from src.schema.user.profile import ProfileModel, to_model
oauth2_scheme = OAuth2PasswordBearer( oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="/api/login/token", tokenUrl="/token",
scopes={"me": "read", "admin": "read"}, scopes={
"me": "read",
"admin": "read",
"ROLE_ADMIN": "admin",
"ROLE_MEDIA": "media",
"ROLE_USER": "user",
},
) )
class OAuth2PasswordBearerWithCookie(OAuth2): # class OAuth2PasswordBearerWithCookie(OAuth2):
def __init__( # def __init__(
self, # self,
tokenUrl: str, # tokenUrl: str,
scheme_name: Optional[str] = None, # scheme_name: Optional[str] = None,
scopes: Optional[Dict[str, str]] = None, # scopes: Optional[Dict[str, str]] = None,
auto_error: bool = True, # auto_error: bool = True,
): # ):
if not scopes: # if not scopes:
scopes = {} # scopes = {}
flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes}) # type: ignore # flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes}) # type: ignore
super().__init__(flows=flows, scheme_name=scheme_name, auto_error=auto_error) # super().__init__(flows=flows, scheme_name=scheme_name, auto_error=auto_error)
async def __call__(self, request: Request) -> Optional[str]: # async def __call__(self, request: Request) -> Optional[str]:
authorization: str = request.cookies.get("access_token") # changed to accept access token from httpOnly Cookie # authorization: str = request.cookies.get("access_token") # changed to accept access token from httpOnly Cookie
scheme, param = get_authorization_scheme_param(authorization) # scheme, param = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "bearer": # if not authorization or scheme.lower() != "bearer":
if self.auto_error: # if self.auto_error:
raise HTTPException( # raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, # status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated", # detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"}, # headers={"WWW-Authenticate": "Bearer"},
) # )
else: # else:
return None # return None
return param # return param
def authenticate_user(username: str, password: str) -> Optional[Profile]: def authenticate_user_by_email(email: str, password: str) -> Optional[Profile]:
with SessionLocal() as db: with SessionLocal() as db:
user = get_profile(username=username, db=db) user = get_profile_by_email(email=email, db=db)
logger.debug(user) logger.debug(user)
if not user: if not user:
if is_database_empty(db): if is_database_empty(db):
@@ -65,7 +79,26 @@ def authenticate_user(username: str, password: str) -> Optional[Profile]:
return None return None
else: else:
if bcrypt.checkpw(password.encode(), user.password.encode()): if bcrypt.checkpw(password.encode(), user.password.encode()):
print("User successful authenticated") logger.info("User successful authenticated")
else:
logger.info("Authentication failed!")
return user
def authenticate_user_by_username(username: str, password: str) -> Optional[Profile]:
with SessionLocal() as db:
user = get_profile_by_username(username=username, db=db)
logger.debug(user)
if not user:
if is_database_empty(db):
logger.info("database is empty, use temporary access")
user = Profile()
user.email = "init_user@thpeetz.de"
return user
return None
else:
if bcrypt.checkpw(password.encode(), user.password.encode()):
logger.info("User successful authenticated")
else: else:
logger.info("Authentication failed!") logger.info("Authentication failed!")
return user return user
@@ -103,17 +136,19 @@ async def get_current_user(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
) )
username: str = payload.get("sub") # type: ignore username: str = payload.get("sub") # type: ignore
logger.info("username/email extracted is ", username) logger.info("username/email extracted is %s", username)
if username is None: if username is None:
raise credentials_exception raise credentials_exception
scope: str = payload.get("scope", "") scope: str = payload.get("scope", "")
token_scopes: List[str] = scope.split(" ") token_scopes: List[str] = scope.split(" ")
token_data = TokenData(scopes=token_scopes, username=username) token_data = TokenData(scopes=token_scopes, username=username)
except (JWTError, ValidationError): except (JWTError, ValidationError):
logger.info("Exception raised", exc_info=True)
raise credentials_exception raise credentials_exception
with SessionLocal() as db: with SessionLocal() as db:
user = get_profile(username=token_data.username, db=db) # type: ignore user = get_profile_by_username(username=str(token_data.username), db=db)
if user is None: if user is None:
logger.info("user not found")
raise credentials_exception raise credentials_exception
for scope in security_scopes.scopes: for scope in security_scopes.scopes:
if scope not in token_scopes: if scope not in token_scopes:
@@ -128,19 +163,14 @@ async def get_current_user(
async def get_current_active_user( async def get_current_active_user(
current_user: Annotated[Profile, Security(get_current_user, scopes=["me"])], current_user: Annotated[Profile, Security(get_current_user, scopes=["me"])],
) -> ProfileModel: ) -> ProfileModel:
if not current_user.enabled: # type: ignore if not current_user.enabled:
raise HTTPException(status_code=400, detail="Inactive user") raise HTTPException(status_code=400, detail="Inactive user")
user_model = ProfileModel( user_model = to_model(current_user)
username=current_user.user_name,
email=current_user.email, # type: ignore
first_name=current_user.first_name,
last_name=current_user.last_name, # type: ignore
active=current_user.enabled,
) # type: ignore
return user_model return user_model
def get_current_user_from_token(token: str = Depends(oauth2_scheme)): def get_current_user_from_token(token: str = Depends(oauth2_scheme)):
""" """
credentials_exception = HTTPException( credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials", detail="Could not validate credentials",
@@ -149,14 +179,21 @@ def get_current_user_from_token(token: str = Depends(oauth2_scheme)):
payload = jwt.decode( payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
) )
username: str = payload.get("sub") # type: ignore username: Optional[str] = payload.get("sub")
logger.info("username/email extracted is ", username) logger.info("username/email extracted is %s", username)
if username is None: if username is None:
raise credentials_exception raise credentials_exception
except JWTError: except JWTError as exception:
raise credentials_exception raise credentials_exception from exception
with SessionLocal() as db: with SessionLocal() as db:
user = get_profile(username=username, db=db) user = get_profile_by_email(email=username, db=db)
if user is None: if user is None:
raise credentials_exception user = get_profile_by_username(username=username, db=db)
if user is None:
raise credentials_exception
return user return user
UserDep = Annotated[Profile, Depends(get_current_user_from_token)]
CurrentUser = Annotated[Profile, Depends(get_current_active_user)]
+24 -22
View File
@@ -1,50 +1,52 @@
from typing import List
from sqlalchemy import Column, ForeignKey, Integer, String from sqlalchemy import Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship from sqlalchemy.orm import Mapped, mapped_column, relationship
from src.db.models.base import Base, BaseMixin from src.db.models.base import Base, BaseMixin
class Article(Base, BaseMixin): class Article(Base, BaseMixin):
__tablename__ = 'article' __tablename__ = 'article'
title = Column(String, unique=True) title: Mapped[str] = mapped_column(unique=True)
article_authors = relationship("ArticleAuthor") article_authors = relationship("ArticleAuthor")
class Author(Base, BaseMixin): class Author(Base, BaseMixin):
__tablename__ = 'author' __tablename__ = 'author'
first_name = Column(String) first_name: Mapped[str]
last_name = Column(String) last_name: Mapped[str]
article_authors = relationship("ArticleAuthor") article_authors: Mapped[List["ArticleAuthor"]] = relationship(back_populates="author")
book_authors = relationship("BookAuthor") book_authors: Mapped[List["BookAuthor"]] = relationship(back_populates="author")
class BookshelfPublisher(Base, BaseMixin): class BookshelfPublisher(Base, BaseMixin):
__tablename__ = 'bookshelf_publisher' __tablename__ = 'bookshelf_publisher'
name = Column(String, unique=True) name: Mapped[str] = mapped_column(unique=True)
books = relationship("Book") books: Mapped[List["Book"]] = relationship(back_populates="publisher")
class Book(Base, BaseMixin): class Book(Base, BaseMixin):
__tablename__ = 'book' __tablename__ = 'book'
isbn = Column(String, unique=True) isbn: Mapped[str] = mapped_column(unique=True)
title = Column(String) title: Mapped[str]
year = Column(Integer, nullable=False) year: Mapped[int] = mapped_column(nullable=False)
publisher_id = Column(String, ForeignKey('bookshelf_publisher.id'), nullable=False) publisher_id: Mapped[str] = mapped_column(ForeignKey("bookshelf_publisher.id"), nullable=False)
publisher = relationship('BookshelfPublisher', back_populates="books") publisher: Mapped[BookshelfPublisher] = relationship(back_populates="books")
book_authors = relationship("BookAuthor") book_authors: Mapped[List["BookAuthor"]] = relationship(back_populates="book")
class ArticleAuthor(Base, BaseMixin): class ArticleAuthor(Base, BaseMixin):
__tablename__ = 'article_author' __tablename__ = 'article_author'
article_id = Column(String, ForeignKey('article.id'), nullable=False) article_id: Mapped[str] = mapped_column(ForeignKey("article.id"), nullable=False)
article = relationship('Article', back_populates="article_authors") article: Mapped[Article] = relationship(back_populates="article_authors")
author_id = Column(String, ForeignKey('author.id'), nullable=False) author_id: Mapped[str] = mapped_column(ForeignKey("author.id"), nullable=False)
author = relationship('Author', back_populates="article_authors") author: Mapped[Author] = relationship(back_populates="article_authors")
class BookAuthor(Base, BaseMixin): class BookAuthor(Base, BaseMixin):
__tablename__ = 'book_author' __tablename__ = 'book_author'
author_id = Column(String, ForeignKey('author.id'), nullable=False) author_id: Mapped[str] = mapped_column(ForeignKey("author.id"), nullable=False)
author = relationship('Author', back_populates="book_authors") author: Mapped[Author] = relationship(back_populates="book_authors")
book_id = Column(String, ForeignKey('book.id'), nullable=False) book_id: Mapped[str] = mapped_column(ForeignKey("book.id"), nullable=False)
book = relationship('Book', back_populates="book_authors") book: Mapped[Book] = relationship(back_populates="book_authors")
+39 -19
View File
@@ -14,51 +14,71 @@ from src.db.models.base import Base, BaseMixin, BaseVideoMixin
class MediaFile(Base, BaseMixin, BaseVideoMixin): class MediaFile(Base, BaseMixin, BaseVideoMixin):
__tablename__ = 'media_file' """
media_actor_files: Mapped[List["MediaActorFile"]] = relationship(back_populates="media_file") MediaFile represents video link.
"""
__tablename__ = "media_file"
media_actor_files: Mapped[List["MediaActorFile"]] = relationship(
back_populates="media_file"
)
def __repr__(self): def __repr__(self):
return f'MediaFile({self.id} {self.title} {self.title})' return f"MediaFile({self.id} {self.title} {self.title})"
def __str__(self): def __str__(self):
return f'{self.title}({self.id})' return f"{self.title}({self.id})"
def update_title(self):
"""
Update title from url.
"""
class MediaActor(Base, BaseMixin): class MediaActor(Base, BaseMixin):
__tablename__ = 'media_actor' __tablename__ = "media_actor"
name: Mapped[str] name: Mapped[str]
url: Mapped[Optional[str]] = mapped_column(unique=True) url: Mapped[Optional[str]] = mapped_column(unique=True)
media_actor_files = relationship("MediaActorFile") media_actor_files = relationship("MediaActorFile")
def __repr__(self) -> str: def __repr__(self) -> str:
return f'MediaActor({self.id} {self.name} {self.url})' return f"MediaActor({self.id} {self.name} {self.url})"
def __str__(self) -> str: def __str__(self) -> str:
return f'{self.url}({self.id})' return f"{self.url}({self.id})"
class MediaActorFile(Base, BaseMixin): class MediaActorFile(Base, BaseMixin):
__tablename__ = 'media_actor_file' __tablename__ = "media_actor_file"
media_actor_id: Mapped[str] = mapped_column(ForeignKey("media_actor.id"), nullable=False) media_actor_id: Mapped[str] = mapped_column(
ForeignKey("media_actor.id"), nullable=False
)
media_actor: Mapped[MediaActor] = relationship(back_populates="media_actor_files") media_actor: Mapped[MediaActor] = relationship(back_populates="media_actor_files")
media_file_id: Mapped[str] = mapped_column(ForeignKey("media_file.id"), nullable=True) media_file_id: Mapped[str] = mapped_column(
ForeignKey("media_file.id"), nullable=True
)
media_file: Mapped[MediaFile] = relationship(back_populates="media_actor_files") media_file: Mapped[MediaFile] = relationship(back_populates="media_actor_files")
def __repr__(self): def __repr__(self):
return f'MediaActorFile({self.id} {self.media_actor_id} {self.media_file_id})' return f"MediaActorFile({self.id} {self.media_actor_id} {self.media_file_id})"
def __str__(self) -> str: def __str__(self) -> str:
return f'{self.id} {self.media_actor_id} {self.media_file_id}' return f"{self.id} {self.media_actor_id} {self.media_file_id}"
class MediaArticle(Base, BaseMixin): class MediaArticle(Base, BaseMixin):
__tablename__ = 'media_article' __tablename__ = "media_article"
review: Mapped[bool] review: Mapped[bool]
title: Mapped[str] title: Mapped[str]
url: Mapped[str] = mapped_column(unique=True) url: Mapped[str] = mapped_column(unique=True)
class MediaVideo(Base, BaseMixin): class MediaVideo(Base, BaseMixin):
__tablename__ = 'media_video' """
MediaFile represents video link.
"""
__tablename__ = "media_video"
cloud_link: Mapped[str] cloud_link: Mapped[str]
file_name: Mapped[str] file_name: Mapped[str]
path: Mapped[str] path: Mapped[str]
@@ -68,10 +88,10 @@ class MediaVideo(Base, BaseMixin):
should_download: Mapped[bool] should_download: Mapped[bool]
def __repr__(self): def __repr__(self):
return f'MediaFile({self.id} {self.title} {self.url})' return f"MediaFile({self.id} {self.title} {self.url})"
def __str__(self): def __str__(self):
if self.title is None: if self.title is None:
return f'{self.url}({self.id})' return f"{self.url}({self.id})"
else: else:
return f'{self.title}({self.id})' return f"{self.title}({self.id})"
+7 -3
View File
@@ -1,12 +1,16 @@
from typing import AnyStr, Optional from typing import Optional
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from src.db.models.admin import Profile from src.db.models.admin import Profile
def get_profile(username: AnyStr, db: Session) -> Optional[Profile]: def get_profile_by_username(username: str, db: Session) -> Optional[Profile]:
profile = db.query(Profile).filter(Profile.email == username).first() profile = db.query(Profile).filter(Profile.user_name == username).first()
return profile
def get_profile_by_email(email: str, db: Session) -> Optional[Profile]:
profile = db.query(Profile).filter(Profile.email == email).first()
return profile return profile
def is_database_empty(db: Session) -> bool: def is_database_empty(db: Session) -> bool:
+17 -32
View File
@@ -4,29 +4,27 @@ from sqlalchemy.orm import Session
from src.core.log_conf import logger from src.core.log_conf import logger
from src.db.models.comic import Comic, Issue from src.db.models.comic import Comic, Issue
from src.schema.comics.artist import ArtistResponse from src.schema.comics.artist import artist_to_response
from src.schema.comics.comic import ComicResponse, ComicSchema from src.schema.comics.comic import ComicSchema, comic_to_response
from src.schema.comics.comic_details import ComicDetailsResponse, ComicWorktypeArtistResponse from src.schema.comics.comic_details import ComicDetailsResponse, ComicWorktypeArtistResponse
from src.schema.comics.issue import IssueResponse from src.schema.comics.issue import IssueResponse, issue_to_response
from src.schema.comics.issue_details import IssueDetailsResponse from src.schema.comics.issue_details import IssueDetailsResponse
from src.schema.comics.publisher import PublisherResponse from src.schema.comics.publisher import publisher_to_response
from src.schema.comics.volume import VolumeResponse from src.schema.comics.volume import VolumeResponse, volume_to_response
from src.schema.comics.worktype import WorktypeResponse from src.schema.comics.worktype import worktype_to_response
def list_comics(db: Session) -> List[Comic]:
comics = db.query(Comic).all()
return comics
def get_issue_details(issue: Issue) -> IssueDetailsResponse: def get_issue_details(issue: Issue) -> IssueDetailsResponse:
volume = None
if issue.volume:
volume = volume_to_response(issue.volume)
response = IssueDetailsResponse( response = IssueDetailsResponse(
id=issue.id, id=issue.id,
issue_number=str(issue.issue_number), issue_number=str(issue.issue_number),
in_stock=bool(issue.in_stock), in_stock=bool(issue.in_stock),
is_read=bool(issue.is_read), is_read=bool(issue.is_read),
comic=ComicResponse(id=issue.comic.id, title=issue.comic.title, completed=issue.comic.completed), comic=comic_to_response(issue.comic),
volume=VolumeResponse(id=issue.volume.id, name=issue.volume.name) volume=volume
) )
return response return response
@@ -36,39 +34,26 @@ def update_comic(new_comic: ComicSchema, comic_id: str, db: Session) -> Optional
comic: Optional[Comic] = db.get(Comic, comic_id) comic: Optional[Comic] = db.get(Comic, comic_id)
return comic return comic
def get_short_info(comic: Comic) -> ComicResponse:
response = ComicResponse(
id=comic.id,
title=str(comic.title),
completed=bool(comic.completed == 1)
)
return response
def get_comic_details(comic: Comic) -> ComicDetailsResponse: def get_comic_details(comic: Comic) -> ComicDetailsResponse:
volumes: List[VolumeResponse] = [] volumes: List[VolumeResponse] = []
for volume in comic.volumes: for volume in comic.volumes:
volumes.append(VolumeResponse(id=volume.id, name=volume.name)) volumes.append(volume_to_response(volume))
issues: List[IssueResponse] = [] issues: List[IssueResponse] = []
for issue in comic.issues: for issue in comic.issues:
issues.append(IssueResponse( issues.append(issue_to_response(issue))
id=issue.id,
issue_number=issue.issue_number,
in_stock=issue.in_stock,
is_read=issue.is_read
))
works: List[ComicWorktypeArtistResponse] = [] works: List[ComicWorktypeArtistResponse] = []
works_map: Dict[str, ComicWorktypeArtistResponse] = {} works_map: Dict[str, ComicWorktypeArtistResponse] = {}
for work in comic.comic_works: for work in comic.comic_works:
worktype_id = work.work_type.id worktype_id = work.work_type.id
if worktype_id in works_map: if worktype_id in works_map:
artist = ArtistResponse(id=work.artist.id, name=work.artist.name) artist = artist_to_response(work.artist)
works_map[worktype_id].artists.append(artist) works_map[worktype_id].artists.append(artist)
logger.info(f"add artist to response map: {artist} -> {works_map}") logger.info(f"add artist to response map: {artist} -> {works_map}")
print(f"add artist to response map: {artist} -> {works_map}") print(f"add artist to response map: {artist} -> {works_map}")
else: else:
works_map[worktype_id] = ComicWorktypeArtistResponse( works_map[worktype_id] = ComicWorktypeArtistResponse(
worktype=WorktypeResponse(id=worktype_id, name=work.work_type.name), worktype=worktype_to_response(work.work_type),
artists=[ArtistResponse(id=work.artist.id, name=work.artist.name)] artists=[artist_to_response(work.artist)]
) )
for value in works_map.values(): for value in works_map.values():
works.append(value) works.append(value)
@@ -79,7 +64,7 @@ def get_comic_details(comic: Comic) -> ComicDetailsResponse:
completed=bool(comic.completed), completed=bool(comic.completed),
current_order=bool(comic.current_order), current_order=bool(comic.current_order),
weblink=str(comic.weblink), weblink=str(comic.weblink),
publisher=PublisherResponse(id=comic.publisher.id, name=comic.publisher.name), publisher=publisher_to_response(comic.publisher),
issues=issues, issues=issues,
volumes=volumes, volumes=volumes,
works=works works=works
-105
View File
@@ -1,105 +0,0 @@
from sqlalchemy.orm import Session
import uuid
from datetime import datetime
from src.core.log_conf import logger
from src.db.models.media import MediaActor, MediaActorFile, MediaFile, MediaVideo
from src.schema.media.actor import MediaActorModel, MediaActorResponse
from src.schema.media.actorfile import MediaActorFileResponse
from src.webapps.media.forms import AddLinkForm
def create_new_video(video: AddLinkForm, db: Session) -> MediaVideo:
print(video.url)
media_video = MediaVideo()
media_video.id = str(uuid.uuid4())
media_video.url = video.url
media_video.created_date = datetime.now()
media_video.last_modified_date = datetime.now()
media_video.review = True
media_video.should_download = True
db.add(media_video)
db.commit()
db.refresh(media_video)
print(media_video)
return media_video
def create_new_mediafile(link: str, db: Session) -> MediaFile:
logger.info("create MediaFile with url {link}")
media_file: MediaFile = MediaFile()
media_file.id = str(uuid.uuid4())
media_file.url = link
media_file.created_date = datetime.now()
media_file.last_modified_date = datetime.now()
media_file.version = 0
media_file.review = True
media_file.should_download = True
db.add(media_file)
db.commit()
db.refresh(media_file)
logger.info(f"created {media_file}")
return media_file
def delete_mediafile(db: Session, media_file_id: str):
logger.info(f"delete MediaFile with id {media_file_id}")
media_file = db.get(MediaFile, media_file_id)
db.delete(media_file)
db.commit()
def create_new_mediaactor(new_actor: MediaActorModel, db: Session) -> MediaActor:
logger.info(f"create MediaActor with url {new_actor.url}")
media_actor: MediaActor = MediaActor()
media_actor.id = str(uuid.uuid4())
if new_actor.name is not None:
media_actor.name = new_actor.name
media_actor.url = new_actor.url
media_actor.created_date = datetime.now()
media_actor.last_modified_date = datetime.now()
media_actor.version = 0
db.add(media_actor)
db.commit()
db.refresh(media_actor)
logger.info(f"created {media_actor}")
return media_actor
def delete_mediaactor(db: Session, actor_id: str):
logger.info(f"delete MediaActor with id {actor_id}")
media_actor = db.get(MediaActor, actor_id)
if media_actor is not None:
actor_files = media_actor.media_actor_files
for actor_file in actor_files:
delete_mediaactorfile(db, actorfile_id=actor_file.id)
db.refresh(media_actor)
db.delete(media_actor)
db.commit()
def get_actor_details(media_actor: MediaActor) -> MediaActorResponse:
reponse: MediaActorResponse = MediaActorResponse(id=media_actor.id, name=str(media_actor.name), url=str(media_actor.url))
return reponse
def create_new_mediaactorfile(db: Session, actor_id: str, file_id: str) -> MediaActorFile:
logger.info(f"create MediaActorFile with actor {actor_id} and file {file_id}")
media_actor_file: MediaActorFile = MediaActorFile()
media_actor_file.id = str(uuid.uuid4())
media_actor_file.created_date = datetime.now()
media_actor_file.last_modified_date = datetime.now()
media_actor_file.version = 0
media_actor_file.media_actor_id = actor_id
media_actor_file.media_file_id = file_id
db.add(media_actor_file)
db.commit()
db.refresh(media_actor_file)
return media_actor_file
def delete_mediaactorfile(db: Session, actorfile_id: str):
logger.info(f"delete MediaActorFile with id {actorfile_id}")
media_actorfile = db.get(MediaActorFile, actorfile_id)
db.delete(media_actorfile)
db.commit()
def get_actorfile_details(media_actorfile: MediaActorFile) -> MediaActorFileResponse:
response: MediaActorFileResponse = MediaActorFileResponse(
id=media_actorfile.id,
file_id=str(media_actorfile.media_file_id),
actor_id=str(media_actorfile.media_actor_id)
)
return response
@@ -0,0 +1,66 @@
from datetime import datetime
import uuid
from sqlalchemy.orm import Session
from src.core.log_conf import logger
from src.db.models.media import MediaActor
from src.db.repository.media.actorfile import delete_mediaactorfile
from src.schema.media.actor import MediaActorModel
def create_new_mediaactor(new_actor: MediaActorModel, db: Session) -> MediaActor:
logger.info(f"create MediaActor with url {new_actor.url}")
media_actor: MediaActor = MediaActor()
media_actor.id = str(uuid.uuid4())
if new_actor.name is not None:
media_actor.name = new_actor.name
media_actor.url = new_actor.url
media_actor.created_date = datetime.now()
media_actor.last_modified_date = datetime.now()
media_actor.version = 0
db.add(media_actor)
db.commit()
db.refresh(media_actor)
logger.info(f"created {media_actor}")
return media_actor
def delete_mediaactor(db: Session, actor_id: str):
logger.info(f"delete MediaActor with id {actor_id}")
media_actor = db.get(MediaActor, actor_id)
if media_actor is not None:
actor_files = media_actor.media_actor_files
for actor_file in actor_files:
delete_mediaactorfile(db, actorfile_id=actor_file.id)
db.refresh(media_actor)
db.delete(media_actor)
db.commit()
def import_mediaactor(db: Session, new_actor: MediaActorModel) -> MediaActor:
"""
import MediaFile and set missing values with default ones.
"""
logger.info("import MediaActor with %s", new_actor)
media_actor: MediaActor = MediaActor()
media_actor.id = new_actor.id
if new_actor.created_date:
media_actor.created_date = new_actor.created_date
else:
media_actor.created_date = datetime.now()
if new_actor.last_modified_date:
media_actor.last_modified_date = new_actor.last_modified_date
else:
media_actor.last_modified_date = datetime.now()
media_actor.version = new_actor.version
if new_actor.name:
media_actor.name = new_actor.name
else:
media_actor.name = ""
if new_actor.url:
media_actor.url = new_actor.url
else:
media_actor.url = ""
db.add(media_actor)
db.commit()
db.refresh(media_actor)
return media_actor
@@ -0,0 +1,64 @@
from datetime import datetime
import uuid
from sqlalchemy.orm import Session
from src.core.log_conf import logger
from src.db.models.media import MediaActorFile
from src.schema.media.actorfile import MediaActorFileModel
def create_new_mediaactorfile(
db: Session, actor_id: str, file_id: str
) -> MediaActorFile:
"""
Create relation for MediaFile and MediaActor
"""
logger.info("create MediaActorFile with actor %s and file %s", actor_id, file_id)
media_actor_file: MediaActorFile = MediaActorFile()
media_actor_file.id = str(uuid.uuid4())
media_actor_file.created_date = datetime.now()
media_actor_file.last_modified_date = datetime.now()
media_actor_file.version = 0
media_actor_file.media_actor_id = actor_id
media_actor_file.media_file_id = file_id
db.add(media_actor_file)
db.commit()
db.refresh(media_actor_file)
return media_actor_file
def delete_mediaactorfile(db: Session, actorfile_id: str):
"""
Delete relation between MediaFile and MediaActor.
"""
logger.info("delete MediaActorFile with id %s", actorfile_id)
media_actorfile = db.get(MediaActorFile, actorfile_id)
db.delete(media_actorfile)
db.commit()
def import_mediaactorfile(
db: Session, new_actorfile: MediaActorFileModel
) -> MediaActorFile:
"""
Import MediaFile and set missing values with default ones.
"""
logger.info("import MediaActorFile with %s", new_actorfile)
media_actor_file: MediaActorFile = MediaActorFile()
media_actor_file.id = new_actorfile.id
if new_actorfile.created_date:
media_actor_file.created_date = new_actorfile.created_date
else:
media_actor_file.created_date = datetime.now()
if new_actorfile.last_modified_date:
media_actor_file.last_modified_date = new_actorfile.last_modified_date
else:
media_actor_file.last_modified_date = datetime.now()
media_actor_file.version = new_actorfile.version
media_actor_file.media_actor_id = new_actorfile.media_actor_id
media_actor_file.media_file_id = new_actorfile.media_file_id
db.add(media_actor_file)
db.commit()
db.refresh(media_actor_file)
return media_actor_file
@@ -0,0 +1,78 @@
from datetime import datetime
import uuid
from sqlalchemy.orm import Session
from src.core.log_conf import logger
from src.db.models.media import MediaFile
from src.schema.media.file import MediaFileModel
def create_new_mediafile(link: str, db: Session) -> MediaFile:
"""
Create MediaFile with gievne URL.
"""
logger.info("create MediaFile with url {link}")
media_file: MediaFile = MediaFile()
media_file.id = str(uuid.uuid4())
media_file.url = link
media_file.created_date = datetime.now()
media_file.last_modified_date = datetime.now()
media_file.version = 0
media_file.review = True
media_file.should_download = True
db.add(media_file)
db.commit()
db.refresh(media_file)
logger.info("created %s", media_file)
return media_file
def delete_mediafile(db: Session, media_file_id: str):
"""
Delete MediaFile with given ID from db.
"""
logger.info("delete MediaFile with id %s", media_file_id)
media_file = db.get(MediaFile, media_file_id)
db.delete(media_file)
db.commit()
def import_mediafile(db: Session, new_file: MediaFileModel) -> MediaFile:
"""
import MediaActor and set missing values with defautl ones.
"""
logger.info("import MediaFile with %s", new_file)
media_file: MediaFile = MediaFile()
media_file.id = new_file.id
if new_file.created_date:
media_file.created_date = new_file.created_date
else:
media_file.created_date = datetime.now()
if new_file.last_modified_date:
media_file.last_modified_date = new_file.last_modified_date
else:
media_file.last_modified_date = datetime.now()
media_file.version = new_file.version
if new_file.title:
media_file.title = new_file.title
else:
media_file.title = ""
if new_file.file_name:
media_file.file_name = new_file.file_name
else:
media_file.file_name = ""
if new_file.cloud_link:
media_file.cloud_link = new_file.cloud_link
else:
media_file.cloud_link = ""
if new_file.url:
media_file.url = new_file.url
else:
media_file.url = ""
media_file.review = new_file.review
media_file.should_download = new_file.should_download
db.add(media_file)
db.commit()
db.refresh(media_file)
return media_file
@@ -0,0 +1,23 @@
from datetime import datetime
import uuid
from sqlalchemy.orm import Session
from src.db.models.media import MediaVideo
from src.webapps.media.forms import AddLinkForm
def create_new_video(video: AddLinkForm, db: Session) -> MediaVideo:
print(video.url)
media_video = MediaVideo()
media_video.id = str(uuid.uuid4())
media_video.url = str(video.url)
media_video.created_date = datetime.now()
media_video.last_modified_date = datetime.now()
media_video.review = True
media_video.should_download = True
db.add(media_video)
db.commit()
db.refresh(media_video)
print(media_video)
return media_video
+3 -11
View File
@@ -5,15 +5,14 @@ from src.core.log_conf import logger
from src.db.models.admin import Assignment, Profile from src.db.models.admin import Assignment, Profile
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from src.schema.user.profile import ProfileModel, ProfileResponse from src.schema.user.profile import ProfileModel
def create_new_profile(new_profile: ProfileModel, db: Session) -> Profile: def create_new_profile(new_profile: ProfileModel, db: Session) -> Profile:
logger.info(f"create MediaActor with url {new_profile.user_name}") logger.info(f"create MediaActor with url {new_profile.username}")
profile: Profile = Profile() profile: Profile = Profile()
profile.id = str(uuid.uuid4()) profile.id = str(uuid.uuid4())
profile.user_name = new_profile.user_name profile.user_name = new_profile.username
profile.first_name = new_profile.first_name profile.first_name = new_profile.first_name
profile.last_name = new_profile.last_name profile.last_name = new_profile.last_name
profile.created_date = datetime.now() profile.created_date = datetime.now()
@@ -35,13 +34,6 @@ def delete_profile(db: Session, profile_id: str):
db.delete(profile) db.delete(profile)
db.commit() db.commit()
def get_profile_details(profile: Profile) -> ProfileResponse:
reponse: ProfileResponse = ProfileResponse(
id=profile.id,
user_name=str(profile.user_name)
)
return reponse
def delete_assignment(db: Session, assignment_id: str) -> None: def delete_assignment(db: Session, assignment_id: str) -> None:
logger.info(f"delete Assignment with id {assignment_id}") logger.info(f"delete Assignment with id {assignment_id}")
assignment: Optional[Assignment] = db.get(Assignment, assignment_id) assignment: Optional[Assignment] = db.get(Assignment, assignment_id)
+4 -2
View File
@@ -12,8 +12,10 @@ engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(bind=engine) SessionLocal = sessionmaker(bind=engine)
def get_db() -> Generator: def get_db() -> Generator[Session, None, None]:
""" """
with SessionLocal() as db: with SessionLocal() as db:
yield db yield db
SessionDep: type[Session] = Annotated[Session, Depends(get_db)]
SessionDep = Annotated[Session, Depends(get_db)]
+7 -5
View File
@@ -6,9 +6,10 @@ from fastapi.staticfiles import StaticFiles
from src.apis.base import api_router from src.apis.base import api_router
from src.apis.version1.healthcheck import health_router from src.apis.version1.healthcheck import health_router
from src.apis.version1.login import login_router from src.apis.version1.admin.login import login_router
from src.core.config import settings from src.core.config import settings
from src.core.log_conf import logger from src.core.log_conf import logger
from src.core.middleware import RequestLoggingMiddleware
from src.db.models.base import Base from src.db.models.base import Base
from src.db.session import engine from src.db.session import engine
from src.db.utils import check_db_connected, check_db_disconnected from src.db.utils import check_db_connected, check_db_disconnected
@@ -23,10 +24,10 @@ async def lifespan(app: FastAPI):
def include_router(app: FastAPI): def include_router(app: FastAPI):
app.include_router(api_router) app.include_router(api_router, tags=["api"])
app.include_router(web_app_router) app.include_router(web_app_router, tags=["webapp"])
app.include_router(health_router) app.include_router(health_router, tags=["admin"])
app.include_router(login_router) app.include_router(login_router, tags=["webapp"])
def configure_static(app: FastAPI): def configure_static(app: FastAPI):
@@ -41,6 +42,7 @@ def add_middle_ware(app: FastAPI):
allow_methods=["*"], allow_methods=["*"],
allow_headers=["*"], allow_headers=["*"],
) )
app.add_middleware(RequestLoggingMiddleware)
def create_tables(): def create_tables():
-27
View File
@@ -1,27 +0,0 @@
from typing import Optional, List
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
scopes: List[str] = []
class ProfileModel(BaseModel):
username: str
email: str
first_name: str
last_name: str
active: bool
class HealthCheck(BaseModel):
"""
Health check model
"""
status: str = "ok"
@@ -0,0 +1,9 @@
from pydantic import BaseModel
class HealthCheck(BaseModel):
"""
Health check model
"""
status: str = "ok"
+8
View File
@@ -0,0 +1,8 @@
from typing import Optional
from pydantic import BaseModel
class LoginRequest(BaseModel):
email: Optional[str] = None
password: Optional[str] = None
@@ -0,0 +1,34 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.admin import MailAccount
class MailAccountResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
host: str
port: int
protocol: str
user_name: str
password: str
start_tls: bool
def to_response(account: MailAccount) -> MailAccountResponse:
response: MailAccountResponse = MailAccountResponse(
id=account.id,
created_date=account.created_date,
last_modified_date=account.last_modified_date,
version=account.version,
host=account.host,
port=account.port,
protocol=account.protocol,
user_name=account.user_name,
password=account.password,
start_tls=account.start_tls,
)
return response
+13
View File
@@ -0,0 +1,13 @@
from typing import List, Optional
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
scopes: List[str] = []
@@ -0,0 +1,23 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.bookshelf import Article
class ArticleResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
title: str
def to_response(article: Article) -> ArticleResponse:
response: ArticleResponse = ArticleResponse(
id=article.id,
created_date=article.created_date,
last_modified_date=article.last_modified_date,
version=article.version,
title=article.title
)
return response
@@ -0,0 +1,25 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.bookshelf import ArticleAuthor
class ArticleAuthorResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
article_id: str
author_id: str
def to_response(articleauthor: ArticleAuthor) -> ArticleAuthorResponse:
response: ArticleAuthorResponse = ArticleAuthorResponse(
id=articleauthor.id,
created_date=articleauthor.created_date,
last_modified_date=articleauthor.last_modified_date,
version=articleauthor.version,
article_id=articleauthor.article_id,
author_id=articleauthor.author_id
)
return response
+25
View File
@@ -0,0 +1,25 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.bookshelf import Author
class AuthorResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
first_name: str
last_name: str
def to_response(author: Author) -> AuthorResponse:
response: AuthorResponse = AuthorResponse(
id=author.id,
created_date=author.created_date,
last_modified_date=author.last_modified_date,
version=author.version,
first_name=author.first_name,
last_name=author.last_name
)
return response
+29
View File
@@ -0,0 +1,29 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.bookshelf import Book
class BookResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
isbn: str
title: str
year: int
publisher_id: str
def to_response(book: Book) -> BookResponse:
response: BookResponse = BookResponse(
id=book.id,
created_date=book.created_date,
last_modified_date=book.last_modified_date,
version=book.version,
isbn=book.isbn,
title=book.title,
year=book.year,
publisher_id=book.publisher_id
)
return response
@@ -0,0 +1,25 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.bookshelf import BookAuthor
class BookAuthorResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
author_id: str
book_id: str
def to_response(bookauthor: BookAuthor) -> BookAuthorResponse:
response: BookAuthorResponse = BookAuthorResponse(
id=bookauthor.id,
created_date=bookauthor.created_date,
last_modified_date=bookauthor.last_modified_date,
version=bookauthor.version,
author_id=bookauthor.author_id,
book_id=bookauthor.book_id
)
return response
@@ -0,0 +1,23 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.bookshelf import BookshelfPublisher
class PublisherResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
def to_response(publisher: BookshelfPublisher) -> PublisherResponse:
response: PublisherResponse = PublisherResponse(
id=publisher.id,
created_date=publisher.created_date,
last_modified_date=publisher.last_modified_date,
version=publisher.version,
name=publisher.name
)
return response
+21
View File
@@ -1,5 +1,10 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel from pydantic import BaseModel
from src.db.models.comic import Artist
class ArtistCreation(BaseModel): class ArtistCreation(BaseModel):
id: str id: str
@@ -7,7 +12,23 @@ class ArtistCreation(BaseModel):
class ArtistResponse(BaseModel): class ArtistResponse(BaseModel):
id: str id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str name: str
weblink: Optional[str]
def artist_to_response(artist: Artist) -> ArtistResponse:
response: ArtistResponse = ArtistResponse(
id=artist.id,
created_date=artist.created_date,
last_modified_date=artist.last_modified_date,
version=artist.version,
name=artist.name,
weblink=artist.weblink
)
return response
class AddArtist(BaseModel): class AddArtist(BaseModel):
id: str id: str
+32 -1
View File
@@ -1,15 +1,46 @@
"""
Schema definitions for Comics.
"""
from datetime import datetime
from typing import Optional from typing import Optional
from pydantic import BaseModel, AnyUrl from pydantic import BaseModel, AnyUrl
from src.core.log_conf import logger
from src.db.models.comic import Comic
class ComicResponse(BaseModel): class ComicResponse(BaseModel):
"""
Pydantic model for returning Comic objects.
"""
id: str id: str
created_date: datetime
last_modified_date: datetime
version: int
title: str title: str
publisher_id: str
current_order: bool
completed: bool completed: bool
weblink: Optional[str]
def comic_to_response(comic: Comic) -> ComicResponse:
response: ComicResponse = ComicResponse(
id=comic.id,
created_date=comic.created_date,
last_modified_date=comic.last_modified_date,
version=comic.version,
title=comic.title,
publisher_id=comic.publisher_id,
current_order=comic.current_order,
completed=comic.completed,
weblink=comic.weblink
)
return response
class ComicSchema(BaseModel): class ComicSchema(BaseModel):
"""
Pydantic model for uploading Comic object.
"""
id: str id: str
title: str title: str
weblink: Optional[AnyUrl] weblink: Optional[AnyUrl]
+32
View File
@@ -0,0 +1,32 @@
"""
Model definitions for ComicWork.
"""
from datetime import datetime
from pydantic import BaseModel
from src.db.models.comic import ComicWork
class ComicWorkResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
comic_id: str
artist_id: str
work_type_id: str
def comicwork_to_response(comicwork: ComicWork) -> ComicWorkResponse:
response: ComicWorkResponse = ComicWorkResponse(
id=comicwork.id,
created_date=comicwork.created_date,
last_modified_date=comicwork.last_modified_date,
version=comicwork.version,
comic_id=comicwork.comic_id,
artist_id=comicwork.artist_id,
work_type_id=comicwork.work_type_id,
)
return response
+30
View File
@@ -1,8 +1,38 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel from pydantic import BaseModel
from src.db.models.comic import Issue
class IssueResponse(BaseModel): class IssueResponse(BaseModel):
id: str id: str
created_date: datetime
last_modified_date: datetime
version: int
issue_number: str issue_number: str
title: Optional[str]
published_on: Optional[datetime]
in_stock: bool in_stock: bool
is_read: bool is_read: bool
comic_id: str
volume_id: Optional[str]
story_arc_id: Optional[str]
def issue_to_response(issue: Issue) -> IssueResponse:
response: IssueResponse = IssueResponse(
id=issue.id,
created_date=issue.created_date,
last_modified_date=issue.last_modified_date,
version=issue.version,
issue_number=issue.issue_number,
title=issue.title,
published_on=issue.published_on,
in_stock=issue.in_stock,
is_read=issue.is_read,
comic_id=issue.comic_id,
volume_id=issue.volume_id,
story_arc_id=issue.story_arc_id
)
return response
@@ -11,3 +11,4 @@ class IssueDetailsResponse(BaseModel):
is_read: bool is_read: bool
comic: ComicResponse comic: ComicResponse
volume: VolumeResponse | None volume: VolumeResponse | None
+32
View File
@@ -0,0 +1,32 @@
"""
Model definitions for IssueWork.
"""
from datetime import datetime
from pydantic import BaseModel
from src.db.models.comic import IssueWork
class IssueWorkResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
issue_id: str
artist_id: str
work_type_id: str
def issuework_to_response(issuework: IssueWork) -> IssueWorkResponse:
response: IssueWorkResponse = IssueWorkResponse(
id=issuework.id,
created_date=issuework.created_date,
last_modified_date=issuework.last_modified_date,
version=issuework.version,
issue_id=issuework.issue_id,
artist_id=issuework.artist_id,
work_type_id=issuework.work_type_id,
)
return response
+24
View File
@@ -1,6 +1,30 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel from pydantic import BaseModel
from src.db.models.comic import Publisher
class PublisherResponse(BaseModel): class PublisherResponse(BaseModel):
id: str id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str name: str
weblink: Optional[str]
parent_publisher_id: Optional[str]
def publisher_to_response(publisher: Publisher) -> PublisherResponse:
response: PublisherResponse = PublisherResponse(
id=publisher.id,
created_date=publisher.created_date,
last_modified_date=publisher.last_modified_date,
version=publisher.version,
name=publisher.name,
weblink=publisher.weblink,
parent_publisher_id=publisher.parent_publisher_id
)
return response
+30
View File
@@ -0,0 +1,30 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
from src.db.models.comic import StoryArc
class StoryArcResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
comic_id: str
volume_id: Optional[str]
class AddLink(BaseModel):
url: str
def storyarc_to_response(storyarc: StoryArc) -> StoryArcResponse:
response: StoryArcResponse = StoryArcResponse(
id=storyarc.id,
created_date=storyarc.created_date,
last_modified_date=storyarc.last_modified_date,
version=storyarc.version,
name=storyarc.name,
comic_id=storyarc.comic_id,
volume_id=storyarc.volume_id
)
return response
+19
View File
@@ -1,6 +1,25 @@
from datetime import datetime
from pydantic import BaseModel from pydantic import BaseModel
from src.db.models.comic import Volume
class VolumeResponse(BaseModel): class VolumeResponse(BaseModel):
id: str id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str name: str
comic_id: str
def volume_to_response(volume: Volume) -> VolumeResponse:
response: VolumeResponse = VolumeResponse(
id=volume.id,
created_date=volume.created_date,
last_modified_date=volume.last_modified_date,
version=volume.version,
name=volume.name,
comic_id=volume.comic_id
)
return response
+17
View File
@@ -1,9 +1,26 @@
from datetime import datetime
from pydantic import BaseModel from pydantic import BaseModel
from src.db.models.comic import WorkType
class AddWorkType(BaseModel): class AddWorkType(BaseModel):
worktype: str worktype: str
class WorktypeResponse(BaseModel): class WorktypeResponse(BaseModel):
id: str id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str name: str
def worktype_to_response(worktype: WorkType) -> WorktypeResponse:
response: WorktypeResponse = WorktypeResponse(
id=worktype.id,
created_date=worktype.created_date,
last_modified_date=worktype.last_modified_date,
version=worktype.version,
name=worktype.name
)
return response
+24 -2
View File
@@ -1,12 +1,34 @@
from datetime import datetime
from typing import Optional from typing import Optional
from pydantic import BaseModel from pydantic import BaseModel
from src.db.models.media import MediaActor
class MediaActorResponse(BaseModel): class MediaActorResponse(BaseModel):
id: str id: str
created_date: datetime
last_modified_date: datetime
version: int
name: Optional[str] name: Optional[str]
url: str url: Optional[str]
def actor_to_response(actor: MediaActor) -> MediaActorResponse:
response: MediaActorResponse = MediaActorResponse(
id=actor.id,
created_date=actor.created_date,
last_modified_date=actor.last_modified_date,
version=actor.version,
name=actor.name,
url=actor.url
)
return response
class MediaActorModel(BaseModel): class MediaActorModel(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: Optional[str] name: Optional[str]
url: str url: Optional[str]
+27 -2
View File
@@ -1,4 +1,5 @@
from datetime import datetime from datetime import datetime
from typing import Optional
from src.db.models.media import MediaActorFile from src.db.models.media import MediaActorFile
from pydantic import BaseModel from pydantic import BaseModel
@@ -6,5 +7,29 @@ from pydantic import BaseModel
class MediaActorFileResponse(BaseModel): class MediaActorFileResponse(BaseModel):
id: str id: str
file_id: str created_date: datetime
actor_id: str last_modified_date: datetime
version: int
media_actor_id: str
media_file_id: Optional[str]
def actorfile_to_response(actorfile: MediaActorFile) -> MediaActorFileResponse:
response: MediaActorFileResponse = MediaActorFileResponse(
id=actorfile.id,
created_date=actorfile.created_date,
last_modified_date=actorfile.last_modified_date,
version=actorfile.version,
media_actor_id=actorfile.media_actor_id,
media_file_id=actorfile.media_file_id
)
return response
class MediaActorFileModel(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
media_actor_id: str
media_file_id: Optional[str]
+30
View File
@@ -0,0 +1,30 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
from src.db.models.media import MediaArticle
class MediaArticleResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
review: bool = False
title: Optional[str] = None
url: Optional[str] = None
class AddLink(BaseModel):
url: str
def to_response(video: MediaArticle) -> MediaArticleResponse:
response: MediaArticleResponse = MediaArticleResponse(
id=video.id,
created_date=video.created_date,
last_modified_date=video.last_modified_date,
version=video.version,
review=video.review,
title=video.title,
url=video.url,
)
return response
+58 -17
View File
@@ -1,4 +1,5 @@
from datetime import datetime from datetime import datetime
from typing import Optional
from src.db.models.media import MediaFile from src.db.models.media import MediaFile
from pydantic import BaseModel from pydantic import BaseModel
@@ -6,35 +7,75 @@ from pydantic import BaseModel
class MediaFileResponse(BaseModel): class MediaFileResponse(BaseModel):
id: str id: str
title: str | None = None created_date: datetime
file_name: str | None = None last_modified_date: datetime
cloud_link: str | None = None version: int
url: str | None = None title: Optional[str]
file_name: Optional[str]
cloud_link: Optional[str]
url: Optional[str]
review: bool = False review: bool = False
should_download: bool = False should_download: bool = False
class Link(BaseModel):
url: str
def get_file_details(mediafile: MediaFile) -> MediaFileResponse: def file_to_response(mediafile: MediaFile) -> MediaFileResponse:
response = MediaFileResponse(id=mediafile.id, """
title=mediafile.title, Create MediaFileResponse from model.
file_name=mediafile.file_name, """
cloud_link=mediafile.cloud_link, response: MediaFileResponse = MediaFileResponse(
url=str(mediafile.url), id=mediafile.id,
review=mediafile.review, created_date=mediafile.created_date,
should_download=mediafile.should_download) last_modified_date=mediafile.last_modified_date,
#print(f"id: {mediafile.id}: review: {response.review} <- {mediafile.review}") version=mediafile.version,
#print(f"id: {mediafile.id}: download: {response.should_download} <- {mediafile.should_download}") title=mediafile.title,
file_name=mediafile.file_name,
cloud_link=mediafile.cloud_link,
url=mediafile.url,
review=mediafile.review,
should_download=mediafile.should_download,
)
return response return response
def set_file(model: MediaFileResponse, mediafile: MediaFile) -> None:
def file_to_model(model: MediaFileResponse, mediafile: MediaFile) -> MediaFile:
"""
Set data of response to model.
"""
mediafile.file_name = model.file_name mediafile.file_name = model.file_name
mediafile.cloud_link = model.cloud_link mediafile.cloud_link = model.cloud_link
if model.url is not None: if model.url is not None:
mediafile.url = model.url mediafile.url = model.url
else:
mediafile.url = ""
if model.title is not None: if model.title is not None:
mediafile.title = model.title mediafile.title = model.title
else:
mediafile.title = ""
mediafile.last_modified_date = datetime.now() mediafile.last_modified_date = datetime.now()
mediafile.review = model.review mediafile.review = model.review
mediafile.should_download = model.should_download mediafile.should_download = model.should_download
return mediafile
class MediaFileModel(BaseModel):
"""
Pydantic model to import MediaFile.
"""
id: str
created_date: Optional[datetime]
last_modified_date: Optional[datetime]
version: int = 0
title: Optional[str]
file_name: Optional[str]
cloud_link: Optional[str]
url: Optional[str]
review: bool = True
should_download: bool = True
class Link(BaseModel):
"""
PYdantic model for uploading url.
"""
url: str
+33
View File
@@ -1,5 +1,38 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel from pydantic import BaseModel
from src.db.models.media import MediaVideo
class MediaVideoResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
cloud_link: Optional[str] = None
file_name: Optional[str] = None
path: Optional[str] = None
review: bool = False
title: Optional[str] = None
url: Optional[str] = None
should_download: bool = False
class AddLink(BaseModel): class AddLink(BaseModel):
url: str url: str
def video_to_response(video: MediaVideo) -> MediaVideoResponse:
response: MediaVideoResponse = MediaVideoResponse(
id=video.id,
created_date=video.created_date,
last_modified_date=video.last_modified_date,
version=video.version,
cloud_link=video.cloud_link,
file_name=video.file_name,
path=video.path,
review=video.review,
title=video.title,
url=video.url,
should_download=video.should_download
)
return response
+31
View File
@@ -0,0 +1,31 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import Card
class CardResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
card_number: int
year: int
card_set_id: str
rooster_id: str
vendor_id: str
def to_response(card: Card) -> CardResponse:
response: CardResponse = CardResponse(
id=card.id,
created_date=card.created_date,
last_modified_date=card.last_modified_date,
version=card.version,
card_number=card.card_number,
year=card.year,
card_set_id=card.card_set_id,
rooster_id=card.rooster_id,
vendor_id=card.vendor_id
)
return response
+30
View File
@@ -0,0 +1,30 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import CardSet
class CardSetResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
parallel_set: bool
insert_set: bool
vendor_id: str
def to_response(cardset: CardSet) -> CardSetResponse:
response: CardSetResponse = CardSetResponse(
id=cardset.id,
created_date=cardset.created_date,
last_modified_date=cardset.last_modified_date,
version=cardset.version,
name=cardset.name,
parallel_set=cardset.parallel_set,
insert_set=cardset.insert_set,
vendor_id=cardset.vendor_id
)
return response
@@ -0,0 +1,28 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import FieldPosition
class FieldPositionResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
short_name: str
sport_id: str
def to_response(fieldposition: FieldPosition) -> FieldPositionResponse:
response: FieldPositionResponse = FieldPositionResponse(
id=fieldposition.id,
created_date=fieldposition.created_date,
last_modified_date=fieldposition.last_modified_date,
version=fieldposition.version,
name=fieldposition.name,
short_name=fieldposition.short_name,
sport_id=fieldposition.sport_id
)
return response
+25
View File
@@ -0,0 +1,25 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import Player
class PlayerResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
first_name: str
last_name: str
def to_response(player: Player) -> PlayerResponse:
response: PlayerResponse = PlayerResponse(
id=player.id,
created_date=player.created_date,
last_modified_date=player.last_modified_date,
version=player.version,
first_name=player.first_name,
last_name=player.last_name
)
return response
+36
View File
@@ -0,0 +1,36 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import Rooster
class RoosterResponse(BaseModel):
"""
Pydantic model for returning Rooster objects.
"""
id: str
created_date: datetime
last_modified_date: datetime
version: int
year: int
team_id: str
player_id: str
position_id: str
def to_response(rooster: Rooster) -> RoosterResponse:
"""
convert database object to response object (Pydantic).
"""
response: RoosterResponse = RoosterResponse(
id=rooster.id,
created_date=rooster.created_date,
last_modified_date=rooster.last_modified_date,
version=rooster.version,
year=rooster.year,
team_id=rooster.team_id,
player_id=rooster.player_id,
position_id=rooster.position_id
)
return response
+18 -2
View File
@@ -1,8 +1,24 @@
from typing import AnyStr from datetime import datetime
from pydantic import BaseModel from pydantic import BaseModel
from src.db.models.tysc import Sport
class SportResponse(BaseModel): class SportResponse(BaseModel):
id: AnyStr id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str name: str
def to_response(sport: Sport) -> SportResponse:
response: SportResponse = SportResponse(
id=sport.id,
created_date=sport.created_date,
last_modified_date=sport.last_modified_date,
version=sport.version,
name=sport.name
)
return response
+28
View File
@@ -0,0 +1,28 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import Team
class TeamResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
short_name: str
sport_id: str
def to_response(team: Team) -> TeamResponse:
response: TeamResponse = TeamResponse(
id=team.id,
created_date=team.created_date,
last_modified_date=team.last_modified_date,
version=team.version,
name=team.name,
short_name=team.short_name,
sport_id=team.sport_id
)
return response
+32
View File
@@ -0,0 +1,32 @@
"""
class and function for json response objects for Vendor.
"""
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import Vendor
class VendorResponse(BaseModel):
"""
Pydantic model for Vendor reponse object.
"""
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
def to_response(vendor: Vendor) -> VendorResponse:
"""
convert database object Vendor to response object VendorResponse.
"""
reponse: VendorResponse = VendorResponse(
id=vendor.id,
created_date=vendor.created_date,
last_modified_date=vendor.last_modified_date,
version=vendor.version,
name=vendor.name
)
return reponse
+26
View File
@@ -0,0 +1,26 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.admin import Assignment
class AssignmentResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
profile_id: str
permission_id: str
def to_response(assignment: Assignment) -> AssignmentResponse:
response: AssignmentResponse = AssignmentResponse(
id=assignment.id,
created_date=assignment.created_date,
last_modified_date=assignment.last_modified_date,
version=assignment.version,
profile_id=assignment.profile_id,
permission_id=assignment.permission_id
)
return response
+24
View File
@@ -0,0 +1,24 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.admin import Permission
class PermissionResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
def to_response(permission: Permission) -> PermissionResponse:
response: PermissionResponse = PermissionResponse(
id=permission.id,
created_date=permission.created_date,
last_modified_date=permission.last_modified_date,
version=permission.version,
name=permission.name
)
return response
+41 -4
View File
@@ -1,3 +1,5 @@
from datetime import datetime
from pydantic import BaseModel from pydantic import BaseModel
from src.db.models.admin import Profile from src.db.models.admin import Profile
@@ -5,9 +7,44 @@ from src.db.models.admin import Profile
class ProfileResponse(BaseModel): class ProfileResponse(BaseModel):
id: str id: str
user_name: str created_date: datetime
last_modified_date: datetime
class ProfileModel(BaseModel): version: int
user_name: str
first_name: str first_name: str
last_name: str last_name: str
user_name: str
email: str
password: str
enabled: bool
def to_response(profile: Profile) -> ProfileResponse:
response: ProfileResponse = ProfileResponse(
id=profile.id,
created_date=profile.created_date,
last_modified_date=profile.last_modified_date,
version=profile.version,
first_name=profile.first_name,
last_name=profile.last_name,
user_name=profile.user_name,
email=profile.email,
password=profile.password,
enabled=profile.enabled
)
return response
class ProfileModel(BaseModel):
username: str
email: str
first_name: str
last_name: str
active: bool
def to_model(profile: Profile) -> ProfileModel:
model: ProfileModel = ProfileModel(
username=profile.user_name,
email=profile.email,
first_name=profile.first_name,
last_name=profile.last_name,
active=profile.enabled,
)
return model
+32
View File
@@ -0,0 +1,32 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.admin import Token
class TokenResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
token: str
name: str
last_used_date: datetime
enabled: bool
profile_id: str
def to_response(token: Token) -> TokenResponse:
response: TokenResponse = TokenResponse(
id=token.id,
created_date=token.created_date,
last_modified_date=token.last_modified_date,
version=token.version,
token=token.token,
name=token.name,
last_used_date=token.last_used_date,
enabled=token.enabled,
profile_id=token.profile_id
)
return response
+6 -8
View File
@@ -1,9 +1,5 @@
# from src.apis.version1.admin import login_for_access_token from src.apis.version1.admin.login import login_for_token_cookie
from fastapi.security import OAuth2PasswordRequestForm from fastapi import APIRouter
from src.apis.version1.admin import login_for_token_cookie
from src.db.session import SessionDep
from fastapi import APIRouter, Depends
from fastapi import HTTPException from fastapi import HTTPException
from fastapi import Request from fastapi import Request
from fastapi.templating import Jinja2Templates from fastapi.templating import Jinja2Templates
@@ -20,7 +16,7 @@ def login(request: Request):
@router.post("/login/") @router.post("/login/")
async def login(request: Request): async def validate_login(request: Request):
form = LoginForm(request) form = LoginForm(request)
await form.load_data() await form.load_data()
if await form.is_valid(): if await form.is_valid():
@@ -31,6 +27,8 @@ async def login(request: Request):
return response return response
except HTTPException: except HTTPException:
form.__dict__.update(msg="") form.__dict__.update(msg="")
form.__dict__.get("errors").append("Incorrect Email or Password") errors = form.__dict__.get("errors")
if errors:
errors.append("Incorrect Email or Password")
return templates.TemplateResponse("auth/login.html", form.__dict__) return templates.TemplateResponse("auth/login.html", form.__dict__)
return templates.TemplateResponse("auth/login.html", form.__dict__) return templates.TemplateResponse("auth/login.html", form.__dict__)
+9 -9
View File
@@ -9,15 +9,15 @@ from src.webapps.media import route_actors, route_media, route_videos
templates = Jinja2Templates(directory="src/templates") templates = Jinja2Templates(directory="src/templates")
api_router = APIRouter() api_router = APIRouter()
api_router.include_router(route_comics.router) api_router.include_router(route_comics.router, tags=["webapp"])
api_router.include_router(route_artists.router) api_router.include_router(route_artists.router, tags=["webapp"])
api_router.include_router(route_worktype.router) api_router.include_router(route_worktype.router, tags=["webapp"])
api_router.include_router(route_media.router) api_router.include_router(route_media.router, tags=["webapp"])
api_router.include_router(route_actors.router) api_router.include_router(route_actors.router, tags=["webapp"])
api_router.include_router(route_videos.router) api_router.include_router(route_videos.router, tags=["webapp"])
api_router.include_router(route_login.router) api_router.include_router(route_login.router, tags=["webapp"])
api_router.include_router(route_admin.router) api_router.include_router(route_admin.router, tags=["webapp"])
@api_router.get("/") @api_router.get("/", tags=["webapp"])
def home(request: Request, msg: str | None = None): def home(request: Request, msg: str | None = None):
return templates.TemplateResponse("index.html", {"request": request, "msg": msg}) return templates.TemplateResponse("index.html", {"request": request, "msg": msg})
+2 -2
View File
@@ -11,7 +11,7 @@ from src.schema.comics.comic import ComicSchema
from src.webapps.comic.forms.comic import ValidateComicForm from src.webapps.comic.forms.comic import ValidateComicForm
templates = Jinja2Templates(directory="src/templates") templates = Jinja2Templates(directory="src/templates")
router = APIRouter(include_in_schema=False, prefix="/comic") router = APIRouter(include_in_schema=True, prefix="/comic")
@router.get("/comics") @router.get("/comics")
def get_comics(db: SessionDep, request: Request, msg: str | None = None): def get_comics(db: SessionDep, request: Request, msg: str | None = None):
@@ -58,7 +58,7 @@ async def validate_comic(request: Request, db: SessionDep, comic_id: str, action
if form.is_valid(): if form.is_valid():
try: try:
comic = ComicSchema(**form.__dict__) comic = ComicSchema(**form.__dict__)
comic = update_comic(comic=comic, comic_id=comic_id, db=db) comic = update_comic(new_comic=comic, comic_id=comic_id, db=db)
return RedirectResponse(f"/comic/comics/{comic.id}", status_code=status.HTTP_303_SEE_OTHER) return RedirectResponse(f"/comic/comics/{comic.id}", status_code=status.HTTP_303_SEE_OTHER)
except Exception as e: except Exception as e:
print(e) print(e)
+1 -1
View File
@@ -10,7 +10,7 @@ class AddLinkForm:
async def load_data(self): async def load_data(self):
form = await self.request.form() form = await self.request.form()
self.url = form.get("url") self.url = str(form.get("url"))
def is_valid(self): def is_valid(self):
if not self.url or not (self.url.__contains__("http")): if not self.url or not (self.url.__contains__("http")):

Some files were not shown because too many files have changed in this diff Show More