""" copy data from JSON to Postgres """ from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter from pathlib import Path from typing import Dict, List from config import get_logger, get_database_cursors import json from psycopg2.sql import SQL parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) parser.add_argument('--verbose', '-v', action='count', default=0) parser.add_argument('--config', '-c', default='kontor-docker') parser.add_argument('--file', '-f', default='~/.sync/media/data.json') args = parser.parse_args() def copy_data(postgres_conn, data_file: Path, log): postgres_cursor = postgres_conn.cursor() import_file = Path(data_file) if not import_file.exists(): log.info(f"File {data_file} does not exist. Do nothing.") return log.info("read json file") with open(data_file, 'r') as json_file: json_load = json.load(json_file) postgres_cursor.execute("SET session_replication_role='replica'") for table in json_load: log.info(f"{table}: {len(json_load[table])}") # result[table] = import_table(table, json_load[table]) truncate_statement = 'TRUNCATE {} CASCADE'.format(table) #log.info(f"truncate: {truncate_statement}") try: postgres_cursor.execute(truncate_statement) except: log.info(f"statement: {insert_statement} FAILED") items = json_load[table] for item in items: #log.info(f"item: {item}") values = [] columns = [] for (key, value) in item.items(): columns.append(key) values.append(value) row = tuple(values) #log.info(f"values: {row}") insert_statement = 'INSERT INTO {}({}) VALUES({})'.format(table, ', '.join(columns), ', '.join(['%s']*len(columns))) #log.info(f"statement: {insert_statement}") try: postgres_cursor.execute(SQL(insert_statement), row) postgres_conn.commit() except: log.info(f'insert failed with {insert_statement}') postgres_cursor.execute("SET session_replication_role='origin'") def load_json(data_file, log) -> dict: import_file = Path(data_file) if not import_file.exists(): log.info(f"File {data_file} does not exist. Do nothing.") return log.info("read json file") with open(data_file, 'r') as json_file: json_load = json.load(json_file) return json_load def insert_data(postgres_conn, data: dict, log): postgres_cursor = postgres_conn.cursor() log.info("insert data") table_list = [] #table_list = ['worktype', 'artist', 'publisher', 'volume', 'comic', 'issue', 'story_arc', 'trade_paperback', 'comic_work'] #table_list.extend(['sport', 'team', 'field_position', 'vendor', 'player', 'rooster', 'card_set', 'card']) #table_list.extend(['card']) #table_list.extend(['media_file', 'media_video', 'media_actor', 'media_article', 'media_actor_file']) #table_list.extend(['media_actor_file']) table_list.extend(['profile', 'permission', 'token', 'assignment']) #table_list.extend(['mail', 'mail_account', 'module_data', 'meta_data_table', 'meta_data_column']) table_list.extend(['meta_data_table', 'meta_data_column']) #table_list.extend(['book', 'author', 'article', 'bookshelf_publisher', 'book_author', 'article_author']) #if len(table_list) != 37: # log.info(f"number of tables incorrect: {len(table_list)}") # return for table in table_list: log.info(f"{table}: {len(data[table])}") truncate_statement = 'DELETE FROM {}'.format(table) log.info(f"truncate: {truncate_statement}") try: postgres_cursor.execute(truncate_statement) postgres_conn.commit() except: log.info(f"statement: {truncate_statement} FAILED") items = data[table] for item in items: # log.info(f"item: {item}") values = [] columns = [] for (key, value) in item.items(): columns.append(key) values.append(value) row = tuple(values) # log.info(f"values: {row}") insert_statement = 'INSERT INTO {}({}) VALUES({})'.format(table, ', '.join(columns), ', '.join(['%s'] * len(columns))) # log.info(f"statement: {insert_statement}") try: postgres_cursor.execute(SQL(insert_statement), row) postgres_conn.commit() except: log.info(f'insert failed with {insert_statement}') def parse_table_order(data: dict, log): log.info("parse_table_order") table_refs: Dict[str, List[str]] = {} for table in data: log.info(f"{table}: {len(data[table])}") items = data[table] table_refs[table] = [] if len(items) == 0: continue item = items[0] for key, _ in item.items(): if key.endswith('_id'): ref = key[0:-3] log.info(f"table {table} has reference to {ref}") if table in table_refs: table_refs[table].append(ref) else: table_refs[table] = [ref] log.info(f"parsed refs: {table_refs}") table_order = [] for table in table_refs: if len(table_refs[table]) == 0: log.info(f"insert {table} at beginning") table_order.insert(0, table) else: log.info(f"insert {table} at end") table_order.append(table) log.info(f"table_list: {len(table_order)}: {table_order}") if __name__ == '__main__': logger = get_logger(args.verbose, args.config) logger.info('kontor.json_to_postgres started') _, _, p_conn = get_database_cursors(logger, args.config) data = load_json(args.file, logger) #parse_table_order(data, logger) insert_data(p_conn, data, logger) #copy_data(p_conn, args.file, logger) p_conn.close() logger.info('kontor.json_to_postgres finished')