154 lines
6.2 KiB
Python
154 lines
6.2 KiB
Python
"""
|
|
copy data from JSON to Postgres
|
|
"""
|
|
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
|
|
from pathlib import Path
|
|
from typing import Dict, List
|
|
|
|
from config import get_logger, get_database_cursors
|
|
import json
|
|
import psycopg2
|
|
from psycopg2.sql import SQL
|
|
|
|
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
|
|
parser.add_argument('--verbose', '-v', action='count', default=0)
|
|
parser.add_argument('--config', '-c', default='kontor-docker')
|
|
parser.add_argument('--file', '-f', default='~/.sync/media/data.json')
|
|
args = parser.parse_args()
|
|
|
|
def copy_data(postgres_conn, data_file: Path, log):
|
|
postgres_cursor = postgres_conn.cursor()
|
|
import_file = Path(data_file)
|
|
if not import_file.exists():
|
|
log.info(f"File {data_file} does not exist. Do nothing.")
|
|
return
|
|
log.info("read json file")
|
|
with open(data_file, 'r') as json_file:
|
|
json_load = json.load(json_file)
|
|
postgres_cursor.execute("SET session_replication_role='replica'")
|
|
for table in json_load:
|
|
log.info(f"{table}: {len(json_load[table])}")
|
|
# result[table] = import_table(table, json_load[table])
|
|
truncate_statement = 'TRUNCATE {} CASCADE'.format(table)
|
|
#log.info(f"truncate: {truncate_statement}")
|
|
try:
|
|
postgres_cursor.execute(truncate_statement)
|
|
except:
|
|
log.info(f"statement: {insert_statement} FAILED")
|
|
items = json_load[table]
|
|
for item in items:
|
|
#log.info(f"item: {item}")
|
|
values = []
|
|
columns = []
|
|
for (key, value) in item.items():
|
|
columns.append(key)
|
|
values.append(value)
|
|
row = tuple(values)
|
|
#log.info(f"values: {row}")
|
|
insert_statement = 'INSERT INTO {}({}) VALUES({})'.format(table, ', '.join(columns), ', '.join(['%s']*len(columns)))
|
|
#log.info(f"statement: {insert_statement}")
|
|
try:
|
|
postgres_cursor.execute(SQL(insert_statement), row)
|
|
postgres_conn.commit()
|
|
except:
|
|
log.info(f'insert failed with {insert_statement}')
|
|
postgres_cursor.execute("SET session_replication_role='origin'")
|
|
|
|
|
|
def load_json(data_file, log) -> dict:
|
|
import_file = Path(data_file)
|
|
if not import_file.exists():
|
|
log.info(f"File {data_file} does not exist. Do nothing.")
|
|
return
|
|
log.info("read json file")
|
|
with open(data_file, 'r') as json_file:
|
|
json_load = json.load(json_file)
|
|
return json_load
|
|
|
|
|
|
def insert_data(postgres_conn, data: dict, log):
|
|
postgres_cursor = postgres_conn.cursor()
|
|
log.info("insert data")
|
|
table_list = []
|
|
#table_list = ['worktype', 'artist', 'publisher', 'volume', 'comic', 'issue', 'story_arc', 'trade_paperback', 'comic_work']
|
|
#table_list.extend(['sport', 'team', 'field_position', 'vendor', 'player', 'rooster', 'card_set', 'card'])
|
|
#table_list.extend(['card'])
|
|
#table_list.extend(['media_file', 'media_video', 'media_actor', 'media_actor_file', 'media_article'])
|
|
#table_list.extend(['media_actor_file'])
|
|
#table_list.extend(['profile', 'permission', 'token', 'assignment'])
|
|
#table_list.extend(['mail', 'mail_account', 'module_data', 'meta_data_table', 'meta_data_column'])
|
|
table_list.extend(['meta_data_column'])
|
|
#table_list.extend(['book', 'author', 'article', 'bookshelf_publisher', 'book_author', 'article_author'])
|
|
#if len(table_list) != 37:
|
|
# log.info(f"number of tables incorrect: {len(table_list)}")
|
|
# return
|
|
for table in table_list:
|
|
log.info(f"{table}: {len(data[table])}")
|
|
truncate_statement = 'DELETE FROM {}'.format(table)
|
|
log.info(f"truncate: {truncate_statement}")
|
|
try:
|
|
postgres_cursor.execute(truncate_statement)
|
|
postgres_conn.commit()
|
|
except:
|
|
log.info(f"statement: {truncate_statement} FAILED")
|
|
items = data[table]
|
|
for item in items:
|
|
# log.info(f"item: {item}")
|
|
values = []
|
|
columns = []
|
|
for (key, value) in item.items():
|
|
columns.append(key)
|
|
values.append(value)
|
|
row = tuple(values)
|
|
# log.info(f"values: {row}")
|
|
insert_statement = 'INSERT INTO {}({}) VALUES({})'.format(table, ', '.join(columns),
|
|
', '.join(['%s'] * len(columns)))
|
|
# log.info(f"statement: {insert_statement}")
|
|
try:
|
|
postgres_cursor.execute(SQL(insert_statement), row)
|
|
postgres_conn.commit()
|
|
except:
|
|
log.info(f'insert failed with {insert_statement}')
|
|
|
|
|
|
def parse_table_order(data: dict, log):
|
|
log.info("parse_table_order")
|
|
table_refs: Dict[str, List[str]] = {}
|
|
for table in data:
|
|
log.info(f"{table}: {len(data[table])}")
|
|
items = data[table]
|
|
table_refs[table] = []
|
|
if len(items) == 0:
|
|
continue
|
|
item = items[0]
|
|
for key, _ in item.items():
|
|
if key.endswith('_id'):
|
|
ref = key[0:-3]
|
|
log.info(f"table {table} has reference to {ref}")
|
|
if table in table_refs:
|
|
table_refs[table].append(ref)
|
|
else:
|
|
table_refs[table] = [ref]
|
|
log.info(f"parsed refs: {table_refs}")
|
|
table_order = []
|
|
for table in table_refs:
|
|
if len(table_refs[table]) == 0:
|
|
log.info(f"insert {table} at beginning")
|
|
table_order.insert(0, table)
|
|
else:
|
|
log.info(f"insert {table} at end")
|
|
table_order.append(table)
|
|
log.info(f"table_list: {len(table_order)}: {table_order}")
|
|
|
|
if __name__ == '__main__':
|
|
logger = get_logger(args.verbose, args.config)
|
|
logger.info('kontor.json_to_postgres started')
|
|
_, _, p_conn = get_database_cursors(logger, args.config)
|
|
data = load_json(args.file, logger)
|
|
#parse_table_order(data, logger)
|
|
insert_data(p_conn, data, logger)
|
|
#copy_data(p_conn, args.file, logger)
|
|
p_conn.close()
|
|
logger.info('kontor.json_to_postgres finished')
|
|
|