separate cli and gui application in own python packages. provide database schema as python package.

This commit is contained in:
Thomas Peetz
2025-01-19 23:36:52 +01:00
parent f07c7b74ee
commit ada723dc48
113 changed files with 1224 additions and 1223 deletions
@@ -0,0 +1,378 @@
import json
import re
import subprocess
import uuid
from datetime import datetime
from pathlib import Path
from sqlalchemy import Engine
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import sessionmaker
from .bookshelf import Article, Book, Author, BookshelfPublisher, ArticleAuthor, BookAuthor
from .comic import Comic, Artist, Publisher, Issue, StoryArc, TradePaperback, Volume, ComicWork, WorkType
from .metadata import MetaDataTable, MetaDataColumn
from .tysc import Card, CardSet, Sport, Team, FieldPosition, Rooster, Player, Vendor
from .media import MediaFile, MediaArticle, MediaVideo
class KontorDB:
def __init__(self, db_engine: Engine, log):
self.engine = db_engine
self.log = log
self.registry = {}
self.init_registry()
def init_registry(self):
self.registry['card'] = Card
self.registry['card_set'] = CardSet
self.registry['sport'] = Sport
self.registry['team'] = Team
self.registry['field_position'] = FieldPosition
self.registry['rooster'] = Rooster
self.registry['player'] = Player
self.registry['vendor'] = Vendor
self.registry['artist'] = Artist
self.registry['publisher'] = Publisher
self.registry['comic'] = Comic
self.registry['issue'] = Issue
self.registry['story_arc'] = StoryArc
self.registry['trade_paperback'] = TradePaperback
self.registry['volume'] = Volume
self.registry['comic_work'] = ComicWork
self.registry['worktype'] = WorkType
self.registry['article'] = Article
self.registry['book'] = Book
self.registry['author'] = Author
self.registry['bookshelf_publisher'] = BookshelfPublisher
self.registry['article_author'] = ArticleAuthor
self.registry['book_author'] = BookAuthor
self.registry['media_file'] = MediaFile
self.registry['media_article'] = MediaArticle
self.registry['media_video'] = MediaVideo
self.registry['meta_data_table'] = MetaDataTable
self.registry[MetaDataColumn.__tablename__] = MetaDataColumn
def get_table_names(self) -> list:
result = []
__session__ = sessionmaker(self.engine)
with __session__() as session:
tables = session.query(MetaDataTable).all()
result = [table.table_name for table in tables]
return result
def get_column_meta_data(self, table_name: str, view_only=True) -> dict:
meta_data = {}
order = 0
__session__ = sessionmaker(self.engine)
with __session__() as session:
if view_only:
for (_, column) in (session.query(MetaDataTable, MetaDataColumn).
filter(MetaDataTable.id == MetaDataColumn.table_id).
filter(MetaDataTable.table_name == table_name).
filter(MetaDataColumn.is_shown == 1).all()):
meta_data[order] = {'column': column.column_name, 'label': column.column_label,
'order': column.column_order, 'ref_column': column.ref_column}
order += 1
else:
for (_, column) in (session.query(MetaDataTable, MetaDataColumn).
filter(MetaDataTable.id == MetaDataColumn.table_id).
filter(MetaDataTable.table_name == table_name).all()):
meta_data[order] = {
'column': column.column_name,
'order': column.column_order,
'ref_column': column.ref_column
}
order += 1
return meta_data
def get_filters(self, table_name):
_filter_map = {}
__session__ = sessionmaker(self.engine)
with __session__() as session:
for (_, column) in (session.query(MetaDataTable, MetaDataColumn).
filter(MetaDataTable.id == MetaDataColumn.table_id).
filter(MetaDataTable.table_name == table_name).
filter(MetaDataColumn.show_filter == 1).all()):
_filter_map[column.column_name] = {'label': column.filter_label, 'widget': None}
self.log.debug(f"retrieved {len(_filter_map)} filters: {_filter_map}")
return _filter_map
def data(self, table_name: str, columns: dict, filters: dict) -> list:
data = []
__session__ = sessionmaker(self.engine)
table = self.registry[table_name]
with __session__() as session:
entries = []
if len(filters) == 0:
entries = session.query(table).all()
else:
entries = session.query(table).filter_by(**filters)
for entry in entries:
row = []
for order in columns.keys():
column_name = columns[order]['column']
if str(column_name).endswith("_id"):
ref_table = column_name[:-3]
# print(f"{ref_table=}")
ref = getattr(entry, ref_table)
value = getattr(ref, "name")
# print(f"{value=}")
row.append(value)
else:
row.append(getattr(entry, column_name))
# print(repr(row))
data.append(row)
return data
def export_db(self, export_type: str, export_file_name: str):
self.log.info(f"export DB to {export_file_name} as {export_type}")
db = {}
export_table_list = self.get_table_names()
for table in export_table_list:
columns = self.get_column_meta_data(table, view_only=False)
if table in self.registry:
model = self.registry[table]
else:
print(f"table {table} is not registered")
continue
__session__ = sessionmaker(self.engine)
with __session__() as session:
rows = session.query(model).all()
entries = []
self.log.debug(f"found {len(rows)} entries")
self.log.debug(f"found {len(columns)} columns")
for row in rows:
# print(row)
entry = {}
for order in columns:
# print(columns[order])
column_name = columns[order]['column']
# print(f"get value {column_name} from {row} of table {table}")
try:
value = getattr(row, column_name)
if isinstance(value, datetime):
entry[column_name] = str(value)
else:
entry[column_name] = value
except AttributeError as error:
self.log.debug("could not get value")
entries.append(entry)
db[table] = entries
export_file = Path(export_file_name)
match export_type:
case "JSON":
json_dump = json.dumps(db, indent=4)
with open(export_file_name, "w") as dump_file:
dump_file.write(json_dump)
case "YAML":
export_file = Path(export_file_name)
case "SQLite":
export_file = Path(export_file_name)
case _:
self.log.debug("unknown export type")
if export_file.exists():
self.log.debug(f"{export_file} exists")
def import_db(self, import_file_name: str, dry_run: bool):
import_file = Path(import_file_name)
if not import_file.exists():
print(f"File {import_file_name} does not exist. Do nothing.")
return
self.log.debug(f"evaluate type from file extension: {import_file.suffix}")
match import_file.suffix:
case '.json':
print("read json file")
with open(import_file_name, 'r') as json_file:
json_load = json.load(json_file)
for table in json_load:
print(f"{table}: {len(json_load[table])}")
self.import_table(table, json_load[table], dry_run)
case '.yml':
print("read yaml file")
case '.yaml':
print("read yaml file")
case '.db':
print("read sqlite file")
def import_table(self, table_name, items, dry_run: bool):
existing_ids = self.get_ids(table_name)
for item in items:
# self.log.debug(f"{item}")
current_id = item['id']
found_item = None
__session__ = sessionmaker(self.engine)
with __session__() as session:
found_item = session.query(self.registry[table_name]).get(current_id)
self.log.debug(f"found: {found_item}")
if found_item is not None:
changed = self.update_entry(found_item, item, dry_run)
if changed:
print(f"{current_id} has changed")
existing_ids.remove(current_id)
else:
self.log.info("item to import not found in database, add new one...")
self.add_entry(table_name, item, session, dry_run)
if len(existing_ids) > 0:
print("remaining items")
def get_ids(self, table_name: str) -> list:
existing_ids = []
__session__ = sessionmaker(self.engine)
with __session__() as session:
items = session.query(self.registry[table_name]).all()
for item in items:
existing_ids.append(getattr(item, 'id'))
return existing_ids
def add_entry(self, table_name: str, update_item: dict, session, dry_run: bool):
add_item = self.registry[table_name]()
for key in update_item.keys():
update_value = update_item[key]
setattr(add_item, key, update_value)
if dry_run:
self.log.info(f"add item {type(add_item)} with id {update_item['id']}")
else:
session.add(add_item)
session.commit()
def update_entry(self, existing_item, update_item: dict, dry_run: bool) -> bool:
changed = False
for key in update_item.keys():
update_value = update_item[key]
existing_value = getattr(existing_item, key)
if type(existing_value) is not type(update_value):
# self.log.debug(f"compare {type(existing_value)} with {type(update_value)}")
existing_value = str(existing_value)
if existing_value != update_value:
print(f"{key} has changed: {existing_value} != {update_value}")
if not dry_run:
setattr(existing_item, key, update_value)
# existing_item[key] = update_value
changed = True
self.log.info(f"update {key} with {update_value}")
return changed
def add_link(self, link: str, dry_run: bool):
self.log.info(f"add link {link} to media_file")
__session__ = sessionmaker(self.engine)
with __session__() as session:
media_file = MediaFile()
media_file.id = str(uuid.uuid4())
media_file.created_date = datetime.now()
media_file.last_modified_date = datetime.now()
media_file.version = 0
media_file.url = link
media_file.review = 1
media_file.should_download = 1
try:
session.add(media_file)
session.commit()
self.log.info(f"entry {media_file} successfully added")
except IntegrityError as error:
session.rollback()
self.log.info(error.orig)
def update_title(self, dry_run=False):
self.log.info(f"get links to review of media_file")
__session__ = sessionmaker(self.engine)
with __session__() as session:
links = session.query(MediaFile).filter(MediaFile.review == 1).all()
self.log.info(f"try to update {len(links)} items")
for link in links:
url = link.url
if url is None:
self.log.info(f"url has not been set for {link.id}")
continue
self.log.info('get title for url {}'.format(url))
if dry_run:
continue
try:
r = requests.get(url)
soup = BeautifulSoup(r.content, "html.parser")
title = soup.title.string
except:
self.log.info("Sorry, could not retrieve title")
continue
self.log.info('ID {} has title {}'.format(link.id, title))
link.title = title
link.review = 0
session.commit()
def get_download_list(self) -> list[str]:
self.log.debug("get links marked as should_download")
download_list = []
__session__ = sessionmaker(self.engine)
with __session__() as session:
links = session.query(MediaFile).filter(MediaFile.should_download == 1).all()
for link in links:
url = link.url
if url is None:
self.log.info(f"url has not been set for {link.id}")
continue
download_list.append(url)
self.log.debug(f"found {len(download_list)} urls for downloads")
return download_list
def download_file(self, dry_run=False):
self.log.info(f"download marked urls of media_file")
__session__ = sessionmaker(self.engine)
with __session__() as session:
links = session.query(MediaFile).filter(MediaFile.should_download == 1).all()
self.log.info(f"try to download {len(links)} items")
for link in links:
url = link.url
if url is None:
self.log.info(f"url has not been set for {link.id}")
continue
if dry_run:
self.log.info(f"download {link.url} to {self.config.get('media', 'dir')}")
continue
filename = self.download_url(link)
if filename is None:
link.file_name = filename
link.should_download = 1
else:
download_file = Path(filename)
download_file.with_name(f"{link.id}{download_file.suffix}")
link.file_name = download_file.name
link.should_download = 0
link.cloud_link = download_file.absolute()
session.commit()
def parse_output(self, lines_list):
file_name = ""
for line in lines_list:
if 'has already been downloaded' in line:
end_len = len(' has already been downloaded')
file_name = line[11:-end_len]
self.log.info('found file: "%s"', file_name)
if 'Destination' in line:
line_len = len(line)
start_len = len('[download] Destination: ')
file_len = line_len - start_len
file_name = line[-file_len:]
self.log.info('new file: "%s"', file_name)
return file_name
def download_url(self, video_url):
media_dir = Path(self.config.get('media', 'dir'))
if not media_dir.exists():
media_dir = Path().absolute()
self.log.info(f"download video to {media_dir}")
result = subprocess.run([self.config.get('media', 'yt-dlp'), video_url], cwd=media_dir, capture_output=True,
text=True)
if result.returncode == 0:
output = result.stdout
output = re.sub(' +', ' ', output)
lines_list = output.splitlines()
return self.parse_output(lines_list)
else:
return None
def check_files(self):
media_dir = Path(self.config.get('media', 'dir'))
if not media_dir.exists():
return
self.log.info(f"check files in {media_dir}")
@@ -0,0 +1,20 @@
import uuid
from datetime import datetime
from sqlalchemy import Integer, func, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class BaseMixin:
# id = Column(String, primary_key=True)
id: Mapped[str] = mapped_column(primary_key=True, default=uuid.uuid4())
# created_date = Column(DateTime)
created_date: Mapped[datetime] = mapped_column(default=func.now())
# last_modified_date = Column(DateTime)
last_modified_date: Mapped[datetime] = mapped_column(default=func.now())
# version = Column(Integer)
version: Mapped[int] = mapped_column(default=0)
@@ -0,0 +1,51 @@
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy.orm import relationship
from .base import Base, BaseMixin
class Article(Base, BaseMixin):
__tablename__ = 'article'
title = Column(String(length=255), unique=True)
article_authors = relationship("ArticleAuthor")
class Author(Base, BaseMixin):
__tablename__ = 'author'
first_name = Column(String(255))
last_name = Column(String(255))
article_authors = relationship("ArticleAuthor")
book_authors = relationship("BookAuthor")
class BookshelfPublisher(Base, BaseMixin):
__tablename__ = 'bookshelf_publisher'
name = Column(String(length=255), unique=True)
books = relationship("Book")
class Book(Base, BaseMixin):
__tablename__ = 'book'
isbn = Column(String(255), unique=True)
title = Column(String(255))
year = Column(Integer, nullable=False)
publisher_id = Column(String, ForeignKey('bookshelf_publisher.id'), nullable=False)
publisher = relationship('BookshelfPublisher', back_populates="books")
book_authors = relationship("BookAuthor")
class ArticleAuthor(Base, BaseMixin):
__tablename__ = 'article_author'
article_id = Column(String, ForeignKey('article.id'), nullable=False)
article = relationship('Article', back_populates="article_authors")
author_id = Column(String, ForeignKey('author.id'), nullable=False)
author = relationship('Author', back_populates="article_authors")
class BookAuthor(Base, BaseMixin):
__tablename__ = 'book_author'
author_id = Column(String, ForeignKey('author.id'), nullable=False)
author = relationship('Author', back_populates="book_authors")
book_id = Column(String, ForeignKey('book.id'), nullable=False)
book = relationship('Book', back_populates="book_authors")
+100
View File
@@ -0,0 +1,100 @@
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy.orm import relationship
from .base import Base, BaseMixin
class Publisher(Base, BaseMixin):
__tablename__ = "publisher"
name = Column(String(length=255), unique=True)
comics = relationship("Comic")
def __repr__(self):
return f'Publisher({self.id} {self.name})'
def __str__(self):
return self.__repr__()
class Comic(Base, BaseMixin):
__tablename__ = 'comic'
title = Column(String(length=255), unique=True)
publisher_id = Column(String, ForeignKey('publisher.id'), nullable=False)
publisher = relationship("Publisher", back_populates="comics")
current_order = Column(BIT(1))
completed = Column(BIT(1))
issues = relationship("Issue")
story_arcs = relationship("StoryArc")
trade_paperbacks = relationship("TradePaperback")
volumes = relationship("Volume")
comic_works = relationship("ComicWork")
def __repr__(self):
return f'Comic({self.id} {self.version} {self.title} {self.publisher.name})'
def __str__(self):
return f'{self.title}({self.id})'
class Volume(Base, BaseMixin):
__tablename__ = "volume"
name = Column(String(length=255), nullable=False)
comic_id = Column(String, ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="volumes")
issues = relationship("Issue")
class TradePaperback(Base, BaseMixin):
__tablename__ = "trade_paperback"
name = Column(String(length=255), nullable=False)
issue_start = Column(Integer)
issue_end = Column(Integer)
comic_id = Column(String, ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="trade_paperbacks")
class StoryArc(Base, BaseMixin):
__tablename__ = "story_arc"
name = Column(String(length=255), nullable=False)
comic_id = Column(String, ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="story_arcs")
class Issue(Base, BaseMixin):
__tablename__ = "issue"
issue_number = Column(String(255))
in_stock = Column(BIT(1))
is_read = Column(BIT(1))
comic_id = Column(String, ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="issues")
volume_id = Column(String, ForeignKey("volume.id"), nullable=True)
volume = relationship("Volume", back_populates="issues")
class Artist(Base, BaseMixin):
__tablename__ = "artist"
name = Column(String(length=255), nullable=False)
comic_works = relationship("ComicWork")
class WorkType(Base, BaseMixin):
__tablename__ = "worktype"
name = Column(String(length=255), nullable=False, unique=True)
comic_works = relationship("ComicWork")
def __repr__(self):
return f'Worktype({self.id} {self.version} {self.name} {len(self.comic_works)})'
def __str__(self):
return f'{self.name}({self.id})'
class ComicWork(Base, BaseMixin):
__tablename__ = "comic_work"
comic_id = Column(String, ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="comic_works")
artist_id = Column(String, ForeignKey("artist.id"), nullable=False)
artist = relationship("Artist", back_populates="comic_works")
work_type_id = Column(String, ForeignKey("worktype.id"), nullable=False)
work_type = relationship("WorkType", back_populates="comic_works")
@@ -0,0 +1,39 @@
from sqlalchemy import Column, DateTime, Integer, String
from sqlalchemy.dialects.mysql import BIT
from .base import Base, BaseMixin
class MediaFile(Base, BaseMixin):
__tablename__ = 'media_file'
cloud_link = Column(String(255))
file_name = Column(String(255))
path = Column(String(255))
review = Column(BIT(1))
title = Column(String(255))
url = Column(String(255), unique=True)
should_download = Column(BIT(1))
def __repr__(self):
return f'MediaFile({self.id} {self.title} {self.title})'
def __str__(self):
return f'{self.title}({self.id})'
class MediaArticle(Base, BaseMixin):
__tablename__ = 'media_article'
review = Column(BIT(1))
title = Column(String(255))
url = Column(String(255), unique=True)
class MediaVideo(Base, BaseMixin):
__tablename__ = 'media_video'
cloud_link = Column(String(255))
file_name = Column(String(255))
path = Column(String(255))
review = Column(BIT(1))
title = Column(String(255))
url = Column(String(255), unique=True)
should_download = Column(BIT(1))
@@ -0,0 +1,42 @@
from sqlalchemy import Column, String, ForeignKey, DateTime, Integer, Boolean
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy.orm import relationship
from .base import Base, BaseMixin
class MetaDataTable(Base, BaseMixin):
__tablename__ = 'meta_data_table'
table_name = Column(String(255), unique=True)
table_columns = relationship("MetaDataColumn")
def __repr__(self):
return f'MetaDataTable({self.id} {self.table_name})'
def __str__(self):
return f'{self.table_name}({self.id})'
class MetaDataColumn(Base, BaseMixin):
__tablename__ = 'meta_data_column'
column_modifier = Column(String(255), nullable=True)
column_name = Column(String(255))
column_order = Column(Integer)
column_sync_name = Column(String(255))
column_type = Column(String(255))
table_id = Column(String, ForeignKey('meta_data_table.id'))
table = relationship("MetaDataTable", back_populates="table_columns")
column_label = Column(String(255))
filter_label = Column(String(255))
is_shown = Column(BIT(1))
show_filter = Column(BIT(1))
ref_column = Column(String, nullable=True)
def __repr__(self):
if self.column_name is None:
return f'MetaDataColumn({self.id} {self.table.table_name}.__)'
else:
return f'MetaDataColumn({self.id} {self.table.table_name}.{self.column_name})'
def __str__(self):
return f'{self.column_name}({self.id})'
+100
View File
@@ -0,0 +1,100 @@
from sqlalchemy import Column, DateTime, Integer, String, ForeignKey, UniqueConstraint
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy.orm import relationship
from .base import Base, BaseMixin
class Sport(Base, BaseMixin):
__tablename__ = "sport"
__table_args__ = (
UniqueConstraint("name"),
)
name = Column(String(255), nullable=False, index=True, unique=True)
teams = relationship("Team")
positions = relationship("FieldPosition")
class Team(Base, BaseMixin):
__tablename__ = "team"
name = Column(String(255), nullable=False, index=True, unique=True)
short_name = Column(String(255), nullable=False, )
sport_id = Column(String, ForeignKey("sport.id"), nullable=False)
sport = relationship("Sport", back_populates="teams")
roosters = relationship("Rooster")
class FieldPosition(Base, BaseMixin):
__tablename__ = "field_position"
__table_args__ = (
UniqueConstraint("name", "sport_id"),
UniqueConstraint("short_name", "sport_id"),
)
name = Column(String(255), nullable=False, index=True)
short_name = Column(String(255), nullable=False)
sport_id = Column(String, ForeignKey("sport.id"), nullable=False, index=True)
sport = relationship("Sport", back_populates="positions")
roosters = relationship("Rooster")
class Player(Base, BaseMixin):
__tablename__ = "player"
__table_args__ = (
UniqueConstraint("first_name", "last_name"),
)
first_name = Column(String(255), nullable=False, index=True)
last_name = Column(String(255), nullable=False, index=True)
roosters = relationship("Rooster")
def get_full_name(self) -> str:
return f"{self.last_name}, {self.first_name}"
class Rooster(Base, BaseMixin):
__tablename__ = "rooster"
__table_args__ = (
UniqueConstraint("year", "team_id", "player_id", "position_id"),
)
year = Column(Integer)
team_id = Column(String, ForeignKey("team.id"), nullable=False, index=True)
team = relationship("Team", back_populates="roosters")
player_id = Column(String, ForeignKey("player.id"), nullable=False, index=True)
player = relationship("Player", back_populates="roosters")
position_id = Column(String, ForeignKey("field_position.id"), nullable=False, index=True)
position = relationship("FieldPosition", back_populates="roosters")
cards = relationship("Card")
class Vendor(Base, BaseMixin):
__tablename__ = "vendor"
name = Column(String(255), nullable=False, unique=True, index=True)
card_sets = relationship("CardSet")
cards = relationship("Card")
class CardSet(Base, BaseMixin):
__tablename__ = "card_set"
__table_args__ = (
UniqueConstraint("name", "vendor_id"),
)
name = Column(String(255), index=True)
parallel_set = Column(BIT(1))
insert_set = Column(BIT(1))
vendor_id = Column(String, ForeignKey("vendor.id"), nullable=False, index=True)
vendor = relationship("Vendor", back_populates="card_sets")
cards = relationship("Card")
class Card(Base, BaseMixin):
__tablename__ = "card"
__table_args__ = (
UniqueConstraint("card_number", "year", "vendor_id", "card_set_id"),
)
card_number = Column(Integer, index=True)
year = Column(Integer, index=True)
card_set_id = Column(String, ForeignKey("card_set.id"), nullable=False)
card_set = relationship("CardSet", back_populates="cards")
rooster_id = Column(String, ForeignKey("rooster.id"), nullable=False)
rooster = relationship("Rooster", back_populates="cards")
vendor_id = Column(String, ForeignKey("vendor.id"), nullable=False)
vendor = relationship("Vendor", back_populates="cards")