Files
kontor/qt/database/__init__.py
T

106 lines
4.2 KiB
Python

import mariadb
from sqlalchemy import create_engine, select, text, MetaData, join
from sqlalchemy.orm import DeclarativeBase, relationship, sessionmaker
from database.base import Base
from database.comic import Comic
from database.metadata import MetaDataTable, MetaDataColumn
class KontorDB:
def __init__(self, db_config):
self.db_conn = mariadb.connect(
host=db_config['mariadb']['host'],
port=db_config['mariadb']['port'],
user=db_config['mariadb']['user'],
password=db_config['mariadb']['password'],
database=db_config['mariadb']['database']
)
connect_string = ('mariadb+mariadbconnector://{}:{}@{}:{}/{}'
.format(
db_config['mariadb']['user'],
db_config['mariadb']['password'],
db_config['mariadb']['host'],
db_config['mariadb']['port'],
db_config['mariadb']['database']))
# engine = create_engine(connect_string, echo=True)
engine = create_engine(connect_string)
Base.metadata.create_all(bind=engine)
__session__ = sessionmaker(bind=engine)
self.session = __session__()
def get_table_id(self, table_name):
result = self.session.execute(select(MetaDataTable.id).where(MetaDataTable.table_name == table_name)).scalar()
return result
def get_table_names(self) -> list:
tables = self.session.query(MetaDataTable).all()
result = [table.table_name for table in tables]
return result
def get_column_meta_data(self, table_id: str, table_name: str) -> dict:
meta_data = {}
order = 0
for (_, column) in self.session.query(MetaDataTable, MetaDataColumn).filter(MetaDataTable.id == MetaDataColumn.table_id).filter(MetaDataTable.table_name == table_name).filter(MetaDataColumn.is_shown == 1).all():
meta_data[order] = {'column': column.column_name, 'label': column.column_label, 'order': column.column_order, 'ref_column': column.ref_column}
order += 1
return meta_data
def get_filters(self, table_id):
cursor = self.db_conn.cursor()
filters = {}
cursor.execute(
"SELECT column_name, filter_label from meta_data_column WHERE table_id=? AND show_filter is true",
(table_id,))
rows = cursor.fetchall()
for row in rows:
filters[row[0]] = {'label': row[1], 'widget': None}
cursor.close()
# print(f"retrieved {len(rows)} filters: {filters}")
return filters
def get_data(self, table_name: str, columns: dict, where_clause: str) -> list:
data = []
cursor = self.db_conn.cursor()
cursor.execute(self.get_statement(table_name, columns, where_clause))
rows = cursor.fetchall()
print(len(rows))
for row in rows:
# print(f"KontorDB.get_data: {row}")
data.append(list(row))
cursor.close()
# print(f"KontorDB.getData: return {len(data)}")
if table_name == 'comic' and len(where_clause) == 0:
data.clear()
comics = self.session.query(Comic).all()
for item in comics:
# print(item)
row = []
for order in columns.keys():
column_name = columns[order]['column']
if str(column_name).endswith("_id"):
ref_table = column_name[:-3]
# print(f"{ref_table=}")
ref = getattr(item, ref_table)
value = getattr(ref, "name")
# print(f"{value=}")
row.append(value)
else:
row.append(getattr(item, column_name))
# print(repr(row))
data.append(row)
return data
def get_statement(self, table: str, header: dict, where_clause):
columns = ""
for index, column in header.items():
if index > 0:
columns += ", "
columns += column['column']
if len(columns) == 0:
columns = "*"
statement = f"SELECT {columns} FROM {table} {where_clause}"
print(f"{statement=}")
return statement