47df61af32
(cherry picked from commit f42735326b4dd490351cebb0fc751d62b3a187d0)
152 lines
5.8 KiB
Python
152 lines
5.8 KiB
Python
"""
|
|
import data from json file to PostgreSQL
|
|
"""
|
|
|
|
from datetime import datetime, date
|
|
from typing import Any, AnyStr, Dict, List
|
|
import os
|
|
import json
|
|
import os
|
|
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
|
|
from datetime import date, datetime
|
|
from logging import Logger
|
|
from pathlib import Path
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker, Session
|
|
from db.models.base import Base
|
|
from db.models import registry
|
|
from psycopg2.errors import NotNullViolation
|
|
from config import get_logger
|
|
|
|
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
|
|
parser.add_argument('--verbose', '-v', action='count', default=0)
|
|
parser.add_argument('--dry-run', '-m', action='store_true')
|
|
parser.add_argument('--cleanup', '-c', action='store_true')
|
|
parser.add_argument('--file', '-f', default='~/data.json')
|
|
args = parser.parse_args()
|
|
|
|
DB_USER: str = os.getenv("DB_USER", "kontor")
|
|
DB_PASSWORD: str = os.getenv("DB_PASSWORD", "kontor")
|
|
DB_SERVER: str = os.getenv("DB_SERVER", "127.0.0.1")
|
|
DB_PORT: int = int(os.getenv("DB_PORT", 5432))
|
|
DB_DBNAME: str = os.getenv("DB_DBNAME", "kontor")
|
|
DATABASE_URL: str = (
|
|
f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_SERVER}:{DB_PORT}/{DB_DBNAME}"
|
|
)
|
|
|
|
|
|
def cleanup_database(db: Session, log, dry_run: bool):
|
|
log.debug("cleanup_database")
|
|
# get tables from registry
|
|
for table in registry:
|
|
log.info(f"{table}")
|
|
model = registry[table]
|
|
entries = db.query(model).all()
|
|
for entry in entries:
|
|
if not dry_run:
|
|
db.delete(entry)
|
|
db.commit()
|
|
|
|
def load_data(filename: str, log) -> Dict[AnyStr, Dict[AnyStr, Any]]:
|
|
log.debug("load_data")
|
|
import_file = Path(filename)
|
|
if not import_file.exists():
|
|
log.info(f"File {filename} does not exist. Do nothing.")
|
|
raise FileNotFoundError()
|
|
log.info("read json file")
|
|
with open(filename, "r") as json_file:
|
|
json_load = json.load(json_file)
|
|
return json_load
|
|
|
|
def get_ids(items: List[Any]) -> List[AnyStr]:
|
|
result: List[AnyStr] = []
|
|
for item in items:
|
|
result.append(item.id)
|
|
return result
|
|
|
|
def update_item(db: Session, import_data: Dict[AnyStr, Any], item: Any, dry_run: bool, log):
|
|
for (key, value) in import_data.items():
|
|
existing_value = getattr(item, str(key))
|
|
update: bool = has_changed(existing_value, value, log)
|
|
# if key == 'published_on':
|
|
# log.info(f"{type(value)}:{value} != {type(existing_value)}:{existing_value} : {update}")
|
|
if update:
|
|
log.info(f"update {key}({existing_value}) with {value}")
|
|
if not dry_run:
|
|
if value == "None":
|
|
setattr(item, str(key), None)
|
|
else:
|
|
setattr(item, str(key), value)
|
|
db.add(item)
|
|
db.commit()
|
|
|
|
def has_changed(existing_data: Any, import_data: AnyStr, log) -> bool:
|
|
if existing_data is None and import_data == 'None':
|
|
return False
|
|
if isinstance(existing_data, str):
|
|
return existing_data != import_data
|
|
if isinstance(existing_data, date):
|
|
if len(import_data) > 19:
|
|
import_date = datetime.strptime(import_data, "%Y-%m-%d %H:%M:%S.%f")
|
|
log.debug(
|
|
f"{type(existing_data)}:{existing_data} == {import_date} : {existing_data != import_date}"
|
|
)
|
|
return existing_data != import_date
|
|
if len(import_data) > 10:
|
|
import_date = datetime.strptime(import_data, "%Y-%m-%d %H:%M:%S")
|
|
log.debug(
|
|
f"{type(existing_data)}:{existing_data} == {import_date} : {existing_data != import_date}"
|
|
)
|
|
return existing_data != import_date
|
|
return existing_data.strftime("%Y-%m-%d") != import_data
|
|
return existing_data != import_data
|
|
|
|
|
|
def item_import(db: Session, import_data: Dict[AnyStr, Any], dry_run: bool, log):
|
|
log.info(f"import {import_data}")
|
|
if not dry_run:
|
|
log.debug(f"model: {repr(model)} {import_data}")
|
|
try:
|
|
new_item = model()
|
|
new_item.import_dict(import_data)
|
|
log.info(f"new item: {new_item}")
|
|
db.add(new_item)
|
|
db.commit()
|
|
except NotNullViolation as notnull:
|
|
log.info(f"import failed: {notnull} {import_data}")
|
|
except Exception as error:
|
|
log.info(f"import failed: {error}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logger = get_logger(args.verbose, "kontor")
|
|
logger.info('kontor.import started')
|
|
engine = create_engine(DATABASE_URL)
|
|
Base.metadata.create_all(bind=engine, checkfirst=True)
|
|
SessionLocal = sessionmaker(bind=engine)
|
|
with SessionLocal() as db:
|
|
if args.cleanup:
|
|
cleanup_database(db, logger, args.dry_run)
|
|
data = load_data(args.file, logger)
|
|
table_list: List = list(data.keys())
|
|
logger.info(f"Liste der Tabellen: {table_list}")
|
|
sorted_table_list: List = table_list
|
|
for tablename in sorted_table_list:
|
|
model = registry[tablename]
|
|
existing_items = db.query(model).all()
|
|
existing_ids = get_ids(existing_items)
|
|
logger.debug(f"found {len(existing_items)} for table {tablename}")
|
|
import_items = data[tablename]
|
|
for import_item in import_items:
|
|
if import_item['id'] in existing_ids:
|
|
logger.debug(f"update {import_item['id']}")
|
|
existing_item = db.get(model, import_item['id'])
|
|
update_item(db, import_item, existing_item, args.dry_run, logger)
|
|
existing_ids.remove(import_item['id'])
|
|
else:
|
|
logger.debug(f"import {import_item['id']}")
|
|
item_import(db, import_item, args.dry_run, logger)
|
|
logger.info(f"remaining items for {tablename}: {len(existing_ids)}")
|
|
logger.info('kontor.import finished')
|
|
|