""" Checks the database kontor """ from logging import Logger from pathlib import Path import sys from typing import Any, Dict, List, Optional from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter from urllib.parse import urlparse from simple_term_menu import TerminalMenu from api import Server, get_api_config, get_logger parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) parser.add_argument("--verbose", "-v", action="count", default=0) parser.add_argument("--config", "-c", default="kontor-api") parser.add_argument("--dir", "-d", default="/data/media") parser.add_argument("--add-dir", "-a", action="append") parser.add_argument("--dry-run", "-m", action="store_true") parser.add_argument("--server", "-s") args = parser.parse_args() def create_item_id_mapping(log: Logger, data_list: List[dict]) -> Dict[str, dict]: """ create dictionary with id as key and dictionary as value. """ item_id_mapping: Dict[str, dict] = {} for data_item in data_list: log.debug(data_item) item_id_mapping[data_item["id"]] = data_item return item_id_mapping def remove_file(log: Logger, item_data: Dict[str, Any], media_dirs: List[str]): """ Delete file from path in dictionary. """ log.debug(item_data) cloud_link = item_data["cloud_link"] for file_dir in media_dirs: log.info("look in %s", file_dir) file_name = Path(cloud_link).name media_file = Path(file_dir, file_name) if media_file.exists(): log.info("File to remove %s", media_file.absolute()) media_file.unlink(missing_ok=True) break else: log.info("File not found %s", media_file.absolute()) def check_duplicate_links(log: Logger, server: Optional[Server], media_dirs: List[str]): """ Check if there are MediaFile URLs which only differ in hostname. """ if server is None: log.info("no server selected") return data = server.request(log=log, table="media_file") mapping = create_item_id_mapping(log=log, data_list=data) visited_link_path: Dict[str, str] = {} duplicate_link_paths: Dict[str, List[str]] = {} for item in data: link = item["url"] if len(link) == 0: continue file_id = item["id"] parsed_url = urlparse(link) link_path = parsed_url.path if link_path in visited_link_path: log.debug("duplicate url path found: %s", link_path) if link_path in duplicate_link_paths: duplicate_link_paths[link_path].append(file_id) else: duplicate_link_paths[link_path] = [] duplicate_link_paths[link_path].append(visited_link_path[link_path]) duplicate_link_paths[link_path].append(file_id) else: visited_link_path[link_path] = file_id log.info("found %s duplicate links", len(duplicate_link_paths.keys())) for _, value in duplicate_link_paths.items(): choices = [mapping[value[0]]["url"], mapping[value[1]]["url"], "Abbruch"] menu = TerminalMenu( choices, title="Choose an link to delete:", multi_select=False ) menu_choice = menu.show() if isinstance(menu_choice, int): if menu_choice == 2: break index: int = int(menu_choice) server.delete(log=log, table="media_file", item_id=value[index]) remove_file(log, mapping[value[index]], media_dirs) else: print("selection canceled") def check_media_dirs(log: Logger, server: Optional[Server], media_dirs: List[str]): """ Check if contents of directories match MediaFiles. """ if server is None: log.info("no server selected") return local_files = collect_files(media_dirs) data = server.request(log=log, table="media_file") for item in data: file_name = Path(item["cloud_link"]).name if file_name in local_files: log.debug("File %s found", file_name) local_files.remove(file_name) def collect_files(media_dirs: List[str]) -> List[str]: """ Collect file names from given directories. """ collected_files: List[str] = [] for media_dir in media_dirs: file_dir = Path(media_dir) if file_dir.is_dir(): for entry in file_dir.iterdir(): if entry.is_file(): collected_files.append(entry.name) return collected_files if __name__ == "__main__": logger = get_logger(args.verbose, args.config) logger.info("kontor.check_kontor started") APICONFIG = get_api_config(logger, args.config) first_server: Optional[Server] = APICONFIG.get_server(args.server) if not first_server: sys.exit(2) dirs: List[str] = args.add_dir if dirs is None: dirs = [args.dir] else: dirs.insert(0, args.dir) logger.info(dirs) logger.info("kontor.check_kontor.check_duplicate_links") check_duplicate_links(logger, first_server, dirs) logger.info("kontor.check_kontor.check_media_dirs") check_media_dirs(logger, first_server, dirs) # logger.info("kontor.check_kontor.get_ids_from_column_cloud_link") # get_ids_from_column_cloud_link(link_list, mariadb_cursor) # logger.info('found {} ids in column cloud_link'.format(len(link_list))) # logger.info("kontor.check_kontor.checking_ids_from_cloud_link") # checking_ids_from_cloud_link(link_list, mariadb_cursor) logger.info("kontor.check_kontor finished")