import mariadb class KontorDB: def __init__(self, db_config): self.db_conn = mariadb.connect( host=db_config['mariadb']['host'], port=db_config['mariadb']['port'], user=db_config['mariadb']['user'], password=db_config['mariadb']['password'], database=db_config['mariadb']['database'] ) def get_table_id(self, table_name): cursor = self.db_conn.cursor() cursor.execute("SELECT id, created_date, last_modified_date FROM meta_data_table WHERE table_name=?", (table_name, )) row = cursor.fetchone() cursor.close() return row[0] def get_table_names(self) -> list: tables_names = [] cursor = self.db_conn.cursor() cursor.execute("SELECT id, table_name from meta_data_table") rows = cursor.fetchall() for (_, table_name) in rows: tables_names.append(table_name) cursor.close() return tables_names def get_column_meta_data(self, table_id): cursor = self.db_conn.cursor() meta_data = {} cursor.execute("SELECT column_name, column_order, column_label FROM meta_data_column WHERE table_id=? AND is_shown is true ORDER bY column_order", (table_id, )) rows = cursor.fetchall() order = 0 for (column_name, column_order, column_label) in rows: meta_data[order] = { 'column': column_name, 'label': column_label, 'order': column_order} order += 1 cursor.close() # print(f"retrieved {len(rows)} columns, set {len(meta_data)} headers") return meta_data def get_filters(self, table_id): cursor = self.db_conn.cursor() filters = {} cursor.execute("SELECT column_name, filter_label from meta_data_column WHERE table_id=? AND show_filter is true", (table_id, )) rows = cursor.fetchall() for row in rows: filters[row[0]] = {'label': row[1], 'widget': None} cursor.close() # print(f"retrieved {len(rows)} filters: {filters}") return filters def get_data(self, table_name: str, columns: dict, where_clause: str) -> list: data = [] cursor = self.db_conn.cursor() cursor.execute(self.get_statement(table_name, columns, where_clause)) rows = cursor.fetchall() print(len(rows)) for row in rows: # print(f"KontorDB.get_data: {row}") data.append(list(row)) cursor.close() print(f"KontorDB.getData: return {len(data)}") return data def get_statement(self, table: str, header: dict, where_clause): columns = "" for index, column in header.items(): if index > 0: columns += ", " columns += column['column'] if len(columns) == 0: columns = "*" statement = f"SELECT {columns} FROM {table} {where_clause}" print(f"{statement=}") return statement