import mariadb from sqlalchemy import create_engine, select, text, MetaData, join from sqlalchemy.orm import DeclarativeBase, relationship, sessionmaker from database.base import Base from database.comic import Comic from database.metadata import MetaDataTable, MetaDataColumn class KontorDB: def __init__(self, db_config): self.db_conn = mariadb.connect( host=db_config['mariadb']['host'], port=db_config['mariadb']['port'], user=db_config['mariadb']['user'], password=db_config['mariadb']['password'], database=db_config['mariadb']['database'] ) connect_string = ('mariadb+mariadbconnector://{}:{}@{}:{}/{}' .format( db_config['mariadb']['user'], db_config['mariadb']['password'], db_config['mariadb']['host'], db_config['mariadb']['port'], db_config['mariadb']['database'])) # engine = create_engine(connect_string, echo=True) engine = create_engine(connect_string) Base.metadata.create_all(bind=engine) __session__ = sessionmaker(bind=engine) self.session = __session__() def get_table_id(self, table_name): result = self.session.execute(select(MetaDataTable.id).where(MetaDataTable.table_name == table_name)).scalar() return result def get_table_names(self) -> list: tables = self.session.query(MetaDataTable).all() result = [table.table_name for table in tables] return result def get_column_meta_data(self, table_id: str, table_name: str) -> dict: meta_data = {} order = 0 for (_, column) in self.session.query(MetaDataTable, MetaDataColumn).filter(MetaDataTable.id == MetaDataColumn.table_id).filter(MetaDataTable.table_name == table_name).filter(MetaDataColumn.is_shown == 1).all(): meta_data[order] = {'column': column.column_name, 'label': column.column_label, 'order': column.column_order, 'ref_column': column.ref_column} order += 1 return meta_data def get_filters(self, table_id): cursor = self.db_conn.cursor() filters = {} cursor.execute( "SELECT column_name, filter_label from meta_data_column WHERE table_id=? AND show_filter is true", (table_id,)) rows = cursor.fetchall() for row in rows: filters[row[0]] = {'label': row[1], 'widget': None} cursor.close() # print(f"retrieved {len(rows)} filters: {filters}") return filters def get_data(self, table_name: str, columns: dict, where_clause: str) -> list: data = [] cursor = self.db_conn.cursor() cursor.execute(self.get_statement(table_name, columns, where_clause)) rows = cursor.fetchall() print(len(rows)) for row in rows: # print(f"KontorDB.get_data: {row}") data.append(list(row)) cursor.close() # print(f"KontorDB.getData: return {len(data)}") if table_name == 'comic' and len(where_clause) == 0: data.clear() comics = self.session.query(Comic).all() for item in comics: # print(item) row = [] for order in columns.keys(): column_name = columns[order]['column'] if str(column_name).endswith("_id"): ref_table = column_name[:-3] # print(f"{ref_table=}") ref = getattr(item, ref_table) value = getattr(ref, "name") # print(f"{value=}") row.append(value) else: row.append(getattr(item, column_name)) # print(repr(row)) data.append(row) return data def get_statement(self, table: str, header: dict, where_clause): columns = "" for index, column in header.items(): if index > 0: columns += ", " columns += column['column'] if len(columns) == 0: columns = "*" statement = f"SELECT {columns} FROM {table} {where_clause}" print(f"{statement=}") return statement