38 Commits

Author SHA1 Message Date
tpeetz 0d5ee0dd63 implement REST API for adding MediaFileActors
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 11s
2026-06-14 01:58:57 +02:00
tpeetz b039ae97a9 add subproject kontor-rails
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 4s
2026-06-01 23:47:57 +02:00
tpeetz 71bd7641dc add checking dirs to cjeck_kontor.py
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 3s
2026-05-31 23:17:32 +02:00
tpeetz e70b3ab486 fix problem when deleting MediaFile with MediaActor relations
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 3s
2026-05-31 17:47:27 +02:00
tpeetz 6c4ff8bcad check for duplicate links
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 4s
2026-05-31 00:18:40 +02:00
Thomas Peetz c885f6cc02 add missing endpoints for creating items
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 4s
2026-05-26 22:43:04 +02:00
tpeetz 0f9c90b883 synchronize data between configured servers
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 4s
2026-05-23 20:32:04 +02:00
Thomas Peetz 8d684908e6 update cli scripts to use REST API
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 2s
2026-05-21 16:51:38 +02:00
Thomas Peetz 40b498ed2a complete loading single items
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 3s
2026-05-21 13:43:53 +02:00
tpeetz 6269b54ee8 check if more than one server is configured before syncing
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 9s
2026-05-20 21:43:17 +02:00
Thomas Peetz bbc00ae2cc add endpoints to get items by id
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 4s
2026-05-20 18:34:44 +02:00
Thomas Peetz 944420802f added all missing endpoints
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 3s
2026-05-20 14:08:10 +02:00
tpeetz d23d1f306a add missing comics endpoints
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 4s
2026-05-19 23:00:19 +02:00
tpeetz 6077f685e0 Remove obsolete endpoints (#89)
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 4s
Remove endpoints api/login/token and api/login/profile

---------

Co-authored-by: Thomas Peetz <thomas.peetz@cimt-ag.de>
Reviewed-on: #89
2026-05-19 17:52:30 +00:00
tpeetz f9f4a70a79 add mailaccount endpoint
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 3s
2026-05-18 22:01:20 +02:00
Thomas Peetz 71724ac800 add missing endpoints
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 5s
2026-05-18 16:45:56 +02:00
tpeetz 6dd8e12218 secure media endpoints
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 2s
2026-05-17 21:48:40 +02:00
tpeetz cd033f458d add missing endpoints
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 5s
2026-05-17 19:52:00 +02:00
tpeetz 1b58ec8e27 remove obsolete scripts 2026-05-17 01:08:50 +02:00
Thomas Peetz 85b5168445 fix typo in workflow demo.yaml
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 4s
2026-05-05 18:05:19 +02:00
Thomas Peetz e8763fe635 test gitea actions
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 13s
2026-05-05 18:02:08 +02:00
Thomas Peetz 69e58e6dfb Add environment variable for DB server 2026-05-04 15:21:18 +02:00
Thomas Peetz d8914627a3 move *.service files to subdirectory container and add service file for standalone operations in standalone 2026-05-04 15:02:41 +02:00
Thomas Peetz b256e20491 renamed systemd unit files 2026-04-27 07:08:41 +02:00
tpeetz a8df860752 Merge branch 'feature/44-evaluate-typst-for-documentation' into 'develop/0.3.0'
Resolve "Evaluate Typst for documentation"

Closes #44

See merge request tpeetz/kontor!42
2026-04-26 21:09:54 +02:00
tpeetz 4e88b52dc3 use separate files for chapters 2026-04-26 21:09:06 +02:00
tpeetz 9772c9f554 add subprojekt kontor-doc 2026-04-26 16:08:17 +02:00
tpeetz e81d473a69 Einrichtung Build 2026-04-23 23:14:26 +02:00
tpeetz aca9a1aa4a integrate kontor-angular 2026-04-19 20:05:50 +02:00
tpeetz 57129c2d37 integration von kontor-angular 2026-04-18 00:29:25 +02:00
tpeetz e09f7cddf3 rename Dockerfile to Containerfile and integrate kontor-angular in pod kontor 2026-04-17 00:18:50 +02:00
Thomas Peetz a68dd05794 update kontor-api to reflect api changes 2026-04-15 14:38:38 +02:00
Thomas Peetz 62e22c56c4 update usage of api for login by email or username 2026-04-15 11:32:18 +02:00
Thomas Peetz 7aa6a12044 improve konto-vue/Containerfile by explicitly stating images 2026-04-15 10:26:35 +02:00
tpeetz 39a0759b0b fix OAuth authentication 2026-04-12 20:08:42 +02:00
tpeetz e82f2825ae update systemd service files and add script to copy these files 2026-04-12 17:51:19 +02:00
tpeetz 74d29ae8ef Merge branch 'feature/43-typeerror-not-all-arguments-converted-during-string-formatting' into 'develop/0.3.0'
Resolve "TypeError: not all arguments converted during string formatting"

Closes #43

See merge request tpeetz/kontor!41
2026-04-12 17:14:46 +02:00
tpeetz 614c5f20aa Resolve "TypeError: not all arguments converted during string formatting" 2026-04-12 17:14:46 +02:00
305 changed files with 7884 additions and 2057 deletions
+19
View File
@@ -0,0 +1,19 @@
name: Gitea Actions Demo
run-name: ${{ gitea.actor }} ist testing out Gitea Actions
on: [push]
jobs:
Explore-Gitea-Actions:
runs-on: inky
steps:
- run: echo "The job was automatically triggered by a ${{ gitea.event_name }} event."
- run: echo "This job is now running on a ${{ runner.os }} server hosted by Gitea!"
- run: echo "The name of your branch is ${{ gitea.ref }} and your repository is ${{ gitea.repository }}."
- name: Check out repository code
uses: actions/checkout@v4
- run: echo "The ${{ gitea.repository }} repository has been cloned to the runner."
- run: echo "The workflow is now ready to test your code on the runner."
- name: List files in the directory
run: |
ls ${{ gitea.workspace }}
- run: echo "The job's status is ${{ job.status }}."
@@ -1,5 +1,5 @@
### STAGE 1: Build ###
FROM node:22.15-alpine AS build
FROM docker.io/node:22-alpine AS build
WORKDIR /app
COPY package.json package-lock.json ./
RUN npm install
@@ -7,8 +7,9 @@ COPY . .
RUN npm run build
### STAGE 2: Run ###
FROM nginx:1.17.1-alpine
FROM docker.io/library/nginx:stable-alpine
COPY nginx.conf /etc/nginx/conf.d/default.conf
COPY --from=build /app/dist/kontor-angular/browser /usr/share/nginx/html
EXPOSE 80
CMD ["nginx", "-g", "daemon off;"]
+1
View File
@@ -1,4 +1,5 @@
server {
listen 8800;
# Root-Verzeichnis für den Server setzen (wir kopieren unsere Anwendung hierher)
root /usr/share/nginx/html;
+236 -10
View File
@@ -1,12 +1,238 @@
from fastapi import APIRouter
"""
add router for different parts (like comics, tysc, media)
"""
from src.apis.version1 import comic, mediaactor, mediafile, mediaactorfile, tysc, admin, user
from fastapi import APIRouter, Depends
from src.apis.version1.admin import mailaccount
from src.apis.version1.comics import (
artist,
publisher,
comic,
issue,
worktype,
volume,
storyarc,
comicwork,
issuework,
)
from src.apis.version1.media import (
actor,
file,
mediaactorfile,
mediavideo,
mediaarticle,
)
from src.apis.version1.tysc import (
card,
cardset,
fieldposition,
player,
rooster,
sport,
team,
vendor,
)
from src.core.security import get_current_user_from_token
api_router = APIRouter(prefix="/api")
api_router.include_router(comic.router, prefix="/comics", tags=["comics"])
api_router.include_router(mediafile.router, prefix="/media", tags=["media"])
api_router.include_router(mediaactor.router, prefix="/media", tags=["media"])
api_router.include_router(mediaactorfile.router, prefix="/media", tags=["media"])
api_router.include_router(tysc.router, prefix="/tysc", tags=["tysc"])
api_router.include_router(admin.router, prefix="/login", tags=["login"])
api_router.include_router(user.router, prefix="/user", tags=["user"])
from src.apis.version1.user import assignment, permission, profile, token
from src.apis.version1.bookshelf import article, bookshelf_publisher, book, author, articleauthor, bookauthor
api_router = APIRouter(prefix="/api")
api_router.include_router(
comic.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
publisher.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
artist.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
issue.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
worktype.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
volume.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
storyarc.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
comicwork.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
issuework.router,
prefix="/comics",
tags=["comics"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
file.router,
prefix="/media",
tags=["media"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
mediavideo.router,
prefix="/media",
tags=["media"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
mediaarticle.router,
prefix="/media",
tags=["media"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
actor.router,
prefix="/media",
tags=["media"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
mediaactorfile.router,
prefix="/media",
tags=["media"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
sport.router,
prefix="/tysc",
tags=["tysc"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
player.router,
prefix="/tysc",
tags=["tysc"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
team.router,
prefix="/tysc",
tags=["tysc"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
fieldposition.router,
prefix="/tysc",
tags=["tysc"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
rooster.router,
prefix="/tysc",
tags=["tysc"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
vendor.router,
prefix="/tysc",
tags=["tysc"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
cardset.router,
prefix="/tysc",
tags=["tysc"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
card.router,
prefix="/tysc",
tags=["tysc"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
article.router,
prefix="/bookshelf",
tags=["bookshelf"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
bookshelf_publisher.router,
prefix="/bookshelf",
tags=["bookshelf"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
book.router,
prefix="/bookshelf",
tags=["bookshelf"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
author.router,
prefix="/bookshelf",
tags=["bookshelf"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
articleauthor.router,
prefix="/bookshelf",
tags=["bookshelf"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
bookauthor.router,
prefix="/bookshelf",
tags=["bookshelf"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
profile.router,
prefix="/user",
tags=["user"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
token.router,
prefix="/user",
tags=["user"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
permission.router,
prefix="/user",
tags=["user"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
assignment.router,
prefix="/user",
tags=["user"],
dependencies=[Depends(get_current_user_from_token)],
)
api_router.include_router(
mailaccount.router,
prefix="/admin",
tags=["admin"],
dependencies=[Depends(get_current_user_from_token)],
)
-52
View File
@@ -1,52 +0,0 @@
from datetime import timedelta
from typing import Annotated
from fastapi import APIRouter, Body, HTTPException, status, Depends, Response
from fastapi.security import OAuth2PasswordRequestForm
from src.core.config import settings
from src.core.security import create_access_token, authenticate_user, get_current_active_user
from src.db.models.admin import Profile
from src.schema.admin import Token, ProfileModel
from src.webapps.auth.forms import LoginForm
router = APIRouter()
@router.post("/token")
def login_for_access_token(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]) -> Token:
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email, "scope": " ".join(form_data.scopes)}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
# @router.post("/token-cookie", response_model=Token)
def login_for_token_cookie(response: Response, form_data: LoginForm = Depends()):
user = authenticate_user(form_data.username, form_data.password) # type: ignore
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email}, expires_delta=access_token_expires
)
response.set_cookie(
key="access_token", value=f"Bearer {access_token}", httponly=True
)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/profile", response_model=ProfileModel)
async def read_profile(current_user: Annotated[Profile, Depends(get_current_active_user)]):
return current_user
@@ -0,0 +1,74 @@
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, Response, status
from fastapi.security import OAuth2PasswordRequestForm
from src.core.config import settings
from src.core.log_conf import logger
from src.core.security import (
authenticate_user_by_email,
authenticate_user_by_username,
create_access_token,
)
from src.schema.admin.login import LoginRequest
from src.schema.admin.token import Token
from src.webapps.auth.forms import LoginForm
login_router = APIRouter()
@login_router.post(
"/login",
tags=["login"],
summary="Login and get token",
response_description="Return HTTP status code 200 (OK)",
status_code=status.HTTP_200_OK,
)
def login(request: LoginRequest) -> Token:
logger.info("login with %s", request.email)
user = authenticate_user_by_email(str(request.email), str(request.password))
scopes = ["admin", "read"]
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email, "scope": " ".join(scopes)},
expires_delta=access_token_expires,
)
return Token(access_token=access_token, token_type="bearer")
@login_router.post("/token", tags=["login"], summary="Login for access token")
# async def login_for_access_token(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]) -> Token:
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
) -> Token:
user = authenticate_user_by_username(form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.user_name, "scope": " ".join(form_data.scopes)},
expires_delta=access_token_expires,
)
return Token(access_token=access_token, token_type="bearer")
def login_for_token_cookie(response: Response, form_data: LoginForm = Depends()):
user = authenticate_user_by_email(str(form_data.username), str(form_data.password))
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email}, expires_delta=access_token_expires
)
response.set_cookie(
key="access_token", value=f"Bearer {access_token}", httponly=True
)
return {"access_token": access_token, "token_type": "bearer"}
@@ -0,0 +1,32 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.admin import MailAccount
from src.db.session import SessionDep
from src.schema.admin.mailaccount import MailAccountResponse, to_response
router = APIRouter()
@router.get("/mailaccounts", response_model=List[MailAccountResponse])
def get_all_mailaccounts(db: SessionDep) -> List[MailAccountResponse]:
"""
return all MailAccounts as JSON.
"""
results: List[MailAccountResponse] = []
mailaccounts = db.query(MailAccount).all()
for mailaccount in mailaccounts:
response = to_response(mailaccount)
results.append(response)
return results
@router.get("/mailaccounts/{mailaccount_id}", response_model=MailAccountResponse)
def get_mailaccount(mailaccount_id: str, db: SessionDep) -> MailAccountResponse:
"""
return MailAccounts by id.
"""
mailaccount = db.get(MailAccount, mailaccount_id)
if mailaccount is None:
raise HTTPException(status_code=409, detail="Mailaccount could not be found")
response = to_response(mailaccount)
return response
@@ -0,0 +1,20 @@
from typing import List
from fastapi import APIRouter
from src.db.models.bookshelf import Article
from src.db.session import SessionDep
from src.schema.bookshelf.article import ArticleResponse, to_response
router = APIRouter()
@router.get("/articles", response_model=List[ArticleResponse])
def get_all_artists(db: SessionDep) -> List[ArticleResponse]:
results: List[ArticleResponse] = []
articles = db.query(Article).all()
for article in articles:
response = to_response(article)
results.append(response)
return results
@@ -0,0 +1,19 @@
from typing import List
from fastapi import APIRouter
from src.db.models.bookshelf import ArticleAuthor
from src.db.session import SessionDep
from src.schema.bookshelf.articleauthor import ArticleAuthorResponse, to_response
router = APIRouter()
@router.get("/articleauthors", response_model=List[ArticleAuthorResponse])
def get_all_artists(db: SessionDep) -> List[ArticleAuthorResponse]:
results: List[ArticleAuthorResponse] = []
articleauthors = db.query(ArticleAuthor).all()
for articleauthor in articleauthors:
response = to_response(articleauthor)
results.append(response)
return results
@@ -0,0 +1,26 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.bookshelf import Author
from src.db.session import SessionDep
from src.schema.bookshelf.author import AuthorResponse, to_response
router = APIRouter()
@router.get("/authors", response_model=List[AuthorResponse])
def get_all_authors(db: SessionDep) -> List[AuthorResponse]:
results: List[AuthorResponse] = []
authors = db.query(Author).all()
for author in authors:
response = to_response(author)
results.append(response)
return results
@router.get("/authors/{author_id}", response_model=AuthorResponse)
def get_author(author_id: str, db: SessionDep) -> AuthorResponse:
author = db.get(Author, author_id)
if author is None:
raise HTTPException(status_code=404, detail="Author could not be found")
response = to_response(author)
return response
@@ -0,0 +1,19 @@
from typing import List
from fastapi import APIRouter
from src.db.models.bookshelf import Book
from src.db.session import SessionDep
from src.schema.bookshelf.book import BookResponse, to_response
router = APIRouter()
@router.get("/books", response_model=List[BookResponse])
def get_all_artists(db: SessionDep) -> List[BookResponse]:
results: List[BookResponse] = []
books = db.query(Book).all()
for book in books:
response = to_response(book)
results.append(response)
return results
@@ -0,0 +1,19 @@
from typing import List
from fastapi import APIRouter
from src.db.models.bookshelf import BookAuthor
from src.db.session import SessionDep
from src.schema.bookshelf.bookauthor import BookAuthorResponse, to_response
router = APIRouter()
@router.get("/bookauthors", response_model=List[BookAuthorResponse])
def get_all_artists(db: SessionDep) -> List[BookAuthorResponse]:
results: List[BookAuthorResponse] = []
bookauthors = db.query(BookAuthor).all()
for bookauthor in bookauthors:
response = to_response(bookauthor)
results.append(response)
return results
@@ -0,0 +1,19 @@
from typing import List
from fastapi import APIRouter
from src.db.models.bookshelf import BookshelfPublisher
from src.db.session import SessionDep
from src.schema.bookshelf.publisher import PublisherResponse, to_response
router = APIRouter()
@router.get("/publishers", response_model=List[PublisherResponse])
def get_all_artists(db: SessionDep) -> List[PublisherResponse]:
results: List[PublisherResponse] = []
publishers = db.query(BookshelfPublisher).all()
for publisher in publishers:
response = to_response(publisher)
results.append(response)
return results
-103
View File
@@ -1,103 +0,0 @@
from typing import List
from fastapi import APIRouter, HTTPException, status
from src.core.log_conf import logger
from src.db.models.comic import Artist, Comic, Issue, Publisher
from src.db.repository.comics.artist import get_artist_details
from src.db.repository.comics.comic import (
get_comic_details,
get_issue_details,
get_short_info,
list_comics,
)
from src.db.repository.comics.publisher import get_publisher_details
from src.db.session import SessionDep
from src.schema.comics.artist import ArtistCreation, ArtistResponse
from src.schema.comics.artist_details import ArtistDetailResponse
from src.schema.comics.comic import ComicResponse
from src.schema.comics.comic_details import ComicDetailsResponse
from src.schema.comics.issue_details import IssueDetailsResponse
from src.schema.comics.publisher import PublisherResponse
from src.schema.comics.publisher_details import PublisherDetailsResponse
router = APIRouter()
@router.get("/comics")
def get_all_comics(db: SessionDep) -> List[ComicResponse]: # type: ignore
results: List[ComicResponse] = []
comics = list_comics(db)
for comic in comics:
response = get_short_info(comic)
results.append(response)
return results
@router.get("/comics/{comic_id}", response_model=ComicDetailsResponse)
def get_comic(comic_id: str, db: SessionDep) -> ComicDetailsResponse: # type: ignore
comic = db.get(Comic, comic_id)
if comic is None:
raise HTTPException(status_code=404, detail="Comic could not be found")
logger.info(f"create ComicDetailsResponse for {comic}")
response: ComicDetailsResponse = get_comic_details(comic)
logger.info(f"ComicDetailsResponse: {response}")
return response
@router.get("/artists", response_model=List[ArtistResponse])
def get_all_artists(db: SessionDep) -> List[ArtistResponse]: # type: ignore
results: List[ArtistResponse] = []
artists = db.query(Artist).all()
for artist in artists:
results.append(ArtistResponse(id=artist.id, name=str(artist.name))) # type: ignore
return results
@router.get("/artists/{artist_id}", response_model=ArtistDetailResponse)
def get_artist(artist_id: str, db: SessionDep) -> ArtistDetailResponse: # type: ignore
artist = db.get(Artist, artist_id)
if artist is None:
raise HTTPException(status_code=404, detail="Artist could not be found")
response: ArtistDetailResponse = get_artist_details(artist)
return response
@router.post("/artists", status_code=status.HTTP_201_CREATED)
def add_artist(db: SessionDep, artist_creation: ArtistCreation) -> ArtistResponse: # type: ignore
artist: Artist = Artist()
setattr(artist, "name", artist_creation.name)
try:
db.add(artist)
db.commit()
except:
raise HTTPException(status_code=409, detail="Artist already added")
response = ArtistResponse(id=artist.id, name=str(artist.name))
return response
@router.get("/publishers", response_model=List[PublisherResponse])
def get_all_publishers(db: SessionDep) -> List[PublisherResponse]: # type: ignore
results: List[PublisherResponse] = []
publishers = db.query(Publisher).all()
for publisher in publishers:
results.append(PublisherResponse(id=publisher.id, name=str(publisher.name)))
return results
@router.get("/publishers/{publisher_id}", response_model=PublisherDetailsResponse)
def get_publisher(publisher_id: str, db: SessionDep) -> PublisherDetailsResponse: # type: ignore
publisher = db.get(Publisher, publisher_id)
if publisher is None:
raise HTTPException(status_code=404, detail="Publisher could not be found")
response: PublisherDetailsResponse = get_publisher_details(publisher)
return response
@router.get("/issues", response_model=List[IssueDetailsResponse])
def get_issues(db: SessionDep) -> List[IssueDetailsResponse]: # type: ignore
results: List[IssueDetailsResponse] = []
issues = db.query(Issue).all()
for issue in issues:
results.append(get_issue_details(issue))
return results
@@ -0,0 +1,53 @@
from typing import List
from fastapi import APIRouter, HTTPException, status
from src.db.models.comic import Artist
from src.db.repository.comics.artist import get_artist_details
from src.db.session import SessionDep
from src.schema.comics.artist import ArtistCreation, ArtistResponse, artist_to_response
from src.schema.comics.artist_details import ArtistDetailResponse
router = APIRouter()
@router.get("/artists", response_model=List[ArtistResponse])
def get_all_artists(db: SessionDep) -> List[ArtistResponse]:
results: List[ArtistResponse] = []
artists = db.query(Artist).all()
for artist in artists:
response = artist_to_response(artist)
results.append(response)
return results
@router.get("/artists/{artist_id}", response_model=ArtistResponse)
def get_artist(artist_id: str, db: SessionDep) -> ArtistResponse:
artist = db.get(Artist, artist_id)
if artist is None:
raise HTTPException(status_code=404, detail="Artist could not be found")
response: ArtistResponse = artist_to_response(artist)
return response
@router.get("/artists/details/{artist_id}", response_model=ArtistDetailResponse)
def get_artist_details_by_id(artist_id: str, db: SessionDep) -> ArtistDetailResponse:
artist = db.get(Artist, artist_id)
if artist is None:
raise HTTPException(status_code=404, detail="Artist could not be found")
response: ArtistDetailResponse = get_artist_details(artist)
return response
@router.post("/artists", status_code=status.HTTP_201_CREATED)
def add_artist(db: SessionDep, artist_creation: ArtistCreation) -> ArtistResponse:
artist: Artist = Artist()
setattr(artist, "name", artist_creation.name)
try:
db.add(artist)
db.commit()
except:
raise HTTPException(status_code=409, detail="Artist already added")
response = artist_to_response(artist)
return response
@@ -0,0 +1,44 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.core.log_conf import logger
from src.db.models.comic import Comic
from src.db.repository.comics.comic import (
get_comic_details,
)
from src.db.session import SessionDep
from src.schema.comics.comic import ComicResponse, comic_to_response
from src.schema.comics.comic_details import ComicDetailsResponse
router = APIRouter()
@router.get("/comics")
def get_all_comics(db: SessionDep) -> List[ComicResponse]:
results: List[ComicResponse] = []
comics = db.query(Comic).all()
for comic in comics:
response = comic_to_response(comic)
results.append(response)
return results
@router.get("/comics/{comic_id}", response_model=ComicResponse)
def get_comic(comic_id: str, db: SessionDep) -> ComicResponse:
comic = db.get(Comic, comic_id)
if comic is None:
raise HTTPException(status_code=404, detail="Comic could not be found")
response: ComicResponse = comic_to_response(comic)
return response
@router.get("/comics/details/{comic_id}", response_model=ComicDetailsResponse)
def get_comic_details_by_id(comic_id: str, db: SessionDep) -> ComicDetailsResponse:
comic = db.get(Comic, comic_id)
if comic is None:
raise HTTPException(status_code=404, detail="Comic could not be found")
logger.info(f"create ComicDetailsResponse for {comic}")
response: ComicDetailsResponse = get_comic_details(comic)
logger.info(f"ComicDetailsResponse: {response}")
return response
@@ -0,0 +1,27 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.comic import ComicWork
from src.db.session import SessionDep
from src.schema.comics.comicwork import ComicWorkResponse, comicwork_to_response
router = APIRouter()
@router.get("/comicworks", response_model=List[ComicWorkResponse])
def get_comicworks(db: SessionDep) -> List[ComicWorkResponse]:
results: List[ComicWorkResponse] = []
worktypes = db.query(ComicWork).all()
for worktype in worktypes:
response = comicwork_to_response(worktype)
results.append(response)
return results
@router.get("/comicworks/{comicwork_id}", response_model=ComicWorkResponse)
def get_comicwork(comicwork_id: str, db: SessionDep) -> ComicWorkResponse:
worktype = db.get(ComicWork, comicwork_id)
if worktype is None:
raise HTTPException(status_code=404, detail="Comicwork could not be found")
response = comicwork_to_response(worktype)
return response
@@ -0,0 +1,27 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.comic import Issue
from src.db.session import SessionDep
from src.schema.comics.issue import IssueResponse, issue_to_response
router = APIRouter()
@router.get("/issues", response_model=List[IssueResponse])
def get_issues(db: SessionDep) -> List[IssueResponse]:
results: List[IssueResponse] = []
issues = db.query(Issue).all()
for issue in issues:
response = issue_to_response(issue)
results.append(response)
return results
@router.get("/issues/{issue_id}", response_model=IssueResponse)
def get_issue(issue_id: str, db: SessionDep) -> IssueResponse:
issue = db.get(Issue, issue_id)
if issue is None:
raise HTTPException(status_code=404, detail="Issue could not be found")
response = issue_to_response(issue)
return response
@@ -0,0 +1,27 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.comic import IssueWork
from src.db.session import SessionDep
from src.schema.comics.issuework import IssueWorkResponse, issuework_to_response
router = APIRouter()
@router.get("/issueworks", response_model=List[IssueWorkResponse])
def get_issueworks(db: SessionDep) -> List[IssueWorkResponse]:
results: List[IssueWorkResponse] = []
worktypes = db.query(IssueWork).all()
for worktype in worktypes:
response = issuework_to_response(worktype)
results.append(response)
return results
@router.get("/issueworks/{issuework_id}", response_model=IssueWorkResponse)
def get_issuework(issuework_id: str, db: SessionDep) -> IssueWorkResponse:
worktype = db.get(IssueWork, issuework_id)
if worktype is None:
raise HTTPException(status_code=404, detail="Issuework could not be found")
response = issuework_to_response(worktype)
return response
@@ -0,0 +1,37 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.comic import Publisher
from src.db.repository.comics.publisher import get_publisher_details
from src.db.session import SessionDep
from src.schema.comics.publisher import PublisherResponse, publisher_to_response
from src.schema.comics.publisher_details import PublisherDetailsResponse
router = APIRouter()
@router.get("/publishers", response_model=List[PublisherResponse])
def get_all_publishers(db: SessionDep) -> List[PublisherResponse]:
results: List[PublisherResponse] = []
publishers = db.query(Publisher).all()
for publisher in publishers:
response: PublisherResponse = publisher_to_response(publisher)
results.append(response)
return results
@router.get("/publishers/{publisher_id}", response_model=PublisherResponse)
def get_publisher(publisher_id: str, db: SessionDep) -> PublisherResponse:
publisher = db.get(Publisher, publisher_id)
if publisher is None:
raise HTTPException(status_code=404, detail="Publisher could not be found")
response: PublisherResponse = publisher_to_response(publisher)
return response
@router.get("/publishers/details/{publisher_id}", response_model=PublisherDetailsResponse)
def get_publisher_details_by_id(publisher_id: str, db: SessionDep) -> PublisherDetailsResponse:
publisher = db.get(Publisher, publisher_id)
if publisher is None:
raise HTTPException(status_code=404, detail="Publisher could not be found")
response: PublisherDetailsResponse = get_publisher_details(publisher)
return response
@@ -0,0 +1,26 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.comic import StoryArc
from src.db.session import SessionDep
from src.schema.comics.storyarc import StoryArcResponse, storyarc_to_response
router = APIRouter()
@router.get("/storyarcs", response_model=List[StoryArcResponse])
def get_storyarcs(db: SessionDep) -> List[StoryArcResponse]:
results: List[StoryArcResponse] = []
storyarcs = db.query(StoryArc).all()
for storyarc in storyarcs:
response = storyarc_to_response(storyarc)
results.append(response)
return results
@router.get("/storyarcs/{storyarc_id}", response_model=StoryArcResponse)
def get_storyarc(story_arc_id: str, db: SessionDep) -> StoryArcResponse:
storyarc = db.get(StoryArc, story_arc_id)
if storyarc is None:
raise HTTPException(status_code=404, detail="Storyarc could not be found")
response = storyarc_to_response(storyarc)
return response
@@ -0,0 +1,26 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.comic import Volume
from src.db.session import SessionDep
from src.schema.comics.volume import VolumeResponse, volume_to_response
router = APIRouter()
@router.get("/volumes", response_model=List[VolumeResponse])
def volumes(db: SessionDep) -> List[VolumeResponse]:
results: List[VolumeResponse] = []
worktypes = db.query(Volume).all()
for worktype in worktypes:
response = volume_to_response(worktype)
results.append(response)
return results
@router.get("/volumes/{volume_id}", response_model=VolumeResponse)
def get_volume(volume_id: str, db: SessionDep) -> VolumeResponse:
worktype = db.get(Volume, volume_id)
if worktype is None:
raise HTTPException(status_code=404, detail="Volume could not be found")
response = volume_to_response(worktype)
return response
@@ -0,0 +1,27 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.comic import WorkType
from src.db.session import SessionDep
from src.schema.comics.worktype import WorktypeResponse, worktype_to_response
router = APIRouter()
@router.get("/worktypes", response_model=List[WorktypeResponse])
def get_issues(db: SessionDep) -> List[WorktypeResponse]:
results: List[WorktypeResponse] = []
worktypes = db.query(WorkType).all()
for worktype in worktypes:
response = worktype_to_response(worktype)
results.append(response)
return results
@router.get("/worktypes/{worktype_id}", response_model=WorktypeResponse)
def get_issue(worktype_id: str, db: SessionDep) -> WorktypeResponse:
worktype = db.get(WorkType, worktype_id)
if worktype is None:
raise HTTPException(status_code=404, detail="Worktype could not be found")
response = worktype_to_response(worktype)
return response
+1 -1
View File
@@ -1,6 +1,6 @@
from fastapi import APIRouter, status
from src.schema.admin import HealthCheck
from src.schema.admin.healthcheck import HealthCheck
health_router = APIRouter()
-41
View File
@@ -1,41 +0,0 @@
from datetime import timedelta
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel
from src.core.config import settings
from src.core.log_conf import logger
from src.core.security import authenticate_user, create_access_token
from src.schema.admin import Token
login_router = APIRouter()
class LoginRequest(BaseModel):
email: str | None = None
password: str | None = None
@login_router.post(
"/login",
tags=["login"],
summary="Login and get token",
response_description="Return HTTP status code 200 (OK)",
status_code=status.HTTP_200_OK,
)
def login(request: LoginRequest) -> Token:
logger.info(f"login with {request.email}")
user = authenticate_user(request.email, request.password)
scopes = ["admin", "read"]
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email, "scope": " ".join(scopes)},
expires_delta=access_token_expires,
)
return Token(access_token=access_token, token_type="bearer")
@@ -1,33 +1,33 @@
from typing import List
from fastapi import APIRouter, status, HTTPException
from sqlalchemy import select
from src.core.log_conf import logger
from src.db.repository.media import create_new_mediaactor, delete_mediaactor, get_actor_details
from src.db.repository.media.actor import delete_mediaactor, import_mediaactor
from src.db.session import SessionDep
from src.schema.media.actor import MediaActorModel, MediaActorResponse
from src.schema.media.actor import MediaActorModel, MediaActorResponse, actor_to_response
from src.db.models.media import MediaActor
router = APIRouter()
@router.get("/actors", response_model=list[MediaActorResponse])
# def get_all_files(db: SessionDep, review: bool = False, download: bool = False, current_user: Profile = Depends(get_current_user_from_token)) -> List[MediaFileResponse]:
def get_all_actors(db: SessionDep, review: bool = False, download: bool = False) -> list[MediaActorResponse]: # type: ignore
@router.get("/actors", response_model=List[MediaActorResponse])
def get_all_actors(db: SessionDep) -> List[MediaActorResponse]:
results: list[MediaActorResponse] = []
actors = db.scalars(select(MediaActor)).all()
actors = db.query(MediaActor).all()
for mediaactor in actors:
response = MediaActorResponse(id=mediaactor.id, name=str(mediaactor.name), url=str(mediaactor.url))
response = actor_to_response(mediaactor)
results.append(response)
return results
@router.get("/actors/{actor_id}", response_model=MediaActorResponse)
def get_actor(actor_id: str, db: SessionDep) -> MediaActorResponse: # type: ignore
def get_actor(actor_id: str, db: SessionDep) -> MediaActorResponse:
media_actor = db.get(MediaActor, actor_id)
if not media_actor:
raise HTTPException(status_code=404, detail="MediaActor could not be found")
response = get_actor_details(media_actor)
response = actor_to_response(media_actor)
return response
@router.delete("/actors/{actor_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_actor(actor_id: str, db: SessionDep): # type: ignore
def delete_actor(actor_id: str, db: SessionDep):
media_actor = db.get(MediaActor, actor_id)
if not media_actor:
raise HTTPException(status_code=404, detail="MediaActor could not be found")
@@ -35,11 +35,11 @@ def delete_actor(actor_id: str, db: SessionDep): # type: ignore
delete_mediaactor(db, media_actor.id)
@router.post("/actors", status_code=status.HTTP_201_CREATED)
def add_actor(new_actor: MediaActorModel, db: SessionDep) -> MediaActorResponse: # type: ignore
def add_actor(new_actor: MediaActorModel, db: SessionDep) -> MediaActorResponse:
logger.info(f"add actor {new_actor.url}")
try:
mediaActor: MediaActor = create_new_mediaactor(new_actor, db)
except:
raise HTTPException(status_code=409, detail="Link duplicate")
response = get_actor_details(mediaActor)
mediaActor: MediaActor = import_mediaactor(db, new_actor)
except Exception as exception:
raise HTTPException(status_code=409, detail=f"Link duplicate: {exception}")
response = actor_to_response(mediaActor)
return response
+163
View File
@@ -0,0 +1,163 @@
from typing import List
from fastapi import APIRouter, status, HTTPException
from src.core.log_conf import logger
from src.db.repository.media.actorfile import (
create_new_mediaactorfile,
delete_mediaactorfile,
)
from src.db.repository.media.file import delete_mediafile, import_mediafile
from src.db.session import SessionDep
from src.schema.media.actor import MediaActorResponse, actor_to_response
from src.schema.media.actorfile import MediaActorFileResponse, actorfile_to_response
from src.schema.media.file import (
MediaFileModel,
MediaFileResponse,
file_to_response,
file_to_model,
)
from src.db.models.media import MediaFile
router = APIRouter()
@router.get("/update-titles")
def update_titles(db: SessionDep) -> List[MediaFileResponse]:
"""
Update title for given MediaFile.
"""
results: list[MediaFileResponse] = []
files = db.query(MediaFile).filter(MediaFile.review.is_(True)).all()
for mediafile in files:
mediafile.update_title()
db.add(mediafile)
response = file_to_response(mediafile)
results.append(response)
db.commit()
return results
@router.get("/files", response_model=List[MediaFileResponse])
def get_all_files(
db: SessionDep, review: bool = False, download: bool = False
) -> List[MediaFileResponse]:
"""
Get all MediaFiles.
"""
results: List[MediaFileResponse] = []
files: List[MediaFile]
if review:
files = db.query(MediaFile).filter(MediaFile.review.is_(True)).all()
elif download:
files = db.query(MediaFile).filter(MediaFile.should_download.is_(True)).all()
else:
files = db.query(MediaFile).all()
for mediafile in files:
response = file_to_response(mediafile)
results.append(response)
return results
@router.get("/files/{file_id}", response_model=MediaFileResponse)
def get_file(file_id: str, db: SessionDep) -> MediaFileResponse:
"""
Get MediaFile with given id or return HTTPException.
"""
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
response = file_to_response(mediafile)
return response
@router.delete("/files/{file_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_file(file_id: str, db: SessionDep):
"""
Delete MediaFile by given id.
"""
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
logger.info("delete MediaFile: %s", file_id)
actor_files = mediafile.media_actor_files
logger.info("MediaActorFiles links %s", len(actor_files))
if len(actor_files) > 0:
logger.info("delete MediaActor relations first")
for actor_file in actor_files:
delete_mediaactorfile(db, actor_file.id)
delete_mediafile(db, mediafile.id)
@router.get("/files/{file_id}/actors", response_model=list[MediaActorResponse])
def get_file_actors(file_id: str, db: SessionDep) -> list[MediaActorResponse]:
"""
Get list of Actors for given MediaFile.
"""
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
actor_files = mediafile.media_actor_files
logger.info("already known actors: %s", actor_files)
results: list[MediaActorResponse] = []
for actor_file in actor_files:
response = actor_to_response(actor_file.media_actor)
results.append(response)
return results
@router.put("/files/{file_id}/actors", response_model=List[MediaActorFileResponse])
def update_file_actors(
file_id: str, db: SessionDep, actors: List[MediaActorResponse]
) -> List[MediaActorFileResponse]:
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
actor_files = mediafile.media_actor_files
logger.info("already known actors: %s", actor_files)
for actor in actors:
already_associated = False
for actor_file in actor_files:
if actor.id == actor_file.media_actor_id:
logger.info("alreay associated - do nothing")
already_associated = True
break
if not already_associated:
create_new_mediaactorfile(db, actor.id, mediafile.id)
db.refresh(mediafile)
actor_files = mediafile.media_actor_files
results: List[MediaActorFileResponse] = []
for actor_file in actor_files:
response = actorfile_to_response(actor_file)
results.append(response)
return results
@router.put("/files/{file_id}", response_model=MediaFileResponse)
def update_file(
file_id: str, db: SessionDep, info: MediaFileResponse
) -> MediaFileResponse:
"""
Update MediaFile with given id and data.
"""
media_file = db.get(MediaFile, file_id)
if not media_file:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
file_to_model(info, media_file)
db.add(media_file)
db.commit()
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be updated")
response = file_to_response(mediafile)
return response
@router.post("/files", status_code=status.HTTP_201_CREATED)
def add_file(new_file: MediaFileModel, db: SessionDep) -> MediaFileResponse:
logger.info("add mediafile %s", new_file)
try:
mediaFile: MediaFile = import_mediafile(db, new_file)
except:
raise HTTPException(status_code=409, detail="MediaFile duplicate")
response = file_to_response(mediaFile)
return response
@@ -1,33 +1,43 @@
from typing import List
from fastapi import APIRouter, status, HTTPException
from sqlalchemy import select
from src.core.log_conf import logger
from src.db.models.media import MediaActorFile
from src.db.repository.media import delete_mediaactorfile, get_actorfile_details
from src.db.repository.media.actorfile import delete_mediaactorfile, import_mediaactorfile
from src.db.session import SessionDep
from src.schema.media.actorfile import MediaActorFileResponse
from src.schema.media.actorfile import MediaActorFileModel, MediaActorFileResponse, actorfile_to_response
router = APIRouter()
@router.get("/actorfiles", response_model=List[MediaActorFileResponse])
def get_all_actorfiles(db: SessionDep) -> List[MediaActorFileResponse]: # type: ignore
def get_all_actorfiles(db: SessionDep) -> List[MediaActorFileResponse]:
results: List[MediaActorFileResponse] = []
actorfiles = db.scalars(select(MediaActorFile)).all()
actorfiles = db.query(MediaActorFile).all()
for media_actorfile in actorfiles:
response = get_actorfile_details(media_actorfile)
response = actorfile_to_response(media_actorfile)
results.append(response)
return results
@router.get("/actorfiles/{actorfile_id}", response_model=MediaActorFileResponse)
def get_actorfile(actorfile_id: str, db: SessionDep) -> MediaActorFileResponse: # type: ignore
def get_actorfile(actorfile_id: str, db: SessionDep) -> MediaActorFileResponse:
media_actorfile = db.get(MediaActorFile, actorfile_id)
if not media_actorfile:
raise HTTPException(status_code=404, detail="MediaActor could not be found")
response = get_actorfile_details(media_actorfile)
response = actorfile_to_response(media_actorfile)
return response
@router.delete("/actorfiles/{actorfile_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_actorfile(actorfile_id: str, db: SessionDep): # type: ignore
def delete_actorfile(actorfile_id: str, db: SessionDep):
media_actorfile = db.get(MediaActorFile, actorfile_id)
if not media_actorfile:
raise HTTPException(status_code=404, detail="MediaActor could not be found")
delete_mediaactorfile(db, media_actorfile.id)
@router.post("/actorfiles", status_code=status.HTTP_201_CREATED)
def add_actorfile(new_actorfile: MediaActorFileModel, db: SessionDep) -> MediaActorFileResponse:
logger.info("add actorfile %s - %s", new_actorfile.media_actor_id, new_actorfile.media_file_id)
try:
mediaActorFile: MediaActorFile = import_mediaactorfile(db, new_actorfile)
except Exception as exception:
raise HTTPException(status_code=409, detail=f"Link duplicate: {exception}")
response = actorfile_to_response(mediaActorFile)
return response
@@ -0,0 +1,25 @@
from typing import List
from fastapi import APIRouter
from src.db.models.media import MediaArticle
from src.db.session import SessionDep
from src.schema.media.article import MediaArticleResponse, to_response
router = APIRouter()
@router.get("/articles", response_model=List[MediaArticleResponse])
def get_all_files(
db: SessionDep, review: bool = False, download: bool = False
) -> List[MediaArticleResponse]:
"""
Get all MediaVideos.
"""
results: List[MediaArticleResponse] = []
articles: List[MediaArticle]
articles = db.query(MediaArticle).all()
for article in articles:
response = to_response(article)
results.append(response)
return results
@@ -0,0 +1,42 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.media import MediaVideo
from src.db.session import SessionDep
from src.schema.media.video import MediaVideoResponse, video_to_response
router = APIRouter()
@router.get("/videos", response_model=List[MediaVideoResponse])
def get_all_videos(
db: SessionDep, review: bool = False, download: bool = False
) -> List[MediaVideoResponse]:
"""
Get all MediaVideos.
"""
results: List[MediaVideoResponse] = []
files: List[MediaVideo]
if review:
files = db.query(MediaVideo).filter(MediaVideo.review.is_(True)).all()
elif download:
files = db.query(MediaVideo).filter(MediaVideo.should_download.is_(True)).all()
else:
files = db.query(MediaVideo).all()
for mediafile in files:
response = video_to_response(mediafile)
results.append(response)
return results
@router.get("/videos/{video_id}", response_model=MediaVideoResponse)
def get_video(video_id: str, db: SessionDep) -> MediaVideoResponse:
"""
Get MediaVideo by id.
"""
video = db.get(MediaVideo, video_id)
if video is None:
raise HTTPException(status_code=404, detail="MediaVideo could not be found")
response = video_to_response(video)
return response
-120
View File
@@ -1,120 +0,0 @@
from fastapi import APIRouter, status, HTTPException, Depends
from sqlalchemy import select, Sequence
from src.core.log_conf import logger
from src.db.repository.media import create_new_mediaactorfile, create_new_mediafile, delete_mediafile
from src.db.session import SessionDep
from src.schema.media.actor import MediaActorResponse
from src.schema.media.actorfile import MediaActorFileResponse
from src.schema.media.file import MediaFileResponse, Link, get_file_details, set_file
from src.db.models.media import MediaFile
router = APIRouter()
@router.get("/update-titles")
def update_titles(db: SessionDep) -> list[MediaFileResponse]: # type: ignore
results: list[MediaFileResponse] = []
files = db.query(MediaFile).filter(MediaFile.review == True).all()
for mediafile in files:
mediafile.update_title()
db.add(mediafile)
response = get_file_details(mediafile)
results.append(response)
db.commit()
return results
@router.get("/files", response_model=list[MediaFileResponse])
# def get_all_files(db: SessionDep, review: bool = False, download: bool = False, current_user: Profile = Depends(get_current_user_from_token)) -> List[MediaFileResponse]:
def get_all_files(db: SessionDep, review: bool = False, download: bool = False) -> list[MediaFileResponse]: # type: ignore
results: list[MediaFileResponse] = []
files: Sequence[MediaFile]
if review:
files = db.query(MediaFile).filter(MediaFile.review == True).all() # type: ignore
elif download:
files = db.query(MediaFile).filter(MediaFile.should_download == True).all() # type: ignore
else:
files = db.scalars(select(MediaFile)).all() # type: ignore
for mediafile in files: # type: ignore
response = get_file_details(mediafile)
results.append(response)
return results
@router.get("/files/{file_id}", response_model=MediaFileResponse)
def get_file(file_id: str, db: SessionDep) -> MediaFileResponse: # type: ignore
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
response = get_file_details(mediafile)
return response
@router.delete("/files/{file_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_file(file_id: str, db: SessionDep): # type: ignore
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
logger.info(f"delete MediaFile: {file_id}")
actor_files = mediafile.media_actor_files
logger.info(f"MediaActorFiles links {len(actor_files)}")
if len(actor_files) == 0:
delete_mediafile(db, mediafile.id)
@router.get("/files/{file_id}/actors", response_model=list[MediaActorResponse])
def get_file_actors(file_id: str, db: SessionDep) -> list[MediaActorResponse]: # type: ignore
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
actor_files = mediafile.media_actor_files
logger.info(f"already known actors: {actor_files}")
results: list[MediaActorResponse] = []
for actor_file in actor_files:
response = MediaActorResponse(id=actor_file.media_actor.id, name=actor_file.media_actor.name, url=actor_file.media_actor.url)
results.append(response)
return results
@router.put("/files/{file_id}/actors", response_model=list[MediaActorFileResponse])
def update_file_actors(file_id: str, db: SessionDep, actors: list[MediaActorResponse]) -> list[MediaActorFileResponse]: # type: ignore
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
actor_files = mediafile.media_actor_files
logger.info(f"already known actors: {actor_files}")
for actor in actors:
already_associated = False
for actor_file in actor_files:
if actor.id == actor_file.media_actor_id:
logger.info("alreay associated - do nothing")
already_associated = True
break
if not already_associated:
create_new_mediaactorfile(db, actor.id, mediafile.id)
db.refresh(mediafile)
actor_files = mediafile.media_actor_files
results: list[MediaActorFileResponse] = []
for actor_file in actor_files:
response = MediaActorFileResponse(id=actor_file.id, actor_id=actor_file.media_actor_id, file_id=actor_file.media_file_id)
results.append(response)
return results
@router.put("/files/{file_id}", response_model=MediaFileResponse)
def update_file(file_id: str, db: SessionDep, info: MediaFileResponse) -> MediaFileResponse: # type: ignore
mediaFile = db.get(MediaFile, file_id)
if not mediaFile:
raise HTTPException(status_code=404, detail="MediaFile could not be found")
set_file(info, mediaFile)
db.add(mediaFile)
db.commit()
mediafile = db.get(MediaFile, file_id)
if not mediafile:
raise HTTPException(status_code=404, detail="MediaFile could not be updated")
response = get_file_details(mediafile)
return response
@router.post("/files", status_code=status.HTTP_201_CREATED)
def add_file(new_link: Link, db: SessionDep) -> MediaFileResponse: # type: ignore
logger.info(f"add url {new_link.url}")
try:
mediaFile: MediaFile = create_new_mediafile(new_link.url, db)
except:
raise HTTPException(status_code=409, detail="Link duplicate")
response = get_file_details(mediaFile)
return response
-16
View File
@@ -1,16 +0,0 @@
from typing import List
from fastapi import APIRouter
from src.db.session import SessionDep
from src.schema.tysc.sport import SportResponse
from src.db.models.tysc import Sport
router = APIRouter()
@router.get("/sports")
def get_all_sports(db: SessionDep) -> List[SportResponse]:
results: list[SportResponse] = []
sports = db.query(Sport).all()
for sport in sports:
results.append(SportResponse(id=sport.id, name=sport.name))
return results
+27
View File
@@ -0,0 +1,27 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import Card
from src.db.session import SessionDep
from src.schema.tysc.card import CardResponse, to_response
router = APIRouter()
@router.get("/cards")
def get_all_cards(db: SessionDep) -> List[CardResponse]:
results: List[CardResponse] = []
cards = db.query(Card).all()
for card in cards:
response = to_response(card)
results.append(response)
return results
@router.get("/cards/{card_id}", response_model=CardResponse)
def get_card(card_id: str, db: SessionDep) -> CardResponse:
card = db.get(Card, card_id)
if card is None:
raise HTTPException(status_code=404, detail="Card could not be found")
response = to_response(card)
return response
@@ -0,0 +1,27 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import CardSet
from src.db.session import SessionDep
from src.schema.tysc.cardset import CardSetResponse, to_response
router = APIRouter()
@router.get("/cardsets")
def get_all_cardsets(db: SessionDep) -> List[CardSetResponse]:
results: List[CardSetResponse] = []
cardsets = db.query(CardSet).all()
for cardset in cardsets:
response = to_response(cardset)
results.append(response)
return results
@router.get("/cardsets/{cardset_id}", response_model=CardSetResponse)
def get_cardset(cardset_id: str, db: SessionDep) -> CardSetResponse:
cardset = db.get(CardSet, cardset_id)
if cardset is None:
raise HTTPException(status_code=404, detail="Cardset could not be found")
response = to_response(cardset)
return response
@@ -0,0 +1,27 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import FieldPosition
from src.db.session import SessionDep
from src.schema.tysc.fieldposition import FieldPositionResponse, to_response
router = APIRouter()
@router.get("/positions")
def get_all_positions(db: SessionDep) -> List[FieldPositionResponse]:
results: list[FieldPositionResponse] = []
positions = db.query(FieldPosition).all()
for position in positions:
response = to_response(position)
results.append(response)
return results
@router.get("/positions/{position_id}", response_model=FieldPositionResponse)
def get_position(position_id: str, db: SessionDep) -> FieldPositionResponse:
position = db.get(FieldPosition, position_id)
if position is None:
raise HTTPException(status_code=404, detail="Fieldposition could not be found")
response = to_response(position)
return response
@@ -0,0 +1,27 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import Player
from src.db.session import SessionDep
from src.schema.tysc.player import PlayerResponse, to_response
router = APIRouter()
@router.get("/players")
def get_all_players(db: SessionDep) -> List[PlayerResponse]:
results: List[PlayerResponse] = []
players = db.query(Player).all()
for player in players:
response = to_response(player)
results.append(response)
return results
@router.get("/players/{player_id}", response_model=PlayerResponse)
def get_player(player_id: str, db: SessionDep) -> PlayerResponse:
player = db.get(Player, player_id)
if player is None:
raise HTTPException(status_code=404, detail="Player could not be found")
response = to_response(player)
return response
@@ -0,0 +1,27 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import Rooster
from src.db.session import SessionDep
from src.schema.tysc.rooster import RoosterResponse, to_response
router = APIRouter()
@router.get("/roosters")
def get_all_roosters(db: SessionDep) -> List[RoosterResponse]:
results: list[RoosterResponse] = []
roosters = db.query(Rooster).all()
for rooster in roosters:
response = to_response(rooster)
results.append(response)
return results
@router.get("/roosters/{rooster_id}", response_model=RoosterResponse)
def get_rooster(rooster_id: str, db: SessionDep) -> RoosterResponse:
rooster = db.get(Rooster, rooster_id)
if rooster is None:
raise HTTPException(status_code=404, detail="Rooster could not be found")
response = to_response(rooster)
return response
@@ -0,0 +1,28 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.core.log_conf import logger
from src.db.session import SessionDep
from src.schema.tysc.sport import SportResponse, to_response
from src.db.models.tysc import Sport
router = APIRouter()
@router.get("/sports")
def get_all_sports(db: SessionDep) -> List[SportResponse]:
results: list[SportResponse] = []
sports = db.query(Sport).all()
for sport in sports:
response = to_response(sport)
results.append(response)
return results
@router.get("/sports/{sport_id}", response_model=SportResponse)
def get_sport(sport_id: str, db: SessionDep) -> SportResponse:
sport = db.get(Sport, sport_id)
if sport is None:
raise HTTPException(status_code=404, detail="Sport could not be found")
logger.debug(f"create SportResponse for {sport}")
response: SportResponse = to_response(sport)
return response
+27
View File
@@ -0,0 +1,27 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import Team
from src.db.session import SessionDep
from src.schema.tysc.team import TeamResponse, to_response
router = APIRouter()
@router.get("/teams")
def get_all_teams(db: SessionDep) -> List[TeamResponse]:
results: list[TeamResponse] = []
teams = db.query(Team).all()
for team in teams:
response = to_response(team)
results.append(response)
return results
@router.get("/teams/{team_id}", response_model=TeamResponse)
def get_team(team_id: str, db: SessionDep) -> TeamResponse:
team = db.get(Team, team_id)
if team is None:
raise HTTPException(status_code=404, detail="Team could not be found")
response = to_response(team)
return response
@@ -0,0 +1,33 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import Vendor
from src.db.session import SessionDep
from src.schema.tysc.vendor import VendorResponse, to_response
router = APIRouter()
@router.get("/vendors")
def get_all_vendors(db: SessionDep) -> List[VendorResponse]:
"""
retrieve all vendors as json response.
"""
results: list[VendorResponse] = []
vendors = db.query(Vendor).all()
for vendor in vendors:
response = to_response(vendor)
results.append(response)
return results
@router.get("/vendors/{vendor_id}", response_model=VendorResponse)
def get_vendor(vendor_id: str, db: SessionDep) -> VendorResponse:
"""
retrieve vendor by id as json response.
"""
vendor = db.get(Vendor, vendor_id)
if vendor is None:
raise HTTPException(status_code=404, detail="Vendor could not be found")
response = to_response(vendor)
return response
@@ -0,0 +1,26 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.admin import Assignment
from src.db.session import SessionDep
from src.schema.user.assignment import AssignmentResponse, to_response
router = APIRouter()
@router.get("/assignments", response_model=List[AssignmentResponse])
def get_all_assignments(db: SessionDep) -> List[AssignmentResponse]:
results: List[AssignmentResponse] = []
assignments = db.query(Assignment).all()
for assignment in assignments:
response = to_response(assignment)
results.append(response)
return results
@router.get("/assignments/{assignment_id}", response_model=AssignmentResponse)
def get_assignment(assignment_id: str, db: SessionDep) -> AssignmentResponse:
assignment = db.get(Assignment, assignment_id)
if assignment is None:
raise HTTPException(status_code=404, detail="Assignment could not be found")
response = to_response(assignment)
return response
@@ -0,0 +1,26 @@
from typing import List
from fastapi import APIRouter, HTTPException
from src.db.models.admin import Permission
from src.db.session import SessionDep
from src.schema.user.permission import PermissionResponse, to_response
router = APIRouter()
@router.get("/permissions", response_model=List[PermissionResponse])
def get_all_permissions(db: SessionDep) -> List[PermissionResponse]:
results: List[PermissionResponse] = []
permissions = db.query(Permission).all()
for permission in permissions:
response = to_response(permission)
results.append(response)
return results
@router.get("/permissions/{permission_id}", response_model=PermissionResponse)
def get_permission(permission_id: str, db: SessionDep) -> PermissionResponse:
permission = db.get(Permission, permission_id)
if permission is None:
raise HTTPException(status_code=404, detail="Permission could not be found")
response = to_response(permission)
return response
@@ -3,29 +3,35 @@ from fastapi import APIRouter, HTTPException, status
from sqlalchemy import select
from src.core.log_conf import logger
from src.core.security import CurrentUser
from src.db.models.admin import Profile
from src.db.repository.user import create_new_profile, get_profile_details
from src.db.repository.user import create_new_profile
from src.db.session import SessionDep
from src.schema.user.profile import ProfileResponse, ProfileModel
from src.schema.user.profile import ProfileResponse, ProfileModel, to_response
router = APIRouter()
@router.get("/profile", response_model=ProfileModel)
async def read_profile(current_user: CurrentUser):
return current_user
@router.get("/profiles", response_model=List[ProfileResponse])
def get_all_profiles(db: SessionDep) -> List[ProfileResponse]: # type: ignore
def get_all_profiles(db: SessionDep) -> List[ProfileResponse]:
results: List[ProfileResponse] = []
profiles = db.scalars(select(Profile)).all()
for profile in profiles:
response = get_profile_details(profile)
response = to_response(profile)
results.append(response)
return results
@router.get("/profiles/{profile_id}", response_model=ProfileResponse)
def get_profile(profile_id: str, db: SessionDep) -> ProfileResponse: # type: ignore
def get_profile(profile_id: str, db: SessionDep) -> ProfileResponse:
profile = db.get(Profile, profile_id)
if not profile:
raise HTTPException(status_code=404, detail="MediaActor could not be found")
response = get_profile_details(profile)
raise HTTPException(status_code=404, detail="Profile could not be found")
response = to_response(profile)
return response
@router.delete("/profiles/{profile_id}", status_code=status.HTTP_204_NO_CONTENT)
@@ -37,11 +43,11 @@ def delete_profile(profile_id: str, db: SessionDep): # type: ignore
delete_profile(profile_id=profile_id, db=db)
@router.post("/profiles", status_code=status.HTTP_201_CREATED)
def add_profile(new_profile: ProfileModel, db: SessionDep) -> ProfileResponse: # type: ignore
logger.info(f"add profile {new_profile.user_name}")
def add_profile(new_profile: ProfileModel, db: SessionDep) -> ProfileResponse:
logger.info(f"add profile {new_profile.username}")
try:
profile: Profile = create_new_profile(new_profile, db)
except:
raise HTTPException(status_code=409, detail="Profile duplicate")
response = get_profile_details(profile)
response = to_response(profile)
return response
@@ -0,0 +1,19 @@
from typing import List
from fastapi import APIRouter
from src.db.models.admin import Token
from src.db.session import SessionDep
from src.schema.user.token import TokenResponse, to_response
router = APIRouter()
@router.get("/tokens", response_model=List[TokenResponse])
def get_all_profiles(db: SessionDep) -> List[TokenResponse]:
results: List[TokenResponse] = []
tokens = db.query(Token).all()
for token in tokens:
response = to_response(token)
results.append(response)
return results
+35
View File
@@ -0,0 +1,35 @@
import json
import time
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from src.core.log_conf import logger
class RequestLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next) -> Response:
start_time = time.time()
path = request.url.path
if path != "/health":
# Log request info
request_info ={
"method": request.method,
"path": path,
"client_ip": request.client.host if request.client else "unknown"
}
logger.info("Incoming: %s", json.dumps(request_info))
# Process request
response = await call_next(request)
if path != "/health":
# Log response info
duration_ms = (time.time()- start_time)*1000
response_info = {
"status_code": response.status_code,
"duration_ms": round(duration_ms, 2)
}
logger.info("completed: %s", json.dumps(response_info))
return response
+92 -55
View File
@@ -1,60 +1,74 @@
import logging
from datetime import datetime, timedelta, timezone
from typing import Annotated, Dict, List, Optional
from typing import Annotated, List, Optional
import bcrypt
from fastapi import Depends, HTTPException, Request, Security, status
from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
from fastapi.security import OAuth2, OAuth2PasswordBearer, SecurityScopes
from fastapi.security.utils import get_authorization_scheme_param
from fastapi import Depends, HTTPException, Security, status
#from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
from fastapi.security import (
#OAuth2,
OAuth2PasswordBearer,
SecurityScopes
)
#from fastapi.security.utils import get_authorization_scheme_param
from jose import JWTError, jwt
from pydantic import ValidationError
from src.core.config import settings
from src.core.log_conf import logger
from src.db.models.admin import Profile
from src.db.repository.admin import get_profile, is_database_empty
from src.db.repository.admin import (
get_profile_by_username,
get_profile_by_email,
is_database_empty,
)
from src.db.session import SessionLocal
from src.schema.admin import ProfileModel, TokenData
from src.schema.admin.token import TokenData
from src.schema.user.profile import ProfileModel, to_model
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="/api/login/token",
scopes={"me": "read", "admin": "read"},
tokenUrl="/token",
scopes={
"me": "read",
"admin": "read",
"ROLE_ADMIN": "admin",
"ROLE_MEDIA": "media",
"ROLE_USER": "user",
},
)
class OAuth2PasswordBearerWithCookie(OAuth2):
def __init__(
self,
tokenUrl: str,
scheme_name: Optional[str] = None,
scopes: Optional[Dict[str, str]] = None,
auto_error: bool = True,
):
if not scopes:
scopes = {}
flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes}) # type: ignore
super().__init__(flows=flows, scheme_name=scheme_name, auto_error=auto_error)
# class OAuth2PasswordBearerWithCookie(OAuth2):
# def __init__(
# self,
# tokenUrl: str,
# scheme_name: Optional[str] = None,
# scopes: Optional[Dict[str, str]] = None,
# auto_error: bool = True,
# ):
# if not scopes:
# scopes = {}
# flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes}) # type: ignore
# super().__init__(flows=flows, scheme_name=scheme_name, auto_error=auto_error)
async def __call__(self, request: Request) -> Optional[str]:
authorization: str = request.cookies.get("access_token") # changed to accept access token from httpOnly Cookie
scheme, param = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "bearer":
if self.auto_error:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
else:
return None
return param
# async def __call__(self, request: Request) -> Optional[str]:
# authorization: str = request.cookies.get("access_token") # changed to accept access token from httpOnly Cookie
# scheme, param = get_authorization_scheme_param(authorization)
# if not authorization or scheme.lower() != "bearer":
# if self.auto_error:
# raise HTTPException(
# status_code=status.HTTP_401_UNAUTHORIZED,
# detail="Not authenticated",
# headers={"WWW-Authenticate": "Bearer"},
# )
# else:
# return None
# return param
def authenticate_user(username: str, password: str) -> Optional[Profile]:
def authenticate_user_by_email(email: str, password: str) -> Optional[Profile]:
with SessionLocal() as db:
user = get_profile(username=username, db=db)
user = get_profile_by_email(email=email, db=db)
logger.debug(user)
if not user:
if is_database_empty(db):
@@ -65,7 +79,26 @@ def authenticate_user(username: str, password: str) -> Optional[Profile]:
return None
else:
if bcrypt.checkpw(password.encode(), user.password.encode()):
print("User successful authenticated")
logger.info("User successful authenticated")
else:
logger.info("Authentication failed!")
return user
def authenticate_user_by_username(username: str, password: str) -> Optional[Profile]:
with SessionLocal() as db:
user = get_profile_by_username(username=username, db=db)
logger.debug(user)
if not user:
if is_database_empty(db):
logger.info("database is empty, use temporary access")
user = Profile()
user.email = "init_user@thpeetz.de"
return user
return None
else:
if bcrypt.checkpw(password.encode(), user.password.encode()):
logger.info("User successful authenticated")
else:
logger.info("Authentication failed!")
return user
@@ -103,17 +136,19 @@ async def get_current_user(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
username: str = payload.get("sub") # type: ignore
logger.info("username/email extracted is ", username)
logger.info("username/email extracted is %s", username)
if username is None:
raise credentials_exception
scope: str = payload.get("scope", "")
token_scopes: List[str] = scope.split(" ")
token_data = TokenData(scopes=token_scopes, username=username)
except (JWTError, ValidationError):
logger.info("Exception raised", exc_info=True)
raise credentials_exception
with SessionLocal() as db:
user = get_profile(username=token_data.username, db=db) # type: ignore
user = get_profile_by_username(username=str(token_data.username), db=db)
if user is None:
logger.info("user not found")
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_scopes:
@@ -128,19 +163,14 @@ async def get_current_user(
async def get_current_active_user(
current_user: Annotated[Profile, Security(get_current_user, scopes=["me"])],
) -> ProfileModel:
if not current_user.enabled: # type: ignore
if not current_user.enabled:
raise HTTPException(status_code=400, detail="Inactive user")
user_model = ProfileModel(
username=current_user.user_name,
email=current_user.email, # type: ignore
first_name=current_user.first_name,
last_name=current_user.last_name, # type: ignore
active=current_user.enabled,
) # type: ignore
user_model = to_model(current_user)
return user_model
def get_current_user_from_token(token: str = Depends(oauth2_scheme)):
""" """
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
@@ -149,14 +179,21 @@ def get_current_user_from_token(token: str = Depends(oauth2_scheme)):
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
username: str = payload.get("sub") # type: ignore
logger.info("username/email extracted is ", username)
username: Optional[str] = payload.get("sub")
logger.info("username/email extracted is %s", username)
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
except JWTError as exception:
raise credentials_exception from exception
with SessionLocal() as db:
user = get_profile(username=username, db=db)
user = get_profile_by_email(email=username, db=db)
if user is None:
raise credentials_exception
user = get_profile_by_username(username=username, db=db)
if user is None:
raise credentials_exception
return user
UserDep = Annotated[Profile, Depends(get_current_user_from_token)]
CurrentUser = Annotated[Profile, Depends(get_current_active_user)]
+24 -22
View File
@@ -1,50 +1,52 @@
from typing import List
from sqlalchemy import Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from sqlalchemy.orm import Mapped, mapped_column, relationship
from src.db.models.base import Base, BaseMixin
class Article(Base, BaseMixin):
__tablename__ = 'article'
title = Column(String, unique=True)
title: Mapped[str] = mapped_column(unique=True)
article_authors = relationship("ArticleAuthor")
class Author(Base, BaseMixin):
__tablename__ = 'author'
first_name = Column(String)
last_name = Column(String)
article_authors = relationship("ArticleAuthor")
book_authors = relationship("BookAuthor")
first_name: Mapped[str]
last_name: Mapped[str]
article_authors: Mapped[List["ArticleAuthor"]] = relationship(back_populates="author")
book_authors: Mapped[List["BookAuthor"]] = relationship(back_populates="author")
class BookshelfPublisher(Base, BaseMixin):
__tablename__ = 'bookshelf_publisher'
name = Column(String, unique=True)
books = relationship("Book")
name: Mapped[str] = mapped_column(unique=True)
books: Mapped[List["Book"]] = relationship(back_populates="publisher")
class Book(Base, BaseMixin):
__tablename__ = 'book'
isbn = Column(String, unique=True)
title = Column(String)
year = Column(Integer, nullable=False)
publisher_id = Column(String, ForeignKey('bookshelf_publisher.id'), nullable=False)
publisher = relationship('BookshelfPublisher', back_populates="books")
book_authors = relationship("BookAuthor")
isbn: Mapped[str] = mapped_column(unique=True)
title: Mapped[str]
year: Mapped[int] = mapped_column(nullable=False)
publisher_id: Mapped[str] = mapped_column(ForeignKey("bookshelf_publisher.id"), nullable=False)
publisher: Mapped[BookshelfPublisher] = relationship(back_populates="books")
book_authors: Mapped[List["BookAuthor"]] = relationship(back_populates="book")
class ArticleAuthor(Base, BaseMixin):
__tablename__ = 'article_author'
article_id = Column(String, ForeignKey('article.id'), nullable=False)
article = relationship('Article', back_populates="article_authors")
author_id = Column(String, ForeignKey('author.id'), nullable=False)
author = relationship('Author', back_populates="article_authors")
article_id: Mapped[str] = mapped_column(ForeignKey("article.id"), nullable=False)
article: Mapped[Article] = relationship(back_populates="article_authors")
author_id: Mapped[str] = mapped_column(ForeignKey("author.id"), nullable=False)
author: Mapped[Author] = relationship(back_populates="article_authors")
class BookAuthor(Base, BaseMixin):
__tablename__ = 'book_author'
author_id = Column(String, ForeignKey('author.id'), nullable=False)
author = relationship('Author', back_populates="book_authors")
book_id = Column(String, ForeignKey('book.id'), nullable=False)
book = relationship('Book', back_populates="book_authors")
author_id: Mapped[str] = mapped_column(ForeignKey("author.id"), nullable=False)
author: Mapped[Author] = relationship(back_populates="book_authors")
book_id: Mapped[str] = mapped_column(ForeignKey("book.id"), nullable=False)
book: Mapped[Book] = relationship(back_populates="book_authors")
+39 -19
View File
@@ -14,51 +14,71 @@ from src.db.models.base import Base, BaseMixin, BaseVideoMixin
class MediaFile(Base, BaseMixin, BaseVideoMixin):
__tablename__ = 'media_file'
media_actor_files: Mapped[List["MediaActorFile"]] = relationship(back_populates="media_file")
"""
MediaFile represents video link.
"""
__tablename__ = "media_file"
media_actor_files: Mapped[List["MediaActorFile"]] = relationship(
back_populates="media_file"
)
def __repr__(self):
return f'MediaFile({self.id} {self.title} {self.title})'
return f"MediaFile({self.id} {self.title} {self.title})"
def __str__(self):
return f'{self.title}({self.id})'
return f"{self.title}({self.id})"
def update_title(self):
"""
Update title from url.
"""
class MediaActor(Base, BaseMixin):
__tablename__ = 'media_actor'
__tablename__ = "media_actor"
name: Mapped[str]
url: Mapped[Optional[str]] = mapped_column(unique=True)
media_actor_files = relationship("MediaActorFile")
def __repr__(self) -> str:
return f'MediaActor({self.id} {self.name} {self.url})'
return f"MediaActor({self.id} {self.name} {self.url})"
def __str__(self) -> str:
return f'{self.url}({self.id})'
return f"{self.url}({self.id})"
class MediaActorFile(Base, BaseMixin):
__tablename__ = 'media_actor_file'
media_actor_id: Mapped[str] = mapped_column(ForeignKey("media_actor.id"), nullable=False)
__tablename__ = "media_actor_file"
media_actor_id: Mapped[str] = mapped_column(
ForeignKey("media_actor.id"), nullable=False
)
media_actor: Mapped[MediaActor] = relationship(back_populates="media_actor_files")
media_file_id: Mapped[str] = mapped_column(ForeignKey("media_file.id"), nullable=True)
media_file_id: Mapped[str] = mapped_column(
ForeignKey("media_file.id"), nullable=True
)
media_file: Mapped[MediaFile] = relationship(back_populates="media_actor_files")
def __repr__(self):
return f'MediaActorFile({self.id} {self.media_actor_id} {self.media_file_id})'
return f"MediaActorFile({self.id} {self.media_actor_id} {self.media_file_id})"
def __str__(self) -> str:
return f'{self.id} {self.media_actor_id} {self.media_file_id}'
return f"{self.id} {self.media_actor_id} {self.media_file_id}"
class MediaArticle(Base, BaseMixin):
__tablename__ = 'media_article'
__tablename__ = "media_article"
review: Mapped[bool]
title: Mapped[str]
url: Mapped[str] = mapped_column(unique=True)
class MediaVideo(Base, BaseMixin):
__tablename__ = 'media_video'
"""
MediaFile represents video link.
"""
__tablename__ = "media_video"
cloud_link: Mapped[str]
file_name: Mapped[str]
path: Mapped[str]
@@ -68,10 +88,10 @@ class MediaVideo(Base, BaseMixin):
should_download: Mapped[bool]
def __repr__(self):
return f'MediaFile({self.id} {self.title} {self.url})'
return f"MediaFile({self.id} {self.title} {self.url})"
def __str__(self):
if self.title is None:
return f'{self.url}({self.id})'
return f"{self.url}({self.id})"
else:
return f'{self.title}({self.id})'
return f"{self.title}({self.id})"
+7 -3
View File
@@ -1,12 +1,16 @@
from typing import AnyStr, Optional
from typing import Optional
from sqlalchemy.orm import Session
from src.db.models.admin import Profile
def get_profile(username: AnyStr, db: Session) -> Optional[Profile]:
profile = db.query(Profile).filter(Profile.email == username).first()
def get_profile_by_username(username: str, db: Session) -> Optional[Profile]:
profile = db.query(Profile).filter(Profile.user_name == username).first()
return profile
def get_profile_by_email(email: str, db: Session) -> Optional[Profile]:
profile = db.query(Profile).filter(Profile.email == email).first()
return profile
def is_database_empty(db: Session) -> bool:
+17 -32
View File
@@ -4,29 +4,27 @@ from sqlalchemy.orm import Session
from src.core.log_conf import logger
from src.db.models.comic import Comic, Issue
from src.schema.comics.artist import ArtistResponse
from src.schema.comics.comic import ComicResponse, ComicSchema
from src.schema.comics.artist import artist_to_response
from src.schema.comics.comic import ComicSchema, comic_to_response
from src.schema.comics.comic_details import ComicDetailsResponse, ComicWorktypeArtistResponse
from src.schema.comics.issue import IssueResponse
from src.schema.comics.issue import IssueResponse, issue_to_response
from src.schema.comics.issue_details import IssueDetailsResponse
from src.schema.comics.publisher import PublisherResponse
from src.schema.comics.volume import VolumeResponse
from src.schema.comics.worktype import WorktypeResponse
def list_comics(db: Session) -> List[Comic]:
comics = db.query(Comic).all()
return comics
from src.schema.comics.publisher import publisher_to_response
from src.schema.comics.volume import VolumeResponse, volume_to_response
from src.schema.comics.worktype import worktype_to_response
def get_issue_details(issue: Issue) -> IssueDetailsResponse:
volume = None
if issue.volume:
volume = volume_to_response(issue.volume)
response = IssueDetailsResponse(
id=issue.id,
issue_number=str(issue.issue_number),
in_stock=bool(issue.in_stock),
is_read=bool(issue.is_read),
comic=ComicResponse(id=issue.comic.id, title=issue.comic.title, completed=issue.comic.completed),
volume=VolumeResponse(id=issue.volume.id, name=issue.volume.name)
comic=comic_to_response(issue.comic),
volume=volume
)
return response
@@ -36,39 +34,26 @@ def update_comic(new_comic: ComicSchema, comic_id: str, db: Session) -> Optional
comic: Optional[Comic] = db.get(Comic, comic_id)
return comic
def get_short_info(comic: Comic) -> ComicResponse:
response = ComicResponse(
id=comic.id,
title=str(comic.title),
completed=bool(comic.completed == 1)
)
return response
def get_comic_details(comic: Comic) -> ComicDetailsResponse:
volumes: List[VolumeResponse] = []
for volume in comic.volumes:
volumes.append(VolumeResponse(id=volume.id, name=volume.name))
volumes.append(volume_to_response(volume))
issues: List[IssueResponse] = []
for issue in comic.issues:
issues.append(IssueResponse(
id=issue.id,
issue_number=issue.issue_number,
in_stock=issue.in_stock,
is_read=issue.is_read
))
issues.append(issue_to_response(issue))
works: List[ComicWorktypeArtistResponse] = []
works_map: Dict[str, ComicWorktypeArtistResponse] = {}
for work in comic.comic_works:
worktype_id = work.work_type.id
if worktype_id in works_map:
artist = ArtistResponse(id=work.artist.id, name=work.artist.name)
artist = artist_to_response(work.artist)
works_map[worktype_id].artists.append(artist)
logger.info(f"add artist to response map: {artist} -> {works_map}")
print(f"add artist to response map: {artist} -> {works_map}")
else:
works_map[worktype_id] = ComicWorktypeArtistResponse(
worktype=WorktypeResponse(id=worktype_id, name=work.work_type.name),
artists=[ArtistResponse(id=work.artist.id, name=work.artist.name)]
worktype=worktype_to_response(work.work_type),
artists=[artist_to_response(work.artist)]
)
for value in works_map.values():
works.append(value)
@@ -79,7 +64,7 @@ def get_comic_details(comic: Comic) -> ComicDetailsResponse:
completed=bool(comic.completed),
current_order=bool(comic.current_order),
weblink=str(comic.weblink),
publisher=PublisherResponse(id=comic.publisher.id, name=comic.publisher.name),
publisher=publisher_to_response(comic.publisher),
issues=issues,
volumes=volumes,
works=works
-105
View File
@@ -1,105 +0,0 @@
from sqlalchemy.orm import Session
import uuid
from datetime import datetime
from src.core.log_conf import logger
from src.db.models.media import MediaActor, MediaActorFile, MediaFile, MediaVideo
from src.schema.media.actor import MediaActorModel, MediaActorResponse
from src.schema.media.actorfile import MediaActorFileResponse
from src.webapps.media.forms import AddLinkForm
def create_new_video(video: AddLinkForm, db: Session) -> MediaVideo:
print(video.url)
media_video = MediaVideo()
media_video.id = str(uuid.uuid4())
media_video.url = video.url
media_video.created_date = datetime.now()
media_video.last_modified_date = datetime.now()
media_video.review = True
media_video.should_download = True
db.add(media_video)
db.commit()
db.refresh(media_video)
print(media_video)
return media_video
def create_new_mediafile(link: str, db: Session) -> MediaFile:
logger.info("create MediaFile with url {link}")
media_file: MediaFile = MediaFile()
media_file.id = str(uuid.uuid4())
media_file.url = link
media_file.created_date = datetime.now()
media_file.last_modified_date = datetime.now()
media_file.version = 0
media_file.review = True
media_file.should_download = True
db.add(media_file)
db.commit()
db.refresh(media_file)
logger.info(f"created {media_file}")
return media_file
def delete_mediafile(db: Session, media_file_id: str):
logger.info(f"delete MediaFile with id {media_file_id}")
media_file = db.get(MediaFile, media_file_id)
db.delete(media_file)
db.commit()
def create_new_mediaactor(new_actor: MediaActorModel, db: Session) -> MediaActor:
logger.info(f"create MediaActor with url {new_actor.url}")
media_actor: MediaActor = MediaActor()
media_actor.id = str(uuid.uuid4())
if new_actor.name is not None:
media_actor.name = new_actor.name
media_actor.url = new_actor.url
media_actor.created_date = datetime.now()
media_actor.last_modified_date = datetime.now()
media_actor.version = 0
db.add(media_actor)
db.commit()
db.refresh(media_actor)
logger.info(f"created {media_actor}")
return media_actor
def delete_mediaactor(db: Session, actor_id: str):
logger.info(f"delete MediaActor with id {actor_id}")
media_actor = db.get(MediaActor, actor_id)
if media_actor is not None:
actor_files = media_actor.media_actor_files
for actor_file in actor_files:
delete_mediaactorfile(db, actorfile_id=actor_file.id)
db.refresh(media_actor)
db.delete(media_actor)
db.commit()
def get_actor_details(media_actor: MediaActor) -> MediaActorResponse:
reponse: MediaActorResponse = MediaActorResponse(id=media_actor.id, name=str(media_actor.name), url=str(media_actor.url))
return reponse
def create_new_mediaactorfile(db: Session, actor_id: str, file_id: str) -> MediaActorFile:
logger.info(f"create MediaActorFile with actor {actor_id} and file {file_id}")
media_actor_file: MediaActorFile = MediaActorFile()
media_actor_file.id = str(uuid.uuid4())
media_actor_file.created_date = datetime.now()
media_actor_file.last_modified_date = datetime.now()
media_actor_file.version = 0
media_actor_file.media_actor_id = actor_id
media_actor_file.media_file_id = file_id
db.add(media_actor_file)
db.commit()
db.refresh(media_actor_file)
return media_actor_file
def delete_mediaactorfile(db: Session, actorfile_id: str):
logger.info(f"delete MediaActorFile with id {actorfile_id}")
media_actorfile = db.get(MediaActorFile, actorfile_id)
db.delete(media_actorfile)
db.commit()
def get_actorfile_details(media_actorfile: MediaActorFile) -> MediaActorFileResponse:
response: MediaActorFileResponse = MediaActorFileResponse(
id=media_actorfile.id,
file_id=str(media_actorfile.media_file_id),
actor_id=str(media_actorfile.media_actor_id)
)
return response
@@ -0,0 +1,66 @@
from datetime import datetime
import uuid
from sqlalchemy.orm import Session
from src.core.log_conf import logger
from src.db.models.media import MediaActor
from src.db.repository.media.actorfile import delete_mediaactorfile
from src.schema.media.actor import MediaActorModel
def create_new_mediaactor(new_actor: MediaActorModel, db: Session) -> MediaActor:
logger.info(f"create MediaActor with url {new_actor.url}")
media_actor: MediaActor = MediaActor()
media_actor.id = str(uuid.uuid4())
if new_actor.name is not None:
media_actor.name = new_actor.name
media_actor.url = new_actor.url
media_actor.created_date = datetime.now()
media_actor.last_modified_date = datetime.now()
media_actor.version = 0
db.add(media_actor)
db.commit()
db.refresh(media_actor)
logger.info(f"created {media_actor}")
return media_actor
def delete_mediaactor(db: Session, actor_id: str):
logger.info(f"delete MediaActor with id {actor_id}")
media_actor = db.get(MediaActor, actor_id)
if media_actor is not None:
actor_files = media_actor.media_actor_files
for actor_file in actor_files:
delete_mediaactorfile(db, actorfile_id=actor_file.id)
db.refresh(media_actor)
db.delete(media_actor)
db.commit()
def import_mediaactor(db: Session, new_actor: MediaActorModel) -> MediaActor:
"""
import MediaFile and set missing values with default ones.
"""
logger.info("import MediaActor with %s", new_actor)
media_actor: MediaActor = MediaActor()
media_actor.id = new_actor.id
if new_actor.created_date:
media_actor.created_date = new_actor.created_date
else:
media_actor.created_date = datetime.now()
if new_actor.last_modified_date:
media_actor.last_modified_date = new_actor.last_modified_date
else:
media_actor.last_modified_date = datetime.now()
media_actor.version = new_actor.version
if new_actor.name:
media_actor.name = new_actor.name
else:
media_actor.name = ""
if new_actor.url:
media_actor.url = new_actor.url
else:
media_actor.url = ""
db.add(media_actor)
db.commit()
db.refresh(media_actor)
return media_actor
@@ -0,0 +1,64 @@
from datetime import datetime
import uuid
from sqlalchemy.orm import Session
from src.core.log_conf import logger
from src.db.models.media import MediaActorFile
from src.schema.media.actorfile import MediaActorFileModel
def create_new_mediaactorfile(
db: Session, actor_id: str, file_id: str
) -> MediaActorFile:
"""
Create relation for MediaFile and MediaActor
"""
logger.info("create MediaActorFile with actor %s and file %s", actor_id, file_id)
media_actor_file: MediaActorFile = MediaActorFile()
media_actor_file.id = str(uuid.uuid4())
media_actor_file.created_date = datetime.now()
media_actor_file.last_modified_date = datetime.now()
media_actor_file.version = 0
media_actor_file.media_actor_id = actor_id
media_actor_file.media_file_id = file_id
db.add(media_actor_file)
db.commit()
db.refresh(media_actor_file)
return media_actor_file
def delete_mediaactorfile(db: Session, actorfile_id: str):
"""
Delete relation between MediaFile and MediaActor.
"""
logger.info("delete MediaActorFile with id %s", actorfile_id)
media_actorfile = db.get(MediaActorFile, actorfile_id)
db.delete(media_actorfile)
db.commit()
def import_mediaactorfile(
db: Session, new_actorfile: MediaActorFileModel
) -> MediaActorFile:
"""
Import MediaFile and set missing values with default ones.
"""
logger.info("import MediaActorFile with %s", new_actorfile)
media_actor_file: MediaActorFile = MediaActorFile()
media_actor_file.id = new_actorfile.id
if new_actorfile.created_date:
media_actor_file.created_date = new_actorfile.created_date
else:
media_actor_file.created_date = datetime.now()
if new_actorfile.last_modified_date:
media_actor_file.last_modified_date = new_actorfile.last_modified_date
else:
media_actor_file.last_modified_date = datetime.now()
media_actor_file.version = new_actorfile.version
media_actor_file.media_actor_id = new_actorfile.media_actor_id
media_actor_file.media_file_id = new_actorfile.media_file_id
db.add(media_actor_file)
db.commit()
db.refresh(media_actor_file)
return media_actor_file
@@ -0,0 +1,78 @@
from datetime import datetime
import uuid
from sqlalchemy.orm import Session
from src.core.log_conf import logger
from src.db.models.media import MediaFile
from src.schema.media.file import MediaFileModel
def create_new_mediafile(link: str, db: Session) -> MediaFile:
"""
Create MediaFile with gievne URL.
"""
logger.info("create MediaFile with url {link}")
media_file: MediaFile = MediaFile()
media_file.id = str(uuid.uuid4())
media_file.url = link
media_file.created_date = datetime.now()
media_file.last_modified_date = datetime.now()
media_file.version = 0
media_file.review = True
media_file.should_download = True
db.add(media_file)
db.commit()
db.refresh(media_file)
logger.info("created %s", media_file)
return media_file
def delete_mediafile(db: Session, media_file_id: str):
"""
Delete MediaFile with given ID from db.
"""
logger.info("delete MediaFile with id %s", media_file_id)
media_file = db.get(MediaFile, media_file_id)
db.delete(media_file)
db.commit()
def import_mediafile(db: Session, new_file: MediaFileModel) -> MediaFile:
"""
import MediaActor and set missing values with defautl ones.
"""
logger.info("import MediaFile with %s", new_file)
media_file: MediaFile = MediaFile()
media_file.id = new_file.id
if new_file.created_date:
media_file.created_date = new_file.created_date
else:
media_file.created_date = datetime.now()
if new_file.last_modified_date:
media_file.last_modified_date = new_file.last_modified_date
else:
media_file.last_modified_date = datetime.now()
media_file.version = new_file.version
if new_file.title:
media_file.title = new_file.title
else:
media_file.title = ""
if new_file.file_name:
media_file.file_name = new_file.file_name
else:
media_file.file_name = ""
if new_file.cloud_link:
media_file.cloud_link = new_file.cloud_link
else:
media_file.cloud_link = ""
if new_file.url:
media_file.url = new_file.url
else:
media_file.url = ""
media_file.review = new_file.review
media_file.should_download = new_file.should_download
db.add(media_file)
db.commit()
db.refresh(media_file)
return media_file
@@ -0,0 +1,23 @@
from datetime import datetime
import uuid
from sqlalchemy.orm import Session
from src.db.models.media import MediaVideo
from src.webapps.media.forms import AddLinkForm
def create_new_video(video: AddLinkForm, db: Session) -> MediaVideo:
print(video.url)
media_video = MediaVideo()
media_video.id = str(uuid.uuid4())
media_video.url = str(video.url)
media_video.created_date = datetime.now()
media_video.last_modified_date = datetime.now()
media_video.review = True
media_video.should_download = True
db.add(media_video)
db.commit()
db.refresh(media_video)
print(media_video)
return media_video
+3 -11
View File
@@ -5,15 +5,14 @@ from src.core.log_conf import logger
from src.db.models.admin import Assignment, Profile
from sqlalchemy.orm import Session
from src.schema.user.profile import ProfileModel, ProfileResponse
from src.schema.user.profile import ProfileModel
def create_new_profile(new_profile: ProfileModel, db: Session) -> Profile:
logger.info(f"create MediaActor with url {new_profile.user_name}")
logger.info(f"create MediaActor with url {new_profile.username}")
profile: Profile = Profile()
profile.id = str(uuid.uuid4())
profile.user_name = new_profile.user_name
profile.user_name = new_profile.username
profile.first_name = new_profile.first_name
profile.last_name = new_profile.last_name
profile.created_date = datetime.now()
@@ -35,13 +34,6 @@ def delete_profile(db: Session, profile_id: str):
db.delete(profile)
db.commit()
def get_profile_details(profile: Profile) -> ProfileResponse:
reponse: ProfileResponse = ProfileResponse(
id=profile.id,
user_name=str(profile.user_name)
)
return reponse
def delete_assignment(db: Session, assignment_id: str) -> None:
logger.info(f"delete Assignment with id {assignment_id}")
assignment: Optional[Assignment] = db.get(Assignment, assignment_id)
+4 -2
View File
@@ -12,8 +12,10 @@ engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)
def get_db() -> Generator:
def get_db() -> Generator[Session, None, None]:
""" """
with SessionLocal() as db:
yield db
SessionDep: type[Session] = Annotated[Session, Depends(get_db)]
SessionDep = Annotated[Session, Depends(get_db)]
+7 -5
View File
@@ -6,9 +6,10 @@ from fastapi.staticfiles import StaticFiles
from src.apis.base import api_router
from src.apis.version1.healthcheck import health_router
from src.apis.version1.login import login_router
from src.apis.version1.admin.login import login_router
from src.core.config import settings
from src.core.log_conf import logger
from src.core.middleware import RequestLoggingMiddleware
from src.db.models.base import Base
from src.db.session import engine
from src.db.utils import check_db_connected, check_db_disconnected
@@ -23,10 +24,10 @@ async def lifespan(app: FastAPI):
def include_router(app: FastAPI):
app.include_router(api_router)
app.include_router(web_app_router)
app.include_router(health_router)
app.include_router(login_router)
app.include_router(api_router, tags=["api"])
app.include_router(web_app_router, tags=["webapp"])
app.include_router(health_router, tags=["admin"])
app.include_router(login_router, tags=["webapp"])
def configure_static(app: FastAPI):
@@ -41,6 +42,7 @@ def add_middle_ware(app: FastAPI):
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(RequestLoggingMiddleware)
def create_tables():
-27
View File
@@ -1,27 +0,0 @@
from typing import Optional, List
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
scopes: List[str] = []
class ProfileModel(BaseModel):
username: str
email: str
first_name: str
last_name: str
active: bool
class HealthCheck(BaseModel):
"""
Health check model
"""
status: str = "ok"
@@ -0,0 +1,9 @@
from pydantic import BaseModel
class HealthCheck(BaseModel):
"""
Health check model
"""
status: str = "ok"
+8
View File
@@ -0,0 +1,8 @@
from typing import Optional
from pydantic import BaseModel
class LoginRequest(BaseModel):
email: Optional[str] = None
password: Optional[str] = None
@@ -0,0 +1,34 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.admin import MailAccount
class MailAccountResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
host: str
port: int
protocol: str
user_name: str
password: str
start_tls: bool
def to_response(account: MailAccount) -> MailAccountResponse:
response: MailAccountResponse = MailAccountResponse(
id=account.id,
created_date=account.created_date,
last_modified_date=account.last_modified_date,
version=account.version,
host=account.host,
port=account.port,
protocol=account.protocol,
user_name=account.user_name,
password=account.password,
start_tls=account.start_tls,
)
return response
+13
View File
@@ -0,0 +1,13 @@
from typing import List, Optional
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
scopes: List[str] = []
@@ -0,0 +1,23 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.bookshelf import Article
class ArticleResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
title: str
def to_response(article: Article) -> ArticleResponse:
response: ArticleResponse = ArticleResponse(
id=article.id,
created_date=article.created_date,
last_modified_date=article.last_modified_date,
version=article.version,
title=article.title
)
return response
@@ -0,0 +1,25 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.bookshelf import ArticleAuthor
class ArticleAuthorResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
article_id: str
author_id: str
def to_response(articleauthor: ArticleAuthor) -> ArticleAuthorResponse:
response: ArticleAuthorResponse = ArticleAuthorResponse(
id=articleauthor.id,
created_date=articleauthor.created_date,
last_modified_date=articleauthor.last_modified_date,
version=articleauthor.version,
article_id=articleauthor.article_id,
author_id=articleauthor.author_id
)
return response
+25
View File
@@ -0,0 +1,25 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.bookshelf import Author
class AuthorResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
first_name: str
last_name: str
def to_response(author: Author) -> AuthorResponse:
response: AuthorResponse = AuthorResponse(
id=author.id,
created_date=author.created_date,
last_modified_date=author.last_modified_date,
version=author.version,
first_name=author.first_name,
last_name=author.last_name
)
return response
+29
View File
@@ -0,0 +1,29 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.bookshelf import Book
class BookResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
isbn: str
title: str
year: int
publisher_id: str
def to_response(book: Book) -> BookResponse:
response: BookResponse = BookResponse(
id=book.id,
created_date=book.created_date,
last_modified_date=book.last_modified_date,
version=book.version,
isbn=book.isbn,
title=book.title,
year=book.year,
publisher_id=book.publisher_id
)
return response
@@ -0,0 +1,25 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.bookshelf import BookAuthor
class BookAuthorResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
author_id: str
book_id: str
def to_response(bookauthor: BookAuthor) -> BookAuthorResponse:
response: BookAuthorResponse = BookAuthorResponse(
id=bookauthor.id,
created_date=bookauthor.created_date,
last_modified_date=bookauthor.last_modified_date,
version=bookauthor.version,
author_id=bookauthor.author_id,
book_id=bookauthor.book_id
)
return response
@@ -0,0 +1,23 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.bookshelf import BookshelfPublisher
class PublisherResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
def to_response(publisher: BookshelfPublisher) -> PublisherResponse:
response: PublisherResponse = PublisherResponse(
id=publisher.id,
created_date=publisher.created_date,
last_modified_date=publisher.last_modified_date,
version=publisher.version,
name=publisher.name
)
return response
+21
View File
@@ -1,5 +1,10 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
from src.db.models.comic import Artist
class ArtistCreation(BaseModel):
id: str
@@ -7,7 +12,23 @@ class ArtistCreation(BaseModel):
class ArtistResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
weblink: Optional[str]
def artist_to_response(artist: Artist) -> ArtistResponse:
response: ArtistResponse = ArtistResponse(
id=artist.id,
created_date=artist.created_date,
last_modified_date=artist.last_modified_date,
version=artist.version,
name=artist.name,
weblink=artist.weblink
)
return response
class AddArtist(BaseModel):
id: str
+32 -1
View File
@@ -1,15 +1,46 @@
"""
Schema definitions for Comics.
"""
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, AnyUrl
from src.core.log_conf import logger
from src.db.models.comic import Comic
class ComicResponse(BaseModel):
"""
Pydantic model for returning Comic objects.
"""
id: str
created_date: datetime
last_modified_date: datetime
version: int
title: str
publisher_id: str
current_order: bool
completed: bool
weblink: Optional[str]
def comic_to_response(comic: Comic) -> ComicResponse:
response: ComicResponse = ComicResponse(
id=comic.id,
created_date=comic.created_date,
last_modified_date=comic.last_modified_date,
version=comic.version,
title=comic.title,
publisher_id=comic.publisher_id,
current_order=comic.current_order,
completed=comic.completed,
weblink=comic.weblink
)
return response
class ComicSchema(BaseModel):
"""
Pydantic model for uploading Comic object.
"""
id: str
title: str
weblink: Optional[AnyUrl]
+32
View File
@@ -0,0 +1,32 @@
"""
Model definitions for ComicWork.
"""
from datetime import datetime
from pydantic import BaseModel
from src.db.models.comic import ComicWork
class ComicWorkResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
comic_id: str
artist_id: str
work_type_id: str
def comicwork_to_response(comicwork: ComicWork) -> ComicWorkResponse:
response: ComicWorkResponse = ComicWorkResponse(
id=comicwork.id,
created_date=comicwork.created_date,
last_modified_date=comicwork.last_modified_date,
version=comicwork.version,
comic_id=comicwork.comic_id,
artist_id=comicwork.artist_id,
work_type_id=comicwork.work_type_id,
)
return response
+30
View File
@@ -1,8 +1,38 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
from src.db.models.comic import Issue
class IssueResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
issue_number: str
title: Optional[str]
published_on: Optional[datetime]
in_stock: bool
is_read: bool
comic_id: str
volume_id: Optional[str]
story_arc_id: Optional[str]
def issue_to_response(issue: Issue) -> IssueResponse:
response: IssueResponse = IssueResponse(
id=issue.id,
created_date=issue.created_date,
last_modified_date=issue.last_modified_date,
version=issue.version,
issue_number=issue.issue_number,
title=issue.title,
published_on=issue.published_on,
in_stock=issue.in_stock,
is_read=issue.is_read,
comic_id=issue.comic_id,
volume_id=issue.volume_id,
story_arc_id=issue.story_arc_id
)
return response
@@ -11,3 +11,4 @@ class IssueDetailsResponse(BaseModel):
is_read: bool
comic: ComicResponse
volume: VolumeResponse | None
+32
View File
@@ -0,0 +1,32 @@
"""
Model definitions for IssueWork.
"""
from datetime import datetime
from pydantic import BaseModel
from src.db.models.comic import IssueWork
class IssueWorkResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
issue_id: str
artist_id: str
work_type_id: str
def issuework_to_response(issuework: IssueWork) -> IssueWorkResponse:
response: IssueWorkResponse = IssueWorkResponse(
id=issuework.id,
created_date=issuework.created_date,
last_modified_date=issuework.last_modified_date,
version=issuework.version,
issue_id=issuework.issue_id,
artist_id=issuework.artist_id,
work_type_id=issuework.work_type_id,
)
return response
+24
View File
@@ -1,6 +1,30 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
from src.db.models.comic import Publisher
class PublisherResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
weblink: Optional[str]
parent_publisher_id: Optional[str]
def publisher_to_response(publisher: Publisher) -> PublisherResponse:
response: PublisherResponse = PublisherResponse(
id=publisher.id,
created_date=publisher.created_date,
last_modified_date=publisher.last_modified_date,
version=publisher.version,
name=publisher.name,
weblink=publisher.weblink,
parent_publisher_id=publisher.parent_publisher_id
)
return response
+30
View File
@@ -0,0 +1,30 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
from src.db.models.comic import StoryArc
class StoryArcResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
comic_id: str
volume_id: Optional[str]
class AddLink(BaseModel):
url: str
def storyarc_to_response(storyarc: StoryArc) -> StoryArcResponse:
response: StoryArcResponse = StoryArcResponse(
id=storyarc.id,
created_date=storyarc.created_date,
last_modified_date=storyarc.last_modified_date,
version=storyarc.version,
name=storyarc.name,
comic_id=storyarc.comic_id,
volume_id=storyarc.volume_id
)
return response
+19
View File
@@ -1,6 +1,25 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.comic import Volume
class VolumeResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
comic_id: str
def volume_to_response(volume: Volume) -> VolumeResponse:
response: VolumeResponse = VolumeResponse(
id=volume.id,
created_date=volume.created_date,
last_modified_date=volume.last_modified_date,
version=volume.version,
name=volume.name,
comic_id=volume.comic_id
)
return response
+17
View File
@@ -1,9 +1,26 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.comic import WorkType
class AddWorkType(BaseModel):
worktype: str
class WorktypeResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
def worktype_to_response(worktype: WorkType) -> WorktypeResponse:
response: WorktypeResponse = WorktypeResponse(
id=worktype.id,
created_date=worktype.created_date,
last_modified_date=worktype.last_modified_date,
version=worktype.version,
name=worktype.name
)
return response
+24 -2
View File
@@ -1,12 +1,34 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
from src.db.models.media import MediaActor
class MediaActorResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: Optional[str]
url: str
url: Optional[str]
def actor_to_response(actor: MediaActor) -> MediaActorResponse:
response: MediaActorResponse = MediaActorResponse(
id=actor.id,
created_date=actor.created_date,
last_modified_date=actor.last_modified_date,
version=actor.version,
name=actor.name,
url=actor.url
)
return response
class MediaActorModel(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: Optional[str]
url: str
url: Optional[str]
+27 -2
View File
@@ -1,4 +1,5 @@
from datetime import datetime
from typing import Optional
from src.db.models.media import MediaActorFile
from pydantic import BaseModel
@@ -6,5 +7,29 @@ from pydantic import BaseModel
class MediaActorFileResponse(BaseModel):
id: str
file_id: str
actor_id: str
created_date: datetime
last_modified_date: datetime
version: int
media_actor_id: str
media_file_id: Optional[str]
def actorfile_to_response(actorfile: MediaActorFile) -> MediaActorFileResponse:
response: MediaActorFileResponse = MediaActorFileResponse(
id=actorfile.id,
created_date=actorfile.created_date,
last_modified_date=actorfile.last_modified_date,
version=actorfile.version,
media_actor_id=actorfile.media_actor_id,
media_file_id=actorfile.media_file_id
)
return response
class MediaActorFileModel(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
media_actor_id: str
media_file_id: Optional[str]
+30
View File
@@ -0,0 +1,30 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
from src.db.models.media import MediaArticle
class MediaArticleResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
review: bool = False
title: Optional[str] = None
url: Optional[str] = None
class AddLink(BaseModel):
url: str
def to_response(video: MediaArticle) -> MediaArticleResponse:
response: MediaArticleResponse = MediaArticleResponse(
id=video.id,
created_date=video.created_date,
last_modified_date=video.last_modified_date,
version=video.version,
review=video.review,
title=video.title,
url=video.url,
)
return response
+58 -17
View File
@@ -1,4 +1,5 @@
from datetime import datetime
from typing import Optional
from src.db.models.media import MediaFile
from pydantic import BaseModel
@@ -6,35 +7,75 @@ from pydantic import BaseModel
class MediaFileResponse(BaseModel):
id: str
title: str | None = None
file_name: str | None = None
cloud_link: str | None = None
url: str | None = None
created_date: datetime
last_modified_date: datetime
version: int
title: Optional[str]
file_name: Optional[str]
cloud_link: Optional[str]
url: Optional[str]
review: bool = False
should_download: bool = False
class Link(BaseModel):
url: str
def get_file_details(mediafile: MediaFile) -> MediaFileResponse:
response = MediaFileResponse(id=mediafile.id,
title=mediafile.title,
file_name=mediafile.file_name,
cloud_link=mediafile.cloud_link,
url=str(mediafile.url),
review=mediafile.review,
should_download=mediafile.should_download)
#print(f"id: {mediafile.id}: review: {response.review} <- {mediafile.review}")
#print(f"id: {mediafile.id}: download: {response.should_download} <- {mediafile.should_download}")
def file_to_response(mediafile: MediaFile) -> MediaFileResponse:
"""
Create MediaFileResponse from model.
"""
response: MediaFileResponse = MediaFileResponse(
id=mediafile.id,
created_date=mediafile.created_date,
last_modified_date=mediafile.last_modified_date,
version=mediafile.version,
title=mediafile.title,
file_name=mediafile.file_name,
cloud_link=mediafile.cloud_link,
url=mediafile.url,
review=mediafile.review,
should_download=mediafile.should_download,
)
return response
def set_file(model: MediaFileResponse, mediafile: MediaFile) -> None:
def file_to_model(model: MediaFileResponse, mediafile: MediaFile) -> MediaFile:
"""
Set data of response to model.
"""
mediafile.file_name = model.file_name
mediafile.cloud_link = model.cloud_link
if model.url is not None:
mediafile.url = model.url
else:
mediafile.url = ""
if model.title is not None:
mediafile.title = model.title
else:
mediafile.title = ""
mediafile.last_modified_date = datetime.now()
mediafile.review = model.review
mediafile.should_download = model.should_download
return mediafile
class MediaFileModel(BaseModel):
"""
Pydantic model to import MediaFile.
"""
id: str
created_date: Optional[datetime]
last_modified_date: Optional[datetime]
version: int = 0
title: Optional[str]
file_name: Optional[str]
cloud_link: Optional[str]
url: Optional[str]
review: bool = True
should_download: bool = True
class Link(BaseModel):
"""
PYdantic model for uploading url.
"""
url: str
+33
View File
@@ -1,5 +1,38 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
from src.db.models.media import MediaVideo
class MediaVideoResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
cloud_link: Optional[str] = None
file_name: Optional[str] = None
path: Optional[str] = None
review: bool = False
title: Optional[str] = None
url: Optional[str] = None
should_download: bool = False
class AddLink(BaseModel):
url: str
def video_to_response(video: MediaVideo) -> MediaVideoResponse:
response: MediaVideoResponse = MediaVideoResponse(
id=video.id,
created_date=video.created_date,
last_modified_date=video.last_modified_date,
version=video.version,
cloud_link=video.cloud_link,
file_name=video.file_name,
path=video.path,
review=video.review,
title=video.title,
url=video.url,
should_download=video.should_download
)
return response
+31
View File
@@ -0,0 +1,31 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import Card
class CardResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
card_number: int
year: int
card_set_id: str
rooster_id: str
vendor_id: str
def to_response(card: Card) -> CardResponse:
response: CardResponse = CardResponse(
id=card.id,
created_date=card.created_date,
last_modified_date=card.last_modified_date,
version=card.version,
card_number=card.card_number,
year=card.year,
card_set_id=card.card_set_id,
rooster_id=card.rooster_id,
vendor_id=card.vendor_id
)
return response
+30
View File
@@ -0,0 +1,30 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import CardSet
class CardSetResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
parallel_set: bool
insert_set: bool
vendor_id: str
def to_response(cardset: CardSet) -> CardSetResponse:
response: CardSetResponse = CardSetResponse(
id=cardset.id,
created_date=cardset.created_date,
last_modified_date=cardset.last_modified_date,
version=cardset.version,
name=cardset.name,
parallel_set=cardset.parallel_set,
insert_set=cardset.insert_set,
vendor_id=cardset.vendor_id
)
return response
@@ -0,0 +1,28 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import FieldPosition
class FieldPositionResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
short_name: str
sport_id: str
def to_response(fieldposition: FieldPosition) -> FieldPositionResponse:
response: FieldPositionResponse = FieldPositionResponse(
id=fieldposition.id,
created_date=fieldposition.created_date,
last_modified_date=fieldposition.last_modified_date,
version=fieldposition.version,
name=fieldposition.name,
short_name=fieldposition.short_name,
sport_id=fieldposition.sport_id
)
return response
+25
View File
@@ -0,0 +1,25 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import Player
class PlayerResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
first_name: str
last_name: str
def to_response(player: Player) -> PlayerResponse:
response: PlayerResponse = PlayerResponse(
id=player.id,
created_date=player.created_date,
last_modified_date=player.last_modified_date,
version=player.version,
first_name=player.first_name,
last_name=player.last_name
)
return response
+36
View File
@@ -0,0 +1,36 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import Rooster
class RoosterResponse(BaseModel):
"""
Pydantic model for returning Rooster objects.
"""
id: str
created_date: datetime
last_modified_date: datetime
version: int
year: int
team_id: str
player_id: str
position_id: str
def to_response(rooster: Rooster) -> RoosterResponse:
"""
convert database object to response object (Pydantic).
"""
response: RoosterResponse = RoosterResponse(
id=rooster.id,
created_date=rooster.created_date,
last_modified_date=rooster.last_modified_date,
version=rooster.version,
year=rooster.year,
team_id=rooster.team_id,
player_id=rooster.player_id,
position_id=rooster.position_id
)
return response
+18 -2
View File
@@ -1,8 +1,24 @@
from typing import AnyStr
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import Sport
class SportResponse(BaseModel):
id: AnyStr
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
def to_response(sport: Sport) -> SportResponse:
response: SportResponse = SportResponse(
id=sport.id,
created_date=sport.created_date,
last_modified_date=sport.last_modified_date,
version=sport.version,
name=sport.name
)
return response
+28
View File
@@ -0,0 +1,28 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import Team
class TeamResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
short_name: str
sport_id: str
def to_response(team: Team) -> TeamResponse:
response: TeamResponse = TeamResponse(
id=team.id,
created_date=team.created_date,
last_modified_date=team.last_modified_date,
version=team.version,
name=team.name,
short_name=team.short_name,
sport_id=team.sport_id
)
return response
+32
View File
@@ -0,0 +1,32 @@
"""
class and function for json response objects for Vendor.
"""
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import Vendor
class VendorResponse(BaseModel):
"""
Pydantic model for Vendor reponse object.
"""
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
def to_response(vendor: Vendor) -> VendorResponse:
"""
convert database object Vendor to response object VendorResponse.
"""
reponse: VendorResponse = VendorResponse(
id=vendor.id,
created_date=vendor.created_date,
last_modified_date=vendor.last_modified_date,
version=vendor.version,
name=vendor.name
)
return reponse
+26
View File
@@ -0,0 +1,26 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.admin import Assignment
class AssignmentResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
profile_id: str
permission_id: str
def to_response(assignment: Assignment) -> AssignmentResponse:
response: AssignmentResponse = AssignmentResponse(
id=assignment.id,
created_date=assignment.created_date,
last_modified_date=assignment.last_modified_date,
version=assignment.version,
profile_id=assignment.profile_id,
permission_id=assignment.permission_id
)
return response
+24
View File
@@ -0,0 +1,24 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.admin import Permission
class PermissionResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
def to_response(permission: Permission) -> PermissionResponse:
response: PermissionResponse = PermissionResponse(
id=permission.id,
created_date=permission.created_date,
last_modified_date=permission.last_modified_date,
version=permission.version,
name=permission.name
)
return response
+41 -4
View File
@@ -1,3 +1,5 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.admin import Profile
@@ -5,9 +7,44 @@ from src.db.models.admin import Profile
class ProfileResponse(BaseModel):
id: str
user_name: str
class ProfileModel(BaseModel):
user_name: str
created_date: datetime
last_modified_date: datetime
version: int
first_name: str
last_name: str
user_name: str
email: str
password: str
enabled: bool
def to_response(profile: Profile) -> ProfileResponse:
response: ProfileResponse = ProfileResponse(
id=profile.id,
created_date=profile.created_date,
last_modified_date=profile.last_modified_date,
version=profile.version,
first_name=profile.first_name,
last_name=profile.last_name,
user_name=profile.user_name,
email=profile.email,
password=profile.password,
enabled=profile.enabled
)
return response
class ProfileModel(BaseModel):
username: str
email: str
first_name: str
last_name: str
active: bool
def to_model(profile: Profile) -> ProfileModel:
model: ProfileModel = ProfileModel(
username=profile.user_name,
email=profile.email,
first_name=profile.first_name,
last_name=profile.last_name,
active=profile.enabled,
)
return model
+32
View File
@@ -0,0 +1,32 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.admin import Token
class TokenResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
token: str
name: str
last_used_date: datetime
enabled: bool
profile_id: str
def to_response(token: Token) -> TokenResponse:
response: TokenResponse = TokenResponse(
id=token.id,
created_date=token.created_date,
last_modified_date=token.last_modified_date,
version=token.version,
token=token.token,
name=token.name,
last_used_date=token.last_used_date,
enabled=token.enabled,
profile_id=token.profile_id
)
return response
+6 -8
View File
@@ -1,9 +1,5 @@
# from src.apis.version1.admin import login_for_access_token
from fastapi.security import OAuth2PasswordRequestForm
from src.apis.version1.admin import login_for_token_cookie
from src.db.session import SessionDep
from fastapi import APIRouter, Depends
from src.apis.version1.admin.login import login_for_token_cookie
from fastapi import APIRouter
from fastapi import HTTPException
from fastapi import Request
from fastapi.templating import Jinja2Templates
@@ -20,7 +16,7 @@ def login(request: Request):
@router.post("/login/")
async def login(request: Request):
async def validate_login(request: Request):
form = LoginForm(request)
await form.load_data()
if await form.is_valid():
@@ -31,6 +27,8 @@ async def login(request: Request):
return response
except HTTPException:
form.__dict__.update(msg="")
form.__dict__.get("errors").append("Incorrect Email or Password")
errors = form.__dict__.get("errors")
if errors:
errors.append("Incorrect Email or Password")
return templates.TemplateResponse("auth/login.html", form.__dict__)
return templates.TemplateResponse("auth/login.html", form.__dict__)
+9 -9
View File
@@ -9,15 +9,15 @@ from src.webapps.media import route_actors, route_media, route_videos
templates = Jinja2Templates(directory="src/templates")
api_router = APIRouter()
api_router.include_router(route_comics.router)
api_router.include_router(route_artists.router)
api_router.include_router(route_worktype.router)
api_router.include_router(route_media.router)
api_router.include_router(route_actors.router)
api_router.include_router(route_videos.router)
api_router.include_router(route_login.router)
api_router.include_router(route_admin.router)
api_router.include_router(route_comics.router, tags=["webapp"])
api_router.include_router(route_artists.router, tags=["webapp"])
api_router.include_router(route_worktype.router, tags=["webapp"])
api_router.include_router(route_media.router, tags=["webapp"])
api_router.include_router(route_actors.router, tags=["webapp"])
api_router.include_router(route_videos.router, tags=["webapp"])
api_router.include_router(route_login.router, tags=["webapp"])
api_router.include_router(route_admin.router, tags=["webapp"])
@api_router.get("/")
@api_router.get("/", tags=["webapp"])
def home(request: Request, msg: str | None = None):
return templates.TemplateResponse("index.html", {"request": request, "msg": msg})
+2 -2
View File
@@ -11,7 +11,7 @@ from src.schema.comics.comic import ComicSchema
from src.webapps.comic.forms.comic import ValidateComicForm
templates = Jinja2Templates(directory="src/templates")
router = APIRouter(include_in_schema=False, prefix="/comic")
router = APIRouter(include_in_schema=True, prefix="/comic")
@router.get("/comics")
def get_comics(db: SessionDep, request: Request, msg: str | None = None):
@@ -58,7 +58,7 @@ async def validate_comic(request: Request, db: SessionDep, comic_id: str, action
if form.is_valid():
try:
comic = ComicSchema(**form.__dict__)
comic = update_comic(comic=comic, comic_id=comic_id, db=db)
comic = update_comic(new_comic=comic, comic_id=comic_id, db=db)
return RedirectResponse(f"/comic/comics/{comic.id}", status_code=status.HTTP_303_SEE_OTHER)
except Exception as e:
print(e)
+1 -1
View File
@@ -10,7 +10,7 @@ class AddLinkForm:
async def load_data(self):
form = await self.request.form()
self.url = form.get("url")
self.url = str(form.get("url"))
def is_valid(self):
if not self.url or not (self.url.__contains__("http")):

Some files were not shown because too many files have changed in this diff Show More