153 lines
5.5 KiB
Python
153 lines
5.5 KiB
Python
"""
|
|
Checks the database kontor
|
|
"""
|
|
|
|
from logging import Logger
|
|
from pathlib import Path
|
|
import sys
|
|
from typing import Any, Dict, List, Optional
|
|
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
|
|
from urllib.parse import urlparse
|
|
from simple_term_menu import TerminalMenu
|
|
|
|
from api import Server, get_api_config, get_logger
|
|
|
|
|
|
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
|
|
parser.add_argument("--verbose", "-v", action="count", default=0)
|
|
parser.add_argument("--config", "-c", default="kontor-api")
|
|
parser.add_argument("--dir", "-d", default="/data/media")
|
|
parser.add_argument("--add-dir", "-a", action="append")
|
|
parser.add_argument("--dry-run", "-m", action="store_true")
|
|
parser.add_argument("--server", "-s")
|
|
args = parser.parse_args()
|
|
|
|
|
|
def create_item_id_mapping(log: Logger, data_list: List[dict]) -> Dict[str, dict]:
|
|
"""
|
|
create dictionary with id as key and dictionary as value.
|
|
"""
|
|
item_id_mapping: Dict[str, dict] = {}
|
|
for data_item in data_list:
|
|
log.debug(data_item)
|
|
item_id_mapping[data_item["id"]] = data_item
|
|
return item_id_mapping
|
|
|
|
|
|
def remove_file(log: Logger, item_data: Dict[str, Any], media_dirs: List[str]):
|
|
"""
|
|
Delete file from path in dictionary.
|
|
"""
|
|
log.debug(item_data)
|
|
cloud_link = item_data["cloud_link"]
|
|
for file_dir in media_dirs:
|
|
log.info("look in %s", file_dir)
|
|
file_name = Path(cloud_link).name
|
|
media_file = Path(file_dir, file_name)
|
|
if media_file.exists():
|
|
log.info("File to remove %s", media_file.absolute())
|
|
media_file.unlink(missing_ok=True)
|
|
break
|
|
else:
|
|
log.info("File not found %s", media_file.absolute())
|
|
|
|
|
|
def check_duplicate_links(log: Logger, server: Optional[Server], media_dirs: List[str]):
|
|
"""
|
|
Check if there are MediaFile URLs which only differ in hostname.
|
|
"""
|
|
if server is None:
|
|
log.info("no server selected")
|
|
return
|
|
data = server.request(log=log, table="media_file")
|
|
mapping = create_item_id_mapping(log=log, data_list=data)
|
|
visited_link_path: Dict[str, str] = {}
|
|
duplicate_link_paths: Dict[str, List[str]] = {}
|
|
for item in data:
|
|
link = item["url"]
|
|
if len(link) == 0:
|
|
continue
|
|
file_id = item["id"]
|
|
parsed_url = urlparse(link)
|
|
link_path = parsed_url.path
|
|
if link_path in visited_link_path:
|
|
log.debug("duplicate url path found: %s", link_path)
|
|
if link_path in duplicate_link_paths:
|
|
duplicate_link_paths[link_path].append(file_id)
|
|
else:
|
|
duplicate_link_paths[link_path] = []
|
|
duplicate_link_paths[link_path].append(visited_link_path[link_path])
|
|
duplicate_link_paths[link_path].append(file_id)
|
|
else:
|
|
visited_link_path[link_path] = file_id
|
|
log.info("found %s duplicate links", len(duplicate_link_paths.keys()))
|
|
for _, value in duplicate_link_paths.items():
|
|
choices = [mapping[value[0]]["url"], mapping[value[1]]["url"], "Abbruch"]
|
|
menu = TerminalMenu(
|
|
choices, title="Choose an link to delete:", multi_select=False
|
|
)
|
|
menu_choice = menu.show()
|
|
if isinstance(menu_choice, int):
|
|
if menu_choice == 2:
|
|
break
|
|
index: int = int(menu_choice)
|
|
server.delete(log=log, table="media_file", item_id=value[index])
|
|
remove_file(log, mapping[value[index]], media_dirs)
|
|
else:
|
|
print("selection canceled")
|
|
|
|
|
|
def check_media_dirs(log: Logger, server: Optional[Server], media_dirs: List[str]):
|
|
"""
|
|
Check if contents of directories match MediaFiles.
|
|
"""
|
|
if server is None:
|
|
log.info("no server selected")
|
|
return
|
|
local_files = collect_files(media_dirs)
|
|
data = server.request(log=log, table="media_file")
|
|
for item in data:
|
|
file_name = Path(item["cloud_link"]).name
|
|
if file_name in local_files:
|
|
log.debug("File %s found", file_name)
|
|
local_files.remove(file_name)
|
|
|
|
|
|
def collect_files(media_dirs: List[str]) -> List[str]:
|
|
"""
|
|
Collect file names from given directories.
|
|
"""
|
|
collected_files: List[str] = []
|
|
for media_dir in media_dirs:
|
|
file_dir = Path(media_dir)
|
|
if file_dir.is_dir():
|
|
for entry in file_dir.iterdir():
|
|
if entry.is_file():
|
|
collected_files.append(entry.name)
|
|
return collected_files
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logger = get_logger(args.verbose, args.config)
|
|
logger.info("kontor.check_kontor started")
|
|
APICONFIG = get_api_config(logger, args.config)
|
|
first_server: Optional[Server] = APICONFIG.get_server(args.server)
|
|
if not first_server:
|
|
sys.exit(2)
|
|
dirs: List[str] = args.add_dir
|
|
if dirs is None:
|
|
dirs = [args.dir]
|
|
else:
|
|
dirs.insert(0, args.dir)
|
|
logger.info(dirs)
|
|
logger.info("kontor.check_kontor.check_duplicate_links")
|
|
check_duplicate_links(logger, first_server, dirs)
|
|
logger.info("kontor.check_kontor.check_media_dirs")
|
|
check_media_dirs(logger, first_server, dirs)
|
|
# logger.info("kontor.check_kontor.get_ids_from_column_cloud_link")
|
|
# get_ids_from_column_cloud_link(link_list, mariadb_cursor)
|
|
# logger.info('found {} ids in column cloud_link'.format(len(link_list)))
|
|
# logger.info("kontor.check_kontor.checking_ids_from_cloud_link")
|
|
# checking_ids_from_cloud_link(link_list, mariadb_cursor)
|
|
logger.info("kontor.check_kontor finished")
|