184 lines
7.4 KiB
Python
184 lines
7.4 KiB
Python
import json
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import mariadb
|
|
from sqlalchemy import create_engine, select, text, MetaData, join
|
|
from sqlalchemy.orm import DeclarativeBase, relationship, sessionmaker
|
|
|
|
from database.base import Base
|
|
from database.comic import Comic
|
|
from database.metadata import MetaDataTable, MetaDataColumn
|
|
|
|
|
|
class KontorDB:
|
|
|
|
def __init__(self, db_config):
|
|
self.db_conn = mariadb.connect(
|
|
host=db_config['mariadb']['host'],
|
|
port=db_config['mariadb']['port'],
|
|
user=db_config['mariadb']['user'],
|
|
password=db_config['mariadb']['password'],
|
|
database=db_config['mariadb']['database']
|
|
)
|
|
connect_string = ('mariadb+mariadbconnector://{}:{}@{}:{}/{}'
|
|
.format(
|
|
db_config['mariadb']['user'],
|
|
db_config['mariadb']['password'],
|
|
db_config['mariadb']['host'],
|
|
db_config['mariadb']['port'],
|
|
db_config['mariadb']['database']))
|
|
# engine = create_engine(connect_string, echo=True)
|
|
engine = create_engine(connect_string)
|
|
Base.metadata.create_all(bind=engine)
|
|
__session__ = sessionmaker(bind=engine)
|
|
self.session = __session__()
|
|
|
|
def get_table_names(self) -> list:
|
|
tables = self.session.query(MetaDataTable).all()
|
|
result = [table.table_name for table in tables]
|
|
return result
|
|
|
|
def get_column_meta_data(self, table_name: str, view_only=True) -> dict:
|
|
meta_data = {}
|
|
order = 0
|
|
if view_only:
|
|
for (_, column) in (self.session.query(MetaDataTable, MetaDataColumn).
|
|
filter(MetaDataTable.id == MetaDataColumn.table_id).
|
|
filter(MetaDataTable.table_name == table_name).
|
|
filter(MetaDataColumn.is_shown == 1).all()):
|
|
meta_data[order] = {'column': column.column_name, 'label': column.column_label,
|
|
'order': column.column_order, 'ref_column': column.ref_column}
|
|
order += 1
|
|
else:
|
|
for (_, column) in (self.session.query(MetaDataTable, MetaDataColumn).
|
|
filter(MetaDataTable.id == MetaDataColumn.table_id).
|
|
filter(MetaDataTable.table_name == table_name).all()):
|
|
meta_data[order] = {
|
|
'column': column.column_name,
|
|
'order': column.column_order,
|
|
'ref_column': column.ref_column
|
|
}
|
|
order += 1
|
|
return meta_data
|
|
|
|
def get_filters(self, table_name):
|
|
_filter_map = {}
|
|
for (_, column) in (self.session.query(MetaDataTable, MetaDataColumn).
|
|
filter(MetaDataTable.id == MetaDataColumn.table_id).
|
|
filter(MetaDataTable.table_name == table_name).
|
|
filter(MetaDataColumn.show_filter == 1).all()):
|
|
_filter_map[column.column_name] = {'label': column.filter_label, 'widget': None}
|
|
print(f"retrieved {len(_filter_map)} filters: {_filter_map}")
|
|
return _filter_map
|
|
|
|
def data(self, table, columns: dict, filters) -> list:
|
|
data = []
|
|
entries = []
|
|
if len(filters) == 0:
|
|
entries = self.session.query(table).all()
|
|
else:
|
|
entries = self.session.query(table).filter_by(**filters)
|
|
for entry in entries:
|
|
row = []
|
|
for order in columns.keys():
|
|
column_name = columns[order]['column']
|
|
if str(column_name).endswith("_id"):
|
|
ref_table = column_name[:-3]
|
|
# print(f"{ref_table=}")
|
|
ref = getattr(entry, ref_table)
|
|
value = getattr(ref, "name")
|
|
# print(f"{value=}")
|
|
row.append(value)
|
|
else:
|
|
row.append(getattr(entry, column_name))
|
|
# print(repr(row))
|
|
data.append(row)
|
|
return data
|
|
|
|
def get_data(self, table_name: str, columns: dict, where_clause: str) -> list:
|
|
data = []
|
|
cursor = self.db_conn.cursor()
|
|
cursor.execute(self.get_statement(table_name, columns, where_clause))
|
|
rows = cursor.fetchall()
|
|
print(len(rows))
|
|
for row in rows:
|
|
# print(f"KontorDB.get_data: {row}")
|
|
data.append(list(row))
|
|
cursor.close()
|
|
# print(f"KontorDB.getData: return {len(data)}")
|
|
if table_name == 'comic' and len(where_clause) == 0:
|
|
data.clear()
|
|
comics = self.session.query(Comic).all()
|
|
for item in comics:
|
|
# print(item)
|
|
row = []
|
|
for order in columns.keys():
|
|
column_name = columns[order]['column']
|
|
if str(column_name).endswith("_id"):
|
|
ref_table = column_name[:-3]
|
|
# print(f"{ref_table=}")
|
|
ref = getattr(item, ref_table)
|
|
value = getattr(ref, "name")
|
|
# print(f"{value=}")
|
|
row.append(value)
|
|
else:
|
|
row.append(getattr(item, column_name))
|
|
# print(repr(row))
|
|
data.append(row)
|
|
return data
|
|
|
|
def get_statement(self, table: str, header: dict, where_clause):
|
|
columns = ""
|
|
for index, column in header.items():
|
|
if index > 0:
|
|
columns += ", "
|
|
columns += column['column']
|
|
if len(columns) == 0:
|
|
columns = "*"
|
|
statement = f"SELECT {columns} FROM {table} {where_clause}"
|
|
print(f"{statement=}")
|
|
return statement
|
|
|
|
def export_db(self, export_type: str, export_file_name: str, export_table_list: list):
|
|
print(f"export DB to {export_file_name} as {export_type}")
|
|
db = {}
|
|
for table in export_table_list:
|
|
columns = self.get_column_meta_data(table, view_only=False)
|
|
model = Base.model_lookup_by_table_name(table)
|
|
rows = self.session.query(model).all()
|
|
entries = []
|
|
print(f"found {len(rows)} entries")
|
|
print(f"found {len(columns)} columns")
|
|
for row in rows:
|
|
print(row)
|
|
entry = {}
|
|
for order in columns:
|
|
print(columns[order])
|
|
column_name = columns[order]['column']
|
|
print(f"get value {column_name} from {row} of table {table}")
|
|
try:
|
|
value = getattr(row, column_name)
|
|
if isinstance(value, datetime):
|
|
entry[column_name] = str(value)
|
|
else:
|
|
entry[column_name] = value
|
|
except AttributeError as error:
|
|
print("could not get value")
|
|
entries.append(entry)
|
|
db[table] = entries
|
|
export_file = Path(export_file_name)
|
|
match export_type:
|
|
case "JSON":
|
|
json_dump = json.dumps(db, indent=4)
|
|
with open(export_file_name, "w") as dump_file:
|
|
dump_file.write(json_dump)
|
|
case "YAML":
|
|
export_file = Path(export_file_name)
|
|
case "SQLite":
|
|
export_file = Path(export_file_name)
|
|
case _:
|
|
print("unknown export type")
|
|
if export_file.exists():
|
|
print(f"{export_file} exists")
|