add sqlalchemy as orm tool

This commit is contained in:
Thomas Peetz
2025-01-10 17:39:54 +01:00
parent a1e8474149
commit 2ae11e24ef
10 changed files with 281 additions and 172 deletions
+134
View File
@@ -0,0 +1,134 @@
import mariadb
from sqlalchemy import create_engine, text, MetaData, join
from sqlalchemy.orm import DeclarativeBase, relationship, sessionmaker
from database.base import Base
from database.comic import Comic
from database.metadata import MetaDataTable, MetaDataColumn
class KontorDB:
def __init__(self, db_config):
self.db_conn = mariadb.connect(
host=db_config['mariadb']['host'],
port=db_config['mariadb']['port'],
user=db_config['mariadb']['user'],
password=db_config['mariadb']['password'],
database=db_config['mariadb']['database']
)
connect_string = ('mariadb+mariadbconnector://{}:{}@{}:{}/{}'
.format(
db_config['mariadb']['user'],
db_config['mariadb']['password'],
db_config['mariadb']['host'],
db_config['mariadb']['port'],
db_config['mariadb']['database']))
# engine = create_engine(connect_string, echo=True)
engine = create_engine(connect_string)
Base.metadata.create_all(bind=engine)
__session__ = sessionmaker(bind=engine)
self.session = __session__()
def get_table_id(self, table_name):
cursor = self.db_conn.cursor()
cursor.execute("SELECT id, created_date, last_modified_date FROM meta_data_table WHERE table_name=?",
(table_name,))
row = cursor.fetchone()
cursor.close()
# table_ids = self.session.query(MetaDataTable).filter(MetaDataTable.table_name == table_name).all()
# print(type(table_ids))
return row[0]
def get_table_names(self) -> list:
tables_names = []
cursor = self.db_conn.cursor()
cursor.execute("SELECT id, table_name from meta_data_table")
rows = cursor.fetchall().all
for row in rows:
print(row)
for (_, table_name) in rows:
tables_names.append(table_name)
cursor.close()
tables = self.session.query(MetaDataTable).all()
for table in tables:
print(table)
return tables_names
def get_column_meta_data(self, table_id: str, table_name: str):
cursor = self.db_conn.cursor()
meta_data = {}
cursor.execute(
"SELECT column_name, column_order, column_label FROM meta_data_column WHERE table_id=? AND is_shown is true ORDER bY column_order",
(table_id,))
rows = cursor.fetchall()
order = 0
for (column_name, column_order, column_label) in rows:
meta_data[order] = {'column': column_name, 'label': column_label, 'order': column_order}
order += 1
cursor.close()
columns = (self.session.query(MetaDataColumn).
join(MetaDataTable, MetaDataTable.id == MetaDataColumn.table_id).
filter(MetaDataTable.table_name == table_name).
filter(MetaDataColumn.is_shown is True).
order_by(MetaDataColumn.column_order).all())
print(columns)
for column in columns:
print(column)
result = repr(column)
if result is not None:
print(result)
# print(f"retrieved {len(rows)} columns, set {len(meta_data)} headers")
return meta_data
def get_filters(self, table_id):
cursor = self.db_conn.cursor()
filters = {}
cursor.execute(
"SELECT column_name, filter_label from meta_data_column WHERE table_id=? AND show_filter is true",
(table_id,))
rows = cursor.fetchall()
for row in rows:
filters[row[0]] = {'label': row[1], 'widget': None}
cursor.close()
# print(f"retrieved {len(rows)} filters: {filters}")
return filters
def get_data(self, table_name: str, columns: dict, where_clause: str) -> list:
data = []
cursor = self.db_conn.cursor()
cursor.execute(self.get_statement(table_name, columns, where_clause))
rows = cursor.fetchall()
print(len(rows))
for row in rows:
# print(f"KontorDB.get_data: {row}")
data.append(list(row))
cursor.close()
print(f"KontorDB.getData: return {len(data)}")
if table_name == 'comic' and len(where_clause) == 0:
data.clear()
comics = self.session.query(Comic).all()
for item in comics:
# print(item)
row = []
for order in columns.keys():
column_name = columns[order]['column']
if column_name == 'publisher_id':
row.append(item.publisher.name)
else:
row.append(getattr(item, column_name))
# print(repr(row))
data.append(row)
return data
def get_statement(self, table: str, header: dict, where_clause):
columns = ""
for index, column in header.items():
if index > 0:
columns += ", "
columns += column['column']
if len(columns) == 0:
columns = "*"
statement = f"SELECT {columns} FROM {table} {where_clause}"
print(f"{statement=}")
return statement
+7
View File
@@ -0,0 +1,7 @@
from sqlalchemy.orm import DeclarativeBase, relationship, sessionmaker
class Base(DeclarativeBase):
pass
+40
View File
@@ -0,0 +1,40 @@
from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String
from sqlalchemy.dialects.mysql import BIT
from sqlalchemy.orm import relationship
from database.base import Base
class Publisher(Base):
__tablename__ = "publisher"
id = Column(String, primary_key=True)
created_date = Column(DateTime)
last_modified_date = Column(DateTime)
version = Column(Integer)
name = Column(String(length=255))
comics = relationship("Comic")
def __repr__(self):
return f'Publisher({self.id} {self.name})'
def __str__(self):
return self.__repr__()
class Comic(Base):
__tablename__ = 'comic'
id = Column(String, primary_key=True)
created_date = Column(DateTime)
last_modified_date = Column(DateTime)
version = Column(Integer)
title = Column(String(length=255))
publisher_id = Column(String, ForeignKey('publisher.id'))
publisher = relationship("Publisher", back_populates="comics")
current_order = Column(BIT(1))
completed = Column(BIT(1))
def __repr__(self):
return f'Comic({self.id} {self.version} {self.title} {self.publisher.name})'
def __str__(self):
return f'{self.title}({self.id})'
+25
View File
@@ -0,0 +1,25 @@
from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from database.base import Base
class MediaFile(Base):
__tablename__ = 'media_file'
id = Column(String, primary_key=True)
created_date = Column(DateTime)
last_modified_date = Column(DateTime)
version = Column(Integer)
cloud_link = Column(String(255))
file_name = Column(String(255))
path = Column(String(255))
review = Column(Boolean, default=True)
title = Column(String(255))
url = Column(String(255))
should_download = Column(Boolean, default=True)
def __repr__(self):
return f'MediaFile({self.id} {self.title} {self.title})'
def __str__(self):
return f'{self.title}({self.id})'
+49
View File
@@ -0,0 +1,49 @@
from sqlalchemy import Column, String, ForeignKey, DateTime, Integer, Boolean
from sqlalchemy.orm import relationship
from database import Base
class MetaDataTable(Base):
__tablename__ = 'meta_data_table'
id = Column(String, primary_key=True)
created_date = Column(DateTime)
last_modified_date = Column(DateTime)
version = Column(Integer)
table_name = Column(String(255), unique=True)
table_columns = relationship("MetaDataColumn")
def __repr__(self):
print(f'MetaDataTable({self.id} {self.table_name})')
def __str__(self):
print(f'{self.table_name}({self.id})')
class MetaDataColumn(Base):
__tablename__ = 'meta_data_column'
id = Column(String, primary_key=True)
created_date = Column(DateTime)
last_modified_date = Column(DateTime)
version = Column(Integer)
column_modifier = Column(String(255), nullable=True)
column_name = Column(String(255))
column_order = Column(Integer)
column_sync_name = Column(String(255))
column_type = Column(String(255))
table_id = Column(String, ForeignKey('meta_data_table.id'))
table = relationship("MetaDataTable", back_populates="table_columns")
column_label = Column(String(255))
filter_label = Column(String(255))
is_shown = Column(Boolean)
show_filter = Column(Boolean)
def __repr__(self):
if self.column_name is None:
print(f'MetaDataColumn({self.id} {self.table.table_name}.__)')
else:
print(f'MetaDataColumn({self.id} {self.table.table_name}.{self.column_name})')
def __str__(self):
print(f'{self.column_name}({self.id})')