""" download files with URLs from DB """ import re import subprocess from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter from datetime import datetime from enum import Enum, auto from pathlib import Path from uuid import UUID import requests from config import get_logger parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) parser.add_argument('--verbose', '-v', action='count', default=0) parser.add_argument('--config', '-c', default='kontor-docker') parser.add_argument('--dir', '-d', default='/data/media') parser.add_argument('--tool', '-t', default='yt-dlp') parser.add_argument('--dry-run', '-m', action='store_true') args = parser.parse_args() class FileStatus(Enum): DOWNLOADED = auto() RENAMED = auto() UNKNOWN = auto() def download_file(url: str, file_info: dict, download_dir: str = "/data/media", dl_tool: str = "yt-dlp") -> dict: print(f"download file for {url} to {download_dir}") result = subprocess.run([dl_tool, url], cwd=download_dir, capture_output=True, text=True) if result.returncode == 0: output = result.stdout output = re.sub(' +', ' ', output) lines_list = output.splitlines() file_name = __parse_output__(lines_list) if file_name is None: file_info['review'] = True file_info['should_download'] = True file_info['file_name'] = None else: download_file_name = Path(download_dir, file_name) file_info['should_download'] = False file_info['file_name'] = download_file_name.name file_info['cloud_link'] = str(download_file_name.absolute()) file_info['last_modified_date'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") return file_info def __parse_output__(lines_list: list[str]) -> str | None: file_name = None for line in lines_list: if 'has already been downloaded' in line: end_len = len(' has already been downloaded') file_name = line[11:-end_len] if 'Destination' in line: line_len = len(line) start_len = len('[download] Destination: ') file_len = line_len - start_len file_name = line[-file_len:] return file_name def is_file_downloaded(item: dict, dir: Path) -> FileStatus: file_name_as_title = f"{item['file_name']}" file_title = Path(dir, file_name_as_title, ".mp4") if file_title.exists(): log.info(f"{file_name_as_title} has been downloaded") item['should_download'] = 0 return FileStatus.DOWNLOADED file_name_as_id = f"{item['id']}" file_with_id_as_name = Path(dir, file_name_as_id, ".mp4") if file_with_id_as_name.exists(): log.info(f"{file_with_id_as_name} has been downloaded and renamed") item['cloud_link'] = file_with_id_as_name item['should_download'] = 0 return FileStatus.RENAMED log.info("could not find file - start download") return FileStatus.UNKNOWN def update_status(item_id: UUID, file_info: dict): update = requests.put(f"http://127.0.0.1:8800/media/files/{item_id}", json=file_info) log.info(f"update status: {update.status_code}") log.info(f"update result: {update.json()}") def rename_file(file_info: dict): item_id = file_info['id'] file = Path(args.dir, file_info['file_name']) new_file_path = file.with_name(f"{item_id}{file.suffix}") log.info(f"rename {file} to {new_file_path}") file.rename(Path(new_file_path)) file_info['cloud_link'] = str(new_file_path) if __name__ == '__main__': log = get_logger(args.verbose, args.config) log.info('kontor.download started') response = requests.get("http://127.0.0.1:8800/media/files?download=true") log.info(f"Status: {response.status_code}") data = response.json() log.info(f"data: {len(data)}") for item in data: link = item['url'] file_id = item['id'] log.info(f"{file_id} - {link}") download_status: FileStatus = is_file_downloaded(item, args.dir) match download_status: case FileStatus.DOWNLOADED: rename_file(item) update_status(file_id, item) case FileStatus.RENAMED: log.info("update status") update_status(file_id, item) case FileStatus.UNKNOWN: download_file(link, item) rename_file(item) log.info(f'{item}') update_status(file_id, item) log.info('kontor.download finished')