remove kontor-cli.backup

This commit is contained in:
Thomas Peetz
2025-01-21 09:42:33 +01:00
parent 65288a53a1
commit 4c330a1138
40 changed files with 196 additions and 889 deletions
+67 -161
View File
@@ -5,7 +5,6 @@ import uuid
from datetime import datetime
from pathlib import Path
import requests
from sqlalchemy import Engine
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import sessionmaker
@@ -20,9 +19,8 @@ from .media import MediaFile, MediaArticle, MediaVideo
class KontorDB:
def __init__(self, db_engine: Engine, log):
def __init__(self, db_engine: Engine):
self.engine = db_engine
self.log = log
self.registry = {}
self.init_registry()
@@ -107,7 +105,7 @@ class KontorDB:
columns[column.column_name] = {"order": column.column_order, "type": column.column_type}
return columns
def get_filters(self, table_name):
def get_filters(self, table_name: str) -> dict:
_filter_map = {}
__session__ = sessionmaker(self.engine)
with __session__() as session:
@@ -116,7 +114,6 @@ class KontorDB:
filter(MetaDataTable.table_name == table_name).
filter(MetaDataColumn.show_filter == 1).all()):
_filter_map[column.column_name] = {'label': column.filter_label, 'widget': None}
self.log.debug(f"retrieved {len(_filter_map)} filters: {_filter_map}")
return _filter_map
def data(self, table_name: str, columns: dict, filters: dict) -> list:
@@ -135,19 +132,16 @@ class KontorDB:
column_name = columns[order]['column']
if str(column_name).endswith("_id"):
ref_table = column_name[:-3]
# print(f"{ref_table=}")
ref = getattr(entry, ref_table)
value = getattr(ref, "name")
# print(f"{value=}")
row.append(value)
else:
row.append(getattr(entry, column_name))
# print(repr(row))
data.append(row)
return data
def export_db(self, export_type: str, export_file_name: str):
self.log.info(f"export DB to {export_file_name} as {export_type}")
def export_db(self, export_type: str, export_file_name: str) -> dict:
results = {}
db = {}
export_table_list = self.get_table_names()
for table in export_table_list:
@@ -161,8 +155,6 @@ class KontorDB:
with __session__() as session:
rows = session.query(model).all()
entries = []
self.log.debug(f"found {len(rows)} entries")
self.log.debug(f"found {len(columns)} columns")
for row in rows:
# print(row)
entry = {}
@@ -176,11 +168,11 @@ class KontorDB:
entry[column_name] = str(value)
else:
entry[column_name] = value
except AttributeError as error:
self.log.debug("could not get value")
except AttributeError:
pass
entries.append(entry)
db[table] = entries
export_file = Path(export_file_name)
results[table] = len(entries)
match export_type:
case "JSON":
json_dump = json.dumps(db, indent=4)
@@ -190,17 +182,14 @@ class KontorDB:
export_file = Path(export_file_name)
case "SQLite":
export_file = Path(export_file_name)
case _:
self.log.debug("unknown export type")
if export_file.exists():
self.log.debug(f"{export_file} exists")
return results
def import_db(self, import_file_name: str, dry_run: bool):
def import_db(self, import_file_name: str, dry_run: bool) -> dict:
result = {}
import_file = Path(import_file_name)
if not import_file.exists():
print(f"File {import_file_name} does not exist. Do nothing.")
return
self.log.debug(f"evaluate type from file extension: {import_file.suffix}")
return result
match import_file.suffix:
case '.json':
print("read json file")
@@ -208,34 +197,44 @@ class KontorDB:
json_load = json.load(json_file)
for table in json_load:
print(f"{table}: {len(json_load[table])}")
self.import_table(table, json_load[table], dry_run)
result[table] = self.import_table(table, json_load[table], dry_run)
case '.yml':
print("read yaml file")
case '.yaml':
print("read yaml file")
case '.db':
print("read sqlite file")
return result
def import_table(self, table_name, items, dry_run: bool):
def import_table(self, table_name, items, dry_run: bool) -> dict:
result = {}
updated = []
added = []
remaining = []
existing_ids = self.get_ids(table_name)
for item in items:
# self.log.debug(f"{item}")
current_id = item['id']
found_item = None
__session__ = sessionmaker(self.engine)
with __session__() as session:
found_item = session.query(self.registry[table_name]).get(current_id)
self.log.debug(f"found: {found_item}")
if found_item is not None:
changed = self.update_entry(found_item, item, dry_run)
if changed:
print(f"{current_id} has changed")
existing_ids.remove(current_id)
else:
self.log.info("item to import not found in database, add new one...")
self.add_entry(table_name, item, session, dry_run)
if found_item is not None:
changed = self.update_entry(table_name, current_id, item)
updated.append(item)
if changed:
print(f"{current_id} has changed")
updated.append(item)
existing_ids.remove(current_id)
else:
self.add_entry(table_name, item)
added.append(item)
if len(existing_ids) > 0:
print("remaining items")
remaining.extend(existing_ids)
result['updated'] = updated
result['added'] = added
result['remaining'] = remaining
return result
def get_ids(self, table_name: str) -> list:
existing_ids = []
@@ -246,36 +245,36 @@ class KontorDB:
existing_ids.append(getattr(item, 'id'))
return existing_ids
def add_entry(self, table_name: str, update_item: dict, session, dry_run: bool):
add_item = self.registry[table_name]()
for key in update_item.keys():
update_value = update_item[key]
setattr(add_item, key, update_value)
if dry_run:
self.log.info(f"add item {type(add_item)} with id {update_item['id']}")
else:
session.add(add_item)
session.commit()
def add_entry(self, table_name: str, update_item: dict):
__session__ = sessionmaker(self.engine)
with __session__() as session:
add_item = self.registry[table_name]()
for key in update_item.keys():
update_value = update_item[key]
setattr(add_item, key, update_value)
session.add(add_item)
session.commit()
def update_entry(self, existing_item, update_item: dict, dry_run: bool) -> bool:
changed = False
for key in update_item.keys():
update_value = update_item[key]
existing_value = getattr(existing_item, key)
if type(existing_value) is not type(update_value):
# self.log.debug(f"compare {type(existing_value)} with {type(update_value)}")
existing_value = str(existing_value)
if existing_value != update_value:
print(f"{key} has changed: {existing_value} != {update_value}")
if not dry_run:
def update_entry(self, table_name, current_id, update_item: dict) -> bool:
__session__ = sessionmaker(self.engine)
with __session__() as session:
existing_item = session.query(self.registry[table_name]).get(current_id)
changed = False
for key in update_item.keys():
update_value = update_item[key]
existing_value = getattr(existing_item, key)
if type(existing_value) is not type(update_value):
existing_value = str(existing_value)
if existing_value != update_value:
print(f"{key} has changed: {existing_value} != {update_value}")
setattr(existing_item, key, update_value)
# existing_item[key] = update_value
session.commit()
changed = True
self.log.info(f"update {key} with {update_value}")
print(f"update {key} with {update_value}")
return changed
def add_link(self, link: str, dry_run: bool):
self.log.info(f"add link {link} to media_file")
def add_link(self, link: str) -> dict:
result = {}
__session__ = sessionmaker(self.engine)
with __session__() as session:
media_file = MediaFile()
@@ -289,125 +288,32 @@ class KontorDB:
try:
session.add(media_file)
session.commit()
self.log.info(f"entry {media_file} successfully added")
result['added'] = media_file
except IntegrityError as error:
session.rollback()
self.log.info(error.orig)
result['error'] = error.orig
return result
def update_title(self, dry_run=False):
self.log.info(f"get links to review of media_file")
__session__ = sessionmaker(self.engine)
with __session__() as session:
links = session.query(MediaFile).filter(MediaFile.review == 1).all()
self.log.info(f"try to update {len(links)} items")
for link in links:
url = link.url
if url is None:
self.log.info(f"url has not been set for {link.id}")
continue
self.log.info('get title for url {}'.format(url))
if dry_run:
continue
try:
r = requests.get(url)
soup = BeautifulSoup(r.content, "html.parser")
title = soup.title.string
except:
self.log.info("Sorry, could not retrieve title")
continue
self.log.info('ID {} has title {}'.format(link.id, title))
link.title = title
link.review = 0
session.commit()
def get_update_list(self) -> list[str]:
self.log.debug("get links marked as review")
update_list = []
def get_update_list(self) -> dict:
update_list = {}
__session__ = sessionmaker(self.engine)
with __session__() as session:
links = session.query(MediaFile).filter(MediaFile.review == 1).all()
for link in links:
url = link.url
if url is None:
self.log.info(f"url has not been set for {link.id}")
continue
update_list.append(url)
self.log.debug(f"found {len(update_list)} urls for updates")
update_list[link.id] = url
return update_list
def get_download_list(self) -> list[str]:
self.log.debug("get links marked as should_download")
download_list = []
def get_download_list(self) -> dict:
download_list = {}
__session__ = sessionmaker(self.engine)
with __session__() as session:
links = session.query(MediaFile).filter(MediaFile.should_download == 1).all()
for link in links:
url = link.url
if url is None:
self.log.info(f"url has not been set for {link.id}")
continue
download_list.append(url)
self.log.debug(f"found {len(download_list)} urls for downloads")
download_list[link.id] = url
return download_list
def download_file(self, dry_run=False):
self.log.info(f"download marked urls of media_file")
__session__ = sessionmaker(self.engine)
with __session__() as session:
links = session.query(MediaFile).filter(MediaFile.should_download == 1).all()
self.log.info(f"try to download {len(links)} items")
for link in links:
url = link.url
if url is None:
self.log.info(f"url has not been set for {link.id}")
continue
if dry_run:
self.log.info(f"download {link.url} to {self.config.get('media', 'dir')}")
continue
filename = self.download_url(link)
if filename is None:
link.file_name = filename
link.should_download = 1
else:
download_file = Path(filename)
download_file.with_name(f"{link.id}{download_file.suffix}")
link.file_name = download_file.name
link.should_download = 0
link.cloud_link = download_file.absolute()
session.commit()
def parse_output(self, lines_list):
file_name = ""
for line in lines_list:
if 'has already been downloaded' in line:
end_len = len(' has already been downloaded')
file_name = line[11:-end_len]
self.log.info('found file: "%s"', file_name)
if 'Destination' in line:
line_len = len(line)
start_len = len('[download] Destination: ')
file_len = line_len - start_len
file_name = line[-file_len:]
self.log.info('new file: "%s"', file_name)
return file_name
def download_url(self, video_url):
media_dir = Path(self.config.get('media', 'dir'))
if not media_dir.exists():
media_dir = Path().absolute()
self.log.info(f"download video to {media_dir}")
result = subprocess.run([self.config.get('media', 'yt-dlp'), video_url], cwd=media_dir, capture_output=True,
text=True)
if result.returncode == 0:
output = result.stdout
output = re.sub(' +', ' ', output)
lines_list = output.splitlines()
return self.parse_output(lines_list)
else:
return None
def check_files(self):
media_dir = Path(self.config.get('media', 'dir'))
if not media_dir.exists():
return
self.log.info(f"check files in {media_dir}")