import json from datetime import datetime from pathlib import Path import mariadb from sqlalchemy import create_engine, select, text, MetaData, join from sqlalchemy.orm import DeclarativeBase, relationship, sessionmaker from database.base import Base from database.comic import Comic from database.metadata import MetaDataTable, MetaDataColumn class KontorDB: def __init__(self, db_config): self.db_conn = mariadb.connect( host=db_config['mariadb']['host'], port=db_config['mariadb']['port'], user=db_config['mariadb']['user'], password=db_config['mariadb']['password'], database=db_config['mariadb']['database'] ) connect_string = ('mariadb+mariadbconnector://{}:{}@{}:{}/{}' .format( db_config['mariadb']['user'], db_config['mariadb']['password'], db_config['mariadb']['host'], db_config['mariadb']['port'], db_config['mariadb']['database'])) # engine = create_engine(connect_string, echo=True) engine = create_engine(connect_string) Base.metadata.create_all(bind=engine) __session__ = sessionmaker(bind=engine) self.session = __session__() def get_table_names(self) -> list: tables = self.session.query(MetaDataTable).all() result = [table.table_name for table in tables] return result def get_column_meta_data(self, table_name: str, view_only=True) -> dict: meta_data = {} order = 0 if view_only: for (_, column) in (self.session.query(MetaDataTable, MetaDataColumn). filter(MetaDataTable.id == MetaDataColumn.table_id). filter(MetaDataTable.table_name == table_name). filter(MetaDataColumn.is_shown == 1).all()): meta_data[order] = {'column': column.column_name, 'label': column.column_label, 'order': column.column_order, 'ref_column': column.ref_column} order += 1 else: for (_, column) in (self.session.query(MetaDataTable, MetaDataColumn). filter(MetaDataTable.id == MetaDataColumn.table_id). filter(MetaDataTable.table_name == table_name).all()): meta_data[order] = { 'column': column.column_name, 'order': column.column_order, 'ref_column': column.ref_column } order += 1 return meta_data def get_filters(self, table_name): _filter_map = {} for (_, column) in (self.session.query(MetaDataTable, MetaDataColumn). filter(MetaDataTable.id == MetaDataColumn.table_id). filter(MetaDataTable.table_name == table_name). filter(MetaDataColumn.show_filter == 1).all()): _filter_map[column.column_name] = {'label': column.filter_label, 'widget': None} print(f"retrieved {len(_filter_map)} filters: {_filter_map}") return _filter_map def data(self, table, columns: dict, filters) -> list: data = [] entries = [] if len(filters) == 0: entries = self.session.query(table).all() else: entries = self.session.query(table).filter_by(**filters) for entry in entries: row = [] for order in columns.keys(): column_name = columns[order]['column'] if str(column_name).endswith("_id"): ref_table = column_name[:-3] # print(f"{ref_table=}") ref = getattr(entry, ref_table) value = getattr(ref, "name") # print(f"{value=}") row.append(value) else: row.append(getattr(entry, column_name)) # print(repr(row)) data.append(row) return data def get_data(self, table_name: str, columns: dict, where_clause: str) -> list: data = [] cursor = self.db_conn.cursor() cursor.execute(self.get_statement(table_name, columns, where_clause)) rows = cursor.fetchall() print(len(rows)) for row in rows: # print(f"KontorDB.get_data: {row}") data.append(list(row)) cursor.close() # print(f"KontorDB.getData: return {len(data)}") if table_name == 'comic' and len(where_clause) == 0: data.clear() comics = self.session.query(Comic).all() for item in comics: # print(item) row = [] for order in columns.keys(): column_name = columns[order]['column'] if str(column_name).endswith("_id"): ref_table = column_name[:-3] # print(f"{ref_table=}") ref = getattr(item, ref_table) value = getattr(ref, "name") # print(f"{value=}") row.append(value) else: row.append(getattr(item, column_name)) # print(repr(row)) data.append(row) return data def get_statement(self, table: str, header: dict, where_clause): columns = "" for index, column in header.items(): if index > 0: columns += ", " columns += column['column'] if len(columns) == 0: columns = "*" statement = f"SELECT {columns} FROM {table} {where_clause}" print(f"{statement=}") return statement def export_db(self, export_type: str, export_file_name: str, export_table_list: list): print(f"export DB to {export_file_name} as {export_type}") db = {} for table in export_table_list: columns = self.get_column_meta_data(table, view_only=False) model = Base.model_lookup_by_table_name(table) rows = self.session.query(model).all() entries = [] print(f"found {len(rows)} entries") print(f"found {len(columns)} columns") for row in rows: print(row) entry = {} for order in columns: print(columns[order]) column_name = columns[order]['column'] print(f"get value {column_name} from {row} of table {table}") try: value = getattr(row, column_name) if isinstance(value, datetime): entry[column_name] = str(value) else: entry[column_name] = value except AttributeError as error: print("could not get value") entries.append(entry) db[table] = entries export_file = Path(export_file_name) match export_type: case "JSON": json_dump = json.dumps(db, indent=4) with open(export_file_name, "w") as dump_file: dump_file.write(json_dump) case "YAML": export_file = Path(export_file_name) case "SQLite": export_file = Path(export_file_name) case _: print("unknown export type") if export_file.exists(): print(f"{export_file} exists")