180 lines
6.2 KiB
Python
180 lines
6.2 KiB
Python
"""
|
|
download files with URLs from DB
|
|
"""
|
|
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
|
|
from datetime import datetime
|
|
from enum import Enum, auto
|
|
from pathlib import Path
|
|
from logging import Logger
|
|
from typing import Any, Dict, Optional
|
|
from uuid import UUID
|
|
|
|
from api import Option, OptionType, Server, get_api_config, get_logger
|
|
|
|
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
|
|
parser.add_argument("--verbose", "-v", action="count", default=0)
|
|
parser.add_argument("--config", "-c", default="kontor-api")
|
|
parser.add_argument("--dir", "-d", default="/data/media")
|
|
parser.add_argument("--limit", "-l", type=int, help="maximum number of links to check")
|
|
parser.add_argument("--tool", "-t", default="yt-dlp")
|
|
parser.add_argument("--dry-run", "-m", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
|
|
class FileStatus(Enum):
|
|
"""
|
|
Status of video file.
|
|
"""
|
|
|
|
DOWNLOADED = auto()
|
|
RENAMED = auto()
|
|
UNKNOWN = auto()
|
|
|
|
|
|
def download_file(
|
|
url: str,
|
|
file_info: dict,
|
|
download_dir: str = "/data/media",
|
|
dl_tool: str = "yt-dlp",
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Download file from url.
|
|
"""
|
|
print(f"download file for {url} to {download_dir}")
|
|
result = subprocess.run(
|
|
[dl_tool, url], cwd=download_dir, capture_output=True, text=True
|
|
)
|
|
if result.returncode == 0:
|
|
output = result.stdout
|
|
output = re.sub(" +", " ", output)
|
|
lines_list = output.splitlines()
|
|
file_name = __parse_output__(lines_list)
|
|
logger.info("found file: %s", file_name)
|
|
if file_name is None or not file_name.strip():
|
|
file_info["review"] = True
|
|
file_info["should_download"] = True
|
|
file_info["file_name"] = None
|
|
else:
|
|
download_file_name = Path(download_dir, file_name)
|
|
file_info["should_download"] = False
|
|
file_info["review"] = False
|
|
file_info["file_name"] = download_file_name.name
|
|
file_info["cloud_link"] = str(download_file_name.absolute())
|
|
file_info["last_modified_date"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
return file_info
|
|
|
|
|
|
def __parse_output__(lines_list: list[str]) -> Optional[str]:
|
|
file_name = None
|
|
for line in lines_list:
|
|
logger.debug("parse line: %s", line)
|
|
if "has already been downloaded" in line:
|
|
end_len = len(" has already been downloaded")
|
|
file_name = line[11:-end_len]
|
|
logger.info("file_name: %s", file_name)
|
|
break
|
|
if "Destination" in line:
|
|
line_len = len(line)
|
|
start_len = len("[download] Destination: ")
|
|
file_len = line_len - start_len
|
|
file_name = line[-file_len:]
|
|
break
|
|
else:
|
|
file_name = None
|
|
return file_name
|
|
|
|
|
|
def is_file_downloaded(media_file: dict, path: Path) -> FileStatus:
|
|
"""
|
|
Check, if file is already downloaded.
|
|
"""
|
|
file_name_as_title = f"{media_file['file_name']}"
|
|
if not file_name_as_title:
|
|
logger.info("title has not been set - start download")
|
|
return FileStatus.UNKNOWN
|
|
file_title = Path(path, f"{file_name_as_title}.mp4")
|
|
if file_title.exists():
|
|
logger.info("%s has been downloaded", file_name_as_title)
|
|
media_file["should_download"] = False
|
|
return FileStatus.DOWNLOADED
|
|
file_name_as_id = f"{media_file['id']}"
|
|
file_with_id_as_name = Path(path, f"{file_name_as_id}.mp4")
|
|
if file_with_id_as_name.exists():
|
|
logger.info("%s has been downloaded and renamed", file_with_id_as_name)
|
|
media_file["cloud_link"] = str(file_with_id_as_name)
|
|
media_file["should_download"] = False
|
|
return FileStatus.RENAMED
|
|
logger.info("could not find file - start download")
|
|
return FileStatus.UNKNOWN
|
|
|
|
|
|
def update_status(item_id: UUID, file_info: dict, api_server: Server, log: Logger):
|
|
"""
|
|
Update MediaFile
|
|
"""
|
|
update = api_server.update(log, "media_file", item_id, file_info)
|
|
log.info("update result: %s", update)
|
|
|
|
|
|
def rename_file(file_info: dict):
|
|
"""
|
|
Rename file.
|
|
"""
|
|
item_id = file_info["id"]
|
|
file_name = file_info["file_name"]
|
|
if file_name is None or not file_name.strip():
|
|
logger.info("file_name is not set, rename is not executed")
|
|
file_info["review"] = True
|
|
file_info["should_download"] = True
|
|
return
|
|
file = Path(args.dir, file_name)
|
|
new_file_path = file.with_name(f"{item_id}{file.suffix}")
|
|
logger.info("rename %s to %s", file, new_file_path)
|
|
file.rename(Path(new_file_path))
|
|
file_info["cloud_link"] = str(new_file_path)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logger = get_logger(args.verbose, args.config)
|
|
logger.info("kontor.download started")
|
|
APICONFIG = get_api_config(logger, args.config)
|
|
server: Server = APICONFIG.server[0]
|
|
data = server.request(
|
|
log=logger, table="media_file", param=Option(OptionType.PARAM, "download=true")
|
|
)
|
|
entries_count = len(data)
|
|
logger.info("data: %s", entries_count)
|
|
mediafile_index = 1
|
|
logger.debug("data: %s", data)
|
|
missing_actors = {}
|
|
if args.dry_run:
|
|
sys.exit(0)
|
|
if args.limit:
|
|
logger.warning("check the first %s links", args.limit)
|
|
for item in data:
|
|
link = item["url"]
|
|
file_id = item["id"]
|
|
logger.info("%s - %s", file_id, link)
|
|
download_status: FileStatus = is_file_downloaded(item, args.dir)
|
|
match download_status:
|
|
case FileStatus.DOWNLOADED:
|
|
rename_file(item)
|
|
update_status(file_id, item, api_server=server, log=logger)
|
|
case FileStatus.RENAMED:
|
|
logger.info("update status")
|
|
update_status(file_id, item, api_server=server, log=logger)
|
|
case FileStatus.UNKNOWN:
|
|
download_file(link, item, args.dir)
|
|
rename_file(item)
|
|
logger.info(item)
|
|
update_status(file_id, item, api_server=server, log=logger)
|
|
logger.warning("processed %s/%s", mediafile_index, entries_count)
|
|
if args.limit and args.limit <= mediafile_index:
|
|
break
|
|
mediafile_index += 1
|
|
logger.info("kontor.download finished")
|