""" import data from json file to MariaDB """ from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter from typing import Any, AnyStr, Dict, List import os import json from pathlib import Path from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, Session from db.models.base import Base from db.models import registry from config import get_logger parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) parser.add_argument('--verbose', '-v', action='count', default=0) parser.add_argument('--dry-run', '-m', action='store_true') parser.add_argument('--cleanup', '-c', action='store_true') parser.add_argument('--file', '-f', default='~/data.json') args = parser.parse_args() DB_USER: str = os.getenv("DB_USER", "kontor") DB_PASSWORD: str = os.getenv("DB_PASSWORD", "kontor") DB_SERVER: str = os.getenv("DB_SERVER", "127.0.0.1") DB_PORT: int = int(os.getenv("DB_PORT", 5432)) DB_DBNAME: str = os.getenv("DB_DBNAME", "kontor") DATABASE_URL: str = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_SERVER}:{DB_PORT}/{DB_DBNAME}" def cleanup_database(db: Session, log, dry_run: bool): log.info("cleanup_database") # get tables from registry for table in registry: log.info(f"{table}") model = registry[table] entries = db.query(model).all() for entry in entries: if not dry_run: db.delete(entry) db.commit() def load_data(filename: str, log) -> Dict[AnyStr, Dict[AnyStr, Any]]: log.info("load_data") import_file = Path(filename) if not import_file.exists(): log.info(f"File {filename} does not exist. Do nothing.") raise FileNotFoundError() log.info("read json file") with open(filename, 'r') as json_file: json_load = json.load(json_file) return json_load def get_ids(items: List[Any]) -> List[AnyStr]: result: List[AnyStr] = [] for item in items: result.append(item.id) return result if __name__ == '__main__': logger = get_logger(args.verbose, "kontor") logger.info('kontor.import started') engine = create_engine(DATABASE_URL) Base.metadata.create_all(bind=engine, checkfirst=True) SessionLocal = sessionmaker(bind=engine) with SessionLocal() as db: if args.cleanup: cleanup_database(db, logger, args.dry_run) data = load_data(args.file, logger) for tablename in data: model = registry[tablename] existing_items = db.query(model).all() existing_ids = get_ids(existing_items) logger.debug(f"found {len(existing_items)} for table {tablename}") import_items = data[tablename] for import_item in import_items: if import_item['id'] in existing_ids: logger.info(f"update {import_item['id']}") else: logger.info(f"import {import_item['id']}") logger.info('kontor.import finished')