Table sortable #55

Merged
tpeetz merged 3 commits from table_sortable into develop/0.1.0 2025-02-06 15:59:29 +00:00
11 changed files with 487 additions and 428 deletions
@@ -0,0 +1,7 @@
from enum import Enum
class ArgumentData(Enum):
EXPORT_TYPE = 'export_type'
DB_FILE = 'db_file'
DATA_TYPE = 'data_type'
DELETE_FIRST = 'delete_first'
@@ -1,6 +1,8 @@
import mariadb
from cement import Controller, ex
from kontor.controllers import ArgumentData
class Database(Controller):
class Meta:
@@ -24,16 +26,16 @@ class Database(Controller):
)
def export(self):
data = {
'db_file': 'data.json',
'export_type': 'JSON',
ArgumentData.DB_FILE: 'data.json',
ArgumentData.EXPORT_TYPE: 'JSON',
}
if self.app.pargs.db_file is not None:
data['db_file'] = self.app.pargs.db_file
data[ArgumentData.DB_FILE] = self.app.pargs.db_file
db = self.app.kontor_db
self.app.log.info(f"export DB to {data['db_file']} as {data['export_type']}")
results = db.export_db(data['export_type'], data['db_file'])
data['results'] = results
self.app.render(data, 'export_db.jinja2')
self.app.log.info(f"export DB to {data[ArgumentData.DB_FILE]} as {data[ArgumentData.EXPORT_TYPE]}")
results = db.export_db(data[ArgumentData.EXPORT_TYPE], data[ArgumentData.DB_FILE])
for key, value in results.items():
self.app.log.info(f"{key}: {value}")
@ex(
label='import',
@@ -51,16 +53,18 @@ class Database(Controller):
)
def import_cmd(self):
data = {
'db_file': 'data.json',
'data_type': 'JSON',
'delete_first': False,
ArgumentData.DB_FILE: 'data.json',
ArgumentData.DATA_TYPE: 'JSON',
ArgumentData.DELETE_FIRST: False,
}
if self.app.pargs.db_file is not None:
data['db_file'] = self.app.pargs.db_file
data[ArgumentData.DB_FILE] = self.app.pargs.db_file
if self.app.pargs.delete_first is not None:
data['delete_first'] = self.app.pargs.delete_first
data[ArgumentData.DELETE_FIRST] = self.app.pargs.delete_first
db = self.app.kontor_db
db.import_db(data['db_file'], data['delete_first'])
if data[ArgumentData.DELETE_FIRST]:
db.delete_entries()
db.import_db(data[ArgumentData.DB_FILE])
@ex(
help='check the db schema against MetaDataTable and MetaDataColumn'
@@ -1,5 +0,0 @@
Following tables were exported:
{% for key, value in results.items() %}
Table {{key}}: {{value}} entries
{% endfor %}
+4 -2
View File
@@ -1,5 +1,6 @@
from PySide6.QtCore import Signal, QSortFilterProxyModel
from PySide6.QtWidgets import QMainWindow, QWidget, QVBoxLayout, QTabWidget, QMenu, QTableView, QMdiSubWindow
from PySide6.QtWidgets import QMainWindow, QWidget, QVBoxLayout, QTabWidget, QMenu, QTableView, QMdiSubWindow, \
QHeaderView
from gui.model_config import KontorModelConfig
from gui.table_model import KontorTableModel
@@ -53,10 +54,11 @@ class ComicWindow(QMdiSubWindow):
self.data_views.append(model)
data_tab.setLayout(layout)
table_view = QTableView()
header = table_view.horizontalHeader()
header.setSectionResizeMode(QHeaderView.ResizeMode.ResizeToContents)
proxy_model = QSortFilterProxyModel()
proxy_model.setSourceModel(model)
table_view.setSortingEnabled(True)
#table_view.setModel(model)
table_view.setModel(proxy_model)
layout.addLayout(table_config.get_filter_layout())
layout.addWidget(table_view)
+1 -1
View File
@@ -181,7 +181,7 @@ class MainWindow(QMainWindow):
import_dlg = ImportKontorDialog(self)
if import_dlg.exec():
print(f"import DB from file {import_dlg.file_name}")
self.kontor_db.import_db(import_dlg.file_name, False)
self.kontor_db.import_db(import_dlg.file_name)
else:
print("do nothing for import")
+2 -2
View File
@@ -1,5 +1,5 @@
from PySide6.QtCore import Signal, QSortFilterProxyModel
from PySide6.QtWidgets import QMdiSubWindow, QWidget, QVBoxLayout, QTabWidget, QTableView
from PySide6.QtWidgets import QMdiSubWindow, QWidget, QVBoxLayout, QTabWidget, QTableView, QHeaderView
from gui.model_config import KontorModelConfig
from gui.table_model import KontorTableModel
@@ -57,9 +57,9 @@ class MediaWindow(QMdiSubWindow):
proxy_model = QSortFilterProxyModel()
proxy_model.setSourceModel(model)
table_view.setSortingEnabled(True)
#table_view.setModel(model)
table_view.setModel(proxy_model)
layout.addLayout(table_config.get_filter_layout())
layout.addWidget(table_view)
model.refresh()
table_view.resizeColumnToContents(0)
return data_tab
+7 -5
View File
@@ -1,5 +1,6 @@
from PySide6.QtCore import Signal, QSortFilterProxyModel
from PySide6.QtWidgets import QMainWindow, QWidget, QVBoxLayout, QTabWidget, QMenu, QTableView, QMdiSubWindow
from PySide6.QtWidgets import QMainWindow, QWidget, QVBoxLayout, QTabWidget, QMenu, QTableView, QMdiSubWindow, \
QHeaderView
from gui.model_config import KontorModelConfig
from gui.table_model import KontorTableModel
@@ -47,19 +48,20 @@ class MetaDataWindow(QMdiSubWindow):
def generate_data_tab(self, table_name):
data_tab = QWidget()
table_config = KontorModelConfig(self._main_window.kontor_db, self, table_name)
model = KontorTableModel(table_config)
layout = QVBoxLayout()
self.data_views.append(model)
data_tab.setLayout(layout)
table_view = QTableView()
proxy_model = QSortFilterProxyModel()
proxy_model.setSourceModel(model)
# proxy_model = QSortFilterProxyModel()
# proxy_model.setSourceModel(model)
table_view.setSortingEnabled(True)
#table_view.setModel(model)
# header = table_view.horizontalHeader()
# header.setSectionResizeMode(QHeaderView.ResizeMode.ResizeToContents)
table_view.setModel(proxy_model)
layout.addLayout(table_config.get_filter_layout())
layout.addWidget(table_view)
model.refresh()
table_view.resizeColumnToContents(0)
return data_tab
+4 -4
View File
@@ -1,5 +1,5 @@
from PySide6.QtWidgets import QHBoxLayout, QCheckBox, QMdiSubWindow
from kontor_schema import KontorDB
from kontor_schema import KontorDB, ColumnEntry
class KontorModelConfig:
@@ -29,7 +29,7 @@ class KontorModelConfig:
# print(self.filter["download"].isChecked())
for column, filter_info in self.filter.items():
# print(column, filter_info)
if filter_info['widget'].isChecked():
if filter_info[ColumnEntry.COLUMN_WIDGET].isChecked():
_filters[column] = True
# print(f"{filter_rule=}")
# self.log.info("filters -> %s", _filters)
@@ -46,9 +46,9 @@ class KontorModelConfig:
filter_layout = QHBoxLayout()
for column, filter_info in self.filter.items():
filter_checkbox = QCheckBox()
filter_checkbox.setText(filter_info['label'])
filter_checkbox.setText(filter_info[ColumnEntry.COLUMN_LABEL])
filter_checkbox.checkStateChanged.connect(self.main_window.refresh)
self.filter[column]['widget'] = filter_checkbox
self.filter[column][ColumnEntry.COLUMN_WIDGET] = filter_checkbox
filter_layout.addWidget(filter_checkbox)
filter_layout.addStretch()
# self.log.info("get_filter_layout: %s", self.filter)
+54 -46
View File
@@ -1,11 +1,45 @@
from datetime import datetime
from typing import Any
from PySide6.QtCore import QAbstractTableModel, QModelIndex
from PySide6.QtGui import Qt
from PySide6.QtGui import Qt, QColor
from kontor_schema.database import ColumnEntry
from .model_config import KontorModelConfig
def get_display_value(value: Any, column_config: dict, window) -> str:
if isinstance(value, datetime):
return value.strftime("%Y-%m-%d %M:%M:%S")
if column_config[ColumnEntry.COLUMN_TYPE] == 'BOOLEAN':
if value == 1:
return window.tick
else:
return window.cross
if value is None:
return ""
# window.log.info(f"unknown type: {column_config[ColumnEntry.COLUMN_TYPE]} - {type(value)}")
return str(value)
def get_edit_value(value, column_config, window):
# window.log.info(f"edit value {value}")
return str(value)
def get_decoration_value(value: Any, column_config: dict, window):
if column_config[ColumnEntry.COLUMN_TYPE] == 'BOOLEAN':
if value == 1:
return window.tick
else:
return window.cross
def get_background_value(value: Any, column_config: dict, window):
if value is None:
return QColor('lightgrey')
class KontorTableModel(QAbstractTableModel):
def __init__(self, model_config: KontorModelConfig):
@@ -42,71 +76,45 @@ class KontorTableModel(QAbstractTableModel):
return len(self._data)
def headerData(self, col, orientation, role=Qt.ItemDataRole.DisplayRole):
# self.log.info(f"{self._config.header[col]}")
if orientation == Qt.Orientation.Horizontal and role == Qt.ItemDataRole.DisplayRole:
return self._config.header[col]['label']
return self._config.header[col][ColumnEntry.COLUMN_LABEL]
if orientation == Qt.Orientation.Vertical and role == Qt.ItemDataRole.DisplayRole:
return str(col+1)
return str(col + 1)
def data(self, index, role=Qt.ItemDataRole.DisplayRole):
if self._data is None:
return None
value = self._data[index.row()][index.column()]
# print('{}:: {}:: {}: {}'.format(index, role, value, type(value)))
if role == Qt.ItemDataRole.DisplayRole or role == Qt.ItemDataRole.EditRole:
if isinstance(value, datetime):
return value.strftime("%Y-%m-%d %M:%M:%S")
if isinstance(value, str):
return value
if isinstance(value, bytes):
if value == b'\x01':
return self._config.main_window.tick
else:
return self._config.main_window.cross
if isinstance(value, int):
# print('{}:: {}: {}'.format(index, value, type(value)))
if value == 1:
return self._config.main_window.tick
else:
return self._config.main_window.cross
if isinstance(value, bool):
if value:
return self._config.main_window.tick
else:
return self._config.main_window.cross
return str(value)
if role == Qt.ItemDataRole.DecorationRole:
if isinstance(value, bytes):
if value == b'\x01':
return self._config.main_window.tick
else:
return self._config.main_window.cross
if isinstance(value, int):
if value == 1:
return self._config.main_window.tick
else:
return self._config.main_window.cross
if isinstance(value, bool):
if value:
return self._config.main_window.tick
else:
return self._config.main_window.cross
# print('{}:: {}:: {}:: {}: {}'.format(index, role, self._config.header[index.column()][ColumnEntry.COLUMN_TYPE], value, type(value)))
match role:
case Qt.ItemDataRole.DisplayRole:
return get_display_value(value, self._config.header[index.column()], self._config.main_window)
case Qt.ItemDataRole.EditRole:
return get_edit_value(value, self._config.header[index.column()], self._config.main_window)
case Qt.ItemDataRole.DecorationRole:
return get_decoration_value(value, self._config.header[index.column()], self._config.main_window)
case Qt.ItemDataRole.BackgroundRole:
return get_background_value(value, self._config.header[index.column()], self._config.main_window)
def columnCount(self, index=QModelIndex()):
# self.log.info("rowCount %s: %d", self, len(self._config.header))
return len(self._config.header)
def setData(self, index, value, role: int) -> bool:
# print(index, role)
def setData(self, index, value, role=Qt.ItemDataRole.EditRole) -> bool:
# self._config.log.info(f"{index}: {role}")
if role == Qt.ItemDataRole.EditRole:
self._data[index.row()][index.column()] = value
# print(self._data[index.row()][index.column()])
# self._config.log.info(f"{index.row()}-{index.column()}: {self._data[index.row()][index.column()]}")
self.dataChanged.emit(index, index)
return True
if role == Qt.ItemDataRole.CheckStateRole:
# print("role == Qt.ItemDataRole.CheckStateRole")
print("role == Qt.ItemDataRole.CheckStateRole")
checked = value == Qt.CheckState.Checked
self._data[index.row()][index.column()] = checked
return False
def flags(self, index):
if self._config.header[index.column()][ColumnEntry.COLUMN_NAME] == 'id':
return Qt.ItemFlag.ItemIsEnabled
return Qt.ItemFlag.ItemIsSelectable | Qt.ItemFlag.ItemIsEnabled | Qt.ItemFlag.ItemIsEditable | Qt.ItemFlag.ItemIsUserTristate
+3 -350
View File
@@ -1,357 +1,10 @@
import json
import uuid
from datetime import datetime
from logging import Logger
from pathlib import Path
from enum import Enum, auto
from sqlalchemy import Engine, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import sessionmaker
from .base import Base, BaseMixin
from .admin import User, Token, Role, AuthorizationMatrix, ModuleData, MailAccount, Mail
from .bookshelf import Article, Book, Author, BookshelfPublisher, ArticleAuthor, BookAuthor
from .comic import Comic, Artist, Publisher, Issue, StoryArc, TradePaperback, Volume, ComicWork, WorkType
from .metadata import MetaDataTable, MetaDataColumn
from .tysc import Card, CardSet, Sport, Team, FieldPosition, Rooster, Player, Vendor
from .media import MediaFile, MediaArticle, MediaVideo
class KontorDB:
def __init__(self, db_engine: Engine, log: Logger):
self.engine = db_engine
self.registry = {}
self.init_registry()
self.log = log
def init_registry(self):
self.registry[Card.__tablename__] = Card
self.registry[CardSet.__tablename__] = CardSet
self.registry[Rooster.__tablename__] = Rooster
self.registry[Team.__tablename__] = Team
self.registry[FieldPosition.__tablename__] = FieldPosition
self.registry[Player.__tablename__] = Player
self.registry[Vendor.__tablename__] = Vendor
self.registry[Sport.__tablename__] = Sport
self.registry[Issue.__tablename__] = Issue
self.registry[TradePaperback.__tablename__] = TradePaperback
self.registry[StoryArc.__tablename__] = StoryArc
self.registry[Volume.__tablename__] = Volume
self.registry[ComicWork.__tablename__] = ComicWork
self.registry[Artist.__tablename__] = Artist
self.registry[Comic.__tablename__] = Comic
self.registry[Publisher.__tablename__] = Publisher
self.registry[WorkType.__tablename__] = WorkType
self.registry[ArticleAuthor.__tablename__] = ArticleAuthor
self.registry[BookAuthor.__tablename__] = BookAuthor
self.registry[BookshelfPublisher.__tablename__] = BookshelfPublisher
self.registry[Article.__tablename__] = Article
self.registry[Book.__tablename__] = Book
self.registry[Author.__tablename__] = Author
self.registry[MediaFile.__tablename__] = MediaFile
self.registry[MediaArticle.__tablename__] = MediaArticle
self.registry[MediaVideo.__tablename__] = MediaVideo
self.registry[MetaDataColumn.__tablename__] = MetaDataColumn
self.registry[MetaDataTable.__tablename__] = MetaDataTable
self.registry[AuthorizationMatrix.__tablename__] = AuthorizationMatrix
self.registry[Token.__tablename__] = Token
self.registry[User.__tablename__] = User
self.registry[Role.__tablename__] = Role
self.registry[ModuleData.__tablename__] = ModuleData
self.registry[MailAccount.__tablename__] = MailAccount
self.registry[Mail.__tablename__] = Mail
def get_table_names(self) -> list:
result = []
__session__ = sessionmaker(self.engine)
with __session__() as session:
tables = session.scalars(select(MetaDataTable)).all()
result = [table.table_name for table in tables]
return result
def get_column_meta_data(self, table_name: str, view_only=True) -> dict:
meta_data = {}
order = 0
__session__ = sessionmaker(self.engine)
with __session__() as session:
if view_only:
for (_, column) in (session.query(MetaDataTable, MetaDataColumn).
filter(MetaDataTable.id == MetaDataColumn.table_id).
filter(MetaDataTable.table_name == table_name).
filter(MetaDataColumn.is_shown == 1).all()):
# self.log.info("get_column_meta_data: %s %s %d", column.column_name, column.column_label, column.column_order)
meta_data[order] = {'column': column.column_name, 'label': column.column_label,
'order': column.column_order, 'ref_column': column.ref_column}
order += 1
else:
for (_, column) in (session.query(MetaDataTable, MetaDataColumn).
filter(MetaDataTable.id == MetaDataColumn.table_id).
filter(MetaDataTable.table_name == table_name).all()):
meta_data[order] = {
'column': column.column_name,
'order': column.column_order,
'ref_column': column.ref_column
}
order += 1
# self.log.info("get_column_meta_data: %s", meta_data)
return meta_data
def get_columns(self, table_name: str) -> dict:
columns = {}
order = 0
__session__ = sessionmaker(self.engine)
with __session__() as session:
for (_, column) in (session.query(MetaDataTable, MetaDataColumn).
filter(MetaDataTable.id == MetaDataColumn.table_id).
filter(MetaDataTable.table_name == table_name).all()):
columns[column.column_name] = {"order": column.column_order, "type": column.column_type}
return columns
def get_filters(self, table_name: str) -> dict:
_filter_map = {}
__session__ = sessionmaker(self.engine)
with __session__() as session:
for (_, column) in (session.query(MetaDataTable, MetaDataColumn).
filter(MetaDataTable.id == MetaDataColumn.table_id).
filter(MetaDataTable.table_name == table_name).
filter(MetaDataColumn.show_filter == 1).all()):
_filter_map[column.column_name] = {'label': column.filter_label, 'widget': None}
return _filter_map
def data(self, table_name: str, columns: dict, filters: dict) -> list:
data = []
__session__ = sessionmaker(self.engine)
table = self.registry[table_name]
with __session__() as session:
entries = []
if len(filters) == 0:
entries = session.scalars(select(table)).all()
else:
entries = session.scalars(select(table).filter_by(**filters)).all()
for entry in entries:
# self.log.info("data: %s", entry)
row = []
for order in columns.keys():
column_name = columns[order]['column']
ref_column = columns[order]['ref_column']
if str(column_name).endswith("_id"):
ref_table = column_name[:-3]
ref = getattr(entry, ref_table)
value = getattr(ref, ref_column)
row.append(value)
else:
row.append(getattr(entry, column_name))
data.append(row)
# self.log.info("data: %s", data)
return data
def export_db(self, export_type: str, export_file_name: str) -> dict:
results = {}
db = {}
export_table_list = self.get_table_names()
for table in export_table_list:
columns = self.get_column_meta_data(table, view_only=False)
if table in self.registry:
model = self.registry[table]
else:
self.log.info(f"table {table} is not registered")
continue
__session__ = sessionmaker(self.engine)
with __session__() as session:
rows = session.query(model).all()
entries = []
for row in rows:
# print(row)
entry = {}
for order in columns:
# print(columns[order])
column_name = columns[order]['column']
# print(f"get value {column_name} from {row} of table {table}")
try:
value = getattr(row, column_name)
if isinstance(value, datetime):
entry[column_name] = str(value)
else:
entry[column_name] = value
except AttributeError:
pass
entries.append(entry)
db[table] = entries
results[table] = len(entries)
match export_type:
case "JSON":
json_dump = json.dumps(db, indent=4)
with open(export_file_name, "w") as dump_file:
dump_file.write(json_dump)
case "YAML":
export_file = Path(export_file_name)
case "SQLite":
export_file = Path(export_file_name)
self.log.info("%d tables exported", len(results))
return results
def import_db(self, import_file_name: str, delete_first: bool) -> dict:
result = {}
if delete_first:
self.delete_entries()
import_file = Path(import_file_name)
if not import_file.exists():
self.log.info(f"File {import_file_name} does not exist. Do nothing.")
return result
match import_file.suffix:
case '.json':
print("read json file")
with open(import_file_name, 'r') as json_file:
json_load = json.load(json_file)
for table in json_load:
self.log.info(f"{table}: {len(json_load[table])}")
result[table] = self.import_table(table, json_load[table])
case '.yml':
print("read yaml file")
case '.yaml':
print("read yaml file")
case '.db':
print("read sqlite file")
return result
def import_table(self, table_name: str, items:list) -> dict:
result = {}
updated = []
added = []
remaining = []
existing_ids = self.get_ids(table_name)
self.log.info(f"found {len(existing_ids)} existing ids for table {table_name}")
for item in items:
current_id = item['id']
# print(f"import item: {item}")
found_item = None
__session__ = sessionmaker(self.engine)
with __session__() as session:
found_item = session.get(self.registry[table_name], current_id)
# print(f"found item: {found_item}")
if found_item is not None:
changed = self.update_entry(table_name, current_id, item)
updated.append(item)
if changed:
self.log.info(f"{current_id} has changed")
updated.append(item)
existing_ids.remove(current_id)
else:
try:
self.add_entry(table_name, item)
added.append(item)
except IntegrityError as error:
self.log.info(f"Could not add item, due to: {error.detail}")
if len(existing_ids) > 0:
print(f"remaining items: {existing_ids}")
remaining.extend(existing_ids)
result['updated'] = updated
result['added'] = added
result['remaining'] = remaining
return result
def get_ids(self, table_name: str) -> list:
existing_ids = []
__session__ = sessionmaker(self.engine)
with __session__() as session:
items = session.query(self.registry[table_name]).all()
for item in items:
existing_ids.append(getattr(item, 'id'))
return existing_ids
def add_entry(self, table_name: str, update_item: dict):
self.log.debug(f"add entry to table {table_name} with {update_item}")
__session__ = sessionmaker(self.engine)
with __session__() as session:
add_item = self.registry[table_name]()
for key in update_item.keys():
update_value = update_item[key]
setattr(add_item, key, update_value)
session.add(add_item)
session.commit()
def update_entry(self, table_name, current_id, update_item: dict) -> bool:
# self.log.info("update entry to table %s", table_name)
__session__ = sessionmaker(self.engine)
with __session__() as session:
existing_item = session.query(self.registry[table_name]).get(current_id)
changed = False
for key in update_item.keys():
update_value = update_item[key]
existing_value = getattr(existing_item, key)
if type(existing_value) is not type(update_value):
existing_value = str(existing_value)
if existing_value != update_value:
self.log.info(f"{key} has changed: {existing_value} != {update_value}")
setattr(existing_item, key, update_value)
session.commit()
changed = True
self.log.info(f"update {key} with {update_value}")
return changed
def add_link(self, link: str) -> dict:
result = {}
__session__ = sessionmaker(self.engine)
with __session__() as session:
media_file = MediaFile()
media_file.id = str(uuid.uuid4())
media_file.created_date = datetime.now()
media_file.last_modified_date = datetime.now()
media_file.version = 0
media_file.url = link
media_file.review = 1
media_file.should_download = 1
try:
session.add(media_file)
session.commit()
result['added'] = {'url': media_file.url, 'title': media_file.title, 'review': media_file.review, 'download': media_file.should_download}
except IntegrityError as error:
session.rollback()
result['error'] = error.orig
return result
def update_titles(self) -> dict:
update_list = {}
__session__ = sessionmaker(self.engine)
with __session__() as session:
links = session.query(MediaFile).filter(MediaFile.review == 1).all()
for link in links:
url = link.url
if url is None:
continue
link.update_title()
session.commit()
update_list[link.id] = link.title
return update_list
def get_download_list(self) -> list:
download_list = []
__session__ = sessionmaker(self.engine)
with __session__() as session:
links = session.query(MediaFile).filter(MediaFile.should_download == 1).all()
for link in links:
url = link.url
if url is None:
continue
download_list.append(link.id)
return download_list
def download_file(self, entry_id: str, download_dir = "/data/media", dl_tool = "yt-dlp") -> str:
__session__ = sessionmaker(self.engine)
with __session__() as session:
link = session.query(MediaFile).get(entry_id)
link.download_file(download_dir, dl_tool)
session.commit()
file_name = link.file_name
return file_name
def delete_entries(self):
for (table_name, table) in self.registry.items():
# self.log.info("delete entries from table %s", table_name)
__session__ = sessionmaker(self.engine)
with __session__() as session:
items = session.query(table).all()
for item in items:
session.delete(item)
session.commit()
from .base import Base
from .database import KontorDB, ColumnEntry
@@ -0,0 +1,388 @@
import json
import uuid
from datetime import datetime
from enum import Enum, auto
from logging import Logger
from pathlib import Path
from sqlalchemy import Engine, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import sessionmaker
from .tysc import Card, CardSet, Rooster, Team, FieldPosition, Player, Vendor, Sport
from .comic import Issue, TradePaperback, StoryArc, Volume, ComicWork, Artist, Comic, Publisher, WorkType
from .bookshelf import ArticleAuthor, BookAuthor, BookshelfPublisher, Article, Book, Author
from .admin import Mail, MailAccount, ModuleData, Role, User, Token, AuthorizationMatrix
from .metadata import MetaDataTable, MetaDataColumn
from .media import MediaVideo, MediaArticle, MediaFile
class ColumnEntry(Enum):
COLUMN_NAME = 'column'
COLUMN_LABEL = 'label'
COLUMN_ORDER = 'order'
COLUMN_REF_COLUMN = 'ref_column'
COLUMN_TYPE = 'type'
COLUMN_WIDGET = 'widget'
class StatusType(Enum):
UNKNOWN = auto()
FILE_NAME = auto()
FILE_ID = auto()
DUPLICATE = auto()
CLOUD_LINK = auto()
CLOUD_LINK_ID = auto()
class KontorDB:
def __init__(self, db_engine: Engine, log: Logger):
self.engine = db_engine
self.registry = {}
self.init_registry()
self.log = log
def init_registry(self):
self.registry[Card.__tablename__] = Card
self.registry[CardSet.__tablename__] = CardSet
self.registry[Rooster.__tablename__] = Rooster
self.registry[Team.__tablename__] = Team
self.registry[FieldPosition.__tablename__] = FieldPosition
self.registry[Player.__tablename__] = Player
self.registry[Vendor.__tablename__] = Vendor
self.registry[Sport.__tablename__] = Sport
self.registry[Issue.__tablename__] = Issue
self.registry[TradePaperback.__tablename__] = TradePaperback
self.registry[StoryArc.__tablename__] = StoryArc
self.registry[Volume.__tablename__] = Volume
self.registry[ComicWork.__tablename__] = ComicWork
self.registry[Artist.__tablename__] = Artist
self.registry[Comic.__tablename__] = Comic
self.registry[Publisher.__tablename__] = Publisher
self.registry[WorkType.__tablename__] = WorkType
self.registry[ArticleAuthor.__tablename__] = ArticleAuthor
self.registry[BookAuthor.__tablename__] = BookAuthor
self.registry[BookshelfPublisher.__tablename__] = BookshelfPublisher
self.registry[Article.__tablename__] = Article
self.registry[Book.__tablename__] = Book
self.registry[Author.__tablename__] = Author
self.registry[MediaFile.__tablename__] = MediaFile
self.registry[MediaArticle.__tablename__] = MediaArticle
self.registry[MediaVideo.__tablename__] = MediaVideo
self.registry[MetaDataColumn.__tablename__] = MetaDataColumn
self.registry[MetaDataTable.__tablename__] = MetaDataTable
self.registry[AuthorizationMatrix.__tablename__] = AuthorizationMatrix
self.registry[Token.__tablename__] = Token
self.registry[User.__tablename__] = User
self.registry[Role.__tablename__] = Role
self.registry[ModuleData.__tablename__] = ModuleData
self.registry[MailAccount.__tablename__] = MailAccount
self.registry[Mail.__tablename__] = Mail
def get_table_names(self) -> list:
result = []
__session__ = sessionmaker(self.engine)
with __session__() as session:
tables = session.scalars(select(MetaDataTable)).all()
result = [table.table_name for table in tables]
return result
def get_column_meta_data(self, table_name: str, view_only=True) -> dict:
meta_data = {}
order = 0
__session__ = sessionmaker(self.engine)
with __session__() as session:
if view_only:
for (_, column) in (session.query(MetaDataTable, MetaDataColumn).
filter(MetaDataTable.id == MetaDataColumn.table_id).
filter(MetaDataTable.table_name == table_name).
filter(MetaDataColumn.is_shown == 1).all()):
# self.log.info("get_column_meta_data: %s %s %d", column.column_name, column.column_label, column.column_order)
meta_data[order] = {
ColumnEntry.COLUMN_NAME: column.column_name,
ColumnEntry.COLUMN_LABEL: column.column_label,
ColumnEntry.COLUMN_ORDER: column.column_order,
ColumnEntry.COLUMN_REF_COLUMN: column.ref_column,
ColumnEntry.COLUMN_TYPE: column.column_type
}
order += 1
else:
for (_, column) in (session.query(MetaDataTable, MetaDataColumn).
filter(MetaDataTable.id == MetaDataColumn.table_id).
filter(MetaDataTable.table_name == table_name).all()):
meta_data[order] = {
ColumnEntry.COLUMN_NAME: column.column_name,
ColumnEntry.COLUMN_ORDER: column.column_order,
ColumnEntry.COLUMN_REF_COLUMN: column.ref_column,
ColumnEntry.COLUMN_TYPE: column.column_type
}
order += 1
# self.log.info("get_column_meta_data: %s", meta_data)
return meta_data
def get_columns(self, table_name: str) -> dict:
columns = {}
order = 0
__session__ = sessionmaker(self.engine)
with __session__() as session:
for (_, column) in (session.query(MetaDataTable, MetaDataColumn).
filter(MetaDataTable.id == MetaDataColumn.table_id).
filter(MetaDataTable.table_name == table_name).all()):
columns[column.column_name] = {
ColumnEntry.COLUMN_ORDER: column.column_order,
ColumnEntry.COLUMN_TYPE: column.column_type
}
return columns
def get_filters(self, table_name: str) -> dict:
_filter_map = {}
__session__ = sessionmaker(self.engine)
with __session__() as session:
for (_, column) in (session.query(MetaDataTable, MetaDataColumn).
filter(MetaDataTable.id == MetaDataColumn.table_id).
filter(MetaDataTable.table_name == table_name).
filter(MetaDataColumn.show_filter == 1).all()):
_filter_map[column.column_name] = {
ColumnEntry.COLUMN_LABEL: column.filter_label,
ColumnEntry.COLUMN_WIDGET: None
}
return _filter_map
def data(self, table_name: str, columns: dict, filters: dict) -> list:
data = []
__session__ = sessionmaker(self.engine)
table = self.registry[table_name]
with __session__() as session:
entries = []
if len(filters) == 0:
entries = session.scalars(select(table)).all()
else:
entries = session.scalars(select(table).filter_by(**filters)).all()
for entry in entries:
# self.log.info("data: %s", entry)
row = []
for order in columns.keys():
column_name = columns[order][ColumnEntry.COLUMN_NAME]
ref_column = columns[order][ColumnEntry.COLUMN_REF_COLUMN]
if str(column_name).endswith("_id"):
ref_table = column_name[:-3]
ref = getattr(entry, ref_table)
value = getattr(ref, ref_column)
row.append(value)
else:
row.append(getattr(entry, column_name))
data.append(row)
# self.log.info("data: %s", data)
return data
def export_db(self, export_type: str, export_file_name: str) -> dict:
results = {}
db = {}
export_table_list = self.get_table_names()
for table in export_table_list:
columns = self.get_column_meta_data(table, view_only=False)
if table in self.registry:
model = self.registry[table]
else:
self.log.info(f"table {table} is not registered")
continue
__session__ = sessionmaker(self.engine)
with __session__() as session:
rows = session.query(model).all()
entries = []
for row in rows:
# print(row)
entry = {}
for order in columns:
# print(columns[order])
column_name = columns[order][ColumnEntry.COLUMN_NAME]
# print(f"get value {column_name} from {row} of table {table}")
try:
value = getattr(row, column_name)
if isinstance(value, datetime):
entry[column_name] = str(value)
else:
entry[column_name] = value
except AttributeError:
pass
entries.append(entry)
db[table] = entries
results[table] = len(entries)
match export_type:
case "JSON":
json_dump = json.dumps(db, indent=4)
with open(export_file_name, "w") as dump_file:
dump_file.write(json_dump)
case "YAML":
export_file = Path(export_file_name)
case "SQLite":
export_file = Path(export_file_name)
self.log.info("%d tables exported", len(results))
return results
def import_db(self, import_file_name: str) -> dict:
result = {}
import_file = Path(import_file_name)
if not import_file.exists():
self.log.info(f"File {import_file_name} does not exist. Do nothing.")
return result
match import_file.suffix:
case '.json':
print("read json file")
with open(import_file_name, 'r') as json_file:
json_load = json.load(json_file)
for table in json_load:
self.log.info(f"{table}: {len(json_load[table])}")
result[table] = self.import_table(table, json_load[table])
case '.yml':
print("read yaml file")
case '.yaml':
print("read yaml file")
case '.db':
print("read sqlite file")
return result
def import_table(self, table_name: str, items:list) -> dict:
result = {}
updated = []
added = []
remaining = []
existing_ids = self.get_ids(table_name)
self.log.info(f"found {len(existing_ids)} existing ids for table {table_name}")
for item in items:
current_id = item['id']
# print(f"import item: {item}")
found_item = None
__session__ = sessionmaker(self.engine)
with __session__() as session:
found_item = session.get(self.registry[table_name], current_id)
# print(f"found item: {found_item}")
if found_item is not None:
changed = self.update_entry(table_name, current_id, item)
updated.append(item)
if changed:
self.log.info(f"{current_id} has changed")
updated.append(item)
existing_ids.remove(current_id)
else:
try:
self.add_entry(table_name, item)
added.append(item)
except IntegrityError as error:
self.log.info(f"Could not add item, due to: {error.detail}")
if len(existing_ids) > 0:
print(f"remaining items: {existing_ids}")
remaining.extend(existing_ids)
result['updated'] = updated
result['added'] = added
result['remaining'] = remaining
return result
def get_ids(self, table_name: str) -> list:
existing_ids = []
__session__ = sessionmaker(self.engine)
with __session__() as session:
items = session.query(self.registry[table_name]).all()
for item in items:
existing_ids.append(getattr(item, 'id'))
return existing_ids
def add_entry(self, table_name: str, update_item: dict):
self.log.debug(f"add entry to table {table_name} with {update_item}")
__session__ = sessionmaker(self.engine)
with __session__() as session:
add_item = self.registry[table_name]()
for key in update_item.keys():
update_value = update_item[key]
setattr(add_item, key, update_value)
session.add(add_item)
session.commit()
def update_entry(self, table_name, current_id, update_item: dict) -> bool:
# self.log.info("update entry to table %s", table_name)
__session__ = sessionmaker(self.engine)
with __session__() as session:
existing_item = session.query(self.registry[table_name]).get(current_id)
changed = False
for key in update_item.keys():
update_value = update_item[key]
existing_value = getattr(existing_item, key)
if type(existing_value) is not type(update_value):
existing_value = str(existing_value)
if existing_value != update_value:
self.log.info(f"{key} has changed: {existing_value} != {update_value}")
setattr(existing_item, key, update_value)
session.commit()
changed = True
self.log.info(f"update {key} with {update_value}")
return changed
def add_link(self, link: str) -> dict:
result = {}
__session__ = sessionmaker(self.engine)
with __session__() as session:
media_file = MediaFile()
media_file.id = str(uuid.uuid4())
media_file.created_date = datetime.now()
media_file.last_modified_date = datetime.now()
media_file.version = 0
media_file.url = link
media_file.review = 1
media_file.should_download = 1
try:
session.add(media_file)
session.commit()
result['added'] = {'url': media_file.url, 'title': media_file.title, 'review': media_file.review, 'download': media_file.should_download}
except IntegrityError as error:
session.rollback()
result['error'] = error.orig
return result
def update_titles(self) -> dict:
update_list = {}
__session__ = sessionmaker(self.engine)
with __session__() as session:
links = session.query(MediaFile).filter(MediaFile.review == 1).all()
for link in links:
url = link.url
if url is None:
continue
link.update_title()
session.commit()
update_list[link.id] = link.title
return update_list
def get_download_list(self) -> list:
download_list = []
__session__ = sessionmaker(self.engine)
with __session__() as session:
links = session.query(MediaFile).filter(MediaFile.should_download == 1).all()
for link in links:
url = link.url
if url is None:
continue
download_list.append(link.id)
return download_list
def download_file(self, entry_id: str, download_dir = "/data/media", dl_tool = "yt-dlp") -> str:
__session__ = sessionmaker(self.engine)
with __session__() as session:
link = session.query(MediaFile).get(entry_id)
link.download_file(download_dir, dl_tool)
session.commit()
file_name = link.file_name
return file_name
def delete_entries(self):
for (table_name, table) in self.registry.items():
# self.log.info("delete entries from table %s", table_name)
__session__ = sessionmaker(self.engine)
with __session__() as session:
items = session.query(table).all()
for item in items:
session.delete(item)
session.commit()
def check_files(self):
pass