Files
kontor/kontor-scripts/download.py
T
2025-12-09 17:05:20 +01:00

162 lines
5.6 KiB
Python

"""
download files with URLs from DB
"""
import re
import subprocess
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from datetime import datetime
from enum import Enum, auto
from pathlib import Path
from uuid import UUID
import requests
from config import get_logger
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument("--verbose", "-v", action="count", default=0)
parser.add_argument("--config", "-c", default="kontor-docker")
parser.add_argument("--dir", "-d", default="/data/media")
parser.add_argument("--limit", "-l", type=int, help="maximum number of links to check")
parser.add_argument("--tool", "-t", default="yt-dlp")
parser.add_argument("--dry-run", "-m", action="store_true")
args = parser.parse_args()
class FileStatus(Enum):
DOWNLOADED = auto()
RENAMED = auto()
UNKNOWN = auto()
def download_file(
url: str,
file_info: dict,
download_dir: str = "/data/media",
dl_tool: str = "yt-dlp",
) -> dict:
print(f"download file for {url} to {download_dir}")
result = subprocess.run(
[dl_tool, url], cwd=download_dir, capture_output=True, text=True
)
if result.returncode == 0:
output = result.stdout
output = re.sub(" +", " ", output)
lines_list = output.splitlines()
file_name = __parse_output__(lines_list)
log.info(f"found file: {file_name}")
if file_name is None or not file_name.strip():
file_info["review"] = True
file_info["should_download"] = True
file_info["file_name"] = None
else:
download_file_name = Path(download_dir, file_name)
file_info["should_download"] = False
file_info["review"] = False
file_info["file_name"] = download_file_name.name
file_info["cloud_link"] = str(download_file_name.absolute())
file_info["last_modified_date"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return file_info
def __parse_output__(lines_list: list[str]) -> str | None:
file_name = None
for line in lines_list:
log.debug(f"parse line: {line}")
if "has already been downloaded" in line:
end_len = len(" has already been downloaded")
file_name = line[11:-end_len]
log.info(f"file_name: {file_name}")
break
if "Destination" in line:
line_len = len(line)
start_len = len("[download] Destination: ")
file_len = line_len - start_len
file_name = line[-file_len:]
break
else:
file_name = None
return file_name
def is_file_downloaded(media_file: dict, dir: Path) -> FileStatus:
file_name_as_title = f"{media_file['file_name']}"
if not file_name_as_title:
log.info("title has not been set - start download")
return FileStatus.UNKNOWN
file_title = Path(dir, f"{file_name_as_title}.mp4")
if file_title.exists():
log.info(f"{file_name_as_title} has been downloaded")
media_file["should_download"] = False
return FileStatus.DOWNLOADED
file_name_as_id = f"{media_file['id']}"
file_with_id_as_name = Path(dir, f"{file_name_as_id}.mp4")
if file_with_id_as_name.exists():
log.info(f"{file_with_id_as_name} has been downloaded and renamed")
media_file["cloud_link"] = str(file_with_id_as_name)
media_file["should_download"] = False
return FileStatus.RENAMED
log.info("could not find file - start download")
return FileStatus.UNKNOWN
def update_status(item_id: UUID, file_info: dict):
update = requests.put(
f"http://127.0.0.1:8800/api/media/files/{item_id}", json=file_info
)
log.info(f"update status: {update.status_code}")
log.info(f"update result: {update.json()}")
def rename_file(file_info: dict):
item_id = file_info["id"]
file_name = file_info["file_name"]
if file_name is None or not file_name.strip():
log.info("file_name is not set, rename is not executed")
file_info["review"] = True
file_info["should_download"] = True
return
file = Path(args.dir, file_name)
new_file_path = file.with_name(f"{item_id}{file.suffix}")
log.info(f"rename {file} to {new_file_path}")
file.rename(Path(new_file_path))
file_info["cloud_link"] = str(new_file_path)
if __name__ == "__main__":
log = get_logger(args.verbose, args.config)
log.info("kontor.download started")
response = requests.get("http://127.0.0.1:8800/api/media/files?download=true")
log.info(f"Status: {response.status_code}")
data = response.json()
entries_count = len(data)
log.info(f"data: {entries_count}")
mediafile_index = 1
log.debug(f"data: {len(data)}")
missing_actors = {}
if args.limit:
log.warning(f"check the first {args.limit} links")
for item in data:
link = item["url"]
file_id = item["id"]
log.info(f"{file_id} - {link}")
download_status: FileStatus = is_file_downloaded(item, args.dir)
match download_status:
case FileStatus.DOWNLOADED:
rename_file(item)
update_status(file_id, item)
case FileStatus.RENAMED:
log.info("update status")
update_status(file_id, item)
case FileStatus.UNKNOWN:
download_file(link, item, args.dir)
rename_file(item)
log.info(f"{item}")
update_status(file_id, item)
log.warning(f"processed {mediafile_index}/{entries_count}")
if args.limit and args.limit <= mediafile_index:
break
mediafile_index += 1
log.info("kontor.download finished")