import mariadb from sqlalchemy import create_engine, text, MetaData, join from sqlalchemy.orm import DeclarativeBase, relationship, sessionmaker from database.base import Base from database.comic import Comic from database.metadata import MetaDataTable, MetaDataColumn class KontorDB: def __init__(self, db_config): self.db_conn = mariadb.connect( host=db_config['mariadb']['host'], port=db_config['mariadb']['port'], user=db_config['mariadb']['user'], password=db_config['mariadb']['password'], database=db_config['mariadb']['database'] ) connect_string = ('mariadb+mariadbconnector://{}:{}@{}:{}/{}' .format( db_config['mariadb']['user'], db_config['mariadb']['password'], db_config['mariadb']['host'], db_config['mariadb']['port'], db_config['mariadb']['database'])) # engine = create_engine(connect_string, echo=True) engine = create_engine(connect_string) Base.metadata.create_all(bind=engine) __session__ = sessionmaker(bind=engine) self.session = __session__() def get_table_id(self, table_name): cursor = self.db_conn.cursor() cursor.execute("SELECT id, created_date, last_modified_date FROM meta_data_table WHERE table_name=?", (table_name,)) row = cursor.fetchone() cursor.close() # table_ids = self.session.query(MetaDataTable).filter(MetaDataTable.table_name == table_name).all() # print(type(table_ids)) return row[0] def get_table_names(self) -> list: tables_names = [] cursor = self.db_conn.cursor() cursor.execute("SELECT id, table_name from meta_data_table") rows = cursor.fetchall().all for row in rows: print(row) for (_, table_name) in rows: tables_names.append(table_name) cursor.close() tables = self.session.query(MetaDataTable).all() for table in tables: print(table) return tables_names def get_column_meta_data(self, table_id: str, table_name: str): cursor = self.db_conn.cursor() meta_data = {} cursor.execute( "SELECT column_name, column_order, column_label FROM meta_data_column WHERE table_id=? AND is_shown is true ORDER bY column_order", (table_id,)) rows = cursor.fetchall() order = 0 for (column_name, column_order, column_label) in rows: meta_data[order] = {'column': column_name, 'label': column_label, 'order': column_order} order += 1 cursor.close() columns = (self.session.query(MetaDataColumn). join(MetaDataTable, MetaDataTable.id == MetaDataColumn.table_id). filter(MetaDataTable.table_name == table_name). filter(MetaDataColumn.is_shown is True). order_by(MetaDataColumn.column_order).all()) print(columns) for column in columns: print(column) result = repr(column) if result is not None: print(result) # print(f"retrieved {len(rows)} columns, set {len(meta_data)} headers") return meta_data def get_filters(self, table_id): cursor = self.db_conn.cursor() filters = {} cursor.execute( "SELECT column_name, filter_label from meta_data_column WHERE table_id=? AND show_filter is true", (table_id,)) rows = cursor.fetchall() for row in rows: filters[row[0]] = {'label': row[1], 'widget': None} cursor.close() # print(f"retrieved {len(rows)} filters: {filters}") return filters def get_data(self, table_name: str, columns: dict, where_clause: str) -> list: data = [] cursor = self.db_conn.cursor() cursor.execute(self.get_statement(table_name, columns, where_clause)) rows = cursor.fetchall() print(len(rows)) for row in rows: # print(f"KontorDB.get_data: {row}") data.append(list(row)) cursor.close() print(f"KontorDB.getData: return {len(data)}") if table_name == 'comic' and len(where_clause) == 0: data.clear() comics = self.session.query(Comic).all() for item in comics: # print(item) row = [] for order in columns.keys(): column_name = columns[order]['column'] if column_name == 'publisher_id': row.append(item.publisher.name) else: row.append(getattr(item, column_name)) # print(repr(row)) data.append(row) return data def get_statement(self, table: str, header: dict, where_clause): columns = "" for index, column in header.items(): if index > 0: columns += ", " columns += column['column'] if len(columns) == 0: columns = "*" statement = f"SELECT {columns} FROM {table} {where_clause}" print(f"{statement=}") return statement