import json from datetime import datetime from pathlib import Path import mariadb from sqlalchemy import create_engine, select, text, MetaData, join from sqlalchemy.orm import DeclarativeBase, relationship, sessionmaker from .base import Base from .comic import Comic, Artist, Publisher, ComicWork, WorkType, StoryArc, Volume, Issue, TradePaperback from .tysc import Sport, Team, Card, CardSet, Vendor, Rooster, Player, FieldPosition from .media import MediaFile from .metadata import MetaDataTable, MetaDataColumn class KontorDB: def __init__(self, db_config): self.db_conn = mariadb.connect( host=db_config['mariadb']['host'], port=db_config['mariadb']['port'], user=db_config['mariadb']['user'], password=db_config['mariadb']['password'], database=db_config['mariadb']['database'] ) connect_string = ('mariadb+mariadbconnector://{}:{}@{}:{}/{}' .format( db_config['mariadb']['user'], db_config['mariadb']['password'], db_config['mariadb']['host'], db_config['mariadb']['port'], db_config['mariadb']['database'])) # engine = create_engine(connect_string, echo=True) engine = create_engine(connect_string) Base.metadata.create_all(bind=engine) __session__ = sessionmaker(bind=engine) self.session = __session__() self.registry = {} self.init_registry() def init_registry(self): self.registry['card'] = Card self.registry['card_set'] = CardSet self.registry['sport'] = Sport self.registry['team'] = Team self.registry['field_position'] = FieldPosition self.registry['rooster'] = Rooster self.registry['player'] = Player self.registry['vendor'] = Vendor self.registry['artist'] = Artist self.registry['publisher'] = Publisher self.registry['comic'] = Comic self.registry['issue'] = Issue self.registry['story_arc'] = StoryArc self.registry['trade_paperback'] = TradePaperback self.registry['volume'] = Volume self.registry['comic_work'] = ComicWork self.registry['worktype'] = WorkType self.registry['media_file'] = MediaFile def get_table_names(self) -> list: tables = self.session.query(MetaDataTable).all() result = [table.table_name for table in tables] return result def get_column_meta_data(self, table_name: str, view_only=True) -> dict: meta_data = {} order = 0 if view_only: for (_, column) in (self.session.query(MetaDataTable, MetaDataColumn). filter(MetaDataTable.id == MetaDataColumn.table_id). filter(MetaDataTable.table_name == table_name). filter(MetaDataColumn.is_shown == 1).all()): meta_data[order] = {'column': column.column_name, 'label': column.column_label, 'order': column.column_order, 'ref_column': column.ref_column} order += 1 else: for (_, column) in (self.session.query(MetaDataTable, MetaDataColumn). filter(MetaDataTable.id == MetaDataColumn.table_id). filter(MetaDataTable.table_name == table_name).all()): meta_data[order] = { 'column': column.column_name, 'order': column.column_order, 'ref_column': column.ref_column } order += 1 return meta_data def get_filters(self, table_name): _filter_map = {} for (_, column) in (self.session.query(MetaDataTable, MetaDataColumn). filter(MetaDataTable.id == MetaDataColumn.table_id). filter(MetaDataTable.table_name == table_name). filter(MetaDataColumn.show_filter == 1).all()): _filter_map[column.column_name] = {'label': column.filter_label, 'widget': None} print(f"retrieved {len(_filter_map)} filters: {_filter_map}") return _filter_map def data(self, table, columns: dict, filters) -> list: data = [] entries = [] if len(filters) == 0: entries = self.session.query(table).all() else: entries = self.session.query(table).filter_by(**filters) for entry in entries: row = [] for order in columns.keys(): column_name = columns[order]['column'] if str(column_name).endswith("_id"): ref_table = column_name[:-3] # print(f"{ref_table=}") ref = getattr(entry, ref_table) value = getattr(ref, "name") # print(f"{value=}") row.append(value) else: row.append(getattr(entry, column_name)) # print(repr(row)) data.append(row) return data def get_data(self, table_name: str, columns: dict, where_clause: str) -> list: data = [] cursor = self.db_conn.cursor() cursor.execute(self.get_statement(table_name, columns, where_clause)) rows = cursor.fetchall() print(len(rows)) for row in rows: # print(f"KontorDB.get_data: {row}") data.append(list(row)) cursor.close() # print(f"KontorDB.getData: return {len(data)}") if table_name == 'comic' and len(where_clause) == 0: data.clear() comics = self.session.query(Comic).all() for item in comics: # print(item) row = [] for order in columns.keys(): column_name = columns[order]['column'] if str(column_name).endswith("_id"): ref_table = column_name[:-3] # print(f"{ref_table=}") ref = getattr(item, ref_table) value = getattr(ref, "name") # print(f"{value=}") row.append(value) else: row.append(getattr(item, column_name)) # print(repr(row)) data.append(row) return data def get_statement(self, table: str, header: dict, where_clause): columns = "" for index, column in header.items(): if index > 0: columns += ", " columns += column['column'] if len(columns) == 0: columns = "*" statement = f"SELECT {columns} FROM {table} {where_clause}" print(f"{statement=}") return statement def export_db(self, export_type: str, export_file_name: str, export_table_list: list): print(f"export DB to {export_file_name} as {export_type}") db = {} for table in export_table_list: columns = self.get_column_meta_data(table, view_only=False) if table in self.registry: model = self.registry[table] else: print(f"table {table} is not registered") continue rows = self.session.query(model).all() entries = [] print(f"found {len(rows)} entries") print(f"found {len(columns)} columns") for row in rows: # print(row) entry = {} for order in columns: # print(columns[order]) column_name = columns[order]['column'] # print(f"get value {column_name} from {row} of table {table}") try: value = getattr(row, column_name) if isinstance(value, datetime): entry[column_name] = str(value) else: entry[column_name] = value except AttributeError as error: print("could not get value") entries.append(entry) db[table] = entries export_file = Path(export_file_name) match export_type: case "JSON": json_dump = json.dumps(db, indent=4) with open(export_file_name, "w") as dump_file: dump_file.write(json_dump) case "YAML": export_file = Path(export_file_name) case "SQLite": export_file = Path(export_file_name) case _: print("unknown export type") if export_file.exists(): print(f"{export_file} exists")