Files
kontor/kontor-scripts/json_to_postgres.py
T
Thomas Peetz ace568a800 setup changed
2025-05-02 13:58:09 +02:00

154 lines
6.2 KiB
Python

"""
copy data from JSON to Postgres
"""
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from pathlib import Path
from typing import Dict, List
from config import get_logger, get_database_cursors
import json
import psycopg2
from psycopg2.sql import SQL
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument('--verbose', '-v', action='count', default=0)
parser.add_argument('--config', '-c', default='kontor-docker')
parser.add_argument('--file', '-f', default='~/.sync/media/data.json')
args = parser.parse_args()
def copy_data(postgres_conn, data_file: Path, log):
postgres_cursor = postgres_conn.cursor()
import_file = Path(data_file)
if not import_file.exists():
log.info(f"File {data_file} does not exist. Do nothing.")
return
log.info("read json file")
with open(data_file, 'r') as json_file:
json_load = json.load(json_file)
postgres_cursor.execute("SET session_replication_role='replica'")
for table in json_load:
log.info(f"{table}: {len(json_load[table])}")
# result[table] = import_table(table, json_load[table])
truncate_statement = 'TRUNCATE {} CASCADE'.format(table)
#log.info(f"truncate: {truncate_statement}")
try:
postgres_cursor.execute(truncate_statement)
except:
log.info(f"statement: {insert_statement} FAILED")
items = json_load[table]
for item in items:
#log.info(f"item: {item}")
values = []
columns = []
for (key, value) in item.items():
columns.append(key)
values.append(value)
row = tuple(values)
#log.info(f"values: {row}")
insert_statement = 'INSERT INTO {}({}) VALUES({})'.format(table, ', '.join(columns), ', '.join(['%s']*len(columns)))
#log.info(f"statement: {insert_statement}")
try:
postgres_cursor.execute(SQL(insert_statement), row)
postgres_conn.commit()
except:
log.info(f'insert failed with {insert_statement}')
postgres_cursor.execute("SET session_replication_role='origin'")
def load_json(data_file, log) -> dict:
import_file = Path(data_file)
if not import_file.exists():
log.info(f"File {data_file} does not exist. Do nothing.")
return
log.info("read json file")
with open(data_file, 'r') as json_file:
json_load = json.load(json_file)
return json_load
def insert_data(postgres_conn, data: dict, log):
postgres_cursor = postgres_conn.cursor()
log.info("insert data")
table_list = []
#table_list = ['worktype', 'artist', 'publisher', 'volume', 'comic', 'issue', 'story_arc', 'trade_paperback', 'comic_work']
#table_list.extend(['sport', 'team', 'field_position', 'vendor', 'player', 'rooster', 'card_set', 'card'])
#table_list.extend(['card'])
#table_list.extend(['media_file', 'media_video', 'media_actor', 'media_article', 'media_actor_file'])
#table_list.extend(['media_actor_file'])
table_list.extend(['profile', 'permission', 'token', 'assignment'])
#table_list.extend(['mail', 'mail_account', 'module_data', 'meta_data_table', 'meta_data_column'])
table_list.extend(['meta_data_table', 'meta_data_column'])
#table_list.extend(['book', 'author', 'article', 'bookshelf_publisher', 'book_author', 'article_author'])
#if len(table_list) != 37:
# log.info(f"number of tables incorrect: {len(table_list)}")
# return
for table in table_list:
log.info(f"{table}: {len(data[table])}")
truncate_statement = 'DELETE FROM {}'.format(table)
log.info(f"truncate: {truncate_statement}")
try:
postgres_cursor.execute(truncate_statement)
postgres_conn.commit()
except:
log.info(f"statement: {truncate_statement} FAILED")
items = data[table]
for item in items:
# log.info(f"item: {item}")
values = []
columns = []
for (key, value) in item.items():
columns.append(key)
values.append(value)
row = tuple(values)
# log.info(f"values: {row}")
insert_statement = 'INSERT INTO {}({}) VALUES({})'.format(table, ', '.join(columns),
', '.join(['%s'] * len(columns)))
# log.info(f"statement: {insert_statement}")
try:
postgres_cursor.execute(SQL(insert_statement), row)
postgres_conn.commit()
except:
log.info(f'insert failed with {insert_statement}')
def parse_table_order(data: dict, log):
log.info("parse_table_order")
table_refs: Dict[str, List[str]] = {}
for table in data:
log.info(f"{table}: {len(data[table])}")
items = data[table]
table_refs[table] = []
if len(items) == 0:
continue
item = items[0]
for key, _ in item.items():
if key.endswith('_id'):
ref = key[0:-3]
log.info(f"table {table} has reference to {ref}")
if table in table_refs:
table_refs[table].append(ref)
else:
table_refs[table] = [ref]
log.info(f"parsed refs: {table_refs}")
table_order = []
for table in table_refs:
if len(table_refs[table]) == 0:
log.info(f"insert {table} at beginning")
table_order.insert(0, table)
else:
log.info(f"insert {table} at end")
table_order.append(table)
log.info(f"table_list: {len(table_order)}: {table_order}")
if __name__ == '__main__':
logger = get_logger(args.verbose, args.config)
logger.info('kontor.json_to_postgres started')
_, _, p_conn = get_database_cursors(logger, args.config)
data = load_json(args.file, logger)
#parse_table_order(data, logger)
insert_data(p_conn, data, logger)
#copy_data(p_conn, args.file, logger)
p_conn.close()
logger.info('kontor.json_to_postgres finished')