117 lines
5.0 KiB
Python
117 lines
5.0 KiB
Python
"""
|
|
Setup database connections
|
|
"""
|
|
import sqlite3
|
|
import mariadb
|
|
import logging.config
|
|
from platformdirs import PlatformDirs
|
|
from pathlib import Path
|
|
import yaml
|
|
|
|
|
|
def get_database_cursors(log, config: str):
|
|
dirs = PlatformDirs(config)
|
|
database_config = Path(dirs.user_config_dir, 'database-config.yaml')
|
|
with open(database_config, 'rt') as f:
|
|
db_config = yaml.safe_load(f.read())
|
|
sqlite_db = db_config["sqlite"]["file"]
|
|
log.info('using SQLite3 database {}'.format(sqlite_db))
|
|
sqlite_conn = sqlite3.connect(sqlite_db, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
|
|
mariadb_conn = mariadb.connect(
|
|
host=db_config['mariadb']['host'],
|
|
port=db_config['mariadb']['port'],
|
|
user=db_config['mariadb']['user'],
|
|
password=db_config['mariadb']['password'],
|
|
database=db_config['mariadb']['database']
|
|
)
|
|
return sqlite_conn, mariadb_conn
|
|
|
|
|
|
def create_tables(sqlite_conn, logger, recreate_db, scripts):
|
|
logger.info('create_tables')
|
|
for table_id in scripts:
|
|
create_statement = scripts[table_id]['create']
|
|
drop_statement = scripts[table_id]['drop']
|
|
logger.debug(create_statement)
|
|
cursor = sqlite_conn.cursor()
|
|
if recreate_db:
|
|
logger.debug(drop_statement)
|
|
cursor.execute(drop_statement)
|
|
cursor.execute(create_statement)
|
|
|
|
|
|
def get_logger(level, config: str):
|
|
dirs = PlatformDirs(config)
|
|
logging_config = Path(dirs.user_config_dir, 'logging-config.yaml')
|
|
with open(logging_config, 'rt') as f:
|
|
config = yaml.safe_load(f.read())
|
|
logging.config.dictConfig(config)
|
|
logger = logging.getLogger('development')
|
|
if level is not None:
|
|
match level:
|
|
case 0:
|
|
logger.setLevel(logging.INFO)
|
|
case 1:
|
|
logger.setLevel(logging.DEBUG)
|
|
case _:
|
|
logger.setLevel(logging.CRITICAL)
|
|
return logger
|
|
|
|
|
|
def get_meta_data(mariadb_conn):
|
|
mariadb_cursor = mariadb_conn.cursor()
|
|
select_statement = "SELECT id, table_name FROM meta_data_table"
|
|
mariadb_cursor.execute(select_statement)
|
|
rows = mariadb_cursor.fetchall()
|
|
meta_data = {}
|
|
for (identifier, table_name) in rows:
|
|
table_data = {"name": table_name}
|
|
mariadb_cursor.execute("SELECT column_name, column_sync_name, column_type, column_modifier, column_order FROM meta_data_column WHERE table_id=?", (identifier, ))
|
|
column_rows = mariadb_cursor.fetchall()
|
|
column_list = []
|
|
for (column_name, column_sync_name, column_type, column_modifier, column_order) in column_rows:
|
|
column_data = {"column_name": column_name, "column_sync_name": column_sync_name, "column_type": column_type,
|
|
"column_modifier": column_modifier, "column_order": column_order}
|
|
column_list.append(column_data)
|
|
# logger.info(column_list)
|
|
table_data["columns"] = column_list
|
|
meta_data[identifier] = table_data
|
|
return meta_data
|
|
|
|
|
|
def get_scripts(meta_data, logger):
|
|
scripts_map = {}
|
|
for table_id in meta_data:
|
|
table_scripts = {}
|
|
m_columns = []
|
|
s_columns = []
|
|
columns = []
|
|
for column_data in meta_data[table_id]["columns"]:
|
|
column_line = "{} {}".format(column_data["column_sync_name"], column_data["column_type"])
|
|
if column_data["column_modifier"]:
|
|
column_line += " " + column_data["column_modifier"]
|
|
columns.append(column_line)
|
|
m_columns.append(column_data['column_name'])
|
|
s_columns.append(column_data['column_sync_name'])
|
|
table_name = meta_data[table_id]["name"]
|
|
create_statement = "CREATE TABLE IF NOT EXISTS {} ({});".format(table_name, ", ".join(columns))
|
|
drop_statement = 'DROP TABLE IF EXISTS {}'.format(table_name)
|
|
select_mariadb_statement = 'SELECT {} FROM {}'.format(', '.join(m_columns), table_name)
|
|
select_sqlite_statement = 'SELECT {} FROM {}'.format(', '.join(s_columns), table_name)
|
|
insert_sqlite_statement = 'INSERT INTO {}({}) VALUES({})'.format(table_name, ', '.join(s_columns), ', '.join(['?']*len(s_columns)))
|
|
insert_mariadb_statement = 'INSERT INTO {}({}) VALUES({})'.format(table_name, ', '.join(m_columns), ', '.join(['?']*len(m_columns)))
|
|
truncate_mariadb_statement = 'TRUNCATE {}'.format(table_name)
|
|
#logger.debug(create_statement)
|
|
#logger.debug(select_mariadb_statement)
|
|
table_scripts["create"] = create_statement
|
|
table_scripts["drop"] = drop_statement
|
|
table_scripts["select_mariadb"] = select_mariadb_statement
|
|
table_scripts["select_sqlite"] = select_sqlite_statement
|
|
table_scripts["insert_sqlite"] = insert_sqlite_statement
|
|
table_scripts["insert_mariadb"] = insert_mariadb_statement
|
|
table_scripts["truncate_mariadb"] = truncate_mariadb_statement
|
|
table_scripts["count"] = "SELECT COUNT(*) FROM {}".format(table_name)
|
|
table_scripts["name"] = table_name
|
|
scripts_map[table_id] = table_scripts
|
|
return scripts_map
|