From b87f0fc60a9a1867d4ac6702e6fc86215914cd7e Mon Sep 17 00:00:00 2001 From: Thomas Peetz Date: Mon, 29 Dec 2025 17:24:01 +0100 Subject: [PATCH] refactor kontor-spring --- kontor-scripts/add_links.py | 89 ++++++++- kontor-scripts/db/models/__init__.py | 3 +- kontor-scripts/db/models/admin.py | 46 ++--- kontor-scripts/db/models/bookshelf.py | 36 ++-- kontor-scripts/db/models/comic.py | 63 +++--- kontor-scripts/db/models/database.py | 272 -------------------------- kontor-scripts/db/models/media.py | 55 ++++-- kontor-scripts/db/models/tysc.py | 50 ++--- kontor-scripts/import.py | 43 ++-- 9 files changed, 237 insertions(+), 420 deletions(-) diff --git a/kontor-scripts/add_links.py b/kontor-scripts/add_links.py index 3ed76d7..786a679 100644 --- a/kontor-scripts/add_links.py +++ b/kontor-scripts/add_links.py @@ -1,9 +1,11 @@ """ read file with links and store it in DB """ +from datetime import datetime import logging.config import re -from typing import List +from typing import Dict, List +import uuid from bs4 import BeautifulSoup import requests import yaml @@ -16,13 +18,14 @@ from sqlalchemy.orm import sessionmaker, Session from db.models.base import Base import os -from db.models.media import MediaFile +from db.models.media import MediaActor, MediaActorFile, MediaFile parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) parser.add_argument('--file', '-f', help='file with links', default='~/.sync/media/list.txt') parser.add_argument('--video', help='store Url as VideoFile', action="store_true") parser.add_argument('--config', '-c', default='kontor-docker') parser.add_argument('--verbose', '-v', action='count', default=0) +parser.add_argument('--limit', '-l', type=int, help='maximum number of links to check') parser.add_argument('--dry-run', '-m', help='excute script without storing', action="store_true") args = parser.parse_args() @@ -72,7 +75,14 @@ def load_data(filename: str, log) -> List[str]: links.append(line.rstrip()) return links -def get_meta_info(media_file: MediaFile, log): +def get_actors_mapping(actor_list: List[MediaActor]) -> Dict[str, MediaActor]: + mapping: Dict[str, MediaActor] = {} + for actor in actor_list: + mapping[str(actor.url)] = actor + return mapping + +def get_meta_info(media_file: MediaFile, log) -> List[str]: + actor_links: List[str] = [] try: r = requests.get(media_file.url) soup = BeautifulSoup(r.content, "html.parser") @@ -81,13 +91,12 @@ def get_meta_info(media_file: MediaFile, log): log.warning(f"{error404.get_text()}") media_file.url = None media_file.review = False - return + return actor_links title_tag = soup.find('title') if title_tag: media_file.title = title_tag.get_text() media_file.review = False anchors = soup.find_all('a', attrs={'href': re.compile("^https://.*pornstars/.*")}) - actor_links = [] for anchor in anchors: link_url = str(anchor.get("href")) # type: ignore if link_url.endswith('all/countries'): @@ -95,35 +104,101 @@ def get_meta_info(media_file: MediaFile, log): if link_url in actor_links: continue actor_links.append(link_url) - log.info(f"links({len(actor_links)}): {actor_links}") except Exception as error: log.info(f"something went wrong: {error}") media_file.title = None media_file.review = True log.info(f"update MediaFile with MetaInfos to {repr(media_file)}") + log.info(f"links({len(actor_links)}): {actor_links}") + return actor_links + +def get_actor_name(actor_url: str, log: logging.Logger) -> str | None: + try: + r = requests.get(actor_url) + soup = BeautifulSoup(r.content, "html.parser") + titles = soup.find_all('h1') + for title in titles: + log.info(f"title: {title.get_text()}") + return title.get_text() + except Exception as error: + log.warning(f"something went wrong: {error}") + return None if __name__ == '__main__': logger = get_logger(args.verbose, "kontor") logger.info('kontor.add_links started') + if args.limit: + logger.warning(f"check the first {args.limit} links") session = get_session() + links_index = 1 with session as db: links = load_data(args.file, logger) for link in links: logger.debug(f"process {link}") media_files = db.query(MediaFile).filter(MediaFile.url == link).all() + media_actors = db.query(MediaActor).all() + actor_mapping = get_actors_mapping(media_actors) if len(media_files) == 0: logger.info(f"MediaFile for link {link} not found") media_file = MediaFile() + media_file.id = str(uuid.uuid4()) + media_file.created_date = datetime.now() + media_file.last_modified_date = datetime.now() + media_file.version = 0 media_file.url = link media_file.review = True media_file.should_download = True - get_meta_info(media_file, logger) + media_file.path = None + media_file.cloud_link = None + media_file.file_name = None + actor_urls: List[str] = get_meta_info(media_file, logger) if not args.dry_run: db.add(media_file) db.commit() db.refresh(media_file) + for actor_url in actor_urls: + if actor_url in actor_mapping: + media_actor: MediaActor = actor_mapping[actor_url] + # logger.info(f"create mapping for {repr(media_actor)}") + media_actor_file = MediaActorFile() + media_actor_file.id = str(uuid.uuid4()) + media_actor_file.created_date = datetime.now() + media_actor_file.last_modified_date = datetime.now() + media_actor_file.version = 0 + media_actor_file.media_file_id = media_file.id + media_actor_file.media_actor_id = media_actor.id + logger.info(f"create mapping with {media_actor_file}") + if not args.dry_run: + db.add(media_actor_file) + db.commit() + else: + media_actor = MediaActor() + media_actor.id = str(uuid.uuid4()) + media_actor.created_date = datetime.now() + media_actor.last_modified_date = datetime.now() + media_actor.version = 0 + media_actor.name = get_actor_name(actor_url, logger) + media_actor.url = actor_url + logger.info(f"update MediaActor with {repr(media_actor)}") + if not args.dry_run: + db.add(media_actor) + db.commit() + media_actor_file = MediaActorFile() + media_actor_file.id = str(uuid.uuid4()) + media_actor_file.created_date = datetime.now() + media_actor_file.last_modified_date = datetime.now() + media_actor_file.version = 0 + media_actor_file.media_file_id = media_file.id + media_actor_file.media_actor_id = media_actor.id + logger.info(f"create mapping with {media_actor_file}") + if not args.dry_run: + db.add(media_actor_file) + db.commit() else: for media_file in media_files: logger.debug(f"MediaFile with {media_file.id} is found") + links_index += 1 + if args.limit and args.limit < links_index: + break logger.info('kontor.add_link finished') diff --git a/kontor-scripts/db/models/__init__.py b/kontor-scripts/db/models/__init__.py index 22b716a..05911d7 100644 --- a/kontor-scripts/db/models/__init__.py +++ b/kontor-scripts/db/models/__init__.py @@ -1,3 +1,4 @@ +from typing import Any, Dict from db.models.admin import ( Assignment, Token, @@ -44,7 +45,7 @@ from db.models.tysc import ( Sport, ) -registry = { +registry: Dict[str, Any] = { Sport.__tablename__: Sport, Player.__tablename__: Player, Team.__tablename__: Team, diff --git a/kontor-scripts/db/models/admin.py b/kontor-scripts/db/models/admin.py index 5c41345..6acfa58 100644 --- a/kontor-scripts/db/models/admin.py +++ b/kontor-scripts/db/models/admin.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Any, AnyStr, Dict +from typing import Any, Dict from sqlalchemy import Column, ForeignKey, Integer, String, Boolean from sqlalchemy.orm import relationship, mapped_column, Mapped @@ -19,16 +19,16 @@ class Profile(Base, BaseMixin): tokens = relationship("Token") def get_full_name(self) -> str: - full_name = "" + full_name: str = "" if self.first_name is not None: - full_name += self.first_name + full_name += str(self.first_name) if self.last_name is not None: if len(full_name) > 0: full_name += " " - full_name += self.last_name + full_name += str(self.last_name) return full_name - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -40,8 +40,8 @@ class Profile(Base, BaseMixin): self.password = import_data['password'] self.enabled = import_data['enabled'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) @@ -64,7 +64,7 @@ class Token(Base, BaseMixin): profile_id = Column(String, ForeignKey("profile.id"), nullable=False) profile = relationship("Profile", back_populates="tokens") - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -75,8 +75,8 @@ class Token(Base, BaseMixin): self.enabled = import_data['enabled'] self.profile_id = import_data['profile_id'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) @@ -87,22 +87,22 @@ class Token(Base, BaseMixin): item['enabled'] = self.enabled item['profile_id'] = self.profile_id return item - + class Permission(Base, BaseMixin): __tablename__ = "permission" name = Column(String, nullable=False) assignments = relationship("Assignment") - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] self.version = import_data['version'] self.name = import_data['name'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) @@ -117,7 +117,7 @@ class Assignment(Base, BaseMixin): permission_id = Column(String, ForeignKey("permission.id"), nullable=False) permission = relationship("Permission", back_populates="assignments") - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -125,8 +125,8 @@ class Assignment(Base, BaseMixin): self.profile_id = import_data['profile_id'] self.permission_id = import_data['permission_id'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) @@ -145,7 +145,7 @@ class MailAccount(Base, BaseMixin): password = Column(String) start_tls = Column(Boolean) - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -157,8 +157,8 @@ class MailAccount(Base, BaseMixin): self.password = import_data['password'] self.start_tls = import_data['start_tls'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) @@ -180,7 +180,7 @@ class Mail(Base, BaseMixin): sent_date: Mapped[datetime] = mapped_column() received_date: Mapped[datetime] = mapped_column() - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -191,8 +191,8 @@ class Mail(Base, BaseMixin): self.sent_date = import_data['sent_date'] self.received_date = import_data['received_date'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) diff --git a/kontor-scripts/db/models/bookshelf.py b/kontor-scripts/db/models/bookshelf.py index 078bea6..83a00db 100644 --- a/kontor-scripts/db/models/bookshelf.py +++ b/kontor-scripts/db/models/bookshelf.py @@ -10,15 +10,15 @@ class Article(Base, BaseMixin): title = Column(String, unique=True) article_authors = relationship("ArticleAuthor") - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] self.version = import_data['version'] self.title = import_data['title'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) @@ -34,7 +34,7 @@ class Author(Base, BaseMixin): article_authors = relationship("ArticleAuthor") book_authors = relationship("BookAuthor") - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -42,8 +42,8 @@ class Author(Base, BaseMixin): self.first_name = import_data['first_name'] self.last_name = import_data['last_name'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) @@ -58,15 +58,15 @@ class BookshelfPublisher(Base, BaseMixin): name = Column(String, unique=True) books = relationship("Book") - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] self.version = import_data['version'] self.name = import_data['name'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) @@ -84,7 +84,7 @@ class Book(Base, BaseMixin): publisher = relationship('BookshelfPublisher', back_populates="books") book_authors = relationship("BookAuthor") - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -94,8 +94,8 @@ class Book(Base, BaseMixin): self.year = import_data['year'] self.publisher_id = import_data['publisher_id'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) @@ -114,7 +114,7 @@ class ArticleAuthor(Base, BaseMixin): author_id = Column(String, ForeignKey('author.id'), nullable=False) author = relationship('Author', back_populates="article_authors") - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -122,8 +122,8 @@ class ArticleAuthor(Base, BaseMixin): self.article_id = import_data['article_id'] self.author_id = import_data['author_id'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) @@ -140,7 +140,7 @@ class BookAuthor(Base, BaseMixin): book_id = Column(String, ForeignKey('book.id'), nullable=False) book = relationship('Book', back_populates="book_authors") - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -148,8 +148,8 @@ class BookAuthor(Base, BaseMixin): self.author_id = import_data['author_id'] self.book_id = import_data['book_id'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) diff --git a/kontor-scripts/db/models/comic.py b/kontor-scripts/db/models/comic.py index 180618f..afa1059 100644 --- a/kontor-scripts/db/models/comic.py +++ b/kontor-scripts/db/models/comic.py @@ -26,7 +26,7 @@ class Publisher(Base): def __str__(self): return self.__repr__() - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -34,8 +34,8 @@ class Publisher(Base): self.name = import_data['name'] self.parent_publisher_id = import_data['parent_publisher_id'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = { + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = { 'id': self.id, 'created_date': str(self.created_date), 'last_modified_date': str(self.last_modified_date), @@ -67,7 +67,7 @@ class Comic(Base, BaseMixin): def __str__(self): return f'{self.title}({self.id})' - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -79,8 +79,8 @@ class Comic(Base, BaseMixin): if 'weblink' in import_data: self.weblink = import_data['weblink'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = { + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = { 'id': self.id, 'created_date': str(self.created_date), 'last_modified_date': str(self.last_modified_date), @@ -102,7 +102,7 @@ class Volume(Base, BaseMixin): story_arcs = relationship("StoryArc") issues = relationship("Issue") - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -110,8 +110,8 @@ class Volume(Base, BaseMixin): self.name = import_data['name'] self.comic_id = import_data['comic_id'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) @@ -128,7 +128,7 @@ class TradePaperback(Base, BaseMixin): comic_id = Column(String, ForeignKey("comic.id"), nullable=False) comic = relationship("Comic", back_populates="trade_paperbacks") - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -138,8 +138,8 @@ class TradePaperback(Base, BaseMixin): self.issue_end = import_data['issue_end'] self.comic_id = import_data['comic_id'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) @@ -160,7 +160,7 @@ class StoryArc(Base, BaseMixin): volume = relationship("Volume", back_populates="story_arcs") issues = relationship("Issue") - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -169,8 +169,8 @@ class StoryArc(Base, BaseMixin): self.comic_id = import_data['comic_id'] self.volume_id = import_data['volume_id'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = { + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = { 'id': self.id, 'created_date': str(self.created_date), 'last_modified_date': str(self.last_modified_date), @@ -197,7 +197,7 @@ class Issue(Base, BaseMixin): story_arc = relationship("StoryArc", back_populates="issues") issue_works = relationship("IssueWork") - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -205,7 +205,7 @@ class Issue(Base, BaseMixin): self.issue_number = import_data['issue_number'] self.title = import_data['title'] if import_data['published_on'] == 'None': - self.published_on = None + self.published_on = None # type: ignore else: self.published_on = import_data['published_on'] self.in_stock = import_data['in_stock'] @@ -214,8 +214,8 @@ class Issue(Base, BaseMixin): self.volume_id = import_data['volume_id'] self.story_arc_id = import_data['story_arc_id'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = { + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = { 'id': self.id, 'created_date': str(self.created_date), 'last_modified_date': str(self.last_modified_date), @@ -239,7 +239,7 @@ class Artist(Base, BaseMixin): comic_works = relationship("ComicWork") issue_works = relationship("IssueWork") - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -248,8 +248,8 @@ class Artist(Base, BaseMixin): if 'weblink' in import_data: self.weblink = import_data['weblink'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = { + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = { 'id': self.id, 'created_date': str(self.created_date), 'last_modified_date': str(self.last_modified_date), @@ -272,15 +272,15 @@ class WorkType(Base, BaseMixin): def __str__(self): return f'{self.name}({self.id})' - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] self.version = import_data['version'] self.name = import_data['name'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = { + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = { 'id': self.id, 'created_date': str(self.created_date), 'last_modified_date': str(self.last_modified_date), @@ -299,7 +299,7 @@ class ComicWork(Base, BaseMixin): work_type_id = Column(String, ForeignKey("worktype.id"), nullable=False) work_type = relationship("WorkType", back_populates="comic_works") - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -308,8 +308,8 @@ class ComicWork(Base, BaseMixin): self.artist_id = import_data['artist_id'] self.work_type_id = import_data['work_type_id'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = { + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = { 'id': self.id, 'created_date': str(self.created_date), 'last_modified_date': str(self.last_modified_date), @@ -330,7 +330,7 @@ class IssueWork(Base, BaseMixin): work_type_id = Column(String, ForeignKey("worktype.id"), nullable=False) work_type = relationship("WorkType", back_populates="issue_works") - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -339,8 +339,8 @@ class IssueWork(Base, BaseMixin): self.artist_id = import_data['artist_id'] self.work_type_id = import_data['work_type_id'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = { + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = { 'id': self.id, 'created_date': str(self.created_date), 'last_modified_date': str(self.last_modified_date), @@ -350,4 +350,3 @@ class IssueWork(Base, BaseMixin): 'work_type_id': self.work_type_id } return item - diff --git a/kontor-scripts/db/models/database.py b/kontor-scripts/db/models/database.py index ba8b6c0..5eb9dbb 100644 --- a/kontor-scripts/db/models/database.py +++ b/kontor-scripts/db/models/database.py @@ -1,22 +1,11 @@ -import json -import uuid -from datetime import datetime from enum import Enum, auto from logging import Logger -from pathlib import Path from typing import Any from sqlalchemy import select -from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import sessionmaker from db.models import registry -from db.models.tysc import Card, CardSet, Rooster, Team, FieldPosition, Player, Vendor, Sport -from db.models.comic import Issue, TradePaperback, StoryArc, Volume, ComicWork, Artist, Comic, Publisher, WorkType -from db.models.bookshelf import ArticleAuthor, BookAuthor, BookshelfPublisher, Article, Book, Author -from db.models.admin import Mail, MailAccount, ModuleData, Permission, Profile, Token, Assignment -from db.models.metadata import MetaDataTable, MetaDataColumn -from db.models.media import MediaVideo, MediaArticle, MediaFile, MediaActor, MediaActorFile class ColumnEntry(Enum): @@ -49,52 +38,6 @@ class KontorDB: self.engine = db_engine self.log = log - def get_table_by_name(self, table_name: str) -> dict: - result = {} - __session__ = sessionmaker(self.engine) - _filter = {'table_name': table_name} - with __session__() as session: - table = session.query(MetaDataTable).filter_by(**_filter).one() - result['id'] = table.id - result['table_name'] = table.table_name - return result - - def get_column_meta_data(self, table_name: str, view_only=True) -> dict: - meta_data = {} - order = 0 - __session__ = sessionmaker(self.engine) - columns = list() - table_info = self.get_table_by_name(table_name) - _filters = {'table_id': table_info['id']} - if view_only: - _filters['is_shown'] = True - with __session__() as session: - columns = session.query(MetaDataColumn).filter_by(**_filters).all() - for column in columns: - # self.log.info("get_column_meta_data: %s %s %d", column.column_name, column.column_label, column.column_order) - meta_data[order] = { - ColumnEntry.COLUMN_NAME: column.column_name, - ColumnEntry.COLUMN_LABEL: column.column_label, - ColumnEntry.COLUMN_ORDER: column.column_order, - ColumnEntry.COLUMN_REF_COLUMN: column.ref_column, - ColumnEntry.COLUMN_TYPE: column.column_type - } - order += 1 - return meta_data - - def get_filters(self, table_name: str) -> dict: - _filter_map = {} - __session__ = sessionmaker(self.engine) - table_info = self.get_table_by_name(table_name) - _filters = {'table_id': table_info['id'], 'show_filter': True} - with __session__() as session: - for column in session.query(MetaDataColumn).filter_by(**_filters).all(): - _filter_map[column.column_name] = { - ColumnEntry.COLUMN_LABEL: column.filter_label, - ColumnEntry.COLUMN_WIDGET: None - } - return _filter_map - def data(self, table_name: str, columns: dict, filters: dict) -> list: data = [] __session__ = sessionmaker(self.engine) @@ -121,218 +64,3 @@ class KontorDB: data.append(row) # self.log.info("data: %s", data) return data - - def export_db(self, export_type: ExportType, export_file_name: str) -> dict: - results = {} - db = {} - export_table_list = self.get_table_names() - for table in export_table_list: - columns = self.get_column_meta_data(table, view_only=False) - if table in self.registry: - model = self.registry[table] - else: - self.log.info(f"table {table} is not registered") - continue - __session__ = sessionmaker(self.engine) - with __session__() as session: - rows = session.query(model).all() - entries = [] - for row in rows: - # print(row) - entry = {} - for order in columns: - # print(columns[order]) - column_name = columns[order][ColumnEntry.COLUMN_NAME] - # print(f"get value {column_name} from {row} of table {table}") - try: - value = getattr(row, column_name) - if isinstance(value, datetime): - entry[column_name] = str(value) - else: - entry[column_name] = value - except AttributeError: - pass - entries.append(entry) - db[table] = entries - results[table] = len(entries) - match export_type: - case ExportType.JSON: - json_dump = json.dumps(db, indent=4) - with open(export_file_name, "w") as dump_file: - dump_file.write(json_dump) - case ExportType.YAML: - pass - case ExportType.SQLITE: - pass - self.log.info(f"{len(results)} tables exported") - return results - - def import_db(self, import_file_name: str) -> dict: - result = {} - import_file = Path(import_file_name) - if not import_file.exists(): - self.log.info(f"File {import_file_name} does not exist. Do nothing.") - return result - match import_file.suffix: - case '.json': - print("read json file") - with open(import_file_name, 'r') as json_file: - json_load = json.load(json_file) - for table in json_load: - self.log.info(f"{table}: {len(json_load[table])}") - result[table] = self.import_table(table, json_load[table]) - case '.yml': - print("read yaml file") - case '.yaml': - print("read yaml file") - case '.db': - print("read sqlite file") - return result - - def import_table(self, table_name: str, items: list) -> dict: - result = {} - updated = [] - added = [] - remaining = [] - existing_ids = self.get_ids(table_name) - self.log.info(f"found {len(existing_ids)} existing ids for table {table_name}") - for item in items: - current_id = item['id'] - # print(f"import item: {item}") - found_item = None - __session__ = sessionmaker(self.engine) - with __session__() as session: - found_item = session.get(self.registry[table_name], current_id) - # print(f"found item: {found_item}") - if found_item is not None: - changed = self.update_entry(table_name, current_id, item) - updated.append(item) - if changed: - self.log.info(f"{current_id} has changed") - updated.append(item) - existing_ids.remove(current_id) - else: - try: - self.add_entry(table_name, item) - added.append(item) - except IntegrityError as error: - self.log.info(f"Could not add item, due to: {error.detail}") - if len(existing_ids) > 0: - print(f"remaining items for {table_name}: {existing_ids}") - remaining.extend(existing_ids) - result['updated'] = updated - result['added'] = added - result['remaining'] = remaining - return result - - def get_ids(self, table_name: str) -> list: - existing_ids = [] - __session__ = sessionmaker(self.engine) - with __session__() as session: - items = session.query(self.registry[table_name]).all() - for item in items: - existing_ids.append(getattr(item, 'id')) - return existing_ids - - def add_entry(self, table_name: str, update_item: dict): - self.log.debug(f"add entry to table {table_name} with {update_item}") - __session__ = sessionmaker(self.engine) - with __session__() as session: - add_item = self.registry[table_name]() - for key in update_item.keys(): - update_value = update_item[key] - setattr(add_item, key, update_value) - session.add(add_item) - session.commit() - - def update_entry(self, table_name, current_id, update_item: dict) -> bool: - # self.log.info("update entry to table %s", table_name) - __session__ = sessionmaker(self.engine) - with __session__() as session: - existing_item = session.query(self.registry[table_name]).get(current_id) - changed = False - for key in update_item.keys(): - update_value = update_item[key] - existing_value = getattr(existing_item, key) - if type(existing_value) is not type(update_value): - existing_value = str(existing_value) - if existing_value != update_value: - self.log.info(f"{key} has changed: {existing_value} != {update_value}") - setattr(existing_item, key, update_value) - session.commit() - changed = True - self.log.info(f"update {key} with {update_value}") - return changed - - def add_link(self, link: str) -> dict: - result = {} - __session__ = sessionmaker(self.engine) - with __session__() as session: - media_file = MediaFile() - media_file.id = str(uuid.uuid4()) - media_file.created_date = datetime.now() - media_file.last_modified_date = datetime.now() - media_file.version = 0 - media_file.url = link - media_file.review = 1 - media_file.should_download = 1 - try: - session.add(media_file) - session.commit() - result['added'] = {'url': media_file.url, 'title': media_file.title, 'review': media_file.review, - 'download': media_file.should_download} - except IntegrityError as error: - session.rollback() - result['error'] = error.orig - return result - - def update_titles(self) -> dict: - update_list = {} - __session__ = sessionmaker(self.engine) - _filter = {'review': True} - with __session__() as session: - links = session.query(MediaFile).filter_by(**_filter).all() - self.log.info("%d entries found for updating titles", len(links)) - for link in links: - url = link.url - if url is None: - continue - link.update_title() - session.commit() - update_list[link.id] = link.title - return update_list - - def get_download_list(self) -> list[str]: - download_list = [] - __session__ = sessionmaker(self.engine) - _filter = {'should_download': True} - with __session__() as session: - links = session.query(MediaFile).filter_by(**_filter).all() - for link in links: - url = link.url - if url is None: - continue - download_list.append(link.id) - return download_list - - def download_file(self, entry_id: str, download_dir="/data/media", dl_tool="yt-dlp") -> str: - __session__ = sessionmaker(self.engine) - with __session__() as session: - link = session.query(MediaFile).get(entry_id) - link.download_file(download_dir, dl_tool) - session.commit() - file_name = link.file_name - return file_name - - def delete_entries(self): - for (table_name, table) in self.registry.items(): - # self.log.info("delete entries from table %s", table_name) - __session__ = sessionmaker(self.engine) - with __session__() as session: - items = session.query(table).all() - for item in items: - session.delete(item) - session.commit() - - def check_files(self): - pass diff --git a/kontor-scripts/db/models/media.py b/kontor-scripts/db/models/media.py index 776547e..436df4b 100644 --- a/kontor-scripts/db/models/media.py +++ b/kontor-scripts/db/models/media.py @@ -2,10 +2,10 @@ import re import subprocess from datetime import datetime from pathlib import Path -from typing import Any, AnyStr, Dict +from typing import Any, Dict import requests from bs4 import BeautifulSoup -from sqlalchemy import Boolean, Column, False_, String, ForeignKey +from sqlalchemy import Boolean, Column, String, ForeignKey from sqlalchemy.orm import relationship from db.models.base import Base, BaseMixin, BaseVideoMixin @@ -16,12 +16,12 @@ class MediaFile(Base, BaseMixin, BaseVideoMixin): media_actor_files = relationship("MediaActorFile") def __repr__(self): - return f'MediaFile(\n\tID: {self.id}\n\tTitle: {self.title}\n\tURL: {self.url}\n\tReview: {self.review}\n\tDownload: {self.should_download})' + return f'MediaFile(\n\tID: {self.id}\n\tTitle: {self.title}\n\tURL: {self.url}\n\tReview: {self.review}\n\tDownload: {self.should_download}\n\tPath: {self.path}\n\tCloudlink: {self.cloud_link})' def __str__(self): return f'{self.title}({self.id})' - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -34,8 +34,8 @@ class MediaFile(Base, BaseMixin, BaseVideoMixin): self.url = import_data['url'] self.should_download = import_data['should_download'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) @@ -54,9 +54,10 @@ class MediaFile(Base, BaseMixin, BaseVideoMixin): try: r = requests.get(self.url) soup = BeautifulSoup(r.content, "html.parser") - title = soup.title.string - self.title = title - self.review = False + title_tag = soup.find('title') + if title_tag: + self.title = title_tag.get_text() + self.review = False except: self.title = None self.review = True @@ -101,7 +102,13 @@ class MediaActor(Base, BaseMixin): url = Column(String, unique=True) media_actor_files = relationship("MediaActorFile") - def import_dict(self, import_data: Dict[AnyStr, Any]): + def __repr__(self): + return f'MediaActor(\n\tID: {self.id}\n\tName: {self.name}\n\tURL: {self.url})' + + def __str__(self): + return f'{self.name}({self.id})' + + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -109,8 +116,8 @@ class MediaActor(Base, BaseMixin): self.name = import_data['name'] self.url = import_data['url'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) @@ -127,7 +134,13 @@ class MediaActorFile(Base, BaseMixin): media_file_id = Column(String, ForeignKey("media_file.id"), nullable=True) media_file = relationship("MediaFile", back_populates="media_actor_files") - def import_dict(self, import_data: Dict[AnyStr, Any]): + def __repr__(self): + return f'MediaActorFile(\n\tID: {self.id}\n\tMediaActor: {self.media_actor_id}\n\tMediaFile: {self.media_file_id})' + + def __str__(self): + return f'{self.id}: MediaActor: {self.media_actor_id} - MediaFile: {self.media_file_id}' + + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -135,8 +148,8 @@ class MediaActorFile(Base, BaseMixin): self.media_actor_id = import_data['media_actor_id'] self.media_file_id = import_data['media_file_id'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) @@ -151,7 +164,7 @@ class MediaArticle(Base, BaseMixin): title = Column(String) url = Column(String, unique=True) - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -160,8 +173,8 @@ class MediaArticle(Base, BaseMixin): self.title = import_data['title'] self.url = import_data['url'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) @@ -182,7 +195,7 @@ class MediaVideo(Base, BaseMixin): url = Column(String, unique=True) should_download = Column(Boolean) - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -195,8 +208,8 @@ class MediaVideo(Base, BaseMixin): self.url = import_data['url'] self.should_download = import_data['should_download'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) diff --git a/kontor-scripts/db/models/tysc.py b/kontor-scripts/db/models/tysc.py index c9d70fc..d937173 100644 --- a/kontor-scripts/db/models/tysc.py +++ b/kontor-scripts/db/models/tysc.py @@ -1,4 +1,4 @@ -from typing import AnyStr, Dict, Any +from typing import Dict, Any from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Boolean from sqlalchemy.orm import relationship @@ -17,15 +17,15 @@ class Sport(Base, BaseMixin): def __repr__(self): return f"Sport(id={self.id}, name={self.name}, created_date={self.created_date})" - def import_dict(self, import_data: Dict[AnyStr, AnyStr]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] self.version = import_data['version'] self.name = import_data['name'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) @@ -41,7 +41,7 @@ class Team(Base, BaseMixin): sport = relationship("Sport", back_populates="teams") roosters = relationship("Rooster") - def import_dict(self, import_data: Dict[AnyStr, AnyStr]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -50,8 +50,8 @@ class Team(Base, BaseMixin): self.short_name = import_data['short_name'] self.sport_id = import_data['sport_id'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) @@ -73,7 +73,7 @@ class FieldPosition(Base, BaseMixin): sport = relationship("Sport", back_populates="positions") roosters = relationship("Rooster") - def import_dict(self, import_data: Dict[AnyStr, AnyStr]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -82,8 +82,8 @@ class FieldPosition(Base, BaseMixin): self.short_name = import_data['short_name'] self.sport_id = import_data['sport_id'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) @@ -106,7 +106,7 @@ class Player(Base, BaseMixin): def get_full_name(self) -> str: return f"{self.last_name}, {self.first_name}" - def import_dict(self, import_data: Dict[AnyStr, AnyStr]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -114,8 +114,8 @@ class Player(Base, BaseMixin): self.first_name = import_data['first_name'] self.last_name = import_data['last_name'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) @@ -139,7 +139,7 @@ class Rooster(Base, BaseMixin): position = relationship("FieldPosition", back_populates="roosters") cards = relationship("Card") - def import_dict(self, import_data: Dict[AnyStr, AnyStr]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -149,8 +149,8 @@ class Rooster(Base, BaseMixin): self.player_id = import_data['player_id'] self.position_id = import_data['position_id'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) @@ -168,15 +168,15 @@ class Vendor(Base, BaseMixin): card_sets = relationship("CardSet") cards = relationship("Card") - def import_dict(self, import_data: Dict[AnyStr, AnyStr]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] self.version = import_data['version'] self.name = import_data['name'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) @@ -197,7 +197,7 @@ class CardSet(Base, BaseMixin): vendor = relationship("Vendor", back_populates="card_sets") cards = relationship("Card") - def import_dict(self, import_data: Dict[AnyStr, AnyStr]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -207,8 +207,8 @@ class CardSet(Base, BaseMixin): self.insert_set = import_data['insert_set'] self.vendor_id = import_data['vendor_id'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) @@ -234,7 +234,7 @@ class Card(Base, BaseMixin): vendor_id = Column(String, ForeignKey("vendor.id"), nullable=False) vendor = relationship("Vendor", back_populates="cards") - def import_dict(self, import_data: Dict[AnyStr, Any]): + def import_dict(self, import_data: Dict[str, Any]): self.id = import_data['id'] self.created_date = import_data['created_date'] self.last_modified_date = import_data['last_modified_date'] @@ -245,8 +245,8 @@ class Card(Base, BaseMixin): self.rooster_id = import_data['rooster_id'] self.vendor_id = import_data['vendor_id'] - def export_dict(self) -> Dict[AnyStr, Any]: - item: Dict[AnyStr, Any] = {} + def export_dict(self) -> Dict[str, Any]: + item: Dict[str, Any] = {} item['id'] = self.id item['created_date'] = str(self.created_date) item['last_modified_date'] = str(self.last_modified_date) diff --git a/kontor-scripts/import.py b/kontor-scripts/import.py index 20c77f6..d4cb821 100644 --- a/kontor-scripts/import.py +++ b/kontor-scripts/import.py @@ -4,7 +4,7 @@ import data from json file to PostgreSQL from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter from datetime import datetime, date -from typing import Any, AnyStr, Dict, List +from typing import Any, Dict, List import os import json @@ -43,7 +43,7 @@ def cleanup_database(db: Session, log, dry_run: bool): db.delete(entry) db.commit() -def load_data(filename: str, log) -> Dict[AnyStr, Dict[AnyStr, Any]]: +def load_data(filename: str, log) -> Dict[str, List[Dict[str, Any]]]: log.debug("load_data") import_file = Path(filename) if not import_file.exists(): @@ -54,13 +54,13 @@ def load_data(filename: str, log) -> Dict[AnyStr, Dict[AnyStr, Any]]: json_load = json.load(json_file) return json_load -def get_ids(items: List[Any]) -> List[AnyStr]: - result: List[AnyStr] = [] +def get_ids(items: List[Any]) -> List[str]: + result: List[str] = [] for item in items: result.append(item.id) return result -def update_item(db: Session, import_data: Dict[AnyStr, Any], item: Any, dry_run: bool, log): +def update_item(db: Session, import_data: Dict[str, Any], item: Any, dry_run: bool, log): for (key, value) in import_data.items(): existing_value = getattr(item, str(key)) update: bool = has_changed(existing_value, value, log) @@ -76,7 +76,7 @@ def update_item(db: Session, import_data: Dict[AnyStr, Any], item: Any, dry_run: db.add(item) db.commit() -def has_changed(existing_data: Any, import_data: AnyStr, log) -> bool: +def has_changed(existing_data: Any, import_data: str, log) -> bool: if existing_data is None and import_data == 'None': return False if isinstance(existing_data, str): @@ -94,7 +94,7 @@ def has_changed(existing_data: Any, import_data: AnyStr, log) -> bool: return existing_data != import_data -def item_import(db: Session, import_data: Dict[AnyStr, Any], dry_run: bool, log): +def item_import(db: Session, import_data: Dict[str, Any], dry_run: bool, log): log.info(f"import {import_data}") if not dry_run: log.debug(f"model: {repr(model)} {import_data}") @@ -119,26 +119,27 @@ if __name__ == '__main__': with SessionLocal() as db: if args.cleanup: cleanup_database(db, logger, args.dry_run) - data = load_data(args.file, logger) - table_list: List = list(data.keys()) - logger.info(f"Liste der Tabellen: {table_list}") - sorted_table_list: List = table_list + data: Dict[str, List[Dict[str, Any]]] = load_data(args.file, logger) + table_list: List[str] = list(data.keys()) + logger.debug(f"Liste der Tabellen: {table_list}") + sorted_table_list: List[str] = table_list for tablename in sorted_table_list: model = registry[tablename] existing_items = db.query(model).all() - existing_ids = get_ids(existing_items) + existing_ids: List[str] = get_ids(existing_items) logger.debug(f"found {len(existing_items)} for table {tablename}") - import_items = data[tablename] + import_items: List[Dict[str, Any]] = data[tablename] for import_item in import_items: - if import_item['id'] in existing_ids: - logger.debug(f"update {import_item['id']}") - existing_item = db.get(model, import_item['id']) + item_id: str = import_item['id'] + if item_id in existing_ids: + logger.debug(f"update {item_id}") + existing_item = db.get(model, item_id) update_item(db, import_item, existing_item, args.dry_run, logger) - existing_ids.remove(import_item['id']) + existing_ids.remove(item_id) else: - logger.debug(f"import {import_item['id']}") + logger.debug(f"import {item_id}") item_import(db, import_item, args.dry_run, logger) - logger.info(f"remaining items for {tablename}: {len(existing_ids)}") - logger.info(f"remaining items for {tablename}: {existing_ids}") + logger.debug(f"remaining items for {tablename}: {len(existing_ids)}") + if len(existing_ids) > 0: + logger.info(f"remaining items for {tablename}: {existing_ids}") logger.info('kontor.import finished') -