Files
kontor/kontor-scripts/import.py
T
Thomas Peetz 1b636efdb7 change logging
2025-05-26 11:35:51 +02:00

126 lines
4.7 KiB
Python

"""
import data from json file to PostgreSQL
"""
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from datetime import datetime
from typing import Any, AnyStr, Dict, List
import os
import json
from pathlib import Path
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from db.models.base import Base
from db.models import registry
from psycopg2.errors import NotNullViolation
from config import get_logger
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument('--verbose', '-v', action='count', default=0)
parser.add_argument('--dry-run', '-m', action='store_true')
parser.add_argument('--cleanup', '-c', action='store_true')
parser.add_argument('--file', '-f', default='~/data.json')
args = parser.parse_args()
DB_USER: str = os.getenv("DB_USER", "kontor")
DB_PASSWORD: str = os.getenv("DB_PASSWORD", "kontor")
DB_SERVER: str = os.getenv("DB_SERVER", "127.0.0.1")
DB_PORT: int = int(os.getenv("DB_PORT", 5432))
DB_DBNAME: str = os.getenv("DB_DBNAME", "kontor")
DATABASE_URL: str = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_SERVER}:{DB_PORT}/{DB_DBNAME}"
def cleanup_database(db: Session, log, dry_run: bool):
log.debug("cleanup_database")
# get tables from registry
for table in registry:
log.info(f"{table}")
model = registry[table]
entries = db.query(model).all()
for entry in entries:
if not dry_run:
db.delete(entry)
db.commit()
def load_data(filename: str, log) -> Dict[AnyStr, Dict[AnyStr, Any]]:
log.debug("load_data")
import_file = Path(filename)
if not import_file.exists():
log.info(f"File {filename} does not exist. Do nothing.")
raise FileNotFoundError()
log.info("read json file")
with open(filename, 'r') as json_file:
json_load = json.load(json_file)
return json_load
def get_ids(items: List[Any]) -> List[AnyStr]:
result: List[AnyStr] = []
for item in items:
result.append(item.id)
return result
def update_item(db: Session, import_data: Dict[AnyStr, Any], item: Any, dry_run: bool, log):
for (key, value) in import_data.items():
existing_value = getattr(item, str(key))
update: bool = False
if isinstance(existing_value, datetime):
if str(existing_value) != str(value):
update = True
else:
if existing_value != value:
update = True
if update:
log.info(f"update {key}({existing_value}) with {value}")
if not dry_run:
setattr(item, str(key), value)
db.add(item)
db.commit()
def item_import(db: Session, import_data: Dict[AnyStr, Any], dry_run: bool, log):
log.info(f"import {import_data}")
if not dry_run:
log.debug(f"model: {repr(model)} {import_data}")
try:
new_item = model()
new_item.import_dict(import_data)
log.info(f"new item: {new_item}")
db.add(new_item)
db.commit()
except NotNullViolation as notnull:
log.info(f"import failed: {notnull} {import_data}")
except Exception as error:
log.info(f"import failed: {error}")
if __name__ == '__main__':
logger = get_logger(args.verbose, "kontor")
logger.info('kontor.import started')
engine = create_engine(DATABASE_URL)
Base.metadata.create_all(bind=engine, checkfirst=True)
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as db:
if args.cleanup:
cleanup_database(db, logger, args.dry_run)
data = load_data(args.file, logger)
table_list: List = list(data.keys())
logger.info(f"Liste der Tabellen: {table_list}")
sorted_table_list: List = table_list
for tablename in sorted_table_list:
model = registry[tablename]
existing_items = db.query(model).all()
existing_ids = get_ids(existing_items)
logger.debug(f"found {len(existing_items)} for table {tablename}")
import_items = data[tablename]
for import_item in import_items:
if import_item['id'] in existing_ids:
logger.debug(f"update {import_item['id']}")
existing_item = db.get(model, import_item['id'])
update_item(db, import_item, existing_item, args.dry_run, logger)
existing_ids.remove(import_item['id'])
else:
logger.debug(f"import {import_item['id']}")
item_import(db, import_item, args.dry_run, logger)
logger.info(f"remaining items for {tablename}: {len(existing_ids)}")
logger.info('kontor.import finished')