Files
kontor/kontor-scripts/check_kontor.py
2025-04-25 01:19:59 +02:00

184 lines
7.3 KiB
Python

"""
Checks the database kontor
"""
from enum import Enum, auto
import json
import mariadb
import requests
from pathlib import Path
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from config import get_logger, get_database_cursors
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument('--verbose', '-v', action='count', default=0)
parser.add_argument('--config', '-c', default='kontor')
parser.add_argument('--file', '-f')
parser.add_argument('--dir', '-d')
parser.add_argument('--dry-run', '-m', action='store_true')
parser.add_argument('--reset-cloud-link', '-r', action='store_true')
args = parser.parse_args()
class StatusType(Enum):
UNKNOWN = auto()
FILE_NAME = auto()
FILE_ID = auto()
DUPLICATE = auto()
CLOUD_LINK = auto()
CLOUD_LINK_ID = auto()
class FileStatus:
id: str | None = None
status_type: StatusType = StatusType.UNKNOWN
def get_response(self, response: dict):
self.status_type = StatusType.FILE_NAME
self.id = response['id']
def get_status_of_file(found_file: Path, cursor, log) -> FileStatus:
status = FileStatus()
try:
cursor.execute(f'SELECT id, cloud_link FROM media_file WHERE file_name="{found_file.name}"')
rows = cursor.fetchall()
if len(rows) == 1:
status.status_type = StatusType.FILE_NAME
status.id = rows[0][0]
except mariadb.Error as error:
log.debug(f'select failed with {error}')
try:
cursor.execute(f'SELECT id FROM media_file WHERE id="{found_file.stem}"')
rows = cursor.fetchall()
if len(rows) == 1:
status.status_type = StatusType.FILE_ID
status.id = rows[0][0]
if len(rows) > 1:
status.status_type = StatusType.DUPLICATE
for row in rows:
log.info(f"found {row[0]} with {found_file}")
except mariadb.Error as error:
log.debug(f'select failed with {error}')
try:
cursor.execute(f'SELECT id FROM media_file WHERE cloud_link LIKE "%{found_file.stem}%"')
rows = cursor.fetchall()
if len(rows) == 1:
status.id = rows[0][0]
if rows[0][0] == found_file.stem:
status.status_type = StatusType.CLOUD_LINK_ID
else:
status.status_type = StatusType.CLOUD_LINK
except mariadb.Error as error:
log.debug(f'select failed with {error}')
response = requests.get(f"http://127.0.0.1:8800/media/files/{found_file.stem}")
log.debug(f"Status: {response.status_code}")
if response.status_code == 200:
status.status_type = StatusType.FILE_ID
status.id = response.json()['id']
return status
def rename_files_to_id(media_dir, dry_run, conn, log):
media_path = Path(media_dir)
cursor = conn.cursor()
for file in media_path.iterdir():
log.debug('found file: {}'.format(file.name))
status: FileStatus = get_status_of_file(file, cursor, log)
file_id = status.id
if not file_id:
log.info(f"ID of file {file.name} is unknown")
continue
new_file_path = file.with_name(f"{file_id}{file.suffix}")
match status.status_type:
case StatusType.FILE_NAME:
log.info(f'status of {file.name} is file_name')
rename_file(file, new_file_path, dry_run, log)
update_cloud_link(file_id, new_file_path, conn, dry_run, log)
case StatusType.FILE_ID:
log.info(f'status of {file.name} is file_id')
update_cloud_link(file_id, new_file_path, conn, dry_run, log)
case StatusType.CLOUD_LINK:
log.info(f'status of {file.name} is cloud_link')
rename_file(file, new_file_path, dry_run, log)
update_cloud_link(file_id, new_file_path, conn, dry_run, log)
case StatusType.CLOUD_LINK_ID:
log.debug(f'status of {file.name} is cloud_link_id')
update_cloud_link(file_id, new_file_path, conn, dry_run, log)
case StatusType.DUPLICATE:
log.info(f'status of {file.name} is duplicate')
case StatusType.UNKNOWN:
log.info(f'status of {file.name} is unknown')
def rename_file(current_file, new_file_path, dry_run, log):
if dry_run:
log.info('rename file {} to {}'.format(current_file.name, new_file_path.name))
else:
current_file.rename(Path(new_file_path))
def update_cloud_link(file_id, file_path, conn, dry_run, log):
cursor = conn.cursor()
log.debug(f'update entry {file_id} with {file_path.absolute()}')
if dry_run:
log.debug(f'UPDATE media_file: cloud_link={file_path.absolute()}')
else:
cursor.execute('UPDATE media_file SET cloud_link="{}" WHERE id="{}"'.format(file_path.absolute(), file_id))
conn.commit()
def reset_cloud_link(conn, dry_run, log):
cursor = conn.cursor()
if dry_run:
log.info('UPDATE media_file SET cloud_link=""')
else:
cursor.execute('UPDATE media_file SET cloud_link="" WHERE id is NOT NULL')
conn.commit()
def check_file_with_db(data_file: Path, m_conn, log):
log.info(f"read json file: {data_file}")
cursor = m_conn.cursor()
with open(data_file, 'r') as json_file:
json_load = json.load(json_file)
for table in json_load:
log.info(f"{table}: {len(json_load[table])}")
items = json_load[table]
for item in items:
item_id = item['id']
select_statement = f"SELECT * FROM {table} WHERE id='{item_id}'"
cursor.execute(select_statement)
rows = cursor.fetchall()
count = len(rows)
log.info(f"{count} entries found for {item_id}")
if count == 0:
log.info(f"entry for {item_id} not found")
if count == 1:
log.info(f"check entry {item_id}")
#log.info(f"entry {rows[0]}")
columns = []
values = []
for (key, value) in item.items():
columns.append(key)
values.append(value)
for index, _ in enumerate(columns):
log.info(f"compare {values[index]} with {rows[0][index]}")
if __name__ == '__main__':
log = get_logger(args.verbose, args.config)
log.info("kontor.check_kontor started")
_, m_conn = get_database_cursors(log, args.config)
if args.dir:
log.info("kontor.check_kontor.rename_files_to_id")
rename_files_to_id(args.dir, args.dry_run, m_conn, log)
if args.file:
data_file = Path(args.file)
if data_file.exists():
log.info("kontor.check_kontor.check_file_with_db")
check_file_with_db(data_file, m_conn, log)
#logger.info("kontor.check_kontor.update_cloud_link_with_found_files")
#update_cloud_link_with_found_files(data_dir, mariadb_conn, args.dry_run)
#logger.info("kontor.check_kontor.get_ids_from_column_cloud_link")
#get_ids_from_column_cloud_link(link_list, mariadb_cursor)
#logger.info('found {} ids in column cloud_link'.format(len(link_list)))
#logger.info("kontor.check_kontor.checking_ids_from_cloud_link")
#checking_ids_from_cloud_link(link_list, mariadb_cursor)
log.info("kontor.check_kontor finished")