Merge branch 'feature/9-evaluate-sqlmodel' into 'develop/0.1.0'

evaluate sqlmodel

See merge request tpeetz/kontor!13
This commit was merged in pull request #57.
This commit is contained in:
2025-04-13 16:16:10 +02:00
107 changed files with 2517 additions and 6 deletions
+4
View File
@@ -7,3 +7,7 @@ icons-shadowless/
springboot/.factorypath
java-ee/.settings
java-ee/.project
kontor-schema/env
kontor-schema/kontor_schema.egg-info
kontor-gui/.pdm-python
kontor-gui/dist
+15
View File
@@ -0,0 +1,15 @@
FROM python:3.11
WORKDIR /code
COPY ./requirements.txt /code/requirements.txt
COPY ./config/kontor-docker/logging-config.yaml /root/.config/kontor-docker/logging-config.yaml
COPY ./config/kontor-docker/database-config.yaml /root/.config/kontor-docker/database-config.yaml
RUN pip install --no-cache-dir --upgrade pip
RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt
COPY ./app /code/app
CMD ["fastapi", "run", "app/main.py", "--port", "8800"]
+31
View File
@@ -0,0 +1,31 @@
.PHONY: clean virtualenv test docker dist dist-upload
clean:
find . -name '*.py[co]' -delete
virtualenv:
virtualenv --prompt '|> kontor <| ' env
env/bin/pip install -r requirements.txt
env/bin/python setup.py develop
@echo
@echo "VirtualENV Setup Complete. Now run: source env/bin/activate"
@echo
test:
python -m pytest \
-v \
--cov=kontor \
--cov-report=term \
--cov-report=html:coverage-report \
tests/
docker: clean
docker build -t kontor-api:latest .
dist: clean
rm -rf dist/*
python setup.py sdist
python setup.py bdist_wheel
dist-upload:
twine upload dist/*
+19
View File
@@ -0,0 +1,19 @@
from typing import Union
from fastapi import FastAPI
from .routers import comic, media
app = FastAPI()
app.include_router(comic.router)
app.include_router(media.router)
@app.get("/")
def read_root():
return {"Hello": "World"}
@app.get("/items/{item_id}")
def read_item(item_id: int, q: Union[str, None] = None):
return {"item_id": item_id, "q": q}
View File
+10
View File
@@ -0,0 +1,10 @@
from uuid import UUID
from pydantic import BaseModel
class ComicResponse(BaseModel):
id: UUID
title: str
completed: bool
+16
View File
@@ -0,0 +1,16 @@
from uuid import UUID
from pydantic import BaseModel
class MediaFileResponse(BaseModel):
id: UUID
title: str | None = None
file_name: str | None = None
cloud_link: str | None = None
url: str
review: bool = False
should_download: bool = False
class Link(BaseModel):
url: str
View File
+28
View File
@@ -0,0 +1,28 @@
from uuid import UUID
from fastapi import APIRouter
from sqlalchemy import select
from app.models.comic import ComicResponse
from app.schema import Comic, __session__
router = APIRouter(
prefix="/comic",
tags=["comics"],
responses={404: {"description": "Not found"}},
)
@router.get("/comics")
def get_all_comics() -> list[ComicResponse]:
results: list[ComicResponse] = []
with __session__() as session:
comics = session.scalars(select(Comic)).all()
for comic in comics:
results.append(ComicResponse(id=comic.id, title=comic.title, completed=(comic.completed == 1)))
return results
@router.get("/comics/{comic_id}")
def get_comic(comic_id: UUID) -> ComicResponse:
return ComicResponse(id=comic_id, title="Comic2", completed=False)
+52
View File
@@ -0,0 +1,52 @@
from uuid import uuid4
import mariadb
from fastapi import APIRouter, status, HTTPException
from fastapi.openapi.utils import status_code_ranges
from sqlalchemy import select
from app.models.media import MediaFileResponse, Link
from app.schema import MediaFile, __session__
router = APIRouter(
prefix="/media",
tags=["media"]
)
@router.get("/files")
def get_files() -> list[MediaFileResponse]:
results: list[MediaFileResponse] = []
with __session__() as session:
files = session.scalars(select(MediaFile)).all()
for mediafile in files:
response = MediaFileResponse(id=mediafile.id,
title=mediafile.title,
file_name=mediafile.file_name,
cloud_link= mediafile.cloud_link,
url=str(mediafile.url),
review=(mediafile.review == 1),
shoud_download=(mediafile.should_download==1))
results.append(response)
return results
@router.post("/files", status_code=status.HTTP_201_CREATED)
def add_file(new_link: Link) -> MediaFileResponse:
print(new_link.url)
try:
with __session__() as session:
mediaFile: MediaFile = MediaFile()
setattr(mediaFile, "url", new_link.url)
setattr(mediaFile, "review", True)
setattr(mediaFile, "should_download", True)
session.add(mediaFile)
session.commit()
except :
raise HTTPException(status_code=409, detail="Link duplicate")
response = MediaFileResponse(id=uuid4(),
title=mediaFile.title,
file_name=mediaFile.file_name,
cloud_link=mediaFile.cloud_link,
url=new_link.url,
review=(mediaFile.review == 1),
shoud_download=(mediaFile.should_download==1))
return response
+38
View File
@@ -0,0 +1,38 @@
import logging
from pathlib import Path
import yaml
from platformdirs import PlatformDirs
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from .admin import User, Token, Role, AuthorizationMatrix, ModuleData, MailAccount, Mail
from .bookshelf import Article, Book, Author, BookshelfPublisher, ArticleAuthor, BookAuthor
from .comic import Comic, Artist, Publisher, Issue, StoryArc, TradePaperback, Volume, ComicWork, WorkType
from .metadata import MetaDataTable, MetaDataColumn
from .tysc import Card, CardSet, Sport, Team, FieldPosition, Rooster, Player, Vendor
from .media import MediaFile, MediaArticle, MediaVideo
from .base import Base
from .database import KontorDB, ColumnEntry
dirs = PlatformDirs('kontor-docker')
logging_config = Path(dirs.user_config_dir, 'logging-config.yaml')
with open(logging_config, 'rt') as f:
configDict = yaml.safe_load(f.read())
logging.config.dictConfig(configDict)
logger = logging.getLogger('development')
logger.setLevel(logging.DEBUG)
database_config = Path(dirs.user_config_dir, 'database-config.yaml')
with open(database_config, 'rt') as f:
db_config = yaml.safe_load(f.read())
print(db_config)
connect_string = ('mariadb+mariadbconnector://{}:{}@{}:{}/{}'.format(
db_config['mariadb']['user'],
db_config['mariadb']['password'],
db_config['mariadb']['host'],
db_config['mariadb']['port'],
db_config['mariadb']['database']
))
engine = create_engine(connect_string)
Base.metadata.create_all(bind=engine, checkfirst=True)
__session__ = sessionmaker(engine)
@@ -4,8 +4,9 @@ from datetime import datetime
from enum import Enum, auto
from logging import Logger
from pathlib import Path
from typing import Any
from sqlalchemy import Engine, select
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import sessionmaker
@@ -37,7 +38,7 @@ class StatusType(Enum):
class KontorDB:
def __init__(self, db_engine: Engine, log: Logger):
def __init__(self, db_engine: Any, log: Logger):
self.engine = db_engine
self.registry = {}
self.init_registry()
@@ -0,0 +1,12 @@
version: 1
sqlite:
file: /home/tpeetz/.sync/media/kontor.db
mariadb:
host: mariadb
port: 3306
user: kontor
password: kontor
database: kontor
@@ -0,0 +1,42 @@
version: 1
disable_existing_loggers: False
formatters:
simple:
format: '[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s'
datefmt: '%Y-%m-%d %H:%M:%S'
handlers:
console:
class: logging.StreamHandler
level: DEBUG
formatter: simple
stream: ext://sys.stdout
file:
class: logging.FileHandler
level: INFO
formatter: simple
filename: /root/kontor-api.log
mode: a
loggers:
development:
level: DEBUG
handlers: [console, file]
propagate: no
staging:
level: INFO
handlers: [console, file]
propagate: no
production:
level: WARNING
handlers: [file]
propagate: no
root:
level: DEBUG
handlers: [console]
+10
View File
@@ -0,0 +1,10 @@
mariadb
sqlalchemy
pathlib
platformdirs
pyyaml
beautifulsoup4
sqlmodel
requests
fastapi[standard]
View File
+31
View File
@@ -0,0 +1,31 @@
.PHONY: clean virtualenv test docker dist dist-upload
clean:
find . -name '*.py[co]' -delete
virtualenv:
virtualenv --prompt '|> kontor <| ' env
env/bin/pip install -r requirements.txt
env/bin/python setup.py develop
@echo
@echo "VirtualENV Setup Complete. Now run: source env/bin/activate"
@echo
test:
python -m pytest \
-v \
--cov=kontor \
--cov-report=term \
--cov-report=html:coverage-report \
tests/
docker: clean
docker build -t kontor-api:latest .
dist: clean
rm -rf dist/*
python setup.py sdist
python setup.py bdist_wheel
dist-upload:
twine upload dist/*
+1
View File
@@ -0,0 +1 @@
# kontor-gui
View File
@@ -11,12 +11,18 @@ from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from kontor_schema.base import Base
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from gui.main_window import MainWindow
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument('--verbose', '-v', action='count', default=0)
parser.add_argument('--config', '-c', default='kontor-docker')
args = parser.parse_args()
if __name__ == '__main__':
app = QApplication(sys.argv)
dirs = PlatformDirs("kontor")
dirs = PlatformDirs(args.config)
database_config = Path(dirs.user_config_dir, 'database-config.yaml')
with open(database_config, 'rt') as f:
db_config = yaml.safe_load(f.read())
+26
View File
@@ -0,0 +1,26 @@
[project]
name = "kontor-gui"
version = "0.1.0"
description = "Kontor GUI"
authors = [
{name = "Thomas Peetz", email = "thomas.peetz@thpeetz.de"},
]
dependencies = [
"platformdirs",
"pyyaml",
"PySide6",
]
requires-python = ">=3.11"
readme = "README.md"
license = {text = "MIT"}
[build-system]
requires = ["pdm-backend"]
build-backend = "pdm.backend"
[tool.pdm]
distribution = true
[dependency-groups]
dev = ["-e file:///${PROJECT_ROOT}/../kontor-schema#egg=kontor-schema"]
+98
View File
@@ -0,0 +1,98 @@
[app]
# title of your application
title = kontor
# project directory. the general assumption is that project_dir is the parent directory
# of input_file
project_dir = /home/tpeetz/projects/kontor/python/kontor-gui
# source file path
input_file = /home/tpeetz/projects/kontor/python/kontor-gui/main.py
# directory where exec is stored
exec_directory = .
# path to .pyproject project file
project_file =
# application icon
icon = /usr/local/lib/python3.11/dist-packages/PySide6/scripts/deploy_lib/pyside_icon.jpg
[python]
# python path
python_path = /home/tpeetz/projects/kontor/python/kontor-gui/env/bin/python
# python packages to install
packages = Nuitka==2.4.8
# buildozer = for deploying Android application
android_packages = buildozer==1.5.0,cython==0.29.33
[qt]
# comma separated path to qml files required
# normally all the qml files required by the project are added automatically
qml_files =
# excluded qml plugin binaries
excluded_qml_plugins =
# qt modules used. comma separated
modules = Widgets,DBus,Core,Gui
# qt plugins used by the application
plugins = platformthemes,imageformats,platforms,generic,iconengines,egldeviceintegrations,styles,xcbglintegrations,platforms/darwin,platforminputcontexts,accessiblebridge
[android]
# path to pyside wheel
wheel_pyside =
# path to shiboken wheel
wheel_shiboken =
# plugins to be copied to libs folder of the packaged application. comma separated
plugins =
[nuitka]
# usage description for permissions requested by the app as found in the info.plist file
# of the app bundle
# eg = extra_args = --show-modules --follow-stdlib
macos.permissions =
# mode of using nuitka. accepts standalone or onefile. default is onefile.
mode = onefile
# (str) specify any extra nuitka arguments
extra_args = --quiet --noinclude-qt-translations
[buildozer]
# build mode
# possible options = [release, debug]
# release creates an aab, while debug creates an apk
mode = debug
# contrains path to pyside6 and shiboken6 recipe dir
recipe_dir =
# path to extra qt android jars to be loaded by the application
jars_dir =
# if empty uses default ndk path downloaded by buildozer
ndk_path =
# if empty uses default sdk path downloaded by buildozer
sdk_path =
# other libraries to be loaded. comma separated.
# loaded at app startup
local_libs =
# architecture of deployed platform
# possible values = ["aarch64", "armv7a", "i686", "x86_64"]
arch =

Before

Width:  |  Height:  |  Size: 513 B

After

Width:  |  Height:  |  Size: 513 B

Before

Width:  |  Height:  |  Size: 524 B

After

Width:  |  Height:  |  Size: 524 B

Before

Width:  |  Height:  |  Size: 836 B

After

Width:  |  Height:  |  Size: 836 B

Before

Width:  |  Height:  |  Size: 544 B

After

Width:  |  Height:  |  Size: 544 B

Before

Width:  |  Height:  |  Size: 634 B

After

Width:  |  Height:  |  Size: 634 B

View File
@@ -0,0 +1,366 @@
import json
import re
import subprocess
import uuid
from datetime import datetime
from pathlib import Path
import requests
from bs4 import BeautifulSoup
from cement.core.config import ConfigHandler
from sqlalchemy import Engine
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import sessionmaker
from .bookshelf import Article, Book, Author, BookshelfPublisher, ArticleAuthor, BookAuthor
from .comic import Comic, Artist, Publisher, Issue, StoryArc, TradePaperback, Volume, ComicWork, WorkType
from .metadata import MetaDataTable, MetaDataColumn
from .tysc import Card, CardSet, Sport, Team, FieldPosition, Rooster, Player, Vendor
from .media import MediaFile, MediaArticle, MediaVideo
class KontorDB:
def __init__(self, db_engine: Engine, config: ConfigHandler, log):
self.engine = db_engine
self.config = config
self.log = log
self.registry = {}
self.init_registry()
def init_registry(self):
self.registry['card'] = Card
self.registry['card_set'] = CardSet
self.registry['sport'] = Sport
self.registry['team'] = Team
self.registry['field_position'] = FieldPosition
self.registry['rooster'] = Rooster
self.registry['player'] = Player
self.registry['vendor'] = Vendor
self.registry['artist'] = Artist
self.registry['publisher'] = Publisher
self.registry['comic'] = Comic
self.registry['issue'] = Issue
self.registry['story_arc'] = StoryArc
self.registry['trade_paperback'] = TradePaperback
self.registry['volume'] = Volume
self.registry['comic_work'] = ComicWork
self.registry['worktype'] = WorkType
self.registry['article'] = Article
self.registry['book'] = Book
self.registry['author'] = Author
self.registry['bookshelf_publisher'] = BookshelfPublisher
self.registry['article_author'] = ArticleAuthor
self.registry['book_author'] = BookAuthor
self.registry['media_file'] = MediaFile
self.registry['media_article'] = MediaArticle
self.registry['media_video'] = MediaVideo
self.registry['meta_data_table'] = MetaDataTable
self.registry[MetaDataColumn.__tablename__] = MetaDataColumn
def get_table_names(self) -> list:
result = []
__session__ = sessionmaker(self.engine)
with __session__() as session:
tables = session.query(MetaDataTable).all()
result = [table.table_name for table in tables]
return result
def get_column_meta_data(self, table_name: str, view_only=True) -> dict:
meta_data = {}
order = 0
__session__ = sessionmaker(self.engine)
with __session__() as session:
if view_only:
for (_, column) in (session.query(MetaDataTable, MetaDataColumn).
filter(MetaDataTable.id == MetaDataColumn.table_id).
filter(MetaDataTable.table_name == table_name).
filter(MetaDataColumn.is_shown == 1).all()):
meta_data[order] = {'column': column.column_name, 'label': column.column_label,
'order': column.column_order, 'ref_column': column.ref_column}
order += 1
else:
for (_, column) in (session.query(MetaDataTable, MetaDataColumn).
filter(MetaDataTable.id == MetaDataColumn.table_id).
filter(MetaDataTable.table_name == table_name).all()):
meta_data[order] = {
'column': column.column_name,
'order': column.column_order,
'ref_column': column.ref_column
}
order += 1
return meta_data
def get_filters(self, table_name):
_filter_map = {}
__session__ = sessionmaker(self.engine)
with __session__() as session:
for (_, column) in (session.query(MetaDataTable, MetaDataColumn).
filter(MetaDataTable.id == MetaDataColumn.table_id).
filter(MetaDataTable.table_name == table_name).
filter(MetaDataColumn.show_filter == 1).all()):
_filter_map[column.column_name] = {'label': column.filter_label, 'widget': None}
self.log.debug(f"retrieved {len(_filter_map)} filters: {_filter_map}")
return _filter_map
def data(self, table, columns: dict, filters) -> list:
data = []
__session__ = sessionmaker(self.engine)
with __session__() as session:
entries = []
if len(filters) == 0:
entries = session.query(table).all()
else:
entries = session.query(table).filter_by(**filters)
for entry in entries:
row = []
for order in columns.keys():
column_name = columns[order]['column']
if str(column_name).endswith("_id"):
ref_table = column_name[:-3]
# print(f"{ref_table=}")
ref = getattr(entry, ref_table)
value = getattr(ref, "name")
# print(f"{value=}")
row.append(value)
else:
row.append(getattr(entry, column_name))
# print(repr(row))
data.append(row)
return data
def export_db(self, export_type: str, export_file_name: str):
self.log.info(f"export DB to {export_file_name} as {export_type}")
db = {}
export_table_list = self.get_table_names()
for table in export_table_list:
columns = self.get_column_meta_data(table, view_only=False)
if table in self.registry:
model = self.registry[table]
else:
print(f"table {table} is not registered")
continue
__session__ = sessionmaker(self.engine)
with __session__() as session:
rows = session.query(model).all()
entries = []
self.log.debug(f"found {len(rows)} entries")
self.log.debug(f"found {len(columns)} columns")
for row in rows:
# print(row)
entry = {}
for order in columns:
# print(columns[order])
column_name = columns[order]['column']
# print(f"get value {column_name} from {row} of table {table}")
try:
value = getattr(row, column_name)
if isinstance(value, datetime):
entry[column_name] = str(value)
else:
entry[column_name] = value
except AttributeError as error:
self.log.debug("could not get value")
entries.append(entry)
db[table] = entries
export_file = Path(export_file_name)
match export_type:
case "JSON":
json_dump = json.dumps(db, indent=4)
with open(export_file_name, "w") as dump_file:
dump_file.write(json_dump)
case "YAML":
export_file = Path(export_file_name)
case "SQLite":
export_file = Path(export_file_name)
case _:
self.log.debug("unknown export type")
if export_file.exists():
self.log.debug(f"{export_file} exists")
def import_db(self, import_file_name: str, dry_run: bool):
import_file = Path(import_file_name)
if not import_file.exists():
print(f"File {import_file_name} does not exist. Do nothing.")
return
self.log.debug(f"evaluate type from file extension: {import_file.suffix}")
match import_file.suffix:
case '.json':
print("read json file")
with open(import_file_name, 'r') as json_file:
json_load = json.load(json_file)
for table in json_load:
print(f"{table}: {len(json_load[table])}")
self.import_table(table, json_load[table], dry_run)
case '.yml':
print("read yaml file")
case '.yaml':
print("read yaml file")
case '.db':
print("read sqlite file")
def import_table(self, table_name, items, dry_run: bool):
existing_ids = self.get_ids(table_name)
for item in items:
# self.log.debug(f"{item}")
current_id = item['id']
found_item = None
__session__ = sessionmaker(self.engine)
with __session__() as session:
found_item = session.query(self.registry[table_name]).get(current_id)
self.log.debug(f"found: {found_item}")
if found_item is not None:
changed = self.update_entry(found_item, item, dry_run)
if changed:
print(f"{current_id} has changed")
existing_ids.remove(current_id)
else:
self.log.info("item to import not found in database, add new one...")
self.add_entry(table_name, item, session, dry_run)
if len(existing_ids) > 0:
print("remaining items")
def get_ids(self, table_name: str) -> list:
existing_ids = []
__session__ = sessionmaker(self.engine)
with __session__() as session:
items = session.query(self.registry[table_name]).all()
for item in items:
existing_ids.append(getattr(item, 'id'))
return existing_ids
def add_entry(self, table_name: str, update_item: dict, session, dry_run: bool):
add_item = self.registry[table_name]()
for key in update_item.keys():
update_value = update_item[key]
setattr(add_item, key, update_value)
if dry_run:
self.log.info(f"add item {type(add_item)} with id {update_item['id']}")
else:
session.add(add_item)
session.commit()
def update_entry(self, existing_item, update_item: dict, dry_run: bool) -> bool:
changed = False
for key in update_item.keys():
update_value = update_item[key]
existing_value = getattr(existing_item, key)
if type(existing_value) is not type(update_value):
# self.log.debug(f"compare {type(existing_value)} with {type(update_value)}")
existing_value = str(existing_value)
if existing_value != update_value:
print(f"{key} has changed: {existing_value} != {update_value}")
if not dry_run:
setattr(existing_item, key, update_value)
# existing_item[key] = update_value
changed = True
self.log.info(f"update {key} with {update_value}")
return changed
def add_link(self, link: str, dry_run: bool):
self.log.info(f"add link {link} to media_file")
__session__ = sessionmaker(self.engine)
with __session__() as session:
media_file = MediaFile()
media_file.id = str(uuid.uuid4())
media_file.created_date = datetime.now()
media_file.last_modified_date = datetime.now()
media_file.version = 0
media_file.url = link
media_file.review = 1
media_file.should_download = 1
try:
session.add(media_file)
session.commit()
self.log.info(f"entry {media_file} successfully added")
except IntegrityError as error:
session.rollback()
self.log.info(error.orig)
def update_title(self, dry_run=False):
self.log.info(f"get links to review of media_file")
__session__ = sessionmaker(self.engine)
with __session__() as session:
links = session.query(MediaFile).filter(MediaFile.review == 1).all()
self.log.info(f"try to update {len(links)} items")
for link in links:
url = link.url
if url is None:
self.log.info(f"url has not been set for {link.id}")
continue
self.log.info('get title for url {}'.format(url))
if dry_run:
continue
try:
r = requests.get(url)
soup = BeautifulSoup(r.content, "html.parser")
title = soup.title.string
except:
self.log.info("Sorry, could not retrieve title")
continue
self.log.info('ID {} has title {}'.format(link.id, title))
link.title = title
link.review = 0
session.commit()
def download_file(self, dry_run=False):
self.log.info(f"download marked files of media_file")
__session__ = sessionmaker(self.engine)
with __session__() as session:
links = session.query(MediaFile).filter(MediaFile.should_download == 1).all()
self.log.info(f"try to download {len(links)} items")
for link in links:
url = link.url
if url is None:
self.log.info(f"url has not been set for {link.id}")
continue
if dry_run:
self.log.info(f"download {link.url} to {self.config.get('media', 'dir')}")
continue
filename = self.download_url(link)
if filename is None:
link.file_name = filename
link.should_download = 1
else:
download_file = Path(filename)
download_file.with_name(f"{link.id}{download_file.suffix}")
link.file_name = download_file.name
link.should_download = 0
link.cloud_link = download_file.absolute()
session.commit()
def parse_output(self, lines_list):
file_name = ""
for line in lines_list:
if 'has already been downloaded' in line:
end_len = len(' has already been downloaded')
file_name = line[11:-end_len]
self.log.info('found file: "%s"', file_name)
if 'Destination' in line:
line_len = len(line)
start_len = len('[download] Destination: ')
file_len = line_len - start_len
file_name = line[-file_len:]
self.log.info('new file: "%s"', file_name)
return file_name
def download_url(self, video_url):
media_dir = Path(self.config.get('media', 'dir'))
if not media_dir.exists():
media_dir = Path().absolute()
self.log.info(f"download video to {media_dir}")
result = subprocess.run([self.config.get('media', 'yt-dlp'), video_url], cwd=media_dir, capture_output=True,
text=True)
if result.returncode == 0:
output = result.stdout
output = re.sub(' +', ' ', output)
lines_list = output.splitlines()
return self.parse_output(lines_list)
else:
return None
def check_files(self):
media_dir = Path(self.config.get('media', 'dir'))
if not media_dir.exists():
return
self.log.info(f"check files in {media_dir}")
@@ -0,0 +1,20 @@
import uuid
from datetime import datetime
from sqlalchemy import Integer, func, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class BaseMixin:
# id = Column(String, primary_key=True)
id: Mapped[str] = mapped_column(primary_key=True, default=uuid.uuid4())
# created_date = Column(DateTime)
created_date: Mapped[datetime] = mapped_column(default=func.now())
# last_modified_date = Column(DateTime)
last_modified_date: Mapped[datetime] = mapped_column(default=func.now())
# version = Column(Integer)
version: Mapped[int] = mapped_column(default=0)
@@ -0,0 +1,39 @@
from sqlalchemy import Column, DateTime, Integer, String
from sqlalchemy.dialects.mysql import BIT
from .base import Base, BaseMixin
class MediaFile(Base, BaseMixin):
__tablename__ = 'media_file'
cloud_link = Column(String(255))
file_name = Column(String(255))
path = Column(String(255))
review = Column(BIT(1))
title = Column(String(255))
url = Column(String(255), unique=True)
should_download = Column(BIT(1))
def __repr__(self):
return f'MediaFile({self.id} {self.title} {self.title})'
def __str__(self):
return f'{self.title}({self.id})'
class MediaArticle(Base, BaseMixin):
__tablename__ = 'media_article'
review = Column(BIT(1))
title = Column(String(255))
url = Column(String(255), unique=True)
class MediaVideo(Base, BaseMixin):
__tablename__ = 'media_video'
cloud_link = Column(String(255))
file_name = Column(String(255))
path = Column(String(255))
review = Column(BIT(1))
title = Column(String(255))
url = Column(String(255), unique=True)
should_download = Column(BIT(1))
@@ -0,0 +1,42 @@
from sqlalchemy import Column, String, ForeignKey, DateTime, Integer, Boolean
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy.orm import relationship
from .base import Base, BaseMixin
class MetaDataTable(Base, BaseMixin):
__tablename__ = 'meta_data_table'
table_name = Column(String(255), unique=True)
table_columns = relationship("MetaDataColumn")
def __repr__(self):
return f'MetaDataTable({self.id} {self.table_name})'
def __str__(self):
return f'{self.table_name}({self.id})'
class MetaDataColumn(Base, BaseMixin):
__tablename__ = 'meta_data_column'
column_modifier = Column(String(255), nullable=True)
column_name = Column(String(255))
column_order = Column(Integer)
column_sync_name = Column(String(255))
column_type = Column(String(255))
table_id = Column(String, ForeignKey('meta_data_table.id'))
table = relationship("MetaDataTable", back_populates="table_columns")
column_label = Column(String(255))
filter_label = Column(String(255))
is_shown = Column(BIT(1))
show_filter = Column(BIT(1))
ref_column = Column(String, nullable=True)
def __repr__(self):
if self.column_name is None:
return f'MetaDataColumn({self.id} {self.table.table_name}.__)'
else:
return f'MetaDataColumn({self.id} {self.table.table_name}.{self.column_name})'
def __str__(self):
return f'{self.column_name}({self.id})'
Binary file not shown.
Binary file not shown.
Binary file not shown.
+51
View File
@@ -0,0 +1,51 @@
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy.orm import relationship
from .base import Base, BaseMixin
class Article(Base, BaseMixin):
__tablename__ = 'article'
title = Column(String(length=255), unique=True)
article_authors = relationship("ArticleAuthor")
class Author(Base, BaseMixin):
__tablename__ = 'author'
first_name = Column(String(255))
last_name = Column(String(255))
article_authors = relationship("ArticleAuthor")
book_authors = relationship("BookAuthor")
class BookshelfPublisher(Base, BaseMixin):
__tablename__ = 'bookshelf_publisher'
name = Column(String(length=255), unique=True)
books = relationship("Book")
class Book(Base, BaseMixin):
__tablename__ = 'book'
isbn = Column(String(255), unique=True)
title = Column(String(255))
year = Column(Integer, nullable=False)
publisher_id = Column(String, ForeignKey('bookshelf_publisher.id'), nullable=False)
publisher = relationship('BookshelfPublisher', back_populates="books")
book_authors = relationship("BookAuthor")
class ArticleAuthor(Base, BaseMixin):
__tablename__ = 'article_author'
article_id = Column(String, ForeignKey('article.id'), nullable=False)
article = relationship('Article', back_populates="article_authors")
author_id = Column(String, ForeignKey('author.id'), nullable=False)
author = relationship('Author', back_populates="article_authors")
class BookAuthor(Base, BaseMixin):
__tablename__ = 'book_author'
author_id = Column(String, ForeignKey('author.id'), nullable=False)
author = relationship('Author', back_populates="book_authors")
book_id = Column(String, ForeignKey('book.id'), nullable=False)
book = relationship('Book', back_populates="book_authors")
+100
View File
@@ -0,0 +1,100 @@
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy.orm import relationship
from .base import Base, BaseMixin
class Publisher(Base, BaseMixin):
__tablename__ = "publisher"
name = Column(String(length=255), unique=True)
comics = relationship("Comic")
def __repr__(self):
return f'Publisher({self.id} {self.name})'
def __str__(self):
return self.__repr__()
class Comic(Base, BaseMixin):
__tablename__ = 'comic'
title = Column(String(length=255), unique=True)
publisher_id = Column(String, ForeignKey('publisher.id'), nullable=False)
publisher = relationship("Publisher", back_populates="comics")
current_order = Column(BIT(1))
completed = Column(BIT(1))
issues = relationship("Issue")
story_arcs = relationship("StoryArc")
trade_paperbacks = relationship("TradePaperback")
volumes = relationship("Volume")
comic_works = relationship("ComicWork")
def __repr__(self):
return f'Comic({self.id} {self.version} {self.title} {self.publisher.name})'
def __str__(self):
return f'{self.title}({self.id})'
class Volume(Base, BaseMixin):
__tablename__ = "volume"
name = Column(String(length=255), nullable=False)
comic_id = Column(String, ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="volumes")
issues = relationship("Issue")
class TradePaperback(Base, BaseMixin):
__tablename__ = "trade_paperback"
name = Column(String(length=255), nullable=False)
issue_start = Column(Integer)
issue_end = Column(Integer)
comic_id = Column(String, ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="trade_paperbacks")
class StoryArc(Base, BaseMixin):
__tablename__ = "story_arc"
name = Column(String(length=255), nullable=False)
comic_id = Column(String, ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="story_arcs")
class Issue(Base, BaseMixin):
__tablename__ = "issue"
issue_number = Column(String(255))
in_stock = Column(BIT(1))
is_read = Column(BIT(1))
comic_id = Column(String, ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="issues")
volume_id = Column(String, ForeignKey("volume.id"), nullable=True)
volume = relationship("Volume", back_populates="issues")
class Artist(Base, BaseMixin):
__tablename__ = "artist"
name = Column(String(length=255), nullable=False)
comic_works = relationship("ComicWork")
class WorkType(Base, BaseMixin):
__tablename__ = "worktype"
name = Column(String(length=255), nullable=False, unique=True)
comic_works = relationship("ComicWork")
def __repr__(self):
return f'Worktype({self.id} {self.version} {self.name} {len(self.comic_works)})'
def __str__(self):
return f'{self.name}({self.id})'
class ComicWork(Base, BaseMixin):
__tablename__ = "comic_work"
comic_id = Column(String, ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="comic_works")
artist_id = Column(String, ForeignKey("artist.id"), nullable=False)
artist = relationship("Artist", back_populates="comic_works")
work_type_id = Column(String, ForeignKey("worktype.id"), nullable=False)
work_type = relationship("WorkType", back_populates="comic_works")
+100
View File
@@ -0,0 +1,100 @@
from sqlalchemy import Column, DateTime, Integer, String, ForeignKey, UniqueConstraint
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy.orm import relationship
from .base import Base, BaseMixin
class Sport(Base, BaseMixin):
__tablename__ = "sport"
__table_args__ = (
UniqueConstraint("name"),
)
name = Column(String(255), nullable=False, index=True, unique=True)
teams = relationship("Team")
positions = relationship("FieldPosition")
class Team(Base, BaseMixin):
__tablename__ = "team"
name = Column(String(255), nullable=False, index=True, unique=True)
short_name = Column(String(255), nullable=False, )
sport_id = Column(String, ForeignKey("sport.id"), nullable=False)
sport = relationship("Sport", back_populates="teams")
roosters = relationship("Rooster")
class FieldPosition(Base, BaseMixin):
__tablename__ = "field_position"
__table_args__ = (
UniqueConstraint("name", "sport_id"),
UniqueConstraint("short_name", "sport_id"),
)
name = Column(String(255), nullable=False, index=True)
short_name = Column(String(255), nullable=False)
sport_id = Column(String, ForeignKey("sport.id"), nullable=False, index=True)
sport = relationship("Sport", back_populates="positions")
roosters = relationship("Rooster")
class Player(Base, BaseMixin):
__tablename__ = "player"
__table_args__ = (
UniqueConstraint("first_name", "last_name"),
)
first_name = Column(String(255), nullable=False, index=True)
last_name = Column(String(255), nullable=False, index=True)
roosters = relationship("Rooster")
def get_full_name(self) -> str:
return f"{self.last_name}, {self.first_name}"
class Rooster(Base, BaseMixin):
__tablename__ = "rooster"
__table_args__ = (
UniqueConstraint("year", "team_id", "player_id", "position_id"),
)
year = Column(Integer)
team_id = Column(String, ForeignKey("team.id"), nullable=False, index=True)
team = relationship("Team", back_populates="roosters")
player_id = Column(String, ForeignKey("player.id"), nullable=False, index=True)
player = relationship("Player", back_populates="roosters")
position_id = Column(String, ForeignKey("field_position.id"), nullable=False, index=True)
position = relationship("FieldPosition", back_populates="roosters")
cards = relationship("Card")
class Vendor(Base, BaseMixin):
__tablename__ = "vendor"
name = Column(String(255), nullable=False, unique=True, index=True)
card_sets = relationship("CardSet")
cards = relationship("Card")
class CardSet(Base, BaseMixin):
__tablename__ = "card_set"
__table_args__ = (
UniqueConstraint("name", "vendor_id"),
)
name = Column(String(255), index=True)
parallel_set = Column(BIT(1))
insert_set = Column(BIT(1))
vendor_id = Column(String, ForeignKey("vendor.id"), nullable=False, index=True)
vendor = relationship("Vendor", back_populates="card_sets")
cards = relationship("Card")
class Card(Base, BaseMixin):
__tablename__ = "card"
__table_args__ = (
UniqueConstraint("card_number", "year", "vendor_id", "card_set_id"),
)
card_number = Column(Integer, index=True)
year = Column(Integer, index=True)
card_set_id = Column(String, ForeignKey("card_set.id"), nullable=False)
card_set = relationship("CardSet", back_populates="cards")
rooster_id = Column(String, ForeignKey("rooster.id"), nullable=False)
rooster = relationship("Rooster", back_populates="cards")
vendor_id = Column(String, ForeignKey("vendor.id"), nullable=False)
vendor = relationship("Vendor", back_populates="cards")
+1 -1
View File
@@ -15,7 +15,7 @@ CONFIG = init_defaults('kontor', 'mariadb', 'media')
CONFIG['mariadb']['user'] = 'kontor'
CONFIG['mariadb']['password'] = 'kontor'
CONFIG['mariadb']['host'] = '127.0.0.1'
CONFIG['mariadb']['port'] = '3306'
CONFIG['mariadb']['port'] = '3316'
CONFIG['mariadb']['database'] = 'kontor'
CONFIG['media']['yt-dlp'] = '/home/tpeetz/bin/yt-dlp'
CONFIG['media']['dir'] = '/data/media'
+116
View File
@@ -0,0 +1,116 @@
"""
Setup database connections
"""
import sqlite3
import mariadb
import logging.config
from platformdirs import PlatformDirs
from pathlib import Path
import yaml
def get_database_cursors(log):
dirs = PlatformDirs("kontor")
database_config = Path(dirs.user_config_dir, 'database-config.yaml')
with open(database_config, 'rt') as f:
db_config = yaml.safe_load(f.read())
sqlite_db = db_config["sqlite"]["file"]
log.info('using SQLite3 database {}'.format(sqlite_db))
sqlite_conn = sqlite3.connect(sqlite_db, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
mariadb_conn = mariadb.connect(
host=db_config['mariadb']['host'],
port=db_config['mariadb']['port'],
user=db_config['mariadb']['user'],
password=db_config['mariadb']['password'],
database=db_config['mariadb']['database']
)
return sqlite_conn, mariadb_conn
def create_tables(sqlite_conn, logger, recreate_db, scripts):
logger.info('create_tables')
for table_id in scripts:
create_statement = scripts[table_id]['create']
drop_statement = scripts[table_id]['drop']
logger.debug(create_statement)
cursor = sqlite_conn.cursor()
if recreate_db:
logger.debug(drop_statement)
cursor.execute(drop_statement)
cursor.execute(create_statement)
def get_logger(level):
dirs = PlatformDirs("kontor")
logging_config = Path(dirs.user_config_dir, 'logging-config.yaml')
with open(logging_config, 'rt') as f:
config = yaml.safe_load(f.read())
logging.config.dictConfig(config)
logger = logging.getLogger('development')
if level is not None:
match level:
case 0:
logger.setLevel(logging.INFO)
case 1:
logger.setLevel(logging.DEBUG)
case _:
logger.setLevel(logging.CRITICAL)
return logger
def get_meta_data(mariadb_conn):
mariadb_cursor = mariadb_conn.cursor()
select_statement = "SELECT id, table_name FROM meta_data_table"
mariadb_cursor.execute(select_statement)
rows = mariadb_cursor.fetchall()
meta_data = {}
for (identifier, table_name) in rows:
table_data = {"name": table_name}
mariadb_cursor.execute("SELECT column_name, column_sync_name, column_type, column_modifier, column_order FROM meta_data_column WHERE table_id=?", (identifier, ))
column_rows = mariadb_cursor.fetchall()
column_list = []
for (column_name, column_sync_name, column_type, column_modifier, column_order) in column_rows:
column_data = {"column_name": column_name, "column_sync_name": column_sync_name, "column_type": column_type,
"column_modifier": column_modifier, "column_order": column_order}
column_list.append(column_data)
# logger.info(column_list)
table_data["columns"] = column_list
meta_data[identifier] = table_data
return meta_data
def get_scripts(meta_data, logger):
scripts_map = {}
for table_id in meta_data:
table_scripts = {}
m_columns = []
s_columns = []
columns = []
for column_data in meta_data[table_id]["columns"]:
column_line = "{} {}".format(column_data["column_sync_name"], column_data["column_type"])
if column_data["column_modifier"]:
column_line += " " + column_data["column_modifier"]
columns.append(column_line)
m_columns.append(column_data['column_name'])
s_columns.append(column_data['column_sync_name'])
table_name = meta_data[table_id]["name"]
create_statement = "CREATE TABLE IF NOT EXISTS {} ({});".format(table_name, ", ".join(columns))
drop_statement = 'DROP TABLE IF EXISTS {}'.format(table_name)
select_mariadb_statement = 'SELECT {} FROM {}'.format(', '.join(m_columns), table_name)
select_sqlite_statement = 'SELECT {} FROM {}'.format(', '.join(s_columns), table_name)
insert_sqlite_statement = 'INSERT INTO {}({}) VALUES({})'.format(table_name, ', '.join(s_columns), ', '.join(['?']*len(s_columns)))
insert_mariadb_statement = 'INSERT INTO {}({}) VALUES({})'.format(table_name, ', '.join(m_columns), ', '.join(['?']*len(m_columns)))
truncate_mariadb_statement = 'TRUNCATE {}'.format(table_name)
#logger.debug(create_statement)
#logger.debug(select_mariadb_statement)
table_scripts["create"] = create_statement
table_scripts["drop"] = drop_statement
table_scripts["select_mariadb"] = select_mariadb_statement
table_scripts["select_sqlite"] = select_sqlite_statement
table_scripts["insert_sqlite"] = insert_sqlite_statement
table_scripts["insert_mariadb"] = insert_mariadb_statement
table_scripts["truncate_mariadb"] = truncate_mariadb_statement
table_scripts["count"] = "SELECT COUNT(*) FROM {}".format(table_name)
table_scripts["name"] = table_name
scripts_map[table_id] = table_scripts
return scripts_map
@@ -10,7 +10,7 @@ from platformdirs import PlatformDirs
from pathlib import Path
from schema import Base, KontorDB
from setup import get_database_cursors, create_tables, get_logger, get_scripts, get_meta_data
from config import get_database_cursors, create_tables, get_logger, get_scripts, get_meta_data
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument('--recreate-db', action='store_true')
+64
View File
@@ -0,0 +1,64 @@
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from pathlib import Path
import logging.config
import yaml
from platformdirs import PlatformDirs
from sqlalchemy import create_engine, select
from sqlalchemy.orm import sessionmaker
from schema import Comic, Publisher, Base
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument('--recreate-db', action='store_true')
parser.add_argument('--verbose', '-v', action='count', default=0)
parser.add_argument('--file', '-f', default='~/data.json')
parser.add_argument('--config', '-c', default='kontor-docker')
args = parser.parse_args()
def get_logger(level: int, config: str):
dirs = PlatformDirs(config)
logging_config = Path(dirs.user_config_dir, 'logging-config.yaml')
with open(logging_config, 'rt') as f:
configDict = yaml.safe_load(f.read())
logging.config.dictConfig(configDict)
logger = logging.getLogger('development')
if level is not None:
match level:
case 0:
logger.setLevel(logging.INFO)
case 1:
logger.setLevel(logging.DEBUG)
case _:
logger.setLevel(logging.CRITICAL)
return logger
if __name__ == '__main__':
log = get_logger(args.verbose, args.config)
log.info('kontor started')
dirs = PlatformDirs(args.config)
database_config = Path(dirs.user_config_dir, 'database-config.yaml')
with open(database_config, 'rt') as f:
db_config = yaml.safe_load(f.read())
print(db_config)
connect_string = ('mariadb+mariadbconnector://{}:{}@{}:{}/{}'.format(
db_config['mariadb']['user'],
db_config['mariadb']['password'],
db_config['mariadb']['host'],
db_config['mariadb']['port'],
db_config['mariadb']['database']
))
engine = create_engine(connect_string)
Base.metadata.create_all(bind=engine, checkfirst=True)
__session__ = sessionmaker(engine)
with __session__() as session:
comics = session.scalars(select(Comic)).all()
for comic in comics:
print(comic)
publishers = session.scalars(select(Publisher)).all()
for publisher in publishers:
print(publisher)
print(len(publisher.comics))
log.info('kontor finished')
+33
View File
@@ -0,0 +1,33 @@
[build-system]
requires = ["setuptools>=67", "wheel", "setuptools-git-versioning>=2.0,<3"]
build-backend = "setuptools.build_meta"
[project]
name = "kontor-cli"
version = "0.1.0"
dependencies = [
"mariadb",
"sqlmodel",
"pathlib",
"platformdirs",
"pyyaml",
"beautifulsoup4",
]
requires-python = ">=3.10"
authors = [
{name = "Thomas Peetz", email = "thomas.peetz@thpeetz.de"}
]
maintainers = [
{name = "Thomas Peetz", email = "thomas.peetz@thpeetz.de"}
]
description = "CLI for Kontor application"
readme = "README.md"
classifiers = [
"Development Status :: 4 - Beta",
"Programming Language :: Python"
]
[project-scripts]
kontor = "kontor::main"
[tool.setuptools.packages.find]
where = ["."]
+10
View File
@@ -0,0 +1,10 @@
mariadb
sqlalchemy
pathlib
platformdirs
pyyaml
beautifulsoup4
sqlmodel
requests
fastapi[standard]
+78
View File
@@ -0,0 +1,78 @@
from datetime import datetime
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy.orm import relationship, mapped_column, Mapped
from .base import Base, BaseMixin
class User(Base, BaseMixin):
__tablename__ = 'user'
first_name = Column(String(255))
last_name = Column(String(255))
user_name = Column(String(255), nullable=False)
email = Column(String(255))
password = Column(String(255))
enabled = Column(BIT(1))
matrix = relationship("AuthorizationMatrix")
tokens = relationship("Token")
def get_full_name(self) -> str:
full_name = ""
if self.first_name is not None:
full_name += self.first_name
if self.last_name is not None:
if len(full_name) > 0:
full_name += " "
full_name += self.last_name
return full_name
class Token(Base, BaseMixin):
__tablename__ = "token"
token = Column(String(255), nullable=False, unique=True)
name = Column(String(255))
last_used_date: Mapped[datetime] = mapped_column()
enabled = Column(BIT(1))
user_id = Column(String(255), ForeignKey("user.id"), nullable=False)
user = relationship("User", back_populates="tokens")
class Role(Base, BaseMixin):
__tablename__ = "role"
name = Column(String(255), nullable=False)
matrix = relationship("AuthorizationMatrix")
class AuthorizationMatrix(Base, BaseMixin):
__tablename__ = "authorization_matrix"
user_id = Column(String, ForeignKey("user.id"), nullable=False)
user = relationship("User", back_populates="matrix")
role_id = Column(String, ForeignKey("role.id"), nullable=False)
role = relationship("Role", back_populates="matrix")
class ModuleData(Base, BaseMixin):
__tablename__ = "module_data"
module_name = Column(String(255), nullable=False)
import_data = Column(BIT(1))
class MailAccount(Base, BaseMixin):
__tablename__ = "mail_account"
host = Column(String(255))
port = Column(Integer)
protocol = Column(String(255))
user_name = Column(String(255))
password = Column(String(255))
start_tls = Column(BIT(1))
class Mail(Base, BaseMixin):
__tablename__ = "mail"
folder: Mapped[str] = mapped_column()
subject: Mapped[str] = mapped_column()
body: Mapped[str] = mapped_column()
sent_date: Mapped[datetime] = mapped_column()
received_date: Mapped[datetime] = mapped_column()
+31
View File
@@ -0,0 +1,31 @@
import uuid
from datetime import datetime
from sqlalchemy import func, Column, String
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class BaseMixin:
id = Column(String(255), primary_key=True, default=uuid.uuid4())
# id: Mapped[str] = mapped_column(primary_key=True, default=uuid.uuid4())
# created_date = Column(DateTime)
created_date: Mapped[datetime] = mapped_column(default=func.now())
# last_modified_date = Column(DateTime)
last_modified_date: Mapped[datetime] = mapped_column(default=func.now())
# version = Column(Integer)
version: Mapped[int] = mapped_column(default=0)
class BaseVideoMixin:
cloud_link = Column(String(255))
file_name = Column(String(255))
path = Column(String(255))
review = Column(BIT(1))
title = Column(String(255))
url = Column(String(255), unique=True)
should_download = Column(BIT(1))
+51
View File
@@ -0,0 +1,51 @@
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy.orm import relationship
from .base import Base, BaseMixin
class Article(Base, BaseMixin):
__tablename__ = 'article'
title = Column(String(length=255), unique=True)
article_authors = relationship("ArticleAuthor")
class Author(Base, BaseMixin):
__tablename__ = 'author'
first_name = Column(String(255))
last_name = Column(String(255))
article_authors = relationship("ArticleAuthor")
book_authors = relationship("BookAuthor")
class BookshelfPublisher(Base, BaseMixin):
__tablename__ = 'bookshelf_publisher'
name = Column(String(length=255), unique=True)
books = relationship("Book")
class Book(Base, BaseMixin):
__tablename__ = 'book'
isbn = Column(String(255), unique=True)
title = Column(String(255))
year = Column(Integer, nullable=False)
publisher_id = Column(String, ForeignKey('bookshelf_publisher.id'), nullable=False)
publisher = relationship('BookshelfPublisher', back_populates="books")
book_authors = relationship("BookAuthor")
class ArticleAuthor(Base, BaseMixin):
__tablename__ = 'article_author'
article_id = Column(String, ForeignKey('article.id'), nullable=False)
article = relationship('Article', back_populates="article_authors")
author_id = Column(String, ForeignKey('author.id'), nullable=False)
author = relationship('Author', back_populates="article_authors")
class BookAuthor(Base, BaseMixin):
__tablename__ = 'book_author'
author_id = Column(String, ForeignKey('author.id'), nullable=False)
author = relationship('Author', back_populates="book_authors")
book_id = Column(String, ForeignKey('book.id'), nullable=False)
book = relationship('Book', back_populates="book_authors")
+100
View File
@@ -0,0 +1,100 @@
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy.orm import relationship
from .base import Base, BaseMixin
class Publisher(Base, BaseMixin):
__tablename__ = "publisher"
name = Column(String(length=255), unique=True)
comics = relationship("Comic")
def __repr__(self):
return f'Publisher({self.id} {self.name})'
def __str__(self):
return self.__repr__()
class Comic(Base, BaseMixin):
__tablename__ = 'comic'
title = Column(String(length=255), unique=True)
publisher_id = Column(String, ForeignKey('publisher.id'), nullable=False)
publisher = relationship("Publisher", back_populates="comics")
current_order = Column(BIT(1))
completed = Column(BIT(1))
issues = relationship("Issue")
story_arcs = relationship("StoryArc")
trade_paperbacks = relationship("TradePaperback")
volumes = relationship("Volume")
comic_works = relationship("ComicWork")
def __repr__(self):
return f'Comic({self.id} {self.version} {self.title} {self.publisher.name})'
def __str__(self):
return f'{self.title}({self.id})'
class Volume(Base, BaseMixin):
__tablename__ = "volume"
name = Column(String(length=255), nullable=False)
comic_id = Column(String, ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="volumes")
issues = relationship("Issue")
class TradePaperback(Base, BaseMixin):
__tablename__ = "trade_paperback"
name = Column(String(length=255), nullable=False)
issue_start = Column(Integer)
issue_end = Column(Integer)
comic_id = Column(String, ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="trade_paperbacks")
class StoryArc(Base, BaseMixin):
__tablename__ = "story_arc"
name = Column(String(length=255), nullable=False)
comic_id = Column(String, ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="story_arcs")
class Issue(Base, BaseMixin):
__tablename__ = "issue"
issue_number = Column(String(255))
in_stock = Column(BIT(1))
is_read = Column(BIT(1))
comic_id = Column(String, ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="issues")
volume_id = Column(String, ForeignKey("volume.id"), nullable=True)
volume = relationship("Volume", back_populates="issues")
class Artist(Base, BaseMixin):
__tablename__ = "artist"
name = Column(String(length=255), nullable=False)
comic_works = relationship("ComicWork")
class WorkType(Base, BaseMixin):
__tablename__ = "worktype"
name = Column(String(length=255), nullable=False, unique=True)
comic_works = relationship("ComicWork")
def __repr__(self):
return f'Worktype({self.id} {self.version} {self.name} {len(self.comic_works)})'
def __str__(self):
return f'{self.name}({self.id})'
class ComicWork(Base, BaseMixin):
__tablename__ = "comic_work"
comic_id = Column(String, ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="comic_works")
artist_id = Column(String, ForeignKey("artist.id"), nullable=False)
artist = relationship("Artist", back_populates="comic_works")
work_type_id = Column(String, ForeignKey("worktype.id"), nullable=False)
work_type = relationship("WorkType", back_populates="comic_works")
+392
View File
@@ -0,0 +1,392 @@
import json
import uuid
from datetime import datetime
from enum import Enum, auto
from logging import Logger
from pathlib import Path
from typing import Any
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import sessionmaker
from .tysc import Card, CardSet, Rooster, Team, FieldPosition, Player, Vendor, Sport
from .comic import Issue, TradePaperback, StoryArc, Volume, ComicWork, Artist, Comic, Publisher, WorkType
from .bookshelf import ArticleAuthor, BookAuthor, BookshelfPublisher, Article, Book, Author
from .admin import Mail, MailAccount, ModuleData, Role, User, Token, AuthorizationMatrix
from .metadata import MetaDataTable, MetaDataColumn
from .media import MediaVideo, MediaArticle, MediaFile, MediaActor, MediaActorFile
class ColumnEntry(Enum):
COLUMN_NAME = 'column'
COLUMN_LABEL = 'label'
COLUMN_ORDER = 'order'
COLUMN_REF_COLUMN = 'ref_column'
COLUMN_TYPE = 'type'
COLUMN_WIDGET = 'widget'
class StatusType(Enum):
UNKNOWN = auto()
FILE_NAME = auto()
FILE_ID = auto()
DUPLICATE = auto()
CLOUD_LINK = auto()
CLOUD_LINK_ID = auto()
class KontorDB:
def __init__(self, db_engine: Any, log: Logger):
self.engine = db_engine
self.registry = {}
self.init_registry()
self.log = log
def init_registry(self):
self.registry[Card.__tablename__] = Card
self.registry[CardSet.__tablename__] = CardSet
self.registry[Rooster.__tablename__] = Rooster
self.registry[Team.__tablename__] = Team
self.registry[FieldPosition.__tablename__] = FieldPosition
self.registry[Player.__tablename__] = Player
self.registry[Vendor.__tablename__] = Vendor
self.registry[Sport.__tablename__] = Sport
self.registry[Issue.__tablename__] = Issue
self.registry[TradePaperback.__tablename__] = TradePaperback
self.registry[StoryArc.__tablename__] = StoryArc
self.registry[Volume.__tablename__] = Volume
self.registry[ComicWork.__tablename__] = ComicWork
self.registry[Artist.__tablename__] = Artist
self.registry[Comic.__tablename__] = Comic
self.registry[Publisher.__tablename__] = Publisher
self.registry[WorkType.__tablename__] = WorkType
self.registry[ArticleAuthor.__tablename__] = ArticleAuthor
self.registry[BookAuthor.__tablename__] = BookAuthor
self.registry[BookshelfPublisher.__tablename__] = BookshelfPublisher
self.registry[Article.__tablename__] = Article
self.registry[Book.__tablename__] = Book
self.registry[Author.__tablename__] = Author
self.registry[MediaFile.__tablename__] = MediaFile
self.registry[MediaActor.__tablename__] = MediaActor
self.registry[MediaActorFile.__tablename__] = MediaActorFile
self.registry[MediaArticle.__tablename__] = MediaArticle
self.registry[MediaVideo.__tablename__] = MediaVideo
self.registry[MetaDataColumn.__tablename__] = MetaDataColumn
self.registry[MetaDataTable.__tablename__] = MetaDataTable
self.registry[AuthorizationMatrix.__tablename__] = AuthorizationMatrix
self.registry[Token.__tablename__] = Token
self.registry[User.__tablename__] = User
self.registry[Role.__tablename__] = Role
self.registry[ModuleData.__tablename__] = ModuleData
self.registry[MailAccount.__tablename__] = MailAccount
self.registry[Mail.__tablename__] = Mail
def get_table_names(self) -> list:
result = []
__session__ = sessionmaker(self.engine)
with __session__() as session:
tables = session.scalars(select(MetaDataTable)).all()
result = [table.table_name for table in tables]
return result
def get_table_by_name(self, table_name: str) -> dict:
result = {}
__session__ = sessionmaker(self.engine)
_filter = {'table_name': table_name}
with __session__() as session:
table = session.query(MetaDataTable).filter_by(**_filter).one()
result['id'] = table.id
result['table_name'] = table.table_name
return result
def get_column_meta_data(self, table_name: str, view_only=True) -> dict:
meta_data = {}
order = 0
__session__ = sessionmaker(self.engine)
columns = list()
table_info = self.get_table_by_name(table_name)
_filters = {'table_id': table_info['id']}
if view_only:
_filters['is_shown'] = True
with __session__() as session:
columns = session.query(MetaDataColumn).filter_by(**_filters).all()
for column in columns:
# self.log.info("get_column_meta_data: %s %s %d", column.column_name, column.column_label, column.column_order)
meta_data[order] = {
ColumnEntry.COLUMN_NAME: column.column_name,
ColumnEntry.COLUMN_LABEL: column.column_label,
ColumnEntry.COLUMN_ORDER: column.column_order,
ColumnEntry.COLUMN_REF_COLUMN: column.ref_column,
ColumnEntry.COLUMN_TYPE: column.column_type
}
order += 1
return meta_data
def get_columns(self, table_name: str) -> dict:
columns = {}
order = 0
__session__ = sessionmaker(self.engine)
table_info = self.get_table_by_name(table_name)
_filters = {'table_id': table_info['id']}
with __session__() as session:
for column in session.query(MetaDataColumn).filter_by(**_filters).all():
columns[column.column_name] = {
ColumnEntry.COLUMN_ORDER: column.column_order,
ColumnEntry.COLUMN_TYPE: column.column_type
}
return columns
def get_filters(self, table_name: str) -> dict:
_filter_map = {}
__session__ = sessionmaker(self.engine)
table_info = self.get_table_by_name(table_name)
_filters = {'table_id': table_info['id'], 'show_filter': True}
with __session__() as session:
for column in session.query(MetaDataColumn).filter_by(**_filters).all():
_filter_map[column.column_name] = {
ColumnEntry.COLUMN_LABEL: column.filter_label,
ColumnEntry.COLUMN_WIDGET: None
}
return _filter_map
def data(self, table_name: str, columns: dict, filters: dict) -> list:
data = []
__session__ = sessionmaker(self.engine)
table = self.registry[table_name]
with __session__() as session:
entries = []
if len(filters) == 0:
entries = session.scalars(select(table)).all()
else:
entries = session.scalars(select(table).filter_by(**filters)).all()
for entry in entries:
# self.log.info("data: %s", entry)
row = []
for order in columns.keys():
column_name = columns[order][ColumnEntry.COLUMN_NAME]
ref_column = columns[order][ColumnEntry.COLUMN_REF_COLUMN]
if str(column_name).endswith("_id"):
ref_table = column_name[:-3]
ref = getattr(entry, ref_table)
value = getattr(ref, ref_column)
row.append(value)
else:
row.append(getattr(entry, column_name))
data.append(row)
# self.log.info("data: %s", data)
return data
def export_db(self, export_type: str, export_file_name: str) -> dict:
results = {}
db = {}
export_table_list = self.get_table_names()
for table in export_table_list:
columns = self.get_column_meta_data(table, view_only=False)
if table in self.registry:
model = self.registry[table]
else:
self.log.info(f"table {table} is not registered")
continue
__session__ = sessionmaker(self.engine)
with __session__() as session:
rows = session.query(model).all()
entries = []
for row in rows:
# print(row)
entry = {}
for order in columns:
# print(columns[order])
column_name = columns[order][ColumnEntry.COLUMN_NAME]
# print(f"get value {column_name} from {row} of table {table}")
try:
value = getattr(row, column_name)
if isinstance(value, datetime):
entry[column_name] = str(value)
else:
entry[column_name] = value
except AttributeError:
pass
entries.append(entry)
db[table] = entries
results[table] = len(entries)
match export_type:
case "JSON":
json_dump = json.dumps(db, indent=4)
with open(export_file_name, "w") as dump_file:
dump_file.write(json_dump)
case "YAML":
export_file = Path(export_file_name)
case "SQLite":
export_file = Path(export_file_name)
self.log.info(f"{len(results)} tables exported")
return results
def import_db(self, import_file_name: str) -> dict:
result = {}
import_file = Path(import_file_name)
if not import_file.exists():
self.log.info(f"File {import_file_name} does not exist. Do nothing.")
return result
match import_file.suffix:
case '.json':
print("read json file")
with open(import_file_name, 'r') as json_file:
json_load = json.load(json_file)
for table in json_load:
self.log.info(f"{table}: {len(json_load[table])}")
result[table] = self.import_table(table, json_load[table])
case '.yml':
print("read yaml file")
case '.yaml':
print("read yaml file")
case '.db':
print("read sqlite file")
return result
def import_table(self, table_name: str, items:list) -> dict:
result = {}
updated = []
added = []
remaining = []
existing_ids = self.get_ids(table_name)
self.log.info(f"found {len(existing_ids)} existing ids for table {table_name}")
for item in items:
current_id = item['id']
# print(f"import item: {item}")
found_item = None
__session__ = sessionmaker(self.engine)
with __session__() as session:
found_item = session.get(self.registry[table_name], current_id)
# print(f"found item: {found_item}")
if found_item is not None:
changed = self.update_entry(table_name, current_id, item)
updated.append(item)
if changed:
self.log.info(f"{current_id} has changed")
updated.append(item)
existing_ids.remove(current_id)
else:
try:
self.add_entry(table_name, item)
added.append(item)
except IntegrityError as error:
self.log.info(f"Could not add item, due to: {error.detail}")
if len(existing_ids) > 0:
print(f"remaining items for {table_name}: {existing_ids}")
remaining.extend(existing_ids)
result['updated'] = updated
result['added'] = added
result['remaining'] = remaining
return result
def get_ids(self, table_name: str) -> list:
existing_ids = []
__session__ = sessionmaker(self.engine)
with __session__() as session:
items = session.query(self.registry[table_name]).all()
for item in items:
existing_ids.append(getattr(item, 'id'))
return existing_ids
def add_entry(self, table_name: str, update_item: dict):
self.log.debug(f"add entry to table {table_name} with {update_item}")
__session__ = sessionmaker(self.engine)
with __session__() as session:
add_item = self.registry[table_name]()
for key in update_item.keys():
update_value = update_item[key]
setattr(add_item, key, update_value)
session.add(add_item)
session.commit()
def update_entry(self, table_name, current_id, update_item: dict) -> bool:
# self.log.info("update entry to table %s", table_name)
__session__ = sessionmaker(self.engine)
with __session__() as session:
existing_item = session.query(self.registry[table_name]).get(current_id)
changed = False
for key in update_item.keys():
update_value = update_item[key]
existing_value = getattr(existing_item, key)
if type(existing_value) is not type(update_value):
existing_value = str(existing_value)
if existing_value != update_value:
self.log.info(f"{key} has changed: {existing_value} != {update_value}")
setattr(existing_item, key, update_value)
session.commit()
changed = True
self.log.info(f"update {key} with {update_value}")
return changed
def add_link(self, link: str) -> dict:
result = {}
__session__ = sessionmaker(self.engine)
with __session__() as session:
media_file = MediaFile()
media_file.id = str(uuid.uuid4())
media_file.created_date = datetime.now()
media_file.last_modified_date = datetime.now()
media_file.version = 0
media_file.url = link
media_file.review = 1
media_file.should_download = 1
try:
session.add(media_file)
session.commit()
result['added'] = {'url': media_file.url, 'title': media_file.title, 'review': media_file.review, 'download': media_file.should_download}
except IntegrityError as error:
session.rollback()
result['error'] = error.orig
return result
def update_titles(self) -> dict:
update_list = {}
__session__ = sessionmaker(self.engine)
_filter = { 'review': True}
with __session__() as session:
links = session.query(MediaFile).filter_by(**_filter).all()
for link in links:
url = link.url
if url is None:
continue
link.update_title()
session.commit()
update_list[link.id] = link.title
return update_list
def get_download_list(self) -> list:
download_list = []
__session__ = sessionmaker(self.engine)
_filter = { 'should_download': True}
with __session__() as session:
links = session.query(MediaFile).filter_by(**_filter).all()
for link in links:
url = link.url
if url is None:
continue
download_list.append(link.id)
return download_list
def download_file(self, entry_id: str, download_dir = "/data/media", dl_tool = "yt-dlp") -> str:
__session__ = sessionmaker(self.engine)
with __session__() as session:
link = session.query(MediaFile).get(entry_id)
link.download_file(download_dir, dl_tool)
session.commit()
file_name = link.file_name
return file_name
def delete_entries(self):
for (table_name, table) in self.registry.items():
# self.log.info("delete entries from table %s", table_name)
__session__ = sessionmaker(self.engine)
with __session__() as session:
items = session.query(table).all()
for item in items:
session.delete(item)
session.commit()
def check_files(self):
pass
+100
View File
@@ -0,0 +1,100 @@
import re
import subprocess
from datetime import datetime
from pathlib import Path
import requests
from bs4 import BeautifulSoup
from sqlalchemy import Column, DateTime, Integer, String, ForeignKey
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy.orm import relationship
from .base import Base, BaseMixin, BaseVideoMixin
class MediaFile(Base, BaseMixin, BaseVideoMixin):
__tablename__ = 'media_file'
media_actor_files = relationship("MediaActorFile")
def __repr__(self):
return f'MediaFile({self.id} {self.title} {self.title})'
def __str__(self):
return f'{self.title}({self.id})'
def update_title(self) -> None:
print(f"update title for {self.url}")
try:
r = requests.get(self.url)
soup = BeautifulSoup(r.content, "html.parser")
title = soup.title.string
self.title = title
self.review = 0
except:
self.title = None
self.review = 1
self.last_modified_date = datetime.now()
def download_file(self, download_dir: str, dl_tool: str):
print(f"download file for {self.url} to {download_dir}")
result = subprocess.run([dl_tool, self.url], cwd=download_dir, capture_output=True, text=True)
if result.returncode == 0:
output = result.stdout
output = re.sub(' +', ' ', output)
lines_list = output.splitlines()
file_name = self.__parse_output__(lines_list)
if file_name is None:
self.review = 1
self.should_download = 1
self.file_name = None
else:
download_file = Path(file_name)
self.should_download = 0
self.file_name = download_file.name
self.cloud_link = str(download_file.absolute())
self.last_modified_date = datetime.now()
def __parse_output__(self, lines_list):
self.file_name = None
for line in lines_list:
if 'has already been downloaded' in line:
end_len = len(' has already been downloaded')
self.file_name = line[11:-end_len]
if 'Destination' in line:
line_len = len(line)
start_len = len('[download] Destination: ')
file_len = line_len - start_len
self.file_name = line[-file_len:]
return self.file_name
class MediaActor(Base, BaseMixin):
__tablename__ = 'media_actor'
name = Column(String(255))
media_actor_files = relationship("MediaActorFile")
class MediaActorFile(Base, BaseMixin):
__tablename__ = 'media_actor_file'
media_actor_id = Column(String(255), ForeignKey("media_actor.id"), nullable=False)
media_actor = relationship("MediaActor", back_populates="media_actor_files")
media_file_id = Column(String(255), ForeignKey("media_file.id"), nullable=True)
media_file = relationship("MediaFile", back_populates="media_actor_files")
class MediaArticle(Base, BaseMixin):
__tablename__ = 'media_article'
review = Column(BIT(1))
title = Column(String(255))
url = Column(String(255), unique=True)
class MediaVideo(Base, BaseMixin):
__tablename__ = 'media_video'
cloud_link = Column(String(255))
file_name = Column(String(255))
path = Column(String(255))
review = Column(BIT(1))
title = Column(String(255))
url = Column(String(255), unique=True)
should_download = Column(BIT(1))
+42
View File
@@ -0,0 +1,42 @@
from sqlalchemy import Column, String, ForeignKey, DateTime, Integer, Boolean
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy.orm import relationship
from .base import Base, BaseMixin
class MetaDataTable(Base, BaseMixin):
__tablename__ = 'meta_data_table'
table_name = Column(String(255), unique=True)
table_columns = relationship("MetaDataColumn")
def __repr__(self):
return f'MetaDataTable({self.id} {self.table_name})'
def __str__(self):
return f'{self.table_name}({self.id})'
class MetaDataColumn(Base, BaseMixin):
__tablename__ = 'meta_data_column'
column_name = Column(String(255), nullable=False)
column_sync_name = Column(String(255))
column_type = Column(String(255))
column_modifier = Column(String(255), nullable=True)
column_order = Column(Integer)
table_id = Column(String, ForeignKey('meta_data_table.id'))
table = relationship("MetaDataTable", back_populates="table_columns")
column_label = Column(String(255))
filter_label = Column(String(255))
is_shown = Column(BIT(1))
show_filter = Column(BIT(1))
ref_column = Column(String, nullable=True)
def __repr__(self):
if self.column_name is None:
return f'MetaDataColumn({self.id} {self.table.table_name}.__)'
else:
return f'MetaDataColumn({self.id} {self.table.table_name}.{self.column_name})'
def __str__(self):
return f'{self.column_name}({self.id})'
+100
View File
@@ -0,0 +1,100 @@
from sqlalchemy import Column, DateTime, Integer, String, ForeignKey, UniqueConstraint
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy.orm import relationship
from .base import Base, BaseMixin
class Sport(Base, BaseMixin):
__tablename__ = "sport"
__table_args__ = (
UniqueConstraint("name"),
)
name = Column(String(255), nullable=False, index=True, unique=True)
teams = relationship("Team")
positions = relationship("FieldPosition")
class Team(Base, BaseMixin):
__tablename__ = "team"
name = Column(String(255), nullable=False, index=True, unique=True)
short_name = Column(String(255), nullable=False, )
sport_id = Column(String, ForeignKey("sport.id"), nullable=False)
sport = relationship("Sport", back_populates="teams")
roosters = relationship("Rooster")
class FieldPosition(Base, BaseMixin):
__tablename__ = "field_position"
__table_args__ = (
UniqueConstraint("name", "sport_id"),
UniqueConstraint("short_name", "sport_id"),
)
name = Column(String(255), nullable=False, index=True)
short_name = Column(String(255), nullable=False)
sport_id = Column(String, ForeignKey("sport.id"), nullable=False, index=True)
sport = relationship("Sport", back_populates="positions")
roosters = relationship("Rooster")
class Player(Base, BaseMixin):
__tablename__ = "player"
__table_args__ = (
UniqueConstraint("first_name", "last_name"),
)
first_name = Column(String(255), nullable=False, index=True)
last_name = Column(String(255), nullable=False, index=True)
roosters = relationship("Rooster")
def get_full_name(self) -> str:
return f"{self.last_name}, {self.first_name}"
class Rooster(Base, BaseMixin):
__tablename__ = "rooster"
__table_args__ = (
UniqueConstraint("year", "team_id", "player_id", "position_id"),
)
year = Column(Integer)
team_id = Column(String, ForeignKey("team.id"), nullable=False, index=True)
team = relationship("Team", back_populates="roosters")
player_id = Column(String, ForeignKey("player.id"), nullable=False, index=True)
player = relationship("Player", back_populates="roosters")
position_id = Column(String, ForeignKey("field_position.id"), nullable=False, index=True)
position = relationship("FieldPosition", back_populates="roosters")
cards = relationship("Card")
class Vendor(Base, BaseMixin):
__tablename__ = "vendor"
name = Column(String(255), nullable=False, unique=True, index=True)
card_sets = relationship("CardSet")
cards = relationship("Card")
class CardSet(Base, BaseMixin):
__tablename__ = "card_set"
__table_args__ = (
UniqueConstraint("name", "vendor_id"),
)
name = Column(String(255), index=True)
parallel_set = Column(BIT(1))
insert_set = Column(BIT(1))
vendor_id = Column(String, ForeignKey("vendor.id"), nullable=False, index=True)
vendor = relationship("Vendor", back_populates="card_sets")
cards = relationship("Card")
class Card(Base, BaseMixin):
__tablename__ = "card"
__table_args__ = (
UniqueConstraint("card_number", "year", "vendor_id", "card_set_id"),
)
card_number = Column(Integer, index=True)
year = Column(Integer, index=True)
card_set_id = Column(String, ForeignKey("card_set.id"), nullable=False)
card_set = relationship("CardSet", back_populates="cards")
rooster_id = Column(String, ForeignKey("rooster.id"), nullable=False)
rooster = relationship("Rooster", back_populates="cards")
vendor_id = Column(String, ForeignKey("vendor.id"), nullable=False)
vendor = relationship("Vendor", back_populates="cards")
View File
+31
View File
@@ -0,0 +1,31 @@
.PHONY: clean virtualenv test docker dist dist-upload
clean:
find . -name '*.py[co]' -delete
virtualenv:
virtualenv --prompt '|> kontor <| ' env
env/bin/pip install -r requirements-dev.txt
env/bin/python setup.py develop
@echo
@echo "VirtualENV Setup Complete. Now run: source env/bin/activate"
@echo
test:
python -m pytest \
-v \
--cov=kontor \
--cov-report=term \
--cov-report=html:coverage-report \
tests/
docker: clean
docker build -t kontor-api:latest .
dist: clean
rm -rf dist/*
python setup.py sdist
python setup.py bdist_wheel
dist-upload:
twine upload dist/*
+59
View File
@@ -0,0 +1,59 @@
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from pathlib import Path
import logging.config
import yaml
from platformdirs import PlatformDirs
from sqlmodel import SQLModel, create_engine, Session, select
from model import Comic, Publisher
from model.media import MediaFile
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument('--recreate-db', action='store_true')
parser.add_argument('--verbose', '-v', action='count', default=0)
parser.add_argument('--file', '-f', default='~/data.json')
parser.add_argument('--config', '-c', default='kontor')
args = parser.parse_args()
def get_logger(level: int, config: str):
dirs = PlatformDirs(config)
logging_config = Path(dirs.user_config_dir, 'logging-config.yaml')
with open(logging_config, 'rt') as f:
configDict = yaml.safe_load(f.read())
logging.config.dictConfig(configDict)
logger = logging.getLogger('development')
if level is not None:
match level:
case 0:
logger.setLevel(logging.INFO)
case 1:
logger.setLevel(logging.DEBUG)
case _:
logger.setLevel(logging.CRITICAL)
return logger
if __name__ == '__main__':
log = get_logger(args.verbose, args.config)
log.info('kontor started')
dirs = PlatformDirs(args.config)
database_config = Path(dirs.user_config_dir, 'database-config.yaml')
with open(database_config, 'rt') as f:
db_config = yaml.safe_load(f.read())
print(db_config)
connect_string = ('mariadb+mariadbconnector://{}:{}@{}:{}/{}'.format(
db_config['mariadb']['user'],
db_config['mariadb']['password'],
db_config['mariadb']['host'],
db_config['mariadb']['port'],
db_config['mariadb']['database']
))
engine = create_engine(connect_string, echo=True)
SQLModel.metadata.create_all(engine)
with Session(engine) as session:
files = session.exec(select(MediaFile)).all()
for file in files:
print("{} {}".format(file, file.actors))
log.info('kontor finished')
+1
View File
@@ -0,0 +1 @@
from .comic import Comic, Publisher, ComicWork, Artist, Worktype
+11
View File
@@ -0,0 +1,11 @@
import uuid
from datetime import datetime
from sqlmodel import SQLModel, Field
class AbstractEntity(SQLModel, table=False):
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
created_date: datetime = Field(default_factory=datetime.now, nullable=False)
last_modified_date: datetime = Field(default_factory=datetime.now, nullable=False)
version: int = Field(default=0)
+58
View File
@@ -0,0 +1,58 @@
import uuid
from datetime import datetime
from sqlmodel import Field, Relationship, SQLModel
from model.base import AbstractEntity
class Publisher(AbstractEntity, table=True):
name: str = Field(index=True, unique=True)
comics: list["Comic"] = Relationship(back_populates="publisher")
def __repr__(self):
return f'Publisher({self.id} {self.name})'
def __str__(self):
return self.__repr__()
class Comic(AbstractEntity, table=True):
title: str = Field(index=True, unique=True)
publisher_id: uuid.UUID | None = Field(default=None, foreign_key="publisher.id")
publisher: Publisher | None = Relationship(back_populates="comics")
current_order: int = Field(default=False)
completed: int = Field(nullable=False)
#issues: list["Issue"] = Relationship(back_populates="comic")
#story_arcs: list["StoryArc"] = Relationship(back_populates="comic")
#trade_paperbacks: list["TradePaperback"] = Relationship(back_populates="comic")
#volumes: list["Volume"] = Relationship(back_populates="comic")
#comic_works: list["ComicWork"] = Relationship(back_populates="comic")
class Artist(AbstractEntity, table=True):
name: str = Field(nullable=False)
comic_works: list["ComicWork"] = Relationship(back_populates="artist")
class Worktype(AbstractEntity, table=True):
name: str = Field(nullable=False, unique=True)
#comic_works: list["ComicWork"] = Relationship(back_populates="worktype")
def __repr__(self):
return f'Worktype({self.id} {self.version} {self.name} {len(self.comic_works)})'
def __str__(self):
return f'{self.name}({self.id})'
class ComicWork(AbstractEntity, table=True):
__tablename__ = "comic_work"
comic_id: uuid.UUID | None = Field(nullable=False, foreign_key="comic.id")
#comic: Comic = Relationship(back_populates="comic_works")
artist_id: uuid.UUID | None = Field(nullable=False, foreign_key="artist.id")
artist: Artist = Relationship(back_populates="comic_works")
work_type_id: uuid.UUID | None = Field(nullable=False, foreign_key="worktype.id")
#worktype = Relationship(back_populates="comic_works")

Some files were not shown because too many files have changed in this diff Show More