""" import data from json file to PostgreSQL """ from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter from datetime import datetime from typing import Any, AnyStr, Dict, List import os import json from pathlib import Path from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, Session from db.models.base import Base from db.models import registry from psycopg2.errors import NotNullViolation from config import get_logger parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) parser.add_argument('--verbose', '-v', action='count', default=0) parser.add_argument('--dry-run', '-m', action='store_true') parser.add_argument('--cleanup', '-c', action='store_true') parser.add_argument('--file', '-f', default='~/data.json') args = parser.parse_args() DB_USER: str = os.getenv("DB_USER", "kontor") DB_PASSWORD: str = os.getenv("DB_PASSWORD", "kontor") DB_SERVER: str = os.getenv("DB_SERVER", "127.0.0.1") DB_PORT: int = int(os.getenv("DB_PORT", 5432)) DB_DBNAME: str = os.getenv("DB_DBNAME", "kontor") DATABASE_URL: str = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_SERVER}:{DB_PORT}/{DB_DBNAME}" def cleanup_database(db: Session, log, dry_run: bool): log.debug("cleanup_database") # get tables from registry for table in registry: log.info(f"{table}") model = registry[table] entries = db.query(model).all() for entry in entries: if not dry_run: db.delete(entry) db.commit() def load_data(filename: str, log) -> Dict[AnyStr, Dict[AnyStr, Any]]: log.debug("load_data") import_file = Path(filename) if not import_file.exists(): log.info(f"File {filename} does not exist. Do nothing.") raise FileNotFoundError() log.info("read json file") with open(filename, 'r') as json_file: json_load = json.load(json_file) return json_load def get_ids(items: List[Any]) -> List[AnyStr]: result: List[AnyStr] = [] for item in items: result.append(item.id) return result def update_item(db: Session, import_data: Dict[AnyStr, Any], item: Any, dry_run: bool, log): for (key, value) in import_data.items(): existing_value = getattr(item, str(key)) update: bool = False if isinstance(existing_value, datetime): if str(existing_value) != str(value): update = True else: if existing_value != value: update = True if update: if not dry_run: log.debug(f"update {key}({existing_value}) with {value}") setattr(item, str(key), value) db.add(item) db.commit() def item_import(db: Session, import_data: Dict[AnyStr, Any], dry_run: bool, log): log.debug(f"import {import_data}") if not dry_run: log.info(f"model: {repr(model)} {import_data}") try: new_item = model() new_item.import_dict(import_data) log.info(f"new item: {new_item}") db.add(new_item) db.commit() except NotNullViolation as notnull: log.info(f"import failed: {notnull} {import_data}") except Exception as error: log.info(f"import failed: {error}") if __name__ == '__main__': logger = get_logger(args.verbose, "kontor") logger.info('kontor.import started') engine = create_engine(DATABASE_URL) Base.metadata.create_all(bind=engine, checkfirst=True) SessionLocal = sessionmaker(bind=engine) with SessionLocal() as db: if args.cleanup: cleanup_database(db, logger, args.dry_run) data = load_data(args.file, logger) table_list: List = list(data.keys()) logger.info(f"Liste der Tabellen: {table_list}") sorted_table_list: List = table_list for tablename in sorted_table_list: model = registry[tablename] existing_items = db.query(model).all() existing_ids = get_ids(existing_items) logger.debug(f"found {len(existing_items)} for table {tablename}") import_items = data[tablename] for import_item in import_items: if import_item['id'] in existing_ids: logger.debug(f"update {import_item['id']}") existing_item = db.get(model, import_item['id']) update_item(db, import_item, existing_item, args.dry_run, logger) existing_ids.remove(import_item['id']) else: logger.debug(f"import {import_item['id']}") item_import(db, import_item, args.dry_run, logger) logger.info(f"remaining items for {tablename}: {len(existing_ids)}") logger.info('kontor.import finished')