Vorbereitung Release 0.2.0

This commit is contained in:
2026-01-29 23:50:41 +01:00
parent 58f80b3e76
commit b26b5ecc9c
571 changed files with 35728 additions and 5022 deletions
+20 -24
View File
@@ -1,7 +1,6 @@
from datetime import datetime
from sqlalchemy import Column, ForeignKey, Integer, String
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy import Column, ForeignKey, Integer, String, Boolean
from sqlalchemy.orm import relationship, mapped_column, Mapped
from src.db.models.base import Base, BaseMixin
@@ -9,12 +8,12 @@ from src.db.models.base import Base, BaseMixin
class Profile(Base, BaseMixin):
__tablename__ = 'profile'
first_name = Column(String(255))
last_name = Column(String(255))
user_name = Column(String(255), nullable=False)
email = Column(String(255))
password = Column(String(255))
enabled = Column(BIT(1))
first_name = Column(String)
last_name = Column(String)
user_name = Column(String, nullable=False)
email = Column(String)
password = Column(String)
enabled = Column(Boolean)
assignments = relationship("Assignment")
tokens = relationship("Token")
@@ -28,20 +27,23 @@ class Profile(Base, BaseMixin):
full_name += self.last_name
return full_name
def __str__(self):
return f"Profile({self.id} {self.user_name}, {self.email})"
class Token(Base, BaseMixin):
__tablename__ = "token"
token = Column(String(255), nullable=False, unique=True)
name = Column(String(255))
token = Column(String, nullable=False, unique=True)
name = Column(String)
last_used_date: Mapped[datetime] = mapped_column()
enabled = Column(BIT(1))
profile_id = Column(String(255), ForeignKey("profile.id"), nullable=False)
enabled = Column(Boolean)
profile_id = Column(String, ForeignKey("profile.id"), nullable=False)
profile = relationship("Profile", back_populates="tokens")
class Permission(Base, BaseMixin):
__tablename__ = "permission"
name = Column(String(255), nullable=False)
name = Column(String, nullable=False)
assignments = relationship("Assignment")
@@ -53,20 +55,14 @@ class Assignment(Base, BaseMixin):
permission = relationship("Permission", back_populates="assignments")
class ModuleData(Base, BaseMixin):
__tablename__ = "module_data"
module_name = Column(String(255), nullable=False)
import_data = Column(BIT(1))
class MailAccount(Base, BaseMixin):
__tablename__ = "mail_account"
host = Column(String(255))
host = Column(String)
port = Column(Integer)
protocol = Column(String(255))
user_name = Column(String(255))
password = Column(String(255))
start_tls = Column(BIT(1))
protocol = Column(String)
user_name = Column(String)
password = Column(String)
start_tls = Column(Boolean)
class Mail(Base, BaseMixin):
+10 -11
View File
@@ -1,8 +1,7 @@
import uuid
from datetime import datetime
from sqlalchemy import func, Column, String
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy import func, Column, String, Boolean
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
@@ -11,8 +10,8 @@ class Base(DeclarativeBase):
class BaseMixin:
id = Column(String(255), primary_key=True, default=uuid.uuid4())
# id: Mapped[str] = mapped_column(primary_key=True, default=uuid.uuid4())
#id = Column(String, primary_key=True, default=uuid.uuid4)
id: Mapped[str] = mapped_column(primary_key=True, default=str(uuid.uuid4()))
# created_date = Column(DateTime)
created_date: Mapped[datetime] = mapped_column(default=func.now())
# last_modified_date = Column(DateTime)
@@ -22,10 +21,10 @@ class BaseMixin:
class BaseVideoMixin:
cloud_link = Column(String(255))
file_name = Column(String(255))
path = Column(String(255))
review = Column(BIT(1))
title = Column(String(255))
url = Column(String(255), unique=True)
should_download = Column(BIT(1))
cloud_link = Column(String, nullable=True)
file_name = Column(String, nullable=True)
path = Column(String)
review = Column(Boolean)
title = Column(String)
url = Column(String, nullable=True)
should_download = Column(Boolean)
+6 -6
View File
@@ -6,28 +6,28 @@ from src.db.models.base import Base, BaseMixin
class Article(Base, BaseMixin):
__tablename__ = 'article'
title = Column(String(length=255), unique=True)
title = Column(String, unique=True)
article_authors = relationship("ArticleAuthor")
class Author(Base, BaseMixin):
__tablename__ = 'author'
first_name = Column(String(255))
last_name = Column(String(255))
first_name = Column(String)
last_name = Column(String)
article_authors = relationship("ArticleAuthor")
book_authors = relationship("BookAuthor")
class BookshelfPublisher(Base, BaseMixin):
__tablename__ = 'bookshelf_publisher'
name = Column(String(length=255), unique=True)
name = Column(String, unique=True)
books = relationship("Book")
class Book(Base, BaseMixin):
__tablename__ = 'book'
isbn = Column(String(255), unique=True)
title = Column(String(255))
isbn = Column(String, unique=True)
title = Column(String)
year = Column(Integer, nullable=False)
publisher_id = Column(String, ForeignKey('bookshelf_publisher.id'), nullable=False)
publisher = relationship('BookshelfPublisher', back_populates="books")
+83 -23
View File
@@ -1,15 +1,24 @@
from typing import Dict, List
import uuid
from datetime import datetime
from typing import AnyStr, Dict, List, Optional, Any
from natsort import natsorted
from sqlalchemy import Column, ForeignKey, Integer, String
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy.orm import relationship
from sqlalchemy import Column, ForeignKey, Integer, String, Boolean, func
from sqlalchemy.orm import relationship, Mapped, mapped_column
from src.db.models.base import Base, BaseMixin
class Publisher(Base, BaseMixin):
class Publisher(Base):
__tablename__ = "publisher"
name = Column(String(length=255), unique=True)
id: Mapped[str] = mapped_column(primary_key=True, default=uuid.uuid4)
created_date: Mapped[datetime] = mapped_column(default=func.now())
last_modified_date: Mapped[datetime] = mapped_column(default=func.now())
version: Mapped[int] = mapped_column(default=0)
name = Column(String, unique=True)
weblink = Column(String, nullable=True)
parent_publisher_id: Mapped[Optional[str]] = mapped_column(ForeignKey('publisher.id'))
parent_publisher: Mapped[Optional['Publisher']] = relationship("Publisher", back_populates="imprints", remote_side=[id])
imprints: Mapped[List['Publisher']] = relationship('Publisher', back_populates="parent_publisher")
comics = relationship("Comic")
def __repr__(self):
@@ -21,11 +30,12 @@ class Publisher(Base, BaseMixin):
class Comic(Base, BaseMixin):
__tablename__ = 'comic'
title = Column(String(length=255), unique=True)
title = Column(String, unique=True)
publisher_id = Column(String, ForeignKey('publisher.id'), nullable=False)
publisher = relationship("Publisher", back_populates="comics")
current_order = Column(BIT(1))
completed = Column(BIT(1))
current_order = Column(Boolean)
completed = Column(Boolean)
weblink = Column(String, nullable=True)
issues = relationship("Issue", order_by="Issue.issue_number")
story_arcs = relationship("StoryArc")
trade_paperbacks = relationship("TradePaperback")
@@ -38,10 +48,10 @@ class Comic(Base, BaseMixin):
def __str__(self):
return f'{self.title}({self.id})'
def get_artists(self) -> Dict[str, List[str]]:
works: Dict[str, List[str]] = {}
def get_artists(self) -> Dict[Any, List[Any]]:
works: Dict[Any, List[Any]] = {}
for work in self.comic_works:
work_type = work.work_type.name
work_type = work.work_type
artist = work.artist
if work_type in works:
works[work_type].append(artist)
@@ -56,15 +66,16 @@ class Comic(Base, BaseMixin):
class Volume(Base, BaseMixin):
__tablename__ = "volume"
name = Column(String(length=255), nullable=False)
name = Column(String, nullable=False)
comic_id = Column(String, ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="volumes")
story_arcs = relationship("StoryArc")
issues = relationship("Issue")
class TradePaperback(Base, BaseMixin):
__tablename__ = "trade_paperback"
name = Column(String(length=255), nullable=False)
name = Column(String, nullable=False)
issue_start = Column(Integer)
issue_end = Column(Integer)
comic_id = Column(String, ForeignKey("comic.id"), nullable=False)
@@ -73,31 +84,58 @@ class TradePaperback(Base, BaseMixin):
class StoryArc(Base, BaseMixin):
__tablename__ = "story_arc"
name = Column(String(length=255), nullable=False)
name = Column(String, nullable=False)
comic_id = Column(String, ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="story_arcs")
volume_id = Column(String, ForeignKey("volume.id"), nullable=True)
volume = relationship("Volume", back_populates="story_arcs")
issues = relationship("Issue")
class Issue(Base, BaseMixin):
__tablename__ = "issue"
issue_number = Column(String(255))
in_stock = Column(BIT(1))
is_read = Column(BIT(1))
issue_number = Column(String)
title = Column(String, nullable=True)
published_on: Mapped[datetime] = mapped_column(nullable=True)
in_stock = Column(Boolean)
is_read = Column(Boolean)
comic_id = Column(String, ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="issues")
volume_id = Column(String, ForeignKey("volume.id"), nullable=True)
volume = relationship("Volume", back_populates="issues")
story_arc_id = Column(String, ForeignKey("story_arc.id"), nullable=True)
story_arc = relationship("StoryArc", back_populates="issues")
issue_works = relationship("IssueWork")
def get_full_title(self) -> str:
full_title: str = str(self.issue_number)
if self.title:
full_title += str(": " + self.title)
return full_title
def get_artists(self) -> Dict[Any, List[Any]]:
works: Dict[Any, List[Any]] = {}
for work in self.issue_works:
work_type = work.work_type
artist = work.artist
if work_type in works:
works[work_type].append(artist)
else:
works[work_type] = [artist]
return works
class Artist(Base, BaseMixin):
__tablename__ = "artist"
name = Column(String(length=255), nullable=False)
name = Column(String, nullable=False)
weblink = Column(String, nullable=True)
comic_works = relationship("ComicWork")
issue_works = relationship("IssueWork")
def get_comics(self) -> Dict[str, List[str]]:
works: Dict[str, List[str]] = {}
def get_comics(self) -> Dict[Any, List[Comic]]:
works: Dict[Any, List[Comic]] = {}
for work in self.comic_works:
work_type = work.work_type.name
work_type = work.work_type
comic = work.comic
if work_type in works:
works[work_type].append(comic)
@@ -108,8 +146,20 @@ class Artist(Base, BaseMixin):
class WorkType(Base, BaseMixin):
__tablename__ = "worktype"
name = Column(String(length=255), nullable=False, unique=True)
name = Column(String, nullable=False, unique=True)
comic_works = relationship("ComicWork")
issue_works = relationship("IssueWork")
def get_artists(self) -> Dict[str, List[str]]:
works: Dict[str, List[str]] = {}
for work in self.comic_works:
comic = work.comic.title
artist = work.artist
if comic in works:
works[comic].append(artist)
else:
works[comic] = [artist]
return works
def __repr__(self):
return f'Worktype({self.id} {self.version} {self.name} {len(self.comic_works)})'
@@ -126,3 +176,13 @@ class ComicWork(Base, BaseMixin):
artist = relationship("Artist", back_populates="comic_works")
work_type_id = Column(String, ForeignKey("worktype.id"), nullable=False)
work_type = relationship("WorkType", back_populates="comic_works")
class IssueWork(Base, BaseMixin):
__tablename__ = "issue_work"
issue_id = Column(String, ForeignKey("issue.id"), nullable=False)
issue = relationship("Issue", back_populates="issue_works")
artist_id = Column(String, ForeignKey("artist.id"), nullable=False)
artist = relationship("Artist", back_populates="issue_works")
work_type_id = Column(String, ForeignKey("worktype.id"), nullable=False)
work_type = relationship("WorkType", back_populates="issue_works")
+6 -7
View File
@@ -1,10 +1,9 @@
import json
import logging
import uuid
from datetime import datetime
from enum import Enum, auto
from pathlib import Path
from typing import Any
from typing import Any, List
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
@@ -13,7 +12,7 @@ from sqlalchemy.orm import sessionmaker
from src.db.models.tysc import Card, CardSet, Rooster, Team, FieldPosition, Player, Vendor, Sport
from src.db.models.comic import Issue, TradePaperback, StoryArc, Volume, ComicWork, Artist, Comic, Publisher, WorkType
from src.db.models.bookshelf import ArticleAuthor, BookAuthor, BookshelfPublisher, Article, Book, Author
from src.db.models.admin import Mail, MailAccount, ModuleData, Role, User, Token, AuthorizationMatrix
from src.db.models.admin import Mail, MailAccount, ModuleData, Token, Assignment, Permission, Profile
from src.db.models.metadata import MetaDataTable, MetaDataColumn
from src.db.models.media import MediaVideo, MediaArticle, MediaFile, MediaActor, MediaActorFile
@@ -79,10 +78,10 @@ class KontorDB:
self.registry[MediaVideo.__tablename__] = MediaVideo
self.registry[MetaDataColumn.__tablename__] = MetaDataColumn
self.registry[MetaDataTable.__tablename__] = MetaDataTable
self.registry[AuthorizationMatrix.__tablename__] = AuthorizationMatrix
self.registry[Assignment.__tablename__] = Assignment
self.registry[Token.__tablename__] = Token
self.registry[User.__tablename__] = User
self.registry[Role.__tablename__] = Role
self.registry[Profile.__tablename__] = Profile
self.registry[Permission.__tablename__] = Permission
self.registry[ModuleData.__tablename__] = ModuleData
self.registry[MailAccount.__tablename__] = MailAccount
self.registry[Mail.__tablename__] = Mail
@@ -360,7 +359,7 @@ class KontorDB:
update_list[link.id] = link.title
return update_list
def get_download_list(self) -> list[uuid.UUID]:
def get_download_list(self) -> List[str]:
download_list = []
__session__ = sessionmaker(self.engine)
_filter = { 'should_download': True}
+43 -23
View File
@@ -6,8 +6,7 @@ from pathlib import Path
import requests
from bs4 import BeautifulSoup
from sqlalchemy import Column, String, ForeignKey
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy import Column, String, ForeignKey, Boolean
from sqlalchemy.orm import relationship
from src.db.models.base import Base, BaseMixin, BaseVideoMixin
@@ -26,31 +25,31 @@ class MediaFile(Base, BaseMixin, BaseVideoMixin):
def update_title(self) -> None:
logging.info(f"update title for {self.url}")
try:
r = requests.get(self.url)
r = requests.get(str(self.url))
soup = BeautifulSoup(r.content, "html.parser")
title = soup.title.string
title = soup.title.get_text() # type: ignore
self.title = title
self.review = 0
self.review = False
except:
self.title = None
self.review = 1
self.review = True
self.last_modified_date = datetime.now()
def download_file(self, download_dir: str, dl_tool: str):
logging.info(f"download file for {self.url} to {download_dir}")
result = subprocess.run([dl_tool, self.url], cwd=download_dir, capture_output=True, text=True)
result = subprocess.run([dl_tool, self.url], cwd=download_dir, capture_output=True, text=True) # type: ignore
if result.returncode == 0:
output = result.stdout
output = re.sub(' +', ' ', output)
lines_list = output.splitlines()
file_name = self.__parse_output__(lines_list)
if file_name is None:
self.review = 1
self.should_download = 1
self.review = True
self.should_download = True
self.file_name = None
else:
download_file = Path(file_name)
self.should_download = 0
self.should_download = False
self.file_name = download_file.name
self.cloud_link = str(download_file.absolute())
self.last_modified_date = datetime.now()
@@ -71,31 +70,52 @@ class MediaFile(Base, BaseMixin, BaseVideoMixin):
class MediaActor(Base, BaseMixin):
__tablename__ = 'media_actor'
name = Column(String(255))
name = Column(String)
url = Column(String, unique=True, nullable=True)
media_actor_files = relationship("MediaActorFile")
def __repr__(self) -> str:
return f'MediaActor({self.id} {self.name} {self.url})'
def __str__(self) -> str:
return f'{self.url}({self.id})'
class MediaActorFile(Base, BaseMixin):
__tablename__ = 'media_actor_file'
media_actor_id = Column(String(255), ForeignKey("media_actor.id"), nullable=False)
media_actor_id = Column(String, ForeignKey("media_actor.id"), nullable=False)
media_actor = relationship("MediaActor", back_populates="media_actor_files")
media_file_id = Column(String(255), ForeignKey("media_file.id"), nullable=True)
media_file_id = Column(String, ForeignKey("media_file.id"), nullable=True)
media_file = relationship("MediaFile", back_populates="media_actor_files")
def __repr__(self):
return f'MediaActorFile({self.id} {self.media_actor_id} {self.media_file_id})'
def __str__(self) -> str:
return f'{self.id} {self.media_actor_id} {self.media_file_id}'
class MediaArticle(Base, BaseMixin):
__tablename__ = 'media_article'
review = Column(BIT(1))
title = Column(String(255))
url = Column(String(255), unique=True)
review = Column(Boolean)
title = Column(String)
url = Column(String, unique=True)
class MediaVideo(Base, BaseMixin):
__tablename__ = 'media_video'
cloud_link = Column(String(255))
file_name = Column(String(255))
path = Column(String(255))
review = Column(BIT(1))
title = Column(String(255))
url = Column(String(255), unique=True)
should_download = Column(BIT(1))
cloud_link = Column(String)
file_name = Column(String)
path = Column(String)
review = Column(Boolean)
title = Column(String)
url = Column(String, unique=True)
should_download = Column(Boolean)
def __repr__(self):
return f'MediaFile({self.id} {self.title} {self.url})'
def __str__(self):
if self.title is None:
return f'{self.url}({self.id})'
else:
return f'{self.title}({self.id})'
-42
View File
@@ -1,42 +0,0 @@
from sqlalchemy import Column, String, ForeignKey, Integer
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy.orm import relationship
from src.db.models.base import Base, BaseMixin
class MetaDataTable(Base, BaseMixin):
__tablename__ = 'meta_data_table'
table_name = Column(String(255), unique=True)
table_columns = relationship("MetaDataColumn")
def __repr__(self):
return f'MetaDataTable({self.id} {self.table_name})'
def __str__(self):
return f'{self.table_name}({self.id})'
class MetaDataColumn(Base, BaseMixin):
__tablename__ = 'meta_data_column'
column_name = Column(String(255), nullable=False)
column_sync_name = Column(String(255))
column_type = Column(String(255))
column_modifier = Column(String(255), nullable=True)
column_order = Column(Integer)
table_id = Column(String, ForeignKey('meta_data_table.id'))
table = relationship("MetaDataTable", back_populates="table_columns")
column_label = Column(String(255))
filter_label = Column(String(255))
is_shown = Column(BIT(1))
show_filter = Column(BIT(1))
ref_column = Column(String, nullable=True)
def __repr__(self):
if self.column_name is None:
return f'MetaDataColumn({self.id} {self.table.table_name}.__)'
else:
return f'MetaDataColumn({self.id} {self.table.table_name}.{self.column_name})'
def __str__(self):
return f'{self.column_name}({self.id})'
+12 -13
View File
@@ -1,5 +1,4 @@
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Boolean
from sqlalchemy.orm import relationship
from src.db.models.base import Base, BaseMixin
@@ -10,15 +9,15 @@ class Sport(Base, BaseMixin):
__table_args__ = (
UniqueConstraint("name"),
)
name = Column(String(255), nullable=False, index=True, unique=True)
name = Column(String, nullable=False, index=True, unique=True)
teams = relationship("Team")
positions = relationship("FieldPosition")
class Team(Base, BaseMixin):
__tablename__ = "team"
name = Column(String(255), nullable=False, index=True, unique=True)
short_name = Column(String(255), nullable=False, )
name = Column(String, nullable=False, index=True, unique=True)
short_name = Column(String, nullable=False, )
sport_id = Column(String, ForeignKey("sport.id"), nullable=False)
sport = relationship("Sport", back_populates="teams")
roosters = relationship("Rooster")
@@ -30,8 +29,8 @@ class FieldPosition(Base, BaseMixin):
UniqueConstraint("name", "sport_id"),
UniqueConstraint("short_name", "sport_id"),
)
name = Column(String(255), nullable=False, index=True)
short_name = Column(String(255), nullable=False)
name = Column(String, nullable=False, index=True)
short_name = Column(String, nullable=False)
sport_id = Column(String, ForeignKey("sport.id"), nullable=False, index=True)
sport = relationship("Sport", back_populates="positions")
roosters = relationship("Rooster")
@@ -42,8 +41,8 @@ class Player(Base, BaseMixin):
__table_args__ = (
UniqueConstraint("first_name", "last_name"),
)
first_name = Column(String(255), nullable=False, index=True)
last_name = Column(String(255), nullable=False, index=True)
first_name = Column(String, nullable=False, index=True)
last_name = Column(String, nullable=False, index=True)
roosters = relationship("Rooster")
def get_full_name(self) -> str:
@@ -67,7 +66,7 @@ class Rooster(Base, BaseMixin):
class Vendor(Base, BaseMixin):
__tablename__ = "vendor"
name = Column(String(255), nullable=False, unique=True, index=True)
name = Column(String, nullable=False, unique=True, index=True)
card_sets = relationship("CardSet")
cards = relationship("Card")
@@ -77,9 +76,9 @@ class CardSet(Base, BaseMixin):
__table_args__ = (
UniqueConstraint("name", "vendor_id"),
)
name = Column(String(255), index=True)
parallel_set = Column(BIT(1))
insert_set = Column(BIT(1))
name = Column(String, index=True)
parallel_set = Column(Boolean)
insert_set = Column(Boolean)
vendor_id = Column(String, ForeignKey("vendor.id"), nullable=False, index=True)
vendor = relationship("Vendor", back_populates="card_sets")
cards = relationship("Card")
+10
View File
@@ -0,0 +1,10 @@
from typing import AnyStr, Optional
from sqlalchemy.orm import Session
from src.db.models.admin import Profile
def get_profile(username: AnyStr, db: Session) -> Optional[Profile]:
profile = db.query(Profile).filter(Profile.email == username).first()
return profile
@@ -0,0 +1,46 @@
import uuid
from typing import List, Optional
from sqlalchemy.orm import Session
from src.core.log_conf import logger
from src.db.models.comic import Artist
from src.schema.comics.artist import AddArtist
from src.schema.comics.artist_details import ArtistDetailResponse, ArtistWorktypeComicResponse, ArtistWorktypeIssueResponse
from src.schema.comics.comic import ComicResponse
from src.schema.comics.worktype import WorktypeResponse
def get_artist_details(artist: Artist) -> ArtistDetailResponse:
comic_works: List[ArtistWorktypeComicResponse] = []
comic_works_map = {}
for work in artist.comic_works:
worktype_id = work.work_type.id
if worktype_id in comic_works_map:
comic = ComicResponse(id=work.comic.id, title=work.comic.title, completed=work.comic.completed)
comic_works_map[worktype_id].comics.append(comic)
else:
comic_works_map[worktype_id] = ArtistWorktypeComicResponse(
worktype=WorktypeResponse(id=worktype_id, name=work.work_type.name),
comics=[ComicResponse(id=work.comic.id, title=work.comic.title, completed=work.comic.completed)]
)
for value in comic_works_map.values():
comic_works.append(value)
issue_works: List[ArtistWorktypeIssueResponse] = []
response = ArtistDetailResponse(
id=artist.id,
name=str(artist.name),
weblink=str(artist.weblink),
comic_works=comic_works,
issue_works=issue_works,
)
return response
def update_artist(add_artist: AddArtist, artist_id: str, db: Session) -> Artist:
logger.info("update artist")
artist: Optional[Artist] = db.get(Artist, artist_id)
artist.name = add_artist.name
db.add(artist)
db.commit()
db.refresh(artist)
return artist
@@ -0,0 +1,87 @@
from typing import Dict, List
from sqlalchemy.orm import Session
from src.core.log_conf import logger
from src.db.models.comic import Comic, Issue
from src.schema.comics.artist import ArtistResponse
from src.schema.comics.comic import ComicResponse, ComicSchema
from src.schema.comics.comic_details import ComicDetailsResponse, ComicWorktypeArtistResponse
from src.schema.comics.issue import IssueResponse
from src.schema.comics.issue_details import IssueDetailsResponse
from src.schema.comics.publisher import PublisherResponse
from src.schema.comics.volume import VolumeResponse
from src.schema.comics.worktype import WorktypeResponse
def list_comics(db: Session) -> List[Comic]:
comics = db.query(Comic).all()
return comics
def get_issue_details(issue: Issue) -> IssueDetailsResponse:
response = IssueDetailsResponse(
id=issue.id,
issue_number=str(issue.issue_number),
in_stock=bool(issue.in_stock),
is_read=bool(issue.is_read),
comic=ComicResponse(id=issue.comic.id, title=issue.comic.title, completed=issue.comic.completed),
volume=VolumeResponse(id=issue.volume.id, name=issue.volume.name)
)
return response
def update_comic(comic: ComicSchema, comic_id: str, db: Session) -> type[Comic] | None:
logger.info(f"update_comic: {comic} with {comic_id}")
comic = db.get(Comic, comic_id) # type: ignore
return comic # type: ignore
def get_short_info(comic: Comic) -> ComicResponse:
response = ComicResponse(
id=comic.id,
title=str(comic.title),
completed=bool(comic.completed == 1)
)
return response
def get_comic_details(comic: Comic) -> ComicDetailsResponse:
volumes: List[VolumeResponse] = []
for volume in comic.volumes:
volumes.append(VolumeResponse(id=volume.id, name=volume.name))
issues: List[IssueResponse] = []
for issue in comic.issues:
issues.append(IssueResponse(
id=issue.id,
issue_number=issue.issue_number,
in_stock=issue.in_stock,
is_read=issue.is_read
))
works: List[ComicWorktypeArtistResponse] = []
works_map: Dict[str, ComicWorktypeArtistResponse] = {}
for work in comic.comic_works:
worktype_id = work.work_type.id
if worktype_id in works_map:
artist = ArtistResponse(id=work.artist.id, name=work.artist.name)
works_map[worktype_id].artists.append(artist)
logger.info(f"add artist to response map: {artist} -> {works_map}")
print(f"add artist to response map: {artist} -> {works_map}")
else:
works_map[worktype_id] = ComicWorktypeArtistResponse(
worktype=WorktypeResponse(id=worktype_id, name=work.work_type.name),
artists=[ArtistResponse(id=work.artist.id, name=work.artist.name)]
)
for value in works_map.values():
works.append(value)
response = ComicDetailsResponse(
id=str(comic.id),
created=str(comic.created_date),
title=str(comic.title),
completed=bool(comic.completed),
current_order=bool(comic.current_order),
weblink=str(comic.weblink),
publisher=PublisherResponse(id=comic.publisher.id, name=comic.publisher.name),
issues=issues,
volumes=volumes,
works=works
)
return response
@@ -0,0 +1,24 @@
from typing import List
from src.db.models.comic import Publisher
from src.schema.comics.comic import ComicResponse
from src.schema.comics.publisher import PublisherResponse
from src.schema.comics.publisher_details import PublisherDetailsResponse
def get_publisher_details(publisher: Publisher) -> PublisherDetailsResponse:
imprints: List[PublisherResponse] = []
for imprint in publisher.imprints:
imprints.append(PublisherResponse(id=imprint.id, name=str(imprint.name)))
comics: List[ComicResponse] = []
for comic in publisher.comics:
comics.append(
ComicResponse(id=comic.id, title=comic.title, completed=comic.completed)
)
parent_publisher: PublisherResponse | None = None
if publisher.parent_publisher:
parent_publisher = PublisherResponse(id=publisher.parent_publisher.id, name=str(publisher.parent_publisher.name))
response: PublisherDetailsResponse = PublisherDetailsResponse(
id=publisher.id, name=str(publisher.name), parent_publisher=parent_publisher, imprints=imprints, comics=comics
)
return response
@@ -0,0 +1,32 @@
import uuid
from datetime import datetime
from typing import AnyStr
from sqlalchemy.orm import Session
from src.core.log_conf import logger
from src.db.models.comic import WorkType
from src.schema.comics.worktype import AddWorkType
def create_new_worktype(work: AddWorkType, db: Session) -> WorkType:
worktype = WorkType()
worktype.id = str(uuid.uuid4())
worktype.created_date = datetime.now()
worktype.last_modified_date = datetime.now()
worktype.name = work.worktype
db.add(worktype)
db.commit()
db.refresh(worktype)
logger.info(f"create_new_worktype: {worktype}")
return worktype
def update_worktype(work: AddWorkType, worktype_id: AnyStr, db: Session) -> WorkType:
logger.info("update worktype")
worktype = db.get(WorkType, worktype_id)
worktype.name = work.worktype
db.add(worktype)
db.commit()
db.refresh(worktype)
return worktype
+86
View File
@@ -0,0 +1,86 @@
from sqlalchemy.orm import Session
import uuid
from datetime import datetime
from src.core.log_conf import logger
from src.db.models.media import MediaActor, MediaActorFile, MediaFile, MediaVideo
from src.schema.media.actor import Actor
from src.webapps.media.forms import AddLinkForm
def create_new_video(video: AddLinkForm, db: Session) -> MediaVideo:
print(video.url)
media_video = MediaVideo()
media_video.id = str(uuid.uuid4())
media_video.url = video.url # type: ignore
media_video.created_date = datetime.now()
media_video.last_modified_date = datetime.now()
media_video.review = True # type: ignore
media_video.should_download = True # type: ignore
db.add(media_video)
db.commit()
db.refresh(media_video)
print(media_video)
return media_video
def create_new_mediafile(link: str, db: Session) -> MediaFile:
logger.info("create MediaFile with url {link}")
media_file: MediaFile = MediaFile()
media_file.id = str(uuid.uuid4())
media_file.url = link # type: ignore
media_file.created_date = datetime.now()
media_file.last_modified_date = datetime.now()
media_file.version = 0
media_file.review = True
media_file.should_download = True
db.add(media_file)
db.commit()
db.refresh(media_file)
logger.info(f"created {media_file}")
return media_file
def delete_mediafile(db: Session, media_file_id: str):
logger.info(f"delete MediaFile with id {media_file_id}")
media_file = db.get(MediaFile, media_file_id)
db.delete(media_file)
db.commit()
def create_new_mediaactor(new_actor: Actor, db: Session) -> MediaActor:
logger.info(f"create MediaActor with url {new_actor.url}")
media_actor: MediaActor = MediaActor()
media_actor.id = str(uuid.uuid4())
media_actor.name = str(new_actor.name) # type: ignore
media_actor.url = str(new_actor.url) # type: ignore
media_actor.created_date = datetime.now()
media_actor.last_modified_date = datetime.now()
media_actor.version = 0
db.add(media_actor)
db.commit()
db.refresh(media_actor)
logger.info(f"created {media_actor}")
return media_actor
def delete_mediaactor(db: Session, actor_id: str):
logger.info(f"delete MediaActor with id {actor_id}")
media_actor = db.get(MediaActor, actor_id)
db.delete(media_actor)
db.commit()
def create_new_mediaactorfile(db: Session, actor_id: str, file_id: str) -> MediaActorFile:
logger.info(f"create MediaActorFile with actor {actor_id} and file {file_id}")
media_actor_file: MediaActorFile = MediaActorFile()
media_actor_file.id = str(uuid.uuid4())
media_actor_file.created_date = datetime.now()
media_actor_file.last_modified_date = datetime.now()
media_actor_file.version = 0
media_actor_file.media_actor_id = actor_id # type: ignore
media_actor_file.media_file_id = file_id # type: ignore
db.add(media_actor_file)
db.commit()
db.refresh(media_actor_file)
return media_actor_file
def delete_mediaactorfile(db: Session, actorfile_id: str):
logger.info(f"delete MediaActorFile with id {actorfile_id}")
media_actorfile = db.get(MediaActorFile, actorfile_id)
db.delete(media_actorfile)
db.commit()
+4
View File
@@ -1,5 +1,6 @@
from typing import Generator, Annotated
from fastapi import Depends
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
@@ -10,6 +11,9 @@ engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)
def get_db() -> Generator:
with SessionLocal() as db:
yield db
SessionDep: type[Session] = Annotated[Session, Depends(get_db)]
+30
View File
@@ -0,0 +1,30 @@
import databases
from src.core.log_conf import logger
from src.db.session import SQLALCHEMY_DATABASE_URL
async def check_db_connected():
try:
if not str(SQLALCHEMY_DATABASE_URL).__contains__("sqlite"):
database = databases.Database(SQLALCHEMY_DATABASE_URL)
if not database.is_connected:
await database.connect()
await database.execute("SELECT 1")
logger.info("Database is connected (^_^)")
except Exception as e:
print(
"Looks like db is missing or is there is some problem in connection,see below traceback"
)
raise e
async def check_db_disconnected():
try:
if not str(SQLALCHEMY_DATABASE_URL).__contains__("sqlite"):
database = databases.Database(SQLALCHEMY_DATABASE_URL)
if database.is_connected:
await database.disconnect()
logger.info("Database is Disconnected (-_-) zZZ")
except Exception as e:
raise e