""" download files with URLs from DB """ import os import re import subprocess from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser from datetime import datetime from enum import Enum, auto from pathlib import Path import sys from typing import Any, Dict from uuid import UUID from platformdirs import PlatformDirs import requests import yaml from config import get_api_config, get_logger parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) parser.add_argument("--verbose", "-v", action="count", default=0) parser.add_argument("--config", "-c", default="kontor-docker") parser.add_argument("--dir", "-d", default="/data/media") parser.add_argument("--limit", "-l", type=int, help="maximum number of links to check") parser.add_argument("--tool", "-t", default="yt-dlp") parser.add_argument("--dry-run", "-m", action="store_true") args = parser.parse_args() class FileStatus(Enum): DOWNLOADED = auto() RENAMED = auto() UNKNOWN = auto() def download_file( url: str, file_info: dict, download_dir: str = "/data/media", dl_tool: str = "yt-dlp", ) -> dict: print(f"download file for {url} to {download_dir}") result = subprocess.run( [dl_tool, url], cwd=download_dir, capture_output=True, text=True ) if result.returncode == 0: output = result.stdout output = re.sub(" +", " ", output) lines_list = output.splitlines() file_name = __parse_output__(lines_list) log.info(f"found file: {file_name}") if file_name is None or not file_name.strip(): file_info["review"] = True file_info["should_download"] = True file_info["file_name"] = None else: download_file_name = Path(download_dir, file_name) file_info["should_download"] = False file_info["review"] = False file_info["file_name"] = download_file_name.name file_info["cloud_link"] = str(download_file_name.absolute()) file_info["last_modified_date"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") return file_info def __parse_output__(lines_list: list[str]) -> str | None: file_name = None for line in lines_list: log.debug(f"parse line: {line}") if "has already been downloaded" in line: end_len = len(" has already been downloaded") file_name = line[11:-end_len] log.info(f"file_name: {file_name}") break if "Destination" in line: line_len = len(line) start_len = len("[download] Destination: ") file_len = line_len - start_len file_name = line[-file_len:] break else: file_name = None return file_name def is_file_downloaded(media_file: dict, dir: Path) -> FileStatus: file_name_as_title = f"{media_file['file_name']}" if not file_name_as_title: log.info("title has not been set - start download") return FileStatus.UNKNOWN file_title = Path(dir, f"{file_name_as_title}.mp4") if file_title.exists(): log.info(f"{file_name_as_title} has been downloaded") media_file["should_download"] = False return FileStatus.DOWNLOADED file_name_as_id = f"{media_file['id']}" file_with_id_as_name = Path(dir, f"{file_name_as_id}.mp4") if file_with_id_as_name.exists(): log.info(f"{file_with_id_as_name} has been downloaded and renamed") media_file['cloud_link'] = str(file_with_id_as_name) media_file['should_download'] = False return FileStatus.RENAMED log.info("could not find file - start download") return FileStatus.UNKNOWN def update_status(item_id: UUID, file_info: dict): update = requests.put(f"http://127.0.0.1:8800/api/media/files/{item_id}", json=file_info) log.info(f"update status: {update.status_code}") log.info(f"update result: {update.json()}") def rename_file(file_info: dict): item_id = file_info["id"] file_name = file_info["file_name"] if file_name is None or not file_name.strip(): log.info("file_name is not set, rename is not executed") file_info["review"] = True file_info["should_download"] = True return file = Path(args.dir, file_name) new_file_path = file.with_name(f"{item_id}{file.suffix}") log.info(f"rename {file} to {new_file_path}") file.rename(Path(new_file_path)) file_info["cloud_link"] = str(new_file_path) if __name__ == "__main__": log = get_logger(args.verbose, args.config) log.info('kontor.download started') response = requests.get("http://127.0.0.1:8800/api/media/files?download=true") log.info(f"Status: {response.status_code}") data = response.json() entries_count = len(data) log.info(f"data: {entries_count}") mediafile_index = 1 log.debug(f"data: {data}") missing_actors = {} if args.dry_run: sys.exit(0) if args.limit: log.warning(f"check the first {args.limit} links") for item in data: link = item["url"] file_id = item["id"] log.info(f"{file_id} - {link}") download_status: FileStatus = is_file_downloaded(item, args.dir) match download_status: case FileStatus.DOWNLOADED: rename_file(item) update_status(file_id, item, api_data) case FileStatus.RENAMED: log.info("update status") update_status(file_id, item, api_data) case FileStatus.UNKNOWN: download_file(link, item, args.dir) rename_file(item) log.info(f"{item}") update_status(file_id, item, api_data) log.warning(f"processed {mediafile_index}/{entries_count}") if args.limit and args.limit <= mediafile_index: break mediafile_index += 1 log.info("kontor.download finished")