add scripts from repository python-scripts
This commit is contained in:
@@ -0,0 +1,116 @@
|
||||
"""
|
||||
Setup database connections
|
||||
"""
|
||||
import sqlite3
|
||||
import mariadb
|
||||
import logging.config
|
||||
from platformdirs import PlatformDirs
|
||||
from pathlib import Path
|
||||
import yaml
|
||||
|
||||
|
||||
def get_database_cursors(log):
|
||||
dirs = PlatformDirs("kontor")
|
||||
database_config = Path(dirs.user_config_dir, 'database-config.yaml')
|
||||
with open(database_config, 'rt') as f:
|
||||
db_config = yaml.safe_load(f.read())
|
||||
sqlite_db = db_config["sqlite"]["file"]
|
||||
log.info('using SQLite3 database {}'.format(sqlite_db))
|
||||
sqlite_conn = sqlite3.connect(sqlite_db, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
|
||||
mariadb_conn = mariadb.connect(
|
||||
host=db_config['mariadb']['host'],
|
||||
port=db_config['mariadb']['port'],
|
||||
user=db_config['mariadb']['user'],
|
||||
password=db_config['mariadb']['password'],
|
||||
database=db_config['mariadb']['database']
|
||||
)
|
||||
return sqlite_conn, mariadb_conn
|
||||
|
||||
|
||||
def create_tables(sqlite_conn, logger, recreate_db, scripts):
|
||||
logger.info('create_tables')
|
||||
for table_id in scripts:
|
||||
create_statement = scripts[table_id]['create']
|
||||
drop_statement = scripts[table_id]['drop']
|
||||
logger.debug(create_statement)
|
||||
cursor = sqlite_conn.cursor()
|
||||
if recreate_db:
|
||||
logger.debug(drop_statement)
|
||||
cursor.execute(drop_statement)
|
||||
cursor.execute(create_statement)
|
||||
|
||||
|
||||
def get_logger(level):
|
||||
dirs = PlatformDirs("kontor")
|
||||
logging_config = Path(dirs.user_config_dir, 'logging-config.yaml')
|
||||
with open(logging_config, 'rt') as f:
|
||||
config = yaml.safe_load(f.read())
|
||||
logging.config.dictConfig(config)
|
||||
logger = logging.getLogger('development')
|
||||
if level is not None:
|
||||
match level:
|
||||
case 0:
|
||||
logger.setLevel(logging.INFO)
|
||||
case 1:
|
||||
logger.setLevel(logging.DEBUG)
|
||||
case _:
|
||||
logger.setLevel(logging.CRITICAL)
|
||||
return logger
|
||||
|
||||
|
||||
def get_meta_data(mariadb_conn):
|
||||
mariadb_cursor = mariadb_conn.cursor()
|
||||
select_statement = "SELECT id, table_name FROM meta_data_table"
|
||||
mariadb_cursor.execute(select_statement)
|
||||
rows = mariadb_cursor.fetchall()
|
||||
meta_data = {}
|
||||
for (identifier, table_name) in rows:
|
||||
table_data = {"name": table_name}
|
||||
mariadb_cursor.execute("SELECT column_name, column_sync_name, column_type, column_modifier, column_order FROM meta_data_column WHERE table_id=?", (identifier, ))
|
||||
column_rows = mariadb_cursor.fetchall()
|
||||
column_list = []
|
||||
for (column_name, column_sync_name, column_type, column_modifier, column_order) in column_rows:
|
||||
column_data = {"column_name": column_name, "column_sync_name": column_sync_name, "column_type": column_type,
|
||||
"column_modifier": column_modifier, "column_order": column_order}
|
||||
column_list.append(column_data)
|
||||
# logger.info(column_list)
|
||||
table_data["columns"] = column_list
|
||||
meta_data[identifier] = table_data
|
||||
return meta_data
|
||||
|
||||
|
||||
def get_scripts(meta_data, logger):
|
||||
scripts_map = {}
|
||||
for table_id in meta_data:
|
||||
table_scripts = {}
|
||||
m_columns = []
|
||||
s_columns = []
|
||||
columns = []
|
||||
for column_data in meta_data[table_id]["columns"]:
|
||||
column_line = "{} {}".format(column_data["column_sync_name"], column_data["column_type"])
|
||||
if column_data["column_modifier"]:
|
||||
column_line += " " + column_data["column_modifier"]
|
||||
columns.append(column_line)
|
||||
m_columns.append(column_data['column_name'])
|
||||
s_columns.append(column_data['column_sync_name'])
|
||||
table_name = meta_data[table_id]["name"]
|
||||
create_statement = "CREATE TABLE IF NOT EXISTS {} ({});".format(table_name, ", ".join(columns))
|
||||
drop_statement = 'DROP TABLE IF EXISTS {}'.format(table_name)
|
||||
select_mariadb_statement = 'SELECT {} FROM {}'.format(', '.join(m_columns), table_name)
|
||||
select_sqlite_statement = 'SELECT {} FROM {}'.format(', '.join(s_columns), table_name)
|
||||
insert_sqlite_statement = 'INSERT INTO {}({}) VALUES({})'.format(table_name, ', '.join(s_columns), ', '.join(['?']*len(s_columns)))
|
||||
insert_mariadb_statement = 'INSERT INTO {}({}) VALUES({})'.format(table_name, ', '.join(m_columns), ', '.join(['?']*len(m_columns)))
|
||||
truncate_mariadb_statement = 'TRUNCATE {}'.format(table_name)
|
||||
#logger.debug(create_statement)
|
||||
#logger.debug(select_mariadb_statement)
|
||||
table_scripts["create"] = create_statement
|
||||
table_scripts["drop"] = drop_statement
|
||||
table_scripts["select_mariadb"] = select_mariadb_statement
|
||||
table_scripts["select_sqlite"] = select_sqlite_statement
|
||||
table_scripts["insert_sqlite"] = insert_sqlite_statement
|
||||
table_scripts["insert_mariadb"] = insert_mariadb_statement
|
||||
table_scripts["truncate_mariadb"] = truncate_mariadb_statement
|
||||
table_scripts["count"] = "SELECT COUNT(*) FROM {}".format(table_name)
|
||||
table_scripts["name"] = table_name
|
||||
scripts_map[table_id] = table_scripts
|
||||
return scripts_map
|
||||
Reference in New Issue
Block a user