import json import uuid from datetime import datetime from enum import Enum, auto from logging import Logger from pathlib import Path from sqlalchemy import Engine, select from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import sessionmaker from .tysc import Card, CardSet, Rooster, Team, FieldPosition, Player, Vendor, Sport from .comic import Issue, TradePaperback, StoryArc, Volume, ComicWork, Artist, Comic, Publisher, WorkType from .bookshelf import ArticleAuthor, BookAuthor, BookshelfPublisher, Article, Book, Author from .admin import Mail, MailAccount, ModuleData, Role, User, Token, AuthorizationMatrix from .metadata import MetaDataTable, MetaDataColumn from .media import MediaVideo, MediaArticle, MediaFile, MediaActor, MediaActorFile class ColumnEntry(Enum): COLUMN_NAME = 'column' COLUMN_LABEL = 'label' COLUMN_ORDER = 'order' COLUMN_REF_COLUMN = 'ref_column' COLUMN_TYPE = 'type' COLUMN_WIDGET = 'widget' class StatusType(Enum): UNKNOWN = auto() FILE_NAME = auto() FILE_ID = auto() DUPLICATE = auto() CLOUD_LINK = auto() CLOUD_LINK_ID = auto() class KontorDB: def __init__(self, db_engine: Engine, log: Logger): self.engine = db_engine self.registry = {} self.init_registry() self.log = log def init_registry(self): self.registry[Card.__tablename__] = Card self.registry[CardSet.__tablename__] = CardSet self.registry[Rooster.__tablename__] = Rooster self.registry[Team.__tablename__] = Team self.registry[FieldPosition.__tablename__] = FieldPosition self.registry[Player.__tablename__] = Player self.registry[Vendor.__tablename__] = Vendor self.registry[Sport.__tablename__] = Sport self.registry[Issue.__tablename__] = Issue self.registry[TradePaperback.__tablename__] = TradePaperback self.registry[StoryArc.__tablename__] = StoryArc self.registry[Volume.__tablename__] = Volume self.registry[ComicWork.__tablename__] = ComicWork self.registry[Artist.__tablename__] = Artist self.registry[Comic.__tablename__] = Comic self.registry[Publisher.__tablename__] = Publisher self.registry[WorkType.__tablename__] = WorkType self.registry[ArticleAuthor.__tablename__] = ArticleAuthor self.registry[BookAuthor.__tablename__] = BookAuthor self.registry[BookshelfPublisher.__tablename__] = BookshelfPublisher self.registry[Article.__tablename__] = Article self.registry[Book.__tablename__] = Book self.registry[Author.__tablename__] = Author self.registry[MediaFile.__tablename__] = MediaFile self.registry[MediaActor.__tablename__] = MediaActor self.registry[MediaActorFile.__tablename__] = MediaActorFile self.registry[MediaArticle.__tablename__] = MediaArticle self.registry[MediaVideo.__tablename__] = MediaVideo self.registry[MetaDataColumn.__tablename__] = MetaDataColumn self.registry[MetaDataTable.__tablename__] = MetaDataTable self.registry[AuthorizationMatrix.__tablename__] = AuthorizationMatrix self.registry[Token.__tablename__] = Token self.registry[User.__tablename__] = User self.registry[Role.__tablename__] = Role self.registry[ModuleData.__tablename__] = ModuleData self.registry[MailAccount.__tablename__] = MailAccount self.registry[Mail.__tablename__] = Mail def get_table_names(self) -> list: result = [] __session__ = sessionmaker(self.engine) with __session__() as session: tables = session.scalars(select(MetaDataTable)).all() result = [table.table_name for table in tables] return result def get_table_by_name(self, table_name: str) -> dict: result = {} __session__ = sessionmaker(self.engine) _filter = {'table_name': table_name} with __session__() as session: table = session.query(MetaDataTable).filter_by(**_filter).one() result['id'] = table.id result['table_name'] = table.table_name return result def get_column_meta_data(self, table_name: str, view_only=True) -> dict: meta_data = {} order = 0 __session__ = sessionmaker(self.engine) columns = list() table_info = self.get_table_by_name(table_name) _filters = {'table_id': table_info['id']} if view_only: _filters['is_shown'] = True with __session__() as session: columns = session.query(MetaDataColumn).filter_by(**_filters).all() for column in columns: # self.log.info("get_column_meta_data: %s %s %d", column.column_name, column.column_label, column.column_order) meta_data[order] = { ColumnEntry.COLUMN_NAME: column.column_name, ColumnEntry.COLUMN_LABEL: column.column_label, ColumnEntry.COLUMN_ORDER: column.column_order, ColumnEntry.COLUMN_REF_COLUMN: column.ref_column, ColumnEntry.COLUMN_TYPE: column.column_type } order += 1 return meta_data def get_columns(self, table_name: str) -> dict: columns = {} order = 0 __session__ = sessionmaker(self.engine) table_info = self.get_table_by_name(table_name) _filters = {'table_id': table_info['id']} with __session__() as session: for column in session.query(MetaDataColumn).filter_by(**_filters).all(): columns[column.column_name] = { ColumnEntry.COLUMN_ORDER: column.column_order, ColumnEntry.COLUMN_TYPE: column.column_type } return columns def get_filters(self, table_name: str) -> dict: _filter_map = {} __session__ = sessionmaker(self.engine) table_info = self.get_table_by_name(table_name) _filters = {'table_id': table_info['id'], 'show_filter': True} with __session__() as session: for column in session.query(MetaDataColumn).filter_by(**_filters).all(): _filter_map[column.column_name] = { ColumnEntry.COLUMN_LABEL: column.filter_label, ColumnEntry.COLUMN_WIDGET: None } return _filter_map def data(self, table_name: str, columns: dict, filters: dict) -> list: data = [] __session__ = sessionmaker(self.engine) table = self.registry[table_name] with __session__() as session: entries = [] if len(filters) == 0: entries = session.scalars(select(table)).all() else: entries = session.scalars(select(table).filter_by(**filters)).all() for entry in entries: # self.log.info("data: %s", entry) row = [] for order in columns.keys(): column_name = columns[order][ColumnEntry.COLUMN_NAME] ref_column = columns[order][ColumnEntry.COLUMN_REF_COLUMN] if str(column_name).endswith("_id"): ref_table = column_name[:-3] ref = getattr(entry, ref_table) value = getattr(ref, ref_column) row.append(value) else: row.append(getattr(entry, column_name)) data.append(row) # self.log.info("data: %s", data) return data def export_db(self, export_type: str, export_file_name: str) -> dict: results = {} db = {} export_table_list = self.get_table_names() for table in export_table_list: columns = self.get_column_meta_data(table, view_only=False) if table in self.registry: model = self.registry[table] else: self.log.info(f"table {table} is not registered") continue __session__ = sessionmaker(self.engine) with __session__() as session: rows = session.query(model).all() entries = [] for row in rows: # print(row) entry = {} for order in columns: # print(columns[order]) column_name = columns[order][ColumnEntry.COLUMN_NAME] # print(f"get value {column_name} from {row} of table {table}") try: value = getattr(row, column_name) if isinstance(value, datetime): entry[column_name] = str(value) else: entry[column_name] = value except AttributeError: pass entries.append(entry) db[table] = entries results[table] = len(entries) match export_type: case "JSON": json_dump = json.dumps(db, indent=4) with open(export_file_name, "w") as dump_file: dump_file.write(json_dump) case "YAML": export_file = Path(export_file_name) case "SQLite": export_file = Path(export_file_name) self.log.info(f"{len(results)} tables exported") return results def import_db(self, import_file_name: str) -> dict: result = {} import_file = Path(import_file_name) if not import_file.exists(): self.log.info(f"File {import_file_name} does not exist. Do nothing.") return result match import_file.suffix: case '.json': print("read json file") with open(import_file_name, 'r') as json_file: json_load = json.load(json_file) for table in json_load: self.log.info(f"{table}: {len(json_load[table])}") result[table] = self.import_table(table, json_load[table]) case '.yml': print("read yaml file") case '.yaml': print("read yaml file") case '.db': print("read sqlite file") return result def import_table(self, table_name: str, items:list) -> dict: result = {} updated = [] added = [] remaining = [] existing_ids = self.get_ids(table_name) self.log.info(f"found {len(existing_ids)} existing ids for table {table_name}") for item in items: current_id = item['id'] # print(f"import item: {item}") found_item = None __session__ = sessionmaker(self.engine) with __session__() as session: found_item = session.get(self.registry[table_name], current_id) # print(f"found item: {found_item}") if found_item is not None: changed = self.update_entry(table_name, current_id, item) updated.append(item) if changed: self.log.info(f"{current_id} has changed") updated.append(item) existing_ids.remove(current_id) else: try: self.add_entry(table_name, item) added.append(item) except IntegrityError as error: self.log.info(f"Could not add item, due to: {error.detail}") if len(existing_ids) > 0: print(f"remaining items: {existing_ids}") remaining.extend(existing_ids) result['updated'] = updated result['added'] = added result['remaining'] = remaining return result def get_ids(self, table_name: str) -> list: existing_ids = [] __session__ = sessionmaker(self.engine) with __session__() as session: items = session.query(self.registry[table_name]).all() for item in items: existing_ids.append(getattr(item, 'id')) return existing_ids def add_entry(self, table_name: str, update_item: dict): self.log.debug(f"add entry to table {table_name} with {update_item}") __session__ = sessionmaker(self.engine) with __session__() as session: add_item = self.registry[table_name]() for key in update_item.keys(): update_value = update_item[key] setattr(add_item, key, update_value) session.add(add_item) session.commit() def update_entry(self, table_name, current_id, update_item: dict) -> bool: # self.log.info("update entry to table %s", table_name) __session__ = sessionmaker(self.engine) with __session__() as session: existing_item = session.query(self.registry[table_name]).get(current_id) changed = False for key in update_item.keys(): update_value = update_item[key] existing_value = getattr(existing_item, key) if type(existing_value) is not type(update_value): existing_value = str(existing_value) if existing_value != update_value: self.log.info(f"{key} has changed: {existing_value} != {update_value}") setattr(existing_item, key, update_value) session.commit() changed = True self.log.info(f"update {key} with {update_value}") return changed def add_link(self, link: str) -> dict: result = {} __session__ = sessionmaker(self.engine) with __session__() as session: media_file = MediaFile() media_file.id = str(uuid.uuid4()) media_file.created_date = datetime.now() media_file.last_modified_date = datetime.now() media_file.version = 0 media_file.url = link media_file.review = 1 media_file.should_download = 1 try: session.add(media_file) session.commit() result['added'] = {'url': media_file.url, 'title': media_file.title, 'review': media_file.review, 'download': media_file.should_download} except IntegrityError as error: session.rollback() result['error'] = error.orig return result def update_titles(self) -> dict: update_list = {} __session__ = sessionmaker(self.engine) _filter = { 'review': True} with __session__() as session: links = session.query(MediaFile).filter_by(**_filter).all() for link in links: url = link.url if url is None: continue link.update_title() session.commit() update_list[link.id] = link.title return update_list def get_download_list(self) -> list: download_list = [] __session__ = sessionmaker(self.engine) _filter = { 'should_download': True} with __session__() as session: links = session.query(MediaFile).filter_by(**_filter).all() for link in links: url = link.url if url is None: continue download_list.append(link.id) return download_list def download_file(self, entry_id: str, download_dir = "/data/media", dl_tool = "yt-dlp") -> str: __session__ = sessionmaker(self.engine) with __session__() as session: link = session.query(MediaFile).get(entry_id) link.download_file(download_dir, dl_tool) session.commit() file_name = link.file_name return file_name def delete_entries(self): for (table_name, table) in self.registry.items(): # self.log.info("delete entries from table %s", table_name) __session__ = sessionmaker(self.engine) with __session__() as session: items = session.query(table).all() for item in items: session.delete(item) session.commit() def check_files(self): pass