refactor kontor-api to use SQLAlchemy 2.0 features for mapping fields

(cherry picked from commit e57abdbef7e13e3880738cd639225df5db0c37be)
This commit is contained in:
Thomas Peetz
2026-01-29 14:43:37 +01:00
parent 25fa07d517
commit cf770f4814
20 changed files with 364 additions and 685 deletions
+2 -1
View File
@@ -1,6 +1,6 @@
from fastapi import APIRouter from fastapi import APIRouter
from src.apis.version1 import comic, mediaactor, mediafile, mediaactorfile, tysc, admin from src.apis.version1 import comic, mediaactor, mediafile, mediaactorfile, tysc, admin, user
api_router = APIRouter(prefix="/api") api_router = APIRouter(prefix="/api")
api_router.include_router(comic.router, prefix="/comics", tags=["comics"]) api_router.include_router(comic.router, prefix="/comics", tags=["comics"])
@@ -9,3 +9,4 @@ api_router.include_router(mediaactor.router, prefix="/media", tags=["media"])
api_router.include_router(mediaactorfile.router, prefix="/media", tags=["media"]) api_router.include_router(mediaactorfile.router, prefix="/media", tags=["media"])
api_router.include_router(tysc.router, prefix="/tysc", tags=["tysc"]) api_router.include_router(tysc.router, prefix="/tysc", tags=["tysc"])
api_router.include_router(admin.router, prefix="/login", tags=["login"]) api_router.include_router(admin.router, prefix="/login", tags=["login"])
api_router.include_router(user.router, prefix="/user", tags=["user"])
+8 -8
View File
@@ -25,7 +25,7 @@ router = APIRouter()
@router.get("/comics") @router.get("/comics")
def get_all_comics(db: SessionDep) -> List[ComicResponse]: def get_all_comics(db: SessionDep) -> List[ComicResponse]: # type: ignore
results: List[ComicResponse] = [] results: List[ComicResponse] = []
comics = list_comics(db) comics = list_comics(db)
for comic in comics: for comic in comics:
@@ -35,7 +35,7 @@ def get_all_comics(db: SessionDep) -> List[ComicResponse]:
@router.get("/comics/{comic_id}", response_model=ComicDetailsResponse) @router.get("/comics/{comic_id}", response_model=ComicDetailsResponse)
def get_comic(comic_id: str, db: SessionDep) -> ComicDetailsResponse: def get_comic(comic_id: str, db: SessionDep) -> ComicDetailsResponse: # type: ignore
comic = db.get(Comic, comic_id) comic = db.get(Comic, comic_id)
if comic is None: if comic is None:
raise HTTPException(status_code=404, detail="Comic could not be found") raise HTTPException(status_code=404, detail="Comic could not be found")
@@ -46,7 +46,7 @@ def get_comic(comic_id: str, db: SessionDep) -> ComicDetailsResponse:
@router.get("/artists", response_model=List[ArtistResponse]) @router.get("/artists", response_model=List[ArtistResponse])
def get_all_artists(db: SessionDep) -> List[ArtistResponse]: def get_all_artists(db: SessionDep) -> List[ArtistResponse]: # type: ignore
results: List[ArtistResponse] = [] results: List[ArtistResponse] = []
artists = db.query(Artist).all() artists = db.query(Artist).all()
for artist in artists: for artist in artists:
@@ -55,7 +55,7 @@ def get_all_artists(db: SessionDep) -> List[ArtistResponse]:
@router.get("/artists/{artist_id}", response_model=ArtistDetailResponse) @router.get("/artists/{artist_id}", response_model=ArtistDetailResponse)
def get_artist(artist_id: str, db: SessionDep) -> ArtistDetailResponse: def get_artist(artist_id: str, db: SessionDep) -> ArtistDetailResponse: # type: ignore
artist = db.get(Artist, artist_id) artist = db.get(Artist, artist_id)
if artist is None: if artist is None:
raise HTTPException(status_code=404, detail="Artist could not be found") raise HTTPException(status_code=404, detail="Artist could not be found")
@@ -64,7 +64,7 @@ def get_artist(artist_id: str, db: SessionDep) -> ArtistDetailResponse:
@router.post("/artists", status_code=status.HTTP_201_CREATED) @router.post("/artists", status_code=status.HTTP_201_CREATED)
def add_artist(db: SessionDep, artist_creation: ArtistCreation) -> ArtistResponse: def add_artist(db: SessionDep, artist_creation: ArtistCreation) -> ArtistResponse: # type: ignore
artist: Artist = Artist() artist: Artist = Artist()
setattr(artist, "name", artist_creation.name) setattr(artist, "name", artist_creation.name)
try: try:
@@ -77,7 +77,7 @@ def add_artist(db: SessionDep, artist_creation: ArtistCreation) -> ArtistRespons
@router.get("/publishers", response_model=List[PublisherResponse]) @router.get("/publishers", response_model=List[PublisherResponse])
def get_all_publishers(db: SessionDep) -> List[PublisherResponse]: def get_all_publishers(db: SessionDep) -> List[PublisherResponse]: # type: ignore
results: List[PublisherResponse] = [] results: List[PublisherResponse] = []
publishers = db.query(Publisher).all() publishers = db.query(Publisher).all()
for publisher in publishers: for publisher in publishers:
@@ -86,7 +86,7 @@ def get_all_publishers(db: SessionDep) -> List[PublisherResponse]:
@router.get("/publishers/{publisher_id}", response_model=PublisherDetailsResponse) @router.get("/publishers/{publisher_id}", response_model=PublisherDetailsResponse)
def get_publisher(publisher_id: str, db: SessionDep) -> PublisherDetailsResponse: def get_publisher(publisher_id: str, db: SessionDep) -> PublisherDetailsResponse: # type: ignore
publisher = db.get(Publisher, publisher_id) publisher = db.get(Publisher, publisher_id)
if publisher is None: if publisher is None:
raise HTTPException(status_code=404, detail="Publisher could not be found") raise HTTPException(status_code=404, detail="Publisher could not be found")
@@ -95,7 +95,7 @@ def get_publisher(publisher_id: str, db: SessionDep) -> PublisherDetailsResponse
@router.get("/issues", response_model=List[IssueDetailsResponse]) @router.get("/issues", response_model=List[IssueDetailsResponse])
def get_issues(db: SessionDep) -> List[IssueDetailsResponse]: def get_issues(db: SessionDep) -> List[IssueDetailsResponse]: # type: ignore
results: List[IssueDetailsResponse] = [] results: List[IssueDetailsResponse] = []
issues = db.query(Issue).all() issues = db.query(Issue).all()
for issue in issues: for issue in issues:
+4 -7
View File
@@ -1,9 +1,9 @@
from fastapi import APIRouter, status, HTTPException from fastapi import APIRouter, status, HTTPException
from sqlalchemy import select from sqlalchemy import select
from src.core.log_conf import logger from src.core.log_conf import logger
from src.db.repository.media import create_new_mediaactor, delete_mediaactor from src.db.repository.media import create_new_mediaactor, delete_mediaactor, get_actor_details
from src.db.session import SessionDep from src.db.session import SessionDep
from src.schema.media.actor import Actor, MediaActorResponse, get_actor_details from src.schema.media.actor import MediaActorModel, MediaActorResponse
from src.db.models.media import MediaActor from src.db.models.media import MediaActor
router = APIRouter() router = APIRouter()
@@ -32,13 +32,10 @@ def delete_actor(actor_id: str, db: SessionDep): # type: ignore
if not media_actor: if not media_actor:
raise HTTPException(status_code=404, detail="MediaActor could not be found") raise HTTPException(status_code=404, detail="MediaActor could not be found")
logger.info(f"delete MediaActor: {actor_id}") logger.info(f"delete MediaActor: {actor_id}")
actor_files = media_actor.media_actor_files delete_mediaactor(db, media_actor.id)
logger.info(f"MediaActorFiles links {len(actor_files)}")
if len(actor_files) == 0:
delete_mediaactor(db, media_actor.id)
@router.post("/actors", status_code=status.HTTP_201_CREATED) @router.post("/actors", status_code=status.HTTP_201_CREATED)
def add_actor(new_actor: Actor, db: SessionDep) -> MediaActorResponse: # type: ignore def add_actor(new_actor: MediaActorModel, db: SessionDep) -> MediaActorResponse: # type: ignore
logger.info(f"add actor {new_actor.url}") logger.info(f"add actor {new_actor.url}")
try: try:
mediaActor: MediaActor = create_new_mediaactor(new_actor, db) mediaActor: MediaActor = create_new_mediaactor(new_actor, db)
@@ -1,18 +1,19 @@
from typing import List
from fastapi import APIRouter, status, HTTPException from fastapi import APIRouter, status, HTTPException
from sqlalchemy import select from sqlalchemy import select
from src.db.models.media import MediaActorFile from src.db.models.media import MediaActorFile
from src.db.repository.media import delete_mediaactorfile from src.db.repository.media import delete_mediaactorfile, get_actorfile_details
from src.db.session import SessionDep from src.db.session import SessionDep
from src.schema.media.actorfile import MediaActorFileResponse, get_actorfile_details from src.schema.media.actorfile import MediaActorFileResponse
router = APIRouter() router = APIRouter()
@router.get("/actorfiles", response_model=list[MediaActorFileResponse]) @router.get("/actorfiles", response_model=List[MediaActorFileResponse])
def get_all_actorfiles(db: SessionDep) -> list[MediaActorFileResponse]: # type: ignore def get_all_actorfiles(db: SessionDep) -> List[MediaActorFileResponse]: # type: ignore
results: list[MediaActorFileResponse] = [] results: List[MediaActorFileResponse] = []
actorfiles = db.scalars(select(MediaActorFile)).all() actorfiles = db.scalars(select(MediaActorFile)).all()
for mediaactorfile in actorfiles: for media_actorfile in actorfiles:
response = MediaActorFileResponse(id=mediaactorfile.id, actor_id=str(mediaactorfile.media_actor_id), file_id=str(mediaactorfile.media_file_id)) response = get_actorfile_details(media_actorfile)
results.append(response) results.append(response)
return results return results
@@ -30,4 +31,3 @@ def delete_actorfile(actorfile_id: str, db: SessionDep): # type: ignore
if not media_actorfile: if not media_actorfile:
raise HTTPException(status_code=404, detail="MediaActor could not be found") raise HTTPException(status_code=404, detail="MediaActor could not be found")
delete_mediaactorfile(db, media_actorfile.id) delete_mediaactorfile(db, media_actorfile.id)
+47
View File
@@ -0,0 +1,47 @@
from typing import List
from fastapi import APIRouter, HTTPException, status
from sqlalchemy import select
from src.core.log_conf import logger
from src.db.models.admin import Profile
from src.db.repository.user import create_new_profile, get_profile_details
from src.db.session import SessionDep
from src.schema.user.profile import ProfileResponse, ProfileModel
router = APIRouter()
@router.get("/profiles", response_model=List[ProfileResponse])
def get_all_profiles(db: SessionDep) -> List[ProfileResponse]: # type: ignore
results: List[ProfileResponse] = []
profiles = db.scalars(select(Profile)).all()
for profile in profiles:
response = get_profile_details(profile)
results.append(response)
return results
@router.get("/profiles/{profile_id}", response_model=ProfileResponse)
def get_profile(profile_id: str, db: SessionDep) -> ProfileResponse: # type: ignore
profile = db.get(Profile, profile_id)
if not profile:
raise HTTPException(status_code=404, detail="MediaActor could not be found")
response = get_profile_details(profile)
return response
@router.delete("/profiles/{profile_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_profile(profile_id: str, db: SessionDep): # type: ignore
profile = db.get(Profile, profile_id)
if not profile:
raise HTTPException(status_code=404, detail="Profile could not be found")
logger.info(f"delete Profile: {profile_id}")
delete_profile(profile_id=profile_id, db=db)
@router.post("/profiles", status_code=status.HTTP_201_CREATED)
def add_profile(new_profile: ProfileModel, db: SessionDep) -> ProfileResponse: # type: ignore
logger.info(f"add profile {new_profile.user_name}")
try:
profile: Profile = create_new_profile(new_profile, db)
except:
raise HTTPException(status_code=409, detail="Profile duplicate")
response = get_profile_details(profile)
return response
+36 -35
View File
@@ -1,6 +1,7 @@
from datetime import datetime from datetime import datetime
from typing import List
from sqlalchemy import Column, ForeignKey, Integer, String, Boolean from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship, mapped_column, Mapped from sqlalchemy.orm import relationship, mapped_column, Mapped
from src.db.models.base import Base, BaseMixin from src.db.models.base import Base, BaseMixin
@@ -8,23 +9,23 @@ from src.db.models.base import Base, BaseMixin
class Profile(Base, BaseMixin): class Profile(Base, BaseMixin):
__tablename__ = 'profile' __tablename__ = 'profile'
first_name = Column(String) first_name: Mapped[str]
last_name = Column(String) last_name: Mapped[str]
user_name = Column(String, nullable=False) user_name: Mapped[str] = mapped_column(nullable=False)
email = Column(String) email: Mapped[str]
password = Column(String) password: Mapped[str]
enabled = Column(Boolean) enabled: Mapped[bool]
assignments = relationship("Assignment") assignments: Mapped[List["Assignment"]] = relationship(back_populates="profile")
tokens = relationship("Token") tokens: Mapped[List["Token"]] = relationship(back_populates="profile")
def get_full_name(self) -> str: def get_full_name(self) -> str:
full_name = "" full_name: str = ""
if self.first_name is not None: if self.first_name is not None:
full_name += self.first_name full_name += str(self.first_name)
if self.last_name is not None: if self.last_name is not None:
if len(full_name) > 0: if len(full_name) > 0:
full_name += " " full_name += " "
full_name += self.last_name full_name += str(self.last_name)
return full_name return full_name
def __str__(self): def __str__(self):
@@ -33,42 +34,42 @@ class Profile(Base, BaseMixin):
class Token(Base, BaseMixin): class Token(Base, BaseMixin):
__tablename__ = "token" __tablename__ = "token"
token = Column(String, nullable=False, unique=True) token: Mapped[str] = mapped_column(nullable=False, unique=True)
name = Column(String) name: Mapped[str]
last_used_date: Mapped[datetime] = mapped_column() last_used_date: Mapped[datetime]
enabled = Column(Boolean) enabled: Mapped[bool]
profile_id = Column(String, ForeignKey("profile.id"), nullable=False) profile_id = mapped_column(ForeignKey("profile.id"), nullable=False)
profile = relationship("Profile", back_populates="tokens") profile: Mapped[Profile] = relationship(back_populates="tokens")
class Permission(Base, BaseMixin): class Permission(Base, BaseMixin):
__tablename__ = "permission" __tablename__ = "permission"
name = Column(String, nullable=False) name: Mapped[str] = mapped_column(nullable=False)
assignments = relationship("Assignment") assignments: Mapped[List["Assignment"]] = relationship(back_populates="permission")
class Assignment(Base, BaseMixin): class Assignment(Base, BaseMixin):
__tablename__ = "assignment" __tablename__ = "assignment"
profile_id = Column(String, ForeignKey("profile.id"), nullable=False) profile_id = mapped_column(ForeignKey("profile.id"), nullable=False)
profile = relationship("Profile", back_populates="assignments") profile: Mapped[Profile] = relationship(back_populates="assignments")
permission_id = Column(String, ForeignKey("permission.id"), nullable=False) permission_id = mapped_column(ForeignKey("permission.id"), nullable=False)
permission = relationship("Permission", back_populates="assignments") permission: Mapped[Permission] = relationship(back_populates="assignments")
class MailAccount(Base, BaseMixin): class MailAccount(Base, BaseMixin):
__tablename__ = "mail_account" __tablename__ = "mail_account"
host = Column(String) host: Mapped[str]
port = Column(Integer) port: Mapped[int]
protocol = Column(String) protocol: Mapped[str]
user_name = Column(String) user_name: Mapped[str]
password = Column(String) password: Mapped[str]
start_tls = Column(Boolean) start_tls: Mapped[bool]
class Mail(Base, BaseMixin): class Mail(Base, BaseMixin):
__tablename__ = "mail" __tablename__ = "mail"
folder: Mapped[str] = mapped_column() folder: Mapped[str]
subject: Mapped[str] = mapped_column() subject: Mapped[str]
body: Mapped[str] = mapped_column() body: Mapped[str]
sent_date: Mapped[datetime] = mapped_column() sent_date: Mapped[datetime]
received_date: Mapped[datetime] = mapped_column() received_date: Mapped[datetime]
+9 -12
View File
@@ -1,7 +1,8 @@
from typing import Optional
import uuid import uuid
from datetime import datetime from datetime import datetime
from sqlalchemy import func, Column, String, Boolean from sqlalchemy import func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
@@ -10,21 +11,17 @@ class Base(DeclarativeBase):
class BaseMixin: class BaseMixin:
#id = Column(String, primary_key=True, default=uuid.uuid4)
id: Mapped[str] = mapped_column(primary_key=True, default=str(uuid.uuid4())) id: Mapped[str] = mapped_column(primary_key=True, default=str(uuid.uuid4()))
# created_date = Column(DateTime)
created_date: Mapped[datetime] = mapped_column(default=func.now()) created_date: Mapped[datetime] = mapped_column(default=func.now())
# last_modified_date = Column(DateTime)
last_modified_date: Mapped[datetime] = mapped_column(default=func.now()) last_modified_date: Mapped[datetime] = mapped_column(default=func.now())
# version = Column(Integer)
version: Mapped[int] = mapped_column(default=0) version: Mapped[int] = mapped_column(default=0)
class BaseVideoMixin: class BaseVideoMixin:
cloud_link = Column(String, nullable=True) cloud_link: Mapped[Optional[str]]
file_name = Column(String, nullable=True) file_name: Mapped[Optional[str]]
path = Column(String) path: Mapped[str]
review = Column(Boolean) review: Mapped[bool]
title = Column(String) title: Mapped[str]
url = Column(String, nullable=True) url: Mapped[str]
should_download = Column(Boolean) should_download: Mapped[bool]
+67 -67
View File
@@ -1,8 +1,8 @@
import uuid import uuid
from datetime import datetime from datetime import datetime
from typing import AnyStr, Dict, List, Optional, Any from typing import Dict, List, Optional, Any
from natsort import natsorted from natsort import natsorted
from sqlalchemy import Column, ForeignKey, Integer, String, Boolean, func from sqlalchemy import ForeignKey, func
from sqlalchemy.orm import relationship, Mapped, mapped_column from sqlalchemy.orm import relationship, Mapped, mapped_column
from src.db.models.base import Base, BaseMixin from src.db.models.base import Base, BaseMixin
@@ -14,12 +14,12 @@ class Publisher(Base):
created_date: Mapped[datetime] = mapped_column(default=func.now()) created_date: Mapped[datetime] = mapped_column(default=func.now())
last_modified_date: Mapped[datetime] = mapped_column(default=func.now()) last_modified_date: Mapped[datetime] = mapped_column(default=func.now())
version: Mapped[int] = mapped_column(default=0) version: Mapped[int] = mapped_column(default=0)
name = Column(String, unique=True) name: Mapped[str] = mapped_column(unique=True)
weblink = Column(String, nullable=True) weblink: Mapped[Optional[str]]
parent_publisher_id: Mapped[Optional[str]] = mapped_column(ForeignKey('publisher.id')) parent_publisher_id: Mapped[Optional[str]] = mapped_column(ForeignKey('publisher.id'))
parent_publisher: Mapped[Optional['Publisher']] = relationship("Publisher", back_populates="imprints", remote_side=[id]) parent_publisher: Mapped[Optional['Publisher']] = relationship(back_populates="imprints", remote_side=[id])
imprints: Mapped[List['Publisher']] = relationship('Publisher', back_populates="parent_publisher") imprints: Mapped[List['Publisher']] = relationship(back_populates="parent_publisher")
comics = relationship("Comic") comics = relationship(back_populates="publisher")
def __repr__(self): def __repr__(self):
return f'Publisher({self.id} {self.name})' return f'Publisher({self.id} {self.name})'
@@ -30,17 +30,17 @@ class Publisher(Base):
class Comic(Base, BaseMixin): class Comic(Base, BaseMixin):
__tablename__ = 'comic' __tablename__ = 'comic'
title = Column(String, unique=True) title: Mapped[str] = mapped_column(unique=True)
publisher_id = Column(String, ForeignKey('publisher.id'), nullable=False) publisher_id: Mapped[str] = mapped_column(ForeignKey('publisher.id'), nullable=False)
publisher = relationship("Publisher", back_populates="comics") publisher: Mapped[Publisher] = relationship(back_populates="comics")
current_order = Column(Boolean) current_order: Mapped[bool]
completed = Column(Boolean) completed: Mapped[bool]
weblink = Column(String, nullable=True) weblink: Mapped[Optional[str]]
issues = relationship("Issue", order_by="Issue.issue_number") issues: Mapped[List["Issue"]] = relationship(back_populates="comic", order_by="Issue.issue_number")
story_arcs = relationship("StoryArc") story_arcs: Mapped[List["StoryArc"]] = relationship(back_populates="comic")
trade_paperbacks = relationship("TradePaperback") trade_paperbacks: Mapped[List["TradePaperback"]] = relationship(back_populates="comic")
volumes = relationship("Volume") volumes: Mapped[List["Volume"]] = relationship(back_populates="comic")
comic_works = relationship("ComicWork") comic_works: Mapped[List["ComicWork"]] = relationship(back_populates="comic")
def __repr__(self): def __repr__(self):
return f'Comic({self.id} {self.version} {self.title} {self.publisher.name})' return f'Comic({self.id} {self.version} {self.title} {self.publisher.name})'
@@ -66,46 +66,46 @@ class Comic(Base, BaseMixin):
class Volume(Base, BaseMixin): class Volume(Base, BaseMixin):
__tablename__ = "volume" __tablename__ = "volume"
name = Column(String, nullable=False) name: Mapped[str] = mapped_column(nullable=False)
comic_id = Column(String, ForeignKey("comic.id"), nullable=False) comic_id: Mapped[str] = mapped_column(ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="volumes") comic: Mapped[Comic] = relationship(back_populates="volumes")
story_arcs = relationship("StoryArc") story_arcs: Mapped[List["StoryArc"]] = relationship(back_populates="volume")
issues = relationship("Issue") issues: Mapped[List["Issue"]] = relationship(back_populates="volume")
class TradePaperback(Base, BaseMixin): class TradePaperback(Base, BaseMixin):
__tablename__ = "trade_paperback" __tablename__ = "trade_paperback"
name = Column(String, nullable=False) name: Mapped[str] = mapped_column(nullable=False)
issue_start = Column(Integer) issue_start: Mapped[int]
issue_end = Column(Integer) issue_end: Mapped[int]
comic_id = Column(String, ForeignKey("comic.id"), nullable=False) comic_id: Mapped[str] = mapped_column(ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="trade_paperbacks") comic: Mapped[Comic] = relationship(back_populates="trade_paperbacks")
class StoryArc(Base, BaseMixin): class StoryArc(Base, BaseMixin):
__tablename__ = "story_arc" __tablename__ = "story_arc"
name = Column(String, nullable=False) name: Mapped[str] = mapped_column(nullable=False)
comic_id = Column(String, ForeignKey("comic.id"), nullable=False) comic_id: Mapped[str] = mapped_column(ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="story_arcs") comic: Mapped[Comic] = relationship(back_populates="story_arcs")
volume_id = Column(String, ForeignKey("volume.id"), nullable=True) volume_id: Mapped[str] = mapped_column(ForeignKey("volume.id"), nullable=True)
volume = relationship("Volume", back_populates="story_arcs") volume: Mapped[Volume] = relationship(back_populates="story_arcs")
issues = relationship("Issue") issues: Mapped[List["Issue"]] = relationship(back_populates="story_arc")
class Issue(Base, BaseMixin): class Issue(Base, BaseMixin):
__tablename__ = "issue" __tablename__ = "issue"
issue_number = Column(String) issue_number: Mapped[str]
title = Column(String, nullable=True) title: Mapped[Optional[str]]
published_on: Mapped[datetime] = mapped_column(nullable=True) published_on: Mapped[Optional[datetime]]
in_stock = Column(Boolean) in_stock: Mapped[bool]
is_read = Column(Boolean) is_read: Mapped[bool]
comic_id = Column(String, ForeignKey("comic.id"), nullable=False) comic_id: Mapped[str] = mapped_column(ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="issues") comic: Mapped[Comic] = relationship(back_populates="issues")
volume_id = Column(String, ForeignKey("volume.id"), nullable=True) volume_id: Mapped[str] = mapped_column(ForeignKey("volume.id"), nullable=True)
volume = relationship("Volume", back_populates="issues") volume: Mapped[Volume] = relationship(back_populates="issues")
story_arc_id = Column(String, ForeignKey("story_arc.id"), nullable=True) story_arc_id: Mapped[str] = mapped_column(ForeignKey("story_arc.id"), nullable=True)
story_arc = relationship("StoryArc", back_populates="issues") story_arc: Mapped[StoryArc] = relationship(back_populates="issues")
issue_works = relationship("IssueWork") issue_works: Mapped[List["IssueWork"]] = relationship(back_populates="issue")
def get_full_title(self) -> str: def get_full_title(self) -> str:
full_title: str = str(self.issue_number) full_title: str = str(self.issue_number)
@@ -127,10 +127,10 @@ class Issue(Base, BaseMixin):
class Artist(Base, BaseMixin): class Artist(Base, BaseMixin):
__tablename__ = "artist" __tablename__ = "artist"
name = Column(String, nullable=False) name: Mapped[str] = mapped_column(nullable=False)
weblink = Column(String, nullable=True) weblink: Mapped[str] = mapped_column(nullable=True)
comic_works = relationship("ComicWork") comic_works: Mapped[List["ComicWork"]] = relationship(back_populates="artist")
issue_works = relationship("IssueWork") issue_works: Mapped[List["IssueWork"]] = relationship(back_populates="artist")
def get_comics(self) -> Dict[Any, List[Comic]]: def get_comics(self) -> Dict[Any, List[Comic]]:
works: Dict[Any, List[Comic]] = {} works: Dict[Any, List[Comic]] = {}
@@ -146,12 +146,12 @@ class Artist(Base, BaseMixin):
class WorkType(Base, BaseMixin): class WorkType(Base, BaseMixin):
__tablename__ = "worktype" __tablename__ = "worktype"
name = Column(String, nullable=False, unique=True) name: Mapped[str] = mapped_column(nullable=False, unique=True)
comic_works = relationship("ComicWork") comic_works: Mapped[List["ComicWork"]] = relationship(back_populates="work_type")
issue_works = relationship("IssueWork") issue_works: Mapped[List["IssueWork"]] = relationship(back_populates="work_type")
def get_artists(self) -> Dict[str, List[str]]: def get_artists(self) -> Dict[str, List[Artist]]:
works: Dict[str, List[str]] = {} works: Dict[str, List[Artist]] = {}
for work in self.comic_works: for work in self.comic_works:
comic = work.comic.title comic = work.comic.title
artist = work.artist artist = work.artist
@@ -170,19 +170,19 @@ class WorkType(Base, BaseMixin):
class ComicWork(Base, BaseMixin): class ComicWork(Base, BaseMixin):
__tablename__ = "comic_work" __tablename__ = "comic_work"
comic_id = Column(String, ForeignKey("comic.id"), nullable=False) comic_id: Mapped[str] = mapped_column(ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="comic_works") comic: Mapped[Comic] = relationship(back_populates="comic_works")
artist_id = Column(String, ForeignKey("artist.id"), nullable=False) artist_id: Mapped[str] = mapped_column(ForeignKey("artist.id"), nullable=False)
artist = relationship("Artist", back_populates="comic_works") artist: Mapped[Artist] = relationship(back_populates="comic_works")
work_type_id = Column(String, ForeignKey("worktype.id"), nullable=False) work_type_id: Mapped[str] = mapped_column(ForeignKey("worktype.id"), nullable=False)
work_type = relationship("WorkType", back_populates="comic_works") work_type: Mapped[WorkType] = relationship(back_populates="comic_works")
class IssueWork(Base, BaseMixin): class IssueWork(Base, BaseMixin):
__tablename__ = "issue_work" __tablename__ = "issue_work"
issue_id = Column(String, ForeignKey("issue.id"), nullable=False) issue_id: Mapped[str] = mapped_column(ForeignKey("issue.id"), nullable=False)
issue = relationship("Issue", back_populates="issue_works") issue: Mapped[Issue] = relationship(back_populates="issue_works")
artist_id = Column(String, ForeignKey("artist.id"), nullable=False) artist_id: Mapped[str] = mapped_column(ForeignKey("artist.id"), nullable=False)
artist = relationship("Artist", back_populates="issue_works") artist: Mapped[Artist] = relationship(back_populates="issue_works")
work_type_id = Column(String, ForeignKey("worktype.id"), nullable=False) work_type_id: Mapped[str] = mapped_column(ForeignKey("worktype.id"), nullable=False)
work_type = relationship("WorkType", back_populates="issue_works") work_type: Mapped[WorkType] = relationship(back_populates="issue_works")
-395
View File
@@ -1,395 +0,0 @@
import json
import logging
from datetime import datetime
from enum import Enum, auto
from pathlib import Path
from typing import Any, List
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import sessionmaker
from src.db.models.tysc import Card, CardSet, Rooster, Team, FieldPosition, Player, Vendor, Sport
from src.db.models.comic import Issue, TradePaperback, StoryArc, Volume, ComicWork, Artist, Comic, Publisher, WorkType
from src.db.models.bookshelf import ArticleAuthor, BookAuthor, BookshelfPublisher, Article, Book, Author
from src.db.models.admin import Mail, MailAccount, ModuleData, Token, Assignment, Permission, Profile
from src.db.models.metadata import MetaDataTable, MetaDataColumn
from src.db.models.media import MediaVideo, MediaArticle, MediaFile, MediaActor, MediaActorFile
class ColumnEntry(Enum):
COLUMN_NAME = 'column'
COLUMN_LABEL = 'label'
COLUMN_ORDER = 'order'
COLUMN_REF_COLUMN = 'ref_column'
COLUMN_TYPE = 'type'
COLUMN_WIDGET = 'widget'
class StatusType(Enum):
UNKNOWN = auto()
FILE_NAME = auto()
FILE_ID = auto()
DUPLICATE = auto()
CLOUD_LINK = auto()
CLOUD_LINK_ID = auto()
class ExportType(Enum):
JSON = "JSON"
YAML = "YAML"
SQLITE = "SQLite"
class KontorDB:
def __init__(self, db_engine: Any):
self.engine = db_engine
self.registry = {}
self.init_registry()
def init_registry(self):
self.registry[Card.__tablename__] = Card
self.registry[CardSet.__tablename__] = CardSet
self.registry[Rooster.__tablename__] = Rooster
self.registry[Team.__tablename__] = Team
self.registry[FieldPosition.__tablename__] = FieldPosition
self.registry[Player.__tablename__] = Player
self.registry[Vendor.__tablename__] = Vendor
self.registry[Sport.__tablename__] = Sport
self.registry[Issue.__tablename__] = Issue
self.registry[TradePaperback.__tablename__] = TradePaperback
self.registry[StoryArc.__tablename__] = StoryArc
self.registry[Volume.__tablename__] = Volume
self.registry[ComicWork.__tablename__] = ComicWork
self.registry[Artist.__tablename__] = Artist
self.registry[Comic.__tablename__] = Comic
self.registry[Publisher.__tablename__] = Publisher
self.registry[WorkType.__tablename__] = WorkType
self.registry[ArticleAuthor.__tablename__] = ArticleAuthor
self.registry[BookAuthor.__tablename__] = BookAuthor
self.registry[BookshelfPublisher.__tablename__] = BookshelfPublisher
self.registry[Article.__tablename__] = Article
self.registry[Book.__tablename__] = Book
self.registry[Author.__tablename__] = Author
self.registry[MediaFile.__tablename__] = MediaFile
self.registry[MediaActor.__tablename__] = MediaActor
self.registry[MediaActorFile.__tablename__] = MediaActorFile
self.registry[MediaArticle.__tablename__] = MediaArticle
self.registry[MediaVideo.__tablename__] = MediaVideo
self.registry[MetaDataColumn.__tablename__] = MetaDataColumn
self.registry[MetaDataTable.__tablename__] = MetaDataTable
self.registry[Assignment.__tablename__] = Assignment
self.registry[Token.__tablename__] = Token
self.registry[Profile.__tablename__] = Profile
self.registry[Permission.__tablename__] = Permission
self.registry[ModuleData.__tablename__] = ModuleData
self.registry[MailAccount.__tablename__] = MailAccount
self.registry[Mail.__tablename__] = Mail
def get_table_names(self) -> list:
result = []
__session__ = sessionmaker(self.engine)
with __session__() as session:
tables = session.scalars(select(MetaDataTable)).all()
result = [table.table_name for table in tables]
return result
def get_table_by_name(self, table_name: str) -> dict:
result = {}
__session__ = sessionmaker(self.engine)
_filter = {'table_name': table_name}
with __session__() as session:
table = session.query(MetaDataTable).filter_by(**_filter).one()
result['id'] = table.id
result['table_name'] = table.table_name
return result
def get_column_meta_data(self, table_name: str, view_only=True) -> dict:
meta_data = {}
order = 0
__session__ = sessionmaker(self.engine)
columns = list()
table_info = self.get_table_by_name(table_name)
_filters = {'table_id': table_info['id']}
if view_only:
_filters['is_shown'] = True
with __session__() as session:
columns = session.query(MetaDataColumn).filter_by(**_filters).all()
for column in columns:
# self.log.info("get_column_meta_data: %s %s %d", column.column_name, column.column_label, column.column_order)
meta_data[order] = {
ColumnEntry.COLUMN_NAME: column.column_name,
ColumnEntry.COLUMN_LABEL: column.column_label,
ColumnEntry.COLUMN_ORDER: column.column_order,
ColumnEntry.COLUMN_REF_COLUMN: column.ref_column,
ColumnEntry.COLUMN_TYPE: column.column_type
}
order += 1
return meta_data
def get_columns(self, table_name: str) -> dict:
columns = {}
__session__ = sessionmaker(self.engine)
table_info = self.get_table_by_name(table_name)
_filters = {'table_id': table_info['id']}
with __session__() as session:
for column in session.query(MetaDataColumn).filter_by(**_filters).all():
columns[column.column_name] = {
ColumnEntry.COLUMN_ORDER: column.column_order,
ColumnEntry.COLUMN_TYPE: column.column_type
}
return columns
def get_filters(self, table_name: str) -> dict:
_filter_map = {}
__session__ = sessionmaker(self.engine)
table_info = self.get_table_by_name(table_name)
_filters = {'table_id': table_info['id'], 'show_filter': True}
with __session__() as session:
for column in session.query(MetaDataColumn).filter_by(**_filters).all():
_filter_map[column.column_name] = {
ColumnEntry.COLUMN_LABEL: column.filter_label,
ColumnEntry.COLUMN_WIDGET: None
}
return _filter_map
def data(self, table_name: str, columns: dict, filters: dict) -> list:
data = []
__session__ = sessionmaker(self.engine)
table = self.registry[table_name]
with __session__() as session:
entries = []
if len(filters) == 0:
entries = session.scalars(select(table)).all()
else:
entries = session.scalars(select(table).filter_by(**filters)).all()
for entry in entries:
# self.log.info("data: %s", entry)
row = []
for order in columns.keys():
column_name = columns[order][ColumnEntry.COLUMN_NAME]
ref_column = columns[order][ColumnEntry.COLUMN_REF_COLUMN]
if str(column_name).endswith("_id"):
ref_table = column_name[:-3]
ref = getattr(entry, ref_table)
value = getattr(ref, ref_column)
row.append(value)
else:
row.append(getattr(entry, column_name))
data.append(row)
# self.log.info("data: %s", data)
return data
def export_db(self, export_type: str, export_file_name: str) -> dict:
results = {}
db = {}
export_table_list = self.get_table_names()
for table in export_table_list:
columns = self.get_column_meta_data(table, view_only=False)
if table in self.registry:
model = self.registry[table]
else:
logging.info(f"table {table} is not registered")
continue
__session__ = sessionmaker(self.engine)
with __session__() as session:
rows = session.query(model).all()
entries = []
for row in rows:
# print(row)
entry = {}
for order in columns:
# print(columns[order])
column_name = columns[order][ColumnEntry.COLUMN_NAME]
# print(f"get value {column_name} from {row} of table {table}")
try:
value = getattr(row, column_name)
if isinstance(value, datetime):
entry[column_name] = str(value)
else:
entry[column_name] = value
except AttributeError:
pass
entries.append(entry)
db[table] = entries
results[table] = len(entries)
match export_type:
case "JSON":
json_dump = json.dumps(db, indent=4)
with open(export_file_name, "w") as dump_file:
dump_file.write(json_dump)
case "YAML":
pass
case "SQLite":
pass
logging.info(f"{len(results)} tables exported")
return results
def import_db(self, import_file_name: str) -> dict:
result = {}
import_file = Path(import_file_name)
if not import_file.exists():
logging.info(f"File {import_file_name} does not exist. Do nothing.")
return result
match import_file.suffix:
case '.json':
print("read json file")
with open(import_file_name, 'r') as json_file:
json_load = json.load(json_file)
for table in json_load:
logging.info(f"{table}: {len(json_load[table])}")
result[table] = self.import_table(table, json_load[table])
case '.yml':
print("read yaml file")
case '.yaml':
print("read yaml file")
case '.db':
print("read sqlite file")
return result
def import_table(self, table_name: str, items:list) -> dict:
result = {}
updated = []
added = []
remaining = []
existing_ids = self.get_ids(table_name)
logging.info(f"found {len(existing_ids)} existing ids for table {table_name}")
for item in items:
current_id = item['id']
# print(f"import item: {item}")
found_item = None
__session__ = sessionmaker(self.engine)
with __session__() as session:
found_item = session.get(self.registry[table_name], current_id)
# print(f"found item: {found_item}")
if found_item is not None:
changed = self.update_entry(table_name, current_id, item)
updated.append(item)
if changed:
logging.info(f"{current_id} has changed")
updated.append(item)
existing_ids.remove(current_id)
else:
try:
self.add_entry(table_name, item)
added.append(item)
except IntegrityError as error:
logging.info(f"Could not add item, due to: {error.detail}")
if len(existing_ids) > 0:
print(f"remaining items for {table_name}: {existing_ids}")
remaining.extend(existing_ids)
result['updated'] = updated
result['added'] = added
result['remaining'] = remaining
return result
def get_ids(self, table_name: str) -> list:
existing_ids = []
__session__ = sessionmaker(self.engine)
with __session__() as session:
items = session.query(self.registry[table_name]).all()
for item in items:
existing_ids.append(getattr(item, 'id'))
return existing_ids
def add_entry(self, table_name: str, update_item: dict):
logging.debug(f"add entry to table {table_name} with {update_item}")
__session__ = sessionmaker(self.engine)
with __session__() as session:
add_item = self.registry[table_name]()
for key in update_item.keys():
update_value = update_item[key]
setattr(add_item, key, update_value)
session.add(add_item)
session.commit()
def update_entry(self, table_name, current_id, update_item: dict) -> bool:
# self.log.info("update entry to table %s", table_name)
__session__ = sessionmaker(self.engine)
with __session__() as session:
existing_item = session.query(self.registry[table_name]).get(current_id)
changed = False
for key in update_item.keys():
update_value = update_item[key]
existing_value = getattr(existing_item, key)
if type(existing_value) is not type(update_value):
existing_value = str(existing_value)
if existing_value != update_value:
logging.info(f"{key} has changed: {existing_value} != {update_value}")
setattr(existing_item, key, update_value)
session.commit()
changed = True
logging.info(f"update {key} with {update_value}")
return changed
def add_link(self, link: str) -> dict:
result = {}
__session__ = sessionmaker(self.engine)
with __session__() as session:
media_file = MediaFile()
media_file.id = str(uuid.uuid4())
media_file.created_date = datetime.now()
media_file.last_modified_date = datetime.now()
media_file.version = 0
media_file.url = link
media_file.review = 1
media_file.should_download = 1
try:
session.add(media_file)
session.commit()
result['added'] = {'url': media_file.url, 'title': media_file.title, 'review': media_file.review, 'download': media_file.should_download}
except IntegrityError as error:
session.rollback()
result['error'] = error.orig
return result
def update_titles(self) -> dict:
update_list = {}
__session__ = sessionmaker(self.engine)
_filter = { 'review': True}
with __session__() as session:
links = session.query(MediaFile).filter_by(**_filter).all()
self.log.info("%d entries found for updating titles", len(links))
for link in links:
url = link.url
if url is None:
continue
link.update_title()
session.commit()
update_list[link.id] = link.title
return update_list
def get_download_list(self) -> List[str]:
download_list = []
__session__ = sessionmaker(self.engine)
_filter = { 'should_download': True}
with __session__() as session:
links = session.query(MediaFile).filter_by(**_filter).all()
for link in links:
url = link.url
if url is None:
continue
download_list.append(link.id)
return download_list
def download_file(self, entry_id: str, download_dir = "/data/media", dl_tool = "yt-dlp") -> str:
__session__ = sessionmaker(self.engine)
with __session__() as session:
link = session.query(MediaFile).get(entry_id)
link.download_file(download_dir, dl_tool)
session.commit()
file_name = link.file_name
return file_name
def delete_entries(self):
for (table_name, table) in self.registry.items():
# self.log.info("delete entries from table %s", table_name)
__session__ = sessionmaker(self.engine)
with __session__() as session:
items = session.query(table).all()
for item in items:
session.delete(item)
session.commit()
def check_files(self):
pass
+20 -64
View File
@@ -3,18 +3,19 @@ import re
import subprocess import subprocess
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import List, Optional
import requests import requests
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from sqlalchemy import Column, String, ForeignKey, Boolean from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship from sqlalchemy.orm import Mapped, relationship, mapped_column
from src.db.models.base import Base, BaseMixin, BaseVideoMixin from src.db.models.base import Base, BaseMixin, BaseVideoMixin
class MediaFile(Base, BaseMixin, BaseVideoMixin): class MediaFile(Base, BaseMixin, BaseVideoMixin):
__tablename__ = 'media_file' __tablename__ = 'media_file'
media_actor_files = relationship("MediaActorFile") media_actor_files: Mapped[List["MediaActorFile"]] = relationship(back_populates="media_file")
def __repr__(self): def __repr__(self):
return f'MediaFile({self.id} {self.title} {self.title})' return f'MediaFile({self.id} {self.title} {self.title})'
@@ -22,56 +23,11 @@ class MediaFile(Base, BaseMixin, BaseVideoMixin):
def __str__(self): def __str__(self):
return f'{self.title}({self.id})' return f'{self.title}({self.id})'
def update_title(self) -> None:
logging.info(f"update title for {self.url}")
try:
r = requests.get(str(self.url))
soup = BeautifulSoup(r.content, "html.parser")
title = soup.title.get_text() # type: ignore
self.title = title
self.review = False
except:
self.title = None
self.review = True
self.last_modified_date = datetime.now()
def download_file(self, download_dir: str, dl_tool: str):
logging.info(f"download file for {self.url} to {download_dir}")
result = subprocess.run([dl_tool, self.url], cwd=download_dir, capture_output=True, text=True) # type: ignore
if result.returncode == 0:
output = result.stdout
output = re.sub(' +', ' ', output)
lines_list = output.splitlines()
file_name = self.__parse_output__(lines_list)
if file_name is None:
self.review = True
self.should_download = True
self.file_name = None
else:
download_file = Path(file_name)
self.should_download = False
self.file_name = download_file.name
self.cloud_link = str(download_file.absolute())
self.last_modified_date = datetime.now()
def __parse_output__(self, lines_list):
self.file_name = None
for line in lines_list:
if 'has already been downloaded' in line:
end_len = len(' has already been downloaded')
self.file_name = line[11:-end_len]
if 'Destination' in line:
line_len = len(line)
start_len = len('[download] Destination: ')
file_len = line_len - start_len
self.file_name = line[-file_len:]
return self.file_name
class MediaActor(Base, BaseMixin): class MediaActor(Base, BaseMixin):
__tablename__ = 'media_actor' __tablename__ = 'media_actor'
name = Column(String) name: Mapped[str]
url = Column(String, unique=True, nullable=True) url: Mapped[Optional[str]] = mapped_column(unique=True)
media_actor_files = relationship("MediaActorFile") media_actor_files = relationship("MediaActorFile")
def __repr__(self) -> str: def __repr__(self) -> str:
@@ -83,10 +39,10 @@ class MediaActor(Base, BaseMixin):
class MediaActorFile(Base, BaseMixin): class MediaActorFile(Base, BaseMixin):
__tablename__ = 'media_actor_file' __tablename__ = 'media_actor_file'
media_actor_id = Column(String, ForeignKey("media_actor.id"), nullable=False) media_actor_id: Mapped[str] = mapped_column(ForeignKey("media_actor.id"), nullable=False)
media_actor = relationship("MediaActor", back_populates="media_actor_files") media_actor: Mapped[MediaActor] = relationship(back_populates="media_actor_files")
media_file_id = Column(String, ForeignKey("media_file.id"), nullable=True) media_file_id: Mapped[str] = mapped_column(ForeignKey("media_file.id"), nullable=True)
media_file = relationship("MediaFile", back_populates="media_actor_files") media_file: Mapped[MediaFile] = relationship(back_populates="media_actor_files")
def __repr__(self): def __repr__(self):
return f'MediaActorFile({self.id} {self.media_actor_id} {self.media_file_id})' return f'MediaActorFile({self.id} {self.media_actor_id} {self.media_file_id})'
@@ -96,20 +52,20 @@ class MediaActorFile(Base, BaseMixin):
class MediaArticle(Base, BaseMixin): class MediaArticle(Base, BaseMixin):
__tablename__ = 'media_article' __tablename__ = 'media_article'
review = Column(Boolean) review: Mapped[bool]
title = Column(String) title: Mapped[str]
url = Column(String, unique=True) url: Mapped[str] = mapped_column(unique=True)
class MediaVideo(Base, BaseMixin): class MediaVideo(Base, BaseMixin):
__tablename__ = 'media_video' __tablename__ = 'media_video'
cloud_link = Column(String) cloud_link: Mapped[str]
file_name = Column(String) file_name: Mapped[str]
path = Column(String) path: Mapped[str]
review = Column(Boolean) review: Mapped[bool]
title = Column(String) title: Mapped[str]
url = Column(String, unique=True) url: Mapped[str] = mapped_column(unique=True)
should_download = Column(Boolean) should_download: Mapped[bool]
def __repr__(self): def __repr__(self):
return f'MediaFile({self.id} {self.title} {self.url})' return f'MediaFile({self.id} {self.title} {self.url})'
+44 -43
View File
@@ -1,5 +1,6 @@
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Boolean from typing import List
from sqlalchemy.orm import relationship from sqlalchemy import ForeignKey, UniqueConstraint
from sqlalchemy.orm import relationship, mapped_column, Mapped
from src.db.models.base import Base, BaseMixin from src.db.models.base import Base, BaseMixin
@@ -9,18 +10,18 @@ class Sport(Base, BaseMixin):
__table_args__ = ( __table_args__ = (
UniqueConstraint("name"), UniqueConstraint("name"),
) )
name = Column(String, nullable=False, index=True, unique=True) name: Mapped[str] = mapped_column(nullable=False, index=True, unique=True)
teams = relationship("Team") teams: Mapped[List["Team"]] = relationship(back_populates="sport")
positions = relationship("FieldPosition") positions: Mapped[List["FieldPosition"]] = relationship(back_populates="sport")
class Team(Base, BaseMixin): class Team(Base, BaseMixin):
__tablename__ = "team" __tablename__ = "team"
name = Column(String, nullable=False, index=True, unique=True) name: Mapped[str] = mapped_column(nullable=False, index=True, unique=True)
short_name = Column(String, nullable=False, ) short_name: Mapped[str] = mapped_column(nullable=False)
sport_id = Column(String, ForeignKey("sport.id"), nullable=False) sport_id: Mapped[str] = mapped_column(ForeignKey("sport.id"), nullable=False)
sport = relationship("Sport", back_populates="teams") sport: Mapped[Sport] = relationship(back_populates="teams")
roosters = relationship("Rooster") roosters: Mapped[List["Rooster"]] = relationship(back_populates="team")
class FieldPosition(Base, BaseMixin): class FieldPosition(Base, BaseMixin):
@@ -29,11 +30,11 @@ class FieldPosition(Base, BaseMixin):
UniqueConstraint("name", "sport_id"), UniqueConstraint("name", "sport_id"),
UniqueConstraint("short_name", "sport_id"), UniqueConstraint("short_name", "sport_id"),
) )
name = Column(String, nullable=False, index=True) name: Mapped[str] = mapped_column(nullable=False, index=True)
short_name = Column(String, nullable=False) short_name: Mapped[str] = mapped_column(nullable=False)
sport_id = Column(String, ForeignKey("sport.id"), nullable=False, index=True) sport_id: Mapped[str] = mapped_column(ForeignKey("sport.id"), nullable=False, index=True)
sport = relationship("Sport", back_populates="positions") sport: Mapped[Sport] = relationship(back_populates="positions")
roosters = relationship("Rooster") roosters: Mapped[List["Rooster"]] = relationship(back_populates="position")
class Player(Base, BaseMixin): class Player(Base, BaseMixin):
@@ -41,9 +42,9 @@ class Player(Base, BaseMixin):
__table_args__ = ( __table_args__ = (
UniqueConstraint("first_name", "last_name"), UniqueConstraint("first_name", "last_name"),
) )
first_name = Column(String, nullable=False, index=True) first_name: Mapped[str] = mapped_column(nullable=False, index=True)
last_name = Column(String, nullable=False, index=True) last_name: Mapped[str] = mapped_column(nullable=False, index=True)
roosters = relationship("Rooster") roosters: Mapped[List["Rooster"]] = relationship(back_populates="player")
def get_full_name(self) -> str: def get_full_name(self) -> str:
return f"{self.last_name}, {self.first_name}" return f"{self.last_name}, {self.first_name}"
@@ -54,21 +55,21 @@ class Rooster(Base, BaseMixin):
__table_args__ = ( __table_args__ = (
UniqueConstraint("year", "team_id", "player_id", "position_id"), UniqueConstraint("year", "team_id", "player_id", "position_id"),
) )
year = Column(Integer) yea: Mapped[int]
team_id = Column(String, ForeignKey("team.id"), nullable=False, index=True) team_id: Mapped[str] = mapped_column(ForeignKey("team.id"), nullable=False, index=True)
team = relationship("Team", back_populates="roosters") team: Mapped[Team] = relationship(back_populates="roosters")
player_id = Column(String, ForeignKey("player.id"), nullable=False, index=True) player_id: Mapped[str] = mapped_column(ForeignKey("player.id"), nullable=False, index=True)
player = relationship("Player", back_populates="roosters") player: Mapped[Player] = relationship(back_populates="roosters")
position_id = Column(String, ForeignKey("field_position.id"), nullable=False, index=True) position_id: Mapped[str] = mapped_column(ForeignKey("field_position.id"), nullable=False, index=True)
position = relationship("FieldPosition", back_populates="roosters") position: Mapped[FieldPosition] = relationship(back_populates="roosters")
cards = relationship("Card") cards: Mapped[List["Card"]] = relationship(back_populates="rooster")
class Vendor(Base, BaseMixin): class Vendor(Base, BaseMixin):
__tablename__ = "vendor" __tablename__ = "vendor"
name = Column(String, nullable=False, unique=True, index=True) name: Mapped[str] = mapped_column(nullable=False, unique=True, index=True)
card_sets = relationship("CardSet") card_sets: Mapped[List["CardSet"]] = relationship(back_populates="vendor")
cards = relationship("Card") cards: Mapped[List["Card"]] = relationship(back_populates="vendor")
class CardSet(Base, BaseMixin): class CardSet(Base, BaseMixin):
@@ -76,12 +77,12 @@ class CardSet(Base, BaseMixin):
__table_args__ = ( __table_args__ = (
UniqueConstraint("name", "vendor_id"), UniqueConstraint("name", "vendor_id"),
) )
name = Column(String, index=True) name: Mapped[str] = mapped_column(index=True)
parallel_set = Column(Boolean) parallel_set: Mapped[bool]
insert_set = Column(Boolean) insert_set: Mapped[bool]
vendor_id = Column(String, ForeignKey("vendor.id"), nullable=False, index=True) vendor_id: Mapped[str] = mapped_column(ForeignKey("vendor.id"), nullable=False, index=True)
vendor = relationship("Vendor", back_populates="card_sets") vendor: Mapped[Vendor] = relationship(back_populates="card_sets")
cards = relationship("Card") cards: Mapped[List["Card"]] = relationship(back_populates="card_set")
class Card(Base, BaseMixin): class Card(Base, BaseMixin):
@@ -89,11 +90,11 @@ class Card(Base, BaseMixin):
__table_args__ = ( __table_args__ = (
UniqueConstraint("card_number", "year", "vendor_id", "card_set_id"), UniqueConstraint("card_number", "year", "vendor_id", "card_set_id"),
) )
card_number = Column(Integer, index=True) card_number: Mapped[int] = mapped_column(index=True)
year = Column(Integer, index=True) year: Mapped[int] = mapped_column(index=True)
card_set_id = Column(String, ForeignKey("card_set.id"), nullable=False) card_set_id: Mapped[str] = mapped_column(ForeignKey("card_set.id"), nullable=False)
card_set = relationship("CardSet", back_populates="cards") card_set: Mapped[CardSet] = relationship(back_populates="cards")
rooster_id = Column(String, ForeignKey("rooster.id"), nullable=False) rooster_id: Mapped[str] = mapped_column(ForeignKey("rooster.id"), nullable=False)
rooster = relationship("Rooster", back_populates="cards") rooster: Mapped[Rooster] = relationship(back_populates="cards")
vendor_id = Column(String, ForeignKey("vendor.id"), nullable=False) vendor_id: Mapped[str] = mapped_column(ForeignKey("vendor.id"), nullable=False)
vendor = relationship("Vendor", back_populates="cards") vendor: Mapped[Vendor] = relationship(back_populates="cards")
@@ -36,11 +36,12 @@ def get_artist_details(artist: Artist) -> ArtistDetailResponse:
) )
return response return response
def update_artist(add_artist: AddArtist, artist_id: str, db: Session) -> Artist: def update_artist(add_artist: AddArtist, artist_id: str, db: Session) -> Optional[Artist]:
logger.info("update artist") logger.info("update artist")
artist: Optional[Artist] = db.get(Artist, artist_id) artist: Optional[Artist] = db.get(Artist, artist_id)
artist.name = add_artist.name if artist is not None:
db.add(artist) artist.name = add_artist.name
db.commit() db.add(artist)
db.refresh(artist) db.commit()
db.refresh(artist)
return artist return artist
+5 -5
View File
@@ -1,4 +1,4 @@
from typing import Dict, List from typing import Dict, List, Optional
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
@@ -31,10 +31,10 @@ def get_issue_details(issue: Issue) -> IssueDetailsResponse:
return response return response
def update_comic(comic: ComicSchema, comic_id: str, db: Session) -> type[Comic] | None: def update_comic(new_comic: ComicSchema, comic_id: str, db: Session) -> Optional[Comic]:
logger.info(f"update_comic: {comic} with {comic_id}") logger.info(f"update_comic: {new_comic} with {comic_id}")
comic = db.get(Comic, comic_id) # type: ignore comic: Optional[Comic] = db.get(Comic, comic_id)
return comic # type: ignore return comic
def get_short_info(comic: Comic) -> ComicResponse: def get_short_info(comic: Comic) -> ComicResponse:
response = ComicResponse( response = ComicResponse(
@@ -1,6 +1,6 @@
import uuid import uuid
from datetime import datetime from datetime import datetime
from typing import AnyStr from typing import AnyStr, Optional
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
@@ -22,11 +22,12 @@ def create_new_worktype(work: AddWorkType, db: Session) -> WorkType:
return worktype return worktype
def update_worktype(work: AddWorkType, worktype_id: AnyStr, db: Session) -> WorkType: def update_worktype(work: AddWorkType, worktype_id: AnyStr, db: Session) -> Optional[WorkType]:
logger.info("update worktype") logger.info("update worktype")
worktype = db.get(WorkType, worktype_id) worktype: Optional[WorkType] = db.get(WorkType, worktype_id)
worktype.name = work.worktype if worktype is not None:
db.add(worktype) worktype.name = work.worktype
db.commit() db.add(worktype)
db.refresh(worktype) db.commit()
db.refresh(worktype)
return worktype return worktype
+27 -8
View File
@@ -3,7 +3,8 @@ import uuid
from datetime import datetime from datetime import datetime
from src.core.log_conf import logger from src.core.log_conf import logger
from src.db.models.media import MediaActor, MediaActorFile, MediaFile, MediaVideo from src.db.models.media import MediaActor, MediaActorFile, MediaFile, MediaVideo
from src.schema.media.actor import Actor from src.schema.media.actor import MediaActorModel, MediaActorResponse
from src.schema.media.actorfile import MediaActorFileResponse
from src.webapps.media.forms import AddLinkForm from src.webapps.media.forms import AddLinkForm
@@ -44,12 +45,13 @@ def delete_mediafile(db: Session, media_file_id: str):
db.delete(media_file) db.delete(media_file)
db.commit() db.commit()
def create_new_mediaactor(new_actor: Actor, db: Session) -> MediaActor: def create_new_mediaactor(new_actor: MediaActorModel, db: Session) -> MediaActor:
logger.info(f"create MediaActor with url {new_actor.url}") logger.info(f"create MediaActor with url {new_actor.url}")
media_actor: MediaActor = MediaActor() media_actor: MediaActor = MediaActor()
media_actor.id = str(uuid.uuid4()) media_actor.id = str(uuid.uuid4())
media_actor.name = str(new_actor.name) # type: ignore if new_actor.name is not None:
media_actor.url = str(new_actor.url) # type: ignore media_actor.name = new_actor.name
media_actor.url = new_actor.url
media_actor.created_date = datetime.now() media_actor.created_date = datetime.now()
media_actor.last_modified_date = datetime.now() media_actor.last_modified_date = datetime.now()
media_actor.version = 0 media_actor.version = 0
@@ -62,9 +64,18 @@ def create_new_mediaactor(new_actor: Actor, db: Session) -> MediaActor:
def delete_mediaactor(db: Session, actor_id: str): def delete_mediaactor(db: Session, actor_id: str):
logger.info(f"delete MediaActor with id {actor_id}") logger.info(f"delete MediaActor with id {actor_id}")
media_actor = db.get(MediaActor, actor_id) media_actor = db.get(MediaActor, actor_id)
if media_actor is not None:
actor_files = media_actor.media_actor_files
for actor_file in actor_files:
delete_mediaactorfile(db, actorfile_id=actor_file.id)
db.refresh(media_actor)
db.delete(media_actor) db.delete(media_actor)
db.commit() db.commit()
def get_actor_details(media_actor: MediaActor) -> MediaActorResponse:
reponse: MediaActorResponse = MediaActorResponse(id=media_actor.id, name=str(media_actor.name), url=str(media_actor.url))
return reponse
def create_new_mediaactorfile(db: Session, actor_id: str, file_id: str) -> MediaActorFile: def create_new_mediaactorfile(db: Session, actor_id: str, file_id: str) -> MediaActorFile:
logger.info(f"create MediaActorFile with actor {actor_id} and file {file_id}") logger.info(f"create MediaActorFile with actor {actor_id} and file {file_id}")
media_actor_file: MediaActorFile = MediaActorFile() media_actor_file: MediaActorFile = MediaActorFile()
@@ -72,8 +83,8 @@ def create_new_mediaactorfile(db: Session, actor_id: str, file_id: str) -> Media
media_actor_file.created_date = datetime.now() media_actor_file.created_date = datetime.now()
media_actor_file.last_modified_date = datetime.now() media_actor_file.last_modified_date = datetime.now()
media_actor_file.version = 0 media_actor_file.version = 0
media_actor_file.media_actor_id = actor_id # type: ignore media_actor_file.media_actor_id = actor_id
media_actor_file.media_file_id = file_id # type: ignore media_actor_file.media_file_id = file_id
db.add(media_actor_file) db.add(media_actor_file)
db.commit() db.commit()
db.refresh(media_actor_file) db.refresh(media_actor_file)
@@ -83,4 +94,12 @@ def delete_mediaactorfile(db: Session, actorfile_id: str):
logger.info(f"delete MediaActorFile with id {actorfile_id}") logger.info(f"delete MediaActorFile with id {actorfile_id}")
media_actorfile = db.get(MediaActorFile, actorfile_id) media_actorfile = db.get(MediaActorFile, actorfile_id)
db.delete(media_actorfile) db.delete(media_actorfile)
db.commit() db.commit()
def get_actorfile_details(media_actorfile: MediaActorFile) -> MediaActorFileResponse:
response: MediaActorFileResponse = MediaActorFileResponse(
id=media_actorfile.id,
file_id=str(media_actorfile.media_file_id),
actor_id=str(media_actorfile.media_actor_id)
)
return response
+50
View File
@@ -0,0 +1,50 @@
from datetime import datetime
from typing import Optional
import uuid
from src.core.log_conf import logger
from src.db.models.admin import Assignment, Profile
from sqlalchemy.orm import Session
from src.schema.user.profile import ProfileModel, ProfileResponse
def create_new_profile(new_profile: ProfileModel, db: Session) -> Profile:
logger.info(f"create MediaActor with url {new_profile.user_name}")
profile: Profile = Profile()
profile.id = str(uuid.uuid4())
profile.user_name = new_profile.user_name
profile.first_name = new_profile.first_name
profile.last_name = new_profile.last_name
profile.created_date = datetime.now()
profile.last_modified_date = datetime.now()
profile.version = 0
db.add(profile)
db.commit()
db.refresh(profile)
logger.info(f"created {profile}")
return profile
def delete_profile(db: Session, profile_id: str):
logger.info(f"delete Profile with id {profile_id}")
profile: Optional[Profile] = db.get(Profile, profile_id)
if profile is not None:
assignments = profile.assignments
for assignment in assignments:
delete_assignment(db, assignment.id)
db.delete(profile)
db.commit()
def get_profile_details(profile: Profile) -> ProfileResponse:
reponse: ProfileResponse = ProfileResponse(
id=profile.id,
user_name=str(profile.user_name)
)
return reponse
def delete_assignment(db: Session, assignment_id: str) -> None:
logger.info(f"delete Assignment with id {assignment_id}")
assignment: Optional[Assignment] = db.get(Assignment, assignment_id)
if assignment is not None:
db.delete(assignment)
db.commit()
+4 -10
View File
@@ -1,18 +1,12 @@
from datetime import datetime from typing import Optional
from src.db.models.media import MediaActor
from pydantic import BaseModel from pydantic import BaseModel
class MediaActorResponse(BaseModel): class MediaActorResponse(BaseModel):
id: str id: str
name: str | None name: Optional[str]
url: str url: str
class Actor(BaseModel): class MediaActorModel(BaseModel):
name: str | None name: Optional[str]
url: str url: str
def get_actor_details(media_actor: MediaActor) -> MediaActorResponse:
reponse: MediaActorResponse = MediaActorResponse(id=media_actor.id, name=str(media_actor.name), url=str(media_actor.url))
return reponse
+1 -7
View File
@@ -1,6 +1,6 @@
from datetime import datetime from datetime import datetime
from src.db.models.media import MediaActorFile, MediaFile from src.db.models.media import MediaActorFile
from pydantic import BaseModel from pydantic import BaseModel
@@ -8,9 +8,3 @@ class MediaActorFileResponse(BaseModel):
id: str id: str
file_id: str file_id: str
actor_id: str actor_id: str
def get_actorfile_details(media_actorfile: MediaActorFile) -> MediaActorFileResponse:
response: MediaActorFileResponse = MediaActorFileResponse(id=media_actorfile.id,
file_id=str(media_actorfile.media_file_id),
actor_id=str(media_actorfile.media_actor_id))
return response
+5 -3
View File
@@ -30,9 +30,11 @@ def get_file_details(mediafile: MediaFile) -> MediaFileResponse:
def set_file(model: MediaFileResponse, mediafile: MediaFile) -> None: def set_file(model: MediaFileResponse, mediafile: MediaFile) -> None:
mediafile.file_name = model.file_name mediafile.file_name = model.file_name
mediafile.cloud_link = model.cloud_link # type: ignore mediafile.cloud_link = model.cloud_link
mediafile.url = model.url # type: ignore if model.url is not None:
mediafile.title = model.title mediafile.url = model.url
if model.title is not None:
mediafile.title = model.title
mediafile.last_modified_date = datetime.now() mediafile.last_modified_date = datetime.now()
mediafile.review = model.review mediafile.review = model.review
mediafile.should_download = model.should_download mediafile.should_download = model.should_download
+13
View File
@@ -0,0 +1,13 @@
from pydantic import BaseModel
from src.db.models.admin import Profile
class ProfileResponse(BaseModel):
id: str
user_name: str
class ProfileModel(BaseModel):
user_name: str
first_name: str
last_name: str