Files
kontor/kontor-scripts/check_kontor.py
T

123 lines
4.6 KiB
Python

"""
Checks the database kontor
"""
from logging import Logger
from pathlib import Path
from typing import Any, Dict, List, Optional
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from urllib.parse import urlparse
import click
from simple_term_menu import TerminalMenu
from api import Server, get_api_config, get_logger
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument("--verbose", "-v", action="count", default=0)
parser.add_argument("--config", "-c", default="kontor-api")
parser.add_argument("--dir", "-d", default="/data/media")
parser.add_argument("--add-dir", "-a", action="append")
parser.add_argument("--dry-run", "-m", action="store_true")
parser.add_argument("--server", "-s")
args = parser.parse_args()
def create_item_id_mapping(log: Logger, data_list: List[dict]) -> Dict[str, dict]:
"""
create dictionary with id as key and dictionary as value.
"""
item_id_mapping: Dict[str, dict] = {}
for data_item in data_list:
log.debug(data_item)
item_id_mapping[data_item["id"]] = data_item
return item_id_mapping
def remove_file(log: Logger, item_data: Dict[str, Any], media_dirs: List[str]):
"""
Delete file from path in dictionary.
"""
log.debug(item_data)
cloud_link = item_data["cloud_link"]
for file_dir in media_dirs:
log.info("look in %s", file_dir)
file_name = Path(cloud_link).name
media_file = Path(file_dir, file_name)
if media_file.exists():
log.info("File to remove %s", media_file.absolute())
media_file.unlink(missing_ok=True)
break
else:
log.info("File not found %s", media_file.absolute())
def check_duplicate_links(log: Logger, server: Optional[Server], media_dirs: List[str]):
"""
Check if there are MediaFile URLs which only differ in hostname.
"""
if server is None:
log.info("no server selected")
return
data = server.request(log=log, table="media_file")
mapping = create_item_id_mapping(log=log, data_list=data)
visited_link_path: Dict[str, str] = {}
duplicate_link_paths: Dict[str, List[str]] = {}
for item in data:
link = item["url"]
if len(link) == 0:
continue
file_id = item["id"]
parsed_url = urlparse(link)
link_path = parsed_url.path
if link_path in visited_link_path:
log.debug("duplicate url path found: %s", link_path)
if link_path in duplicate_link_paths:
duplicate_link_paths[link_path].append(file_id)
else:
duplicate_link_paths[link_path] = []
duplicate_link_paths[link_path].append(visited_link_path[link_path])
duplicate_link_paths[link_path].append(file_id)
else:
visited_link_path[link_path] = file_id
log.info("found %s duplicate links", len(duplicate_link_paths.keys()))
for _, value in duplicate_link_paths.items():
choices = [mapping[value[0]]["url"], mapping[value[1]]["url"], "Abbruch"]
menu = TerminalMenu(
choices, title="Choose an link to delete:", multi_select=False
)
menu_choice = menu.show()
if isinstance(menu_choice, int):
if menu_choice == 2:
break
index: int = int(menu_choice)
server.delete(log=log, table="media_file", item_id=value[index])
remove_file(log, mapping[value[index]], media_dirs)
else:
print("selection canceled")
if __name__ == "__main__":
logger = get_logger(args.verbose, args.config)
logger.info("kontor.check_kontor started")
APICONFIG = get_api_config(logger, args.config)
first_server: Optional[Server] = APICONFIG.get_server(args.server)
if not first_server:
SystemExit(2)
dirs: List[str] = args.add_dir
if dirs is None:
dirs = [args.dir]
else:
dirs.insert(0, args.dir)
logger.info(dirs)
logger.info("kontor.check_kontor.check_duplicate_links")
check_duplicate_links(logger, first_server, dirs)
# logger.info("kontor.check_kontor.update_cloud_link_with_found_files")
# update_cloud_link_with_found_files(data_dir, mariadb_conn, args.dry_run)
# logger.info("kontor.check_kontor.get_ids_from_column_cloud_link")
# get_ids_from_column_cloud_link(link_list, mariadb_cursor)
# logger.info('found {} ids in column cloud_link'.format(len(link_list)))
# logger.info("kontor.check_kontor.checking_ids_from_cloud_link")
# checking_ids_from_cloud_link(link_list, mariadb_cursor)
logger.info("kontor.check_kontor finished")