""" import data from json file to PostgreSQL """ from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter from datetime import datetime, date from typing import Any, Dict, List import os import json from pathlib import Path from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, Session from db.models.base import Base from db.models import registry from psycopg2.errors import NotNullViolation from config import get_logger parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) parser.add_argument('--verbose', '-v', action='count', default=0) parser.add_argument('--dry-run', '-m', action='store_true') parser.add_argument('--cleanup', '-c', action='store_true') parser.add_argument('--file', '-f', default='~/data.json') args = parser.parse_args() DB_USER: str = os.getenv("DB_USER", "kontor") DB_PASSWORD: str = os.getenv("DB_PASSWORD", "kontor") DB_SERVER: str = os.getenv("DB_SERVER", "127.0.0.1") DB_PORT: int = int(os.getenv("DB_PORT", 5432)) DB_DBNAME: str = os.getenv("DB_DBNAME", "kontor") DATABASE_URL: str = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_SERVER}:{DB_PORT}/{DB_DBNAME}" def cleanup_database(db: Session, log, dry_run: bool): log.debug("cleanup_database") # get tables from registry for table in registry: log.info(f"{table}") model = registry[table] entries = db.query(model).all() for entry in entries: if not dry_run: db.delete(entry) db.commit() def load_data(filename: str, log) -> Dict[str, List[Dict[str, Any]]]: log.debug("load_data") import_file = Path(filename) if not import_file.exists(): log.info(f"File {filename} does not exist. Do nothing.") raise FileNotFoundError() log.info("read json file") with open(filename, 'r') as json_file: json_load = json.load(json_file) return json_load def get_ids(items: List[Any]) -> List[str]: result: List[str] = [] for item in items: result.append(item.id) return result def update_item(db: Session, import_data: Dict[str, Any], item: Any, dry_run: bool, log): for (key, value) in import_data.items(): existing_value = getattr(item, str(key)) update: bool = has_changed(existing_value, value, log) #if key == 'published_on': # log.info(f"{type(value)}:{value} != {type(existing_value)}:{existing_value} : {update}") if update: log.info(f"update {key}({existing_value}) with {value}") if not dry_run: if value == 'None': setattr(item, str(key), None) else: setattr(item, str(key), value) db.add(item) db.commit() def has_changed(existing_data: Any, import_data: str, log) -> bool: if existing_data is None and import_data == 'None': return False if isinstance(existing_data, str): return existing_data != import_data if isinstance(existing_data, date): if len(import_data) > 19: import_date = datetime.strptime(import_data, "%Y-%m-%d %H:%M:%S.%f") log.debug(f"{type(existing_data)}:{existing_data} == {import_date} : {existing_data != import_date}") return existing_data != import_date if len(import_data) > 10: import_date = datetime.strptime(import_data, "%Y-%m-%d %H:%M:%S") log.debug(f"{type(existing_data)}:{existing_data} == {import_date} : {existing_data != import_date}") return existing_data != import_date return existing_data.strftime("%Y-%m-%d") != import_data return existing_data != import_data def item_import(db: Session, import_data: Dict[str, Any], dry_run: bool, log): log.info(f"import {import_data}") if not dry_run: log.debug(f"model: {repr(model)} {import_data}") try: new_item = model() new_item.import_dict(import_data) log.info(f"new item: {new_item}") db.add(new_item) db.commit() except NotNullViolation as notnull: log.info(f"import failed: {notnull} {import_data}") except Exception as error: log.info(f"import failed: {error}") if __name__ == '__main__': logger = get_logger(args.verbose, "kontor") logger.info('kontor.import started') engine = create_engine(DATABASE_URL) Base.metadata.create_all(bind=engine, checkfirst=True) SessionLocal = sessionmaker(bind=engine) with SessionLocal() as db: if args.cleanup: cleanup_database(db, logger, args.dry_run) data: Dict[str, List[Dict[str, Any]]] = load_data(args.file, logger) table_list: List[str] = list(data.keys()) logger.debug(f"Liste der Tabellen: {table_list}") sorted_table_list: List[str] = table_list for tablename in sorted_table_list: model = registry[tablename] existing_items = db.query(model).all() existing_ids: List[str] = get_ids(existing_items) logger.debug(f"found {len(existing_items)} for table {tablename}") import_items: List[Dict[str, Any]] = data[tablename] for import_item in import_items: item_id: str = import_item['id'] if item_id in existing_ids: logger.debug(f"update {item_id}") existing_item = db.get(model, item_id) update_item(db, import_item, existing_item, args.dry_run, logger) existing_ids.remove(item_id) else: logger.debug(f"import {item_id}") item_import(db, import_item, args.dry_run, logger) logger.debug(f"remaining items for {tablename}: {len(existing_ids)}") if len(existing_ids) > 0: logger.info(f"remaining items for {tablename}: {existing_ids}") logger.info('kontor.import finished')