add import functionality

This commit is contained in:
Thomas Peetz
2025-01-15 14:16:35 +01:00
parent 3466c10a88
commit f3cf1a17f3
2 changed files with 62 additions and 6 deletions
+2 -4
View File
@@ -49,7 +49,5 @@ class Database(Controller):
if self.app.pargs.db_file is not None:
data['db_file'] = self.app.pargs.db_file
kontor_db = KontorDB(self.app.session, self.app.log)
if self.app.pargs.dry_run:
self.app.render(data, 'import.jinja2')
else:
kontor_db.import_db(data['db_file'])
self.app.render(data, 'import.jinja2')
kontor_db.import_db(data['db_file'], self.app.pargs.dry_run)
+60 -2
View File
@@ -154,5 +154,63 @@ class KontorDB:
if export_file.exists():
self.log.debug(f"{export_file} exists")
def import_db(self, import_file_name: str):
pass
def import_db(self, import_file_name: str, dry_run: bool):
import_file = Path(import_file_name)
if not import_file.exists():
print(f"File {import_file_name} does not exist. Do nothing.")
return
self.log.debug(f"evaluate type from file extension: {import_file.suffix}")
match import_file.suffix:
case '.json':
print("read json file")
with open(import_file_name, 'r') as json_file:
json_load = json.load(json_file)
for table in json_load:
print(f"{table}: {len(json_load[table])}")
self.import_table(table, json_load[table], dry_run)
case '.yml':
print("read yaml file")
case '.yaml':
print("read yaml file")
case '.db':
print("read sqlite file")
def import_table(self, table_name, items, dry_run: bool):
existing_ids = self.get_ids(table_name)
for item in items:
self.log.debug(f"{item}")
current_id = item['id']
found_item = self.session.query(self.registry[table_name]).get(current_id)
self.log.debug(f"found: {found_item}")
if found_item is not None:
changed = self.update_entry(found_item, item, dry_run)
if changed:
print(f"{current_id} has changed")
existing_ids.remove(current_id)
if len(existing_ids) > 0:
print("remaining items")
self.session.commit()
def get_ids(self, table_name: str) -> list:
existing_ids = []
items = self.session.query(self.registry[table_name]).all()
for item in items:
existing_ids.append(getattr(item, 'id'))
return existing_ids
def update_entry(self, existing_item, update_item: dict, dry_run: bool) -> bool:
changed = False
for key in update_item.keys():
update_value = update_item[key]
existing_value = getattr(existing_item, key)
if type(existing_value) is not type(update_value):
self.log.debug(f"compare {type(existing_value)} with {type(update_value)}")
existing_value = str(existing_value)
if existing_value != update_value:
print(f"{key} has changed: {existing_value} != {update_value}")
if not dry_run:
setattr(existing_item, key, update_value)
# existing_item[key] = update_value
changed = True
self.log.info(f"update {key} with {update_value}")
return changed