Define meta data model for Kontor DB #52

Merged
tpeetz merged 2 commits from feature/define-meta-data-model-python into develop/0.1.0 2025-01-14 12:22:54 +00:00
11 changed files with 203 additions and 55 deletions
+98 -20
View File
@@ -1,3 +1,7 @@
import json
from datetime import datetime
from pathlib import Path
import mariadb
from sqlalchemy import create_engine, select, text, MetaData, join
from sqlalchemy.orm import DeclarativeBase, relationship, sessionmaker
@@ -30,35 +34,67 @@ class KontorDB:
__session__ = sessionmaker(bind=engine)
self.session = __session__()
def get_table_id(self, table_name):
result = self.session.execute(select(MetaDataTable.id).where(MetaDataTable.table_name == table_name)).scalar()
return result
def get_table_names(self) -> list:
tables = self.session.query(MetaDataTable).all()
result = [table.table_name for table in tables]
return result
def get_column_meta_data(self, table_id: str, table_name: str) -> dict:
def get_column_meta_data(self, table_name: str, view_only=True) -> dict:
meta_data = {}
order = 0
for (_, column) in self.session.query(MetaDataTable, MetaDataColumn).filter(MetaDataTable.id == MetaDataColumn.table_id).filter(MetaDataTable.table_name == table_name).filter(MetaDataColumn.is_shown == 1).all():
meta_data[order] = {'column': column.column_name, 'label': column.column_label, 'order': column.column_order, 'ref_column': column.ref_column}
order += 1
if view_only:
for (_, column) in (self.session.query(MetaDataTable, MetaDataColumn).
filter(MetaDataTable.id == MetaDataColumn.table_id).
filter(MetaDataTable.table_name == table_name).
filter(MetaDataColumn.is_shown == 1).all()):
meta_data[order] = {'column': column.column_name, 'label': column.column_label,
'order': column.column_order, 'ref_column': column.ref_column}
order += 1
else:
for (_, column) in (self.session.query(MetaDataTable, MetaDataColumn).
filter(MetaDataTable.id == MetaDataColumn.table_id).
filter(MetaDataTable.table_name == table_name).all()):
meta_data[order] = {
'column': column.column_name,
'order': column.column_order,
'ref_column': column.ref_column
}
order += 1
return meta_data
def get_filters(self, table_id):
cursor = self.db_conn.cursor()
filters = {}
cursor.execute(
"SELECT column_name, filter_label from meta_data_column WHERE table_id=? AND show_filter is true",
(table_id,))
rows = cursor.fetchall()
for row in rows:
filters[row[0]] = {'label': row[1], 'widget': None}
cursor.close()
# print(f"retrieved {len(rows)} filters: {filters}")
return filters
def get_filters(self, table_name):
_filter_map = {}
for (_, column) in (self.session.query(MetaDataTable, MetaDataColumn).
filter(MetaDataTable.id == MetaDataColumn.table_id).
filter(MetaDataTable.table_name == table_name).
filter(MetaDataColumn.show_filter == 1).all()):
_filter_map[column.column_name] = {'label': column.filter_label, 'widget': None}
print(f"retrieved {len(_filter_map)} filters: {_filter_map}")
return _filter_map
def data(self, table, columns: dict, filters) -> list:
data = []
entries = []
if len(filters) == 0:
entries = self.session.query(table).all()
else:
entries = self.session.query(table).filter_by(**filters)
for entry in entries:
row = []
for order in columns.keys():
column_name = columns[order]['column']
if str(column_name).endswith("_id"):
ref_table = column_name[:-3]
# print(f"{ref_table=}")
ref = getattr(entry, ref_table)
value = getattr(ref, "name")
# print(f"{value=}")
row.append(value)
else:
row.append(getattr(entry, column_name))
# print(repr(row))
data.append(row)
return data
def get_data(self, table_name: str, columns: dict, where_clause: str) -> list:
data = []
@@ -103,3 +139,45 @@ class KontorDB:
statement = f"SELECT {columns} FROM {table} {where_clause}"
print(f"{statement=}")
return statement
def export_db(self, export_type: str, export_file_name: str, export_table_list: list):
print(f"export DB to {export_file_name} as {export_type}")
db = {}
for table in export_table_list:
columns = self.get_column_meta_data(table, view_only=False)
model = Base.model_lookup_by_table_name(table)
rows = self.session.query(model).all()
entries = []
print(f"found {len(rows)} entries")
print(f"found {len(columns)} columns")
for row in rows:
print(row)
entry = {}
for order in columns:
print(columns[order])
column_name = columns[order]['column']
print(f"get value {column_name} from {row} of table {table}")
try:
value = getattr(row, column_name)
if isinstance(value, datetime):
entry[column_name] = str(value)
else:
entry[column_name] = value
except AttributeError as error:
print("could not get value")
entries.append(entry)
db[table] = entries
export_file = Path(export_file_name)
match export_type:
case "JSON":
json_dump = json.dumps(db, indent=4)
with open(export_file_name, "w") as dump_file:
dump_file.write(json_dump)
case "YAML":
export_file = Path(export_file_name)
case "SQLite":
export_file = Path(export_file_name)
case _:
print("unknown export type")
if export_file.exists():
print(f"{export_file} exists")
+16 -3
View File
@@ -1,7 +1,20 @@
from sqlalchemy import Column, String, DateTime, Integer
from sqlalchemy.orm import DeclarativeBase, relationship, sessionmaker
from sqlalchemy.orm import DeclarativeBase, relationship, sessionmaker, declarative_base
class Base(DeclarativeBase):
pass
# class Base(DeclarativeBase):
# pass
class BaseModel:
@classmethod
def model_lookup_by_table_name(cls, table_name):
registry_instance = getattr(cls, "registry")
for mapper_ in registry_instance.mappers:
model = mapper_.class_
model_class_name = model.__tablename__
if model_class_name == table_name:
return model
Base = declarative_base(cls=BaseModel)
+4 -4
View File
@@ -1,4 +1,4 @@
from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy.orm import relationship
@@ -106,7 +106,7 @@ class Artist(Base):
comic_works = relationship("ComicWork")
class Worktype(Base):
class WorkType(Base):
__tablename__ = "worktype"
id = Column(String, primary_key=True)
created_date = Column(DateTime)
@@ -126,5 +126,5 @@ class ComicWork(Base):
comic = relationship("Comic", back_populates="comic_works")
artist_id = Column(String, ForeignKey("artist.id"), nullable=False)
artist = relationship("Artist", back_populates="comic_works")
worktype_id = Column(String, ForeignKey("worktype.id"), nullable=False)
worktype = relationship("Worktype", back_populates="comic_works")
work_type_id = Column(String, ForeignKey("worktype.id"), nullable=False)
work_type = relationship("WorkType", back_populates="comic_works")
+3 -3
View File
@@ -1,4 +1,4 @@
from sqlalchemy import Boolean, Column, DateTime, Integer, String
from sqlalchemy import Column, DateTime, Integer, String
from sqlalchemy.dialects.mysql import BIT
from database.base import Base
@@ -13,10 +13,10 @@ class MediaFile(Base):
cloud_link = Column(String(255))
file_name = Column(String(255))
path = Column(String(255))
review = Column(BIT(1), default=True)
review = Column(BIT(1))
title = Column(String(255))
url = Column(String(255))
should_download = Column(Boolean, default=True)
should_download = Column(BIT(1))
def __repr__(self):
return f'MediaFile({self.id} {self.title} {self.title})'
+1 -1
View File
@@ -1,4 +1,4 @@
from sqlalchemy import Boolean, Column, DateTime, Integer, String, ForeignKey, UniqueConstraint
from sqlalchemy import Column, DateTime, Integer, String, ForeignKey, UniqueConstraint
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy.orm import relationship
+12
View File
@@ -0,0 +1,12 @@
from abc import ABC, abstractmethod
class DataViewMeta(ABC):
@abstractmethod
def get_header(self):
pass
class ComicView(DataViewMeta):
def get_header(self):
pass
+32
View File
@@ -0,0 +1,32 @@
from typing import List
from PySide6.QtCore import QModelIndex, QAbstractTableModel
from PySide6.QtGui import Qt
from gui.data_view import DataViewMeta
class DataViewModel(QAbstractTableModel):
def __init__(self):
super().__init__()
self.main_window = None
self._config = None
self._data = List[DataViewMeta]
def rowCount(self, parent=QModelIndex()):
return len(self._data)
def columnCount(self, parent=QModelIndex()):
return 0
def headerData(self, section, orientation, role=Qt.ItemDataRole.DisplayRole):
return None
def data(self, index, role=Qt.ItemDataRole.DisplayRole):
return None
def setData(self, index, value, role=Qt.ItemDataRole.EditRole):
return False
def flags(self, index):
return None
+2 -2
View File
@@ -1,7 +1,7 @@
from pathlib import Path
from PySide6.QtWidgets import QDialog, QDialogButtonBox, QVBoxLayout, QLabel, QHBoxLayout, QPushButton, QFileDialog, \
QGroupBox, QCheckBox, QComboBox
QCheckBox, QComboBox
class ExportKontorDialog(QDialog):
@@ -10,7 +10,7 @@ class ExportKontorDialog(QDialog):
self.parent = parent
self.kontor_db = kontor_db
self.file_name = None
self.file_name = "data.json"
self.tables = []
self._table_options = {}
+7 -4
View File
@@ -3,6 +3,8 @@ from PySide6.QtWidgets import QWidget, QVBoxLayout, QMenu, QMessageBox, QTabWidg
from PySide6.QtWidgets import QLabel, QMainWindow
from database import KontorDB
from database.media import MediaFile
from database.comic import Comic
from gui.dialogs import ExportKontorDialog, ImportKontorDialog
from gui.model_config import KontorModelConfig
from gui.table_model import KontorTableModel
@@ -33,8 +35,8 @@ class MainWindow(QMainWindow):
parent_layout = QVBoxLayout()
self.central_widget.setLayout(parent_layout)
self.tabs = QTabWidget()
self.tabs.addTab(self.generate_data_tab("comic"), "Comics")
self.tabs.addTab(self.generate_data_tab("media_file"), "MediaFile")
self.tabs.addTab(self.generate_data_tab("comic", Comic), "Comics")
self.tabs.addTab(self.generate_data_tab("media_file", MediaFile), "MediaFile")
self.tabs.currentChanged.connect(self._tab_changed)
#label.setAlignment(Qt.AlignmentFlag.AlignCenter)
parent_layout.addWidget(self.tabs)
@@ -111,6 +113,7 @@ class MainWindow(QMainWindow):
print(export_dlg.get_tables_to_export())
print(f"export DB to {export_dlg.file_name}")
self.statusBar.showMessage(f"export DB to {export_dlg.file_name}", 3000)
self.kontor_db.export_db(export_dlg.current_export_type, export_dlg.file_name, export_dlg.get_tables_to_export())
else:
self.statusBar.showMessage("Export cancelled", 3000)
@@ -120,9 +123,9 @@ class MainWindow(QMainWindow):
def _tab_changed(self, tab_index):
self.data[tab_index].refresh()
def generate_data_tab(self, table_name):
def generate_data_tab(self, table_name, table):
data_tab = QWidget()
table_config = KontorModelConfig(self.kontor_db, self, table_name)
table_config = KontorModelConfig(self.kontor_db, self, table_name, table)
model = KontorTableModel(table_config)
layout = QVBoxLayout()
self.data.append(model)
+18 -13
View File
@@ -6,25 +6,18 @@ from database import KontorDB
class KontorModelConfig:
def __init__(self, kontor_db: KontorDB, main_window, table_name: str):
def __init__(self, kontor_db: KontorDB, main_window, table_name: str, table):
self.header = {}
self.filter = {}
self.main_window = main_window
self._table = table_name
self._table_id = None
self._table_name = table_name
self._table = table
self.kontor_db = kontor_db
self.get_table_config()
def get_table_id(self):
if self._table_id is not None:
return
self._table_id = self.kontor_db.get_table_id(self._table)
def get_table_config(self):
if self._table_id is None:
self.get_table_id()
self.header = self.kontor_db.get_column_meta_data(self._table_id, self._table)
self.filter = self.kontor_db.get_filters(self._table_id)
self.header = self.kontor_db.get_column_meta_data(self._table_name)
self.filter = self.kontor_db.get_filters(self._table_name)
def get_filter(self) -> str:
filter_rule = ""
@@ -41,8 +34,20 @@ class KontorModelConfig:
# print(f"{filter_rule=}")
return filter_rule
def filters(self) -> dict:
_filters = {}
# print(self.filter["download"].isChecked())
for column, filter_info in self.filter.items():
# print(column, filter_info)
if filter_info['widget'].isChecked():
_filters[column] = True
# print(f"{filter_rule=}")
return _filters
def get_data(self) -> list:
data = self.kontor_db.get_data(self._table, self.header, self.get_filter())
# data = self.kontor_db.get_data(self._table_name, self.header, self.get_filter())
# data.clear()
data = self.kontor_db.data(self._table, self.header, self.filters())
# print(f"KontorModelConfig.get_data: {len(data)}")
# comics = self.kontor_db.session.query(Comic).all()
# print(f'{len(comics)} Comics loaded')
+10 -5
View File
@@ -45,8 +45,8 @@ class KontorTableModel(QAbstractTableModel):
if self._data is None:
return None
value = self._data[index.row()][index.column()]
if role == Qt.ItemDataRole.DisplayRole:
# print('{}: {}'.format(value, type(value)))
# print('{}:: {}:: {}: {}'.format(index, role, value, type(value)))
if role == Qt.ItemDataRole.DisplayRole or role == Qt.ItemDataRole.EditRole:
if isinstance(value, datetime):
return value.strftime("%Y-%m-%d %M:%M:%S")
if isinstance(value, str):
@@ -57,6 +57,7 @@ class KontorTableModel(QAbstractTableModel):
else:
return self._main_window.cross
if isinstance(value, int):
# print('{}:: {}: {}'.format(index, value, type(value)))
if value == 1:
return self._main_window.tick
else:
@@ -69,7 +70,6 @@ class KontorTableModel(QAbstractTableModel):
return str(value)
if role == Qt.ItemDataRole.DecorationRole:
if isinstance(value, bytes):
# print('{}: {}'.format(value, type(value)))
if value == b'\x01':
return self._main_window.tick
else:
@@ -91,13 +91,18 @@ class KontorTableModel(QAbstractTableModel):
# print(f"Header count: {len(self._config.get_header())}")
return len(self._config.header)
def setData(self, index, value, role=Qt.ItemDataRole.EditRole):
def setData(self, index, value, role: int) -> bool:
print(index, role)
if role == Qt.ItemDataRole.EditRole:
self._data[index.row()][index.column()] = value
print(self._data[index.row()][index.column()])
self.dataChanged.emit(index, index)
return True
if role == Qt.ItemDataRole.CheckStateRole:
print("role == Qt.ItemDataRole.CheckStateRole")
checked = value == Qt.CheckState.Checked
self._data[index.row()][index.column()] = checked
return True
return False
def flags(self, index):
return Qt.ItemFlag.ItemIsSelectable | Qt.ItemFlag.ItemIsEnabled | Qt.ItemFlag.ItemIsEditable | Qt.ItemFlag.ItemIsUserTristate