evaluate sqlmodel

This commit is contained in:
2025-04-13 16:16:10 +02:00
parent a43e2c806c
commit b14a267b5b
107 changed files with 2517 additions and 6 deletions
+116
View File
@@ -0,0 +1,116 @@
"""
Setup database connections
"""
import sqlite3
import mariadb
import logging.config
from platformdirs import PlatformDirs
from pathlib import Path
import yaml
def get_database_cursors(log):
dirs = PlatformDirs("kontor")
database_config = Path(dirs.user_config_dir, 'database-config.yaml')
with open(database_config, 'rt') as f:
db_config = yaml.safe_load(f.read())
sqlite_db = db_config["sqlite"]["file"]
log.info('using SQLite3 database {}'.format(sqlite_db))
sqlite_conn = sqlite3.connect(sqlite_db, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
mariadb_conn = mariadb.connect(
host=db_config['mariadb']['host'],
port=db_config['mariadb']['port'],
user=db_config['mariadb']['user'],
password=db_config['mariadb']['password'],
database=db_config['mariadb']['database']
)
return sqlite_conn, mariadb_conn
def create_tables(sqlite_conn, logger, recreate_db, scripts):
logger.info('create_tables')
for table_id in scripts:
create_statement = scripts[table_id]['create']
drop_statement = scripts[table_id]['drop']
logger.debug(create_statement)
cursor = sqlite_conn.cursor()
if recreate_db:
logger.debug(drop_statement)
cursor.execute(drop_statement)
cursor.execute(create_statement)
def get_logger(level):
dirs = PlatformDirs("kontor")
logging_config = Path(dirs.user_config_dir, 'logging-config.yaml')
with open(logging_config, 'rt') as f:
config = yaml.safe_load(f.read())
logging.config.dictConfig(config)
logger = logging.getLogger('development')
if level is not None:
match level:
case 0:
logger.setLevel(logging.INFO)
case 1:
logger.setLevel(logging.DEBUG)
case _:
logger.setLevel(logging.CRITICAL)
return logger
def get_meta_data(mariadb_conn):
mariadb_cursor = mariadb_conn.cursor()
select_statement = "SELECT id, table_name FROM meta_data_table"
mariadb_cursor.execute(select_statement)
rows = mariadb_cursor.fetchall()
meta_data = {}
for (identifier, table_name) in rows:
table_data = {"name": table_name}
mariadb_cursor.execute("SELECT column_name, column_sync_name, column_type, column_modifier, column_order FROM meta_data_column WHERE table_id=?", (identifier, ))
column_rows = mariadb_cursor.fetchall()
column_list = []
for (column_name, column_sync_name, column_type, column_modifier, column_order) in column_rows:
column_data = {"column_name": column_name, "column_sync_name": column_sync_name, "column_type": column_type,
"column_modifier": column_modifier, "column_order": column_order}
column_list.append(column_data)
# logger.info(column_list)
table_data["columns"] = column_list
meta_data[identifier] = table_data
return meta_data
def get_scripts(meta_data, logger):
scripts_map = {}
for table_id in meta_data:
table_scripts = {}
m_columns = []
s_columns = []
columns = []
for column_data in meta_data[table_id]["columns"]:
column_line = "{} {}".format(column_data["column_sync_name"], column_data["column_type"])
if column_data["column_modifier"]:
column_line += " " + column_data["column_modifier"]
columns.append(column_line)
m_columns.append(column_data['column_name'])
s_columns.append(column_data['column_sync_name'])
table_name = meta_data[table_id]["name"]
create_statement = "CREATE TABLE IF NOT EXISTS {} ({});".format(table_name, ", ".join(columns))
drop_statement = 'DROP TABLE IF EXISTS {}'.format(table_name)
select_mariadb_statement = 'SELECT {} FROM {}'.format(', '.join(m_columns), table_name)
select_sqlite_statement = 'SELECT {} FROM {}'.format(', '.join(s_columns), table_name)
insert_sqlite_statement = 'INSERT INTO {}({}) VALUES({})'.format(table_name, ', '.join(s_columns), ', '.join(['?']*len(s_columns)))
insert_mariadb_statement = 'INSERT INTO {}({}) VALUES({})'.format(table_name, ', '.join(m_columns), ', '.join(['?']*len(m_columns)))
truncate_mariadb_statement = 'TRUNCATE {}'.format(table_name)
#logger.debug(create_statement)
#logger.debug(select_mariadb_statement)
table_scripts["create"] = create_statement
table_scripts["drop"] = drop_statement
table_scripts["select_mariadb"] = select_mariadb_statement
table_scripts["select_sqlite"] = select_sqlite_statement
table_scripts["insert_sqlite"] = insert_sqlite_statement
table_scripts["insert_mariadb"] = insert_mariadb_statement
table_scripts["truncate_mariadb"] = truncate_mariadb_statement
table_scripts["count"] = "SELECT COUNT(*) FROM {}".format(table_name)
table_scripts["name"] = table_name
scripts_map[table_id] = table_scripts
return scripts_map
@@ -10,7 +10,7 @@ from platformdirs import PlatformDirs
from pathlib import Path
from schema import Base, KontorDB
from setup import get_database_cursors, create_tables, get_logger, get_scripts, get_meta_data
from config import get_database_cursors, create_tables, get_logger, get_scripts, get_meta_data
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument('--recreate-db', action='store_true')
+64
View File
@@ -0,0 +1,64 @@
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from pathlib import Path
import logging.config
import yaml
from platformdirs import PlatformDirs
from sqlalchemy import create_engine, select
from sqlalchemy.orm import sessionmaker
from schema import Comic, Publisher, Base
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument('--recreate-db', action='store_true')
parser.add_argument('--verbose', '-v', action='count', default=0)
parser.add_argument('--file', '-f', default='~/data.json')
parser.add_argument('--config', '-c', default='kontor-docker')
args = parser.parse_args()
def get_logger(level: int, config: str):
dirs = PlatformDirs(config)
logging_config = Path(dirs.user_config_dir, 'logging-config.yaml')
with open(logging_config, 'rt') as f:
configDict = yaml.safe_load(f.read())
logging.config.dictConfig(configDict)
logger = logging.getLogger('development')
if level is not None:
match level:
case 0:
logger.setLevel(logging.INFO)
case 1:
logger.setLevel(logging.DEBUG)
case _:
logger.setLevel(logging.CRITICAL)
return logger
if __name__ == '__main__':
log = get_logger(args.verbose, args.config)
log.info('kontor started')
dirs = PlatformDirs(args.config)
database_config = Path(dirs.user_config_dir, 'database-config.yaml')
with open(database_config, 'rt') as f:
db_config = yaml.safe_load(f.read())
print(db_config)
connect_string = ('mariadb+mariadbconnector://{}:{}@{}:{}/{}'.format(
db_config['mariadb']['user'],
db_config['mariadb']['password'],
db_config['mariadb']['host'],
db_config['mariadb']['port'],
db_config['mariadb']['database']
))
engine = create_engine(connect_string)
Base.metadata.create_all(bind=engine, checkfirst=True)
__session__ = sessionmaker(engine)
with __session__() as session:
comics = session.scalars(select(Comic)).all()
for comic in comics:
print(comic)
publishers = session.scalars(select(Publisher)).all()
for publisher in publishers:
print(publisher)
print(len(publisher.comics))
log.info('kontor finished')
+33
View File
@@ -0,0 +1,33 @@
[build-system]
requires = ["setuptools>=67", "wheel", "setuptools-git-versioning>=2.0,<3"]
build-backend = "setuptools.build_meta"
[project]
name = "kontor-cli"
version = "0.1.0"
dependencies = [
"mariadb",
"sqlmodel",
"pathlib",
"platformdirs",
"pyyaml",
"beautifulsoup4",
]
requires-python = ">=3.10"
authors = [
{name = "Thomas Peetz", email = "thomas.peetz@thpeetz.de"}
]
maintainers = [
{name = "Thomas Peetz", email = "thomas.peetz@thpeetz.de"}
]
description = "CLI for Kontor application"
readme = "README.md"
classifiers = [
"Development Status :: 4 - Beta",
"Programming Language :: Python"
]
[project-scripts]
kontor = "kontor::main"
[tool.setuptools.packages.find]
where = ["."]
+10
View File
@@ -0,0 +1,10 @@
mariadb
sqlalchemy
pathlib
platformdirs
pyyaml
beautifulsoup4
sqlmodel
requests
fastapi[standard]
@@ -4,8 +4,9 @@ from datetime import datetime
from enum import Enum, auto
from logging import Logger
from pathlib import Path
from typing import Any
from sqlalchemy import Engine, select
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import sessionmaker
@@ -37,7 +38,7 @@ class StatusType(Enum):
class KontorDB:
def __init__(self, db_engine: Engine, log: Logger):
def __init__(self, db_engine: Any, log: Logger):
self.engine = db_engine
self.registry = {}
self.init_registry()
View File
+31
View File
@@ -0,0 +1,31 @@
.PHONY: clean virtualenv test docker dist dist-upload
clean:
find . -name '*.py[co]' -delete
virtualenv:
virtualenv --prompt '|> kontor <| ' env
env/bin/pip install -r requirements-dev.txt
env/bin/python setup.py develop
@echo
@echo "VirtualENV Setup Complete. Now run: source env/bin/activate"
@echo
test:
python -m pytest \
-v \
--cov=kontor \
--cov-report=term \
--cov-report=html:coverage-report \
tests/
docker: clean
docker build -t kontor-api:latest .
dist: clean
rm -rf dist/*
python setup.py sdist
python setup.py bdist_wheel
dist-upload:
twine upload dist/*
+59
View File
@@ -0,0 +1,59 @@
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from pathlib import Path
import logging.config
import yaml
from platformdirs import PlatformDirs
from sqlmodel import SQLModel, create_engine, Session, select
from model import Comic, Publisher
from model.media import MediaFile
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument('--recreate-db', action='store_true')
parser.add_argument('--verbose', '-v', action='count', default=0)
parser.add_argument('--file', '-f', default='~/data.json')
parser.add_argument('--config', '-c', default='kontor')
args = parser.parse_args()
def get_logger(level: int, config: str):
dirs = PlatformDirs(config)
logging_config = Path(dirs.user_config_dir, 'logging-config.yaml')
with open(logging_config, 'rt') as f:
configDict = yaml.safe_load(f.read())
logging.config.dictConfig(configDict)
logger = logging.getLogger('development')
if level is not None:
match level:
case 0:
logger.setLevel(logging.INFO)
case 1:
logger.setLevel(logging.DEBUG)
case _:
logger.setLevel(logging.CRITICAL)
return logger
if __name__ == '__main__':
log = get_logger(args.verbose, args.config)
log.info('kontor started')
dirs = PlatformDirs(args.config)
database_config = Path(dirs.user_config_dir, 'database-config.yaml')
with open(database_config, 'rt') as f:
db_config = yaml.safe_load(f.read())
print(db_config)
connect_string = ('mariadb+mariadbconnector://{}:{}@{}:{}/{}'.format(
db_config['mariadb']['user'],
db_config['mariadb']['password'],
db_config['mariadb']['host'],
db_config['mariadb']['port'],
db_config['mariadb']['database']
))
engine = create_engine(connect_string, echo=True)
SQLModel.metadata.create_all(engine)
with Session(engine) as session:
files = session.exec(select(MediaFile)).all()
for file in files:
print("{} {}".format(file, file.actors))
log.info('kontor finished')
+1
View File
@@ -0,0 +1 @@
from .comic import Comic, Publisher, ComicWork, Artist, Worktype
+11
View File
@@ -0,0 +1,11 @@
import uuid
from datetime import datetime
from sqlmodel import SQLModel, Field
class AbstractEntity(SQLModel, table=False):
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
created_date: datetime = Field(default_factory=datetime.now, nullable=False)
last_modified_date: datetime = Field(default_factory=datetime.now, nullable=False)
version: int = Field(default=0)
+58
View File
@@ -0,0 +1,58 @@
import uuid
from datetime import datetime
from sqlmodel import Field, Relationship, SQLModel
from model.base import AbstractEntity
class Publisher(AbstractEntity, table=True):
name: str = Field(index=True, unique=True)
comics: list["Comic"] = Relationship(back_populates="publisher")
def __repr__(self):
return f'Publisher({self.id} {self.name})'
def __str__(self):
return self.__repr__()
class Comic(AbstractEntity, table=True):
title: str = Field(index=True, unique=True)
publisher_id: uuid.UUID | None = Field(default=None, foreign_key="publisher.id")
publisher: Publisher | None = Relationship(back_populates="comics")
current_order: int = Field(default=False)
completed: int = Field(nullable=False)
#issues: list["Issue"] = Relationship(back_populates="comic")
#story_arcs: list["StoryArc"] = Relationship(back_populates="comic")
#trade_paperbacks: list["TradePaperback"] = Relationship(back_populates="comic")
#volumes: list["Volume"] = Relationship(back_populates="comic")
#comic_works: list["ComicWork"] = Relationship(back_populates="comic")
class Artist(AbstractEntity, table=True):
name: str = Field(nullable=False)
comic_works: list["ComicWork"] = Relationship(back_populates="artist")
class Worktype(AbstractEntity, table=True):
name: str = Field(nullable=False, unique=True)
#comic_works: list["ComicWork"] = Relationship(back_populates="worktype")
def __repr__(self):
return f'Worktype({self.id} {self.version} {self.name} {len(self.comic_works)})'
def __str__(self):
return f'{self.name}({self.id})'
class ComicWork(AbstractEntity, table=True):
__tablename__ = "comic_work"
comic_id: uuid.UUID | None = Field(nullable=False, foreign_key="comic.id")
#comic: Comic = Relationship(back_populates="comic_works")
artist_id: uuid.UUID | None = Field(nullable=False, foreign_key="artist.id")
artist: Artist = Relationship(back_populates="comic_works")
work_type_id: uuid.UUID | None = Field(nullable=False, foreign_key="worktype.id")
#worktype = Relationship(back_populates="comic_works")
+28
View File
@@ -0,0 +1,28 @@
from sqlmodel import Field, Relationship, table
from uuid import UUID
from .base import AbstractEntity
class MediaActorFile(AbstractEntity, table=True):
__tablename__ = "media_actor_file"
media_actor_id: UUID = Field(nullable=False, foreign_key="media_actor.id")
media_file_id: UUID = Field(nullable=False, foreign_key="media_file.id")
class MediaFile(AbstractEntity, table=True):
__tablename__ = "media_file"
cloud_link: str = Field(nullable=True, max_length=255)
file_name: str = Field(nullable=True, max_length=255)
path : str = Field(nullable=True, max_length=255)
title: str = Field(nullable=True, max_length=255)
url: str = Field(nullable=True, max_length=255)
actors : list["MediaActor"] = Relationship(back_populates="videos", link_model=MediaActorFile)
class MediaActor(AbstractEntity, table=True):
__tablename__ = "media_actor"
name: str = Field(nullable=True, max_length=255)
videos : list["MediaFile"] = Relationship(back_populates="actors", link_model=MediaActorFile)
+29
View File
@@ -0,0 +1,29 @@
[build-system]
requires = ["setuptools"]
[project]
name = "kontor-cli"
version = "0.1.0"
dependencies = [
"mariadb",
"sqlmodel",
"pathlib",
"platformdirs",
"pyyaml",
"beautifulsoup4",
]
requires-python = ">=3.10"
authors = [
{name = "Thomas Peetz", email = "thomas.peetz@thpeetz.de"}
]
maintainers = [
{name = "Thomas Peetz", email = "thomas.peetz@thpeetz.de"}
]
description = "CLI for Kontor application"
readme = "README.md"
classifiers = [
"Development Status :: 4 - Beta",
"Programming Language :: Python"
]
[project-scripts]
kontor = "kontor::main"
+8
View File
@@ -0,0 +1,8 @@
-r requirements.txt
pytest
pytest-cov
coverage
twine>=1.11.0
setuptools>=38.6.0
wheel>=0.31.0
@@ -4,3 +4,4 @@ pathlib
platformdirs
pyyaml
beautifulsoup4
sqlmodel
View File
+2 -1
View File
@@ -10,10 +10,11 @@ import requests
from bs4 import BeautifulSoup
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
import mariadb
from setup import get_database_cursors, create_tables, get_logger
from setup import get_database_cursors, get_logger
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument('--verbose', '-v', action='count', default=0)
parser.add_argument('--config', '-c', default='kontor-docker')
args = parser.parse_args()