""" read file with links and store it in DB """ from datetime import datetime import logging import re from typing import Dict, List, Optional import uuid from bs4 import BeautifulSoup import requests from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter from pathlib import Path from api import Server, get_api_config, get_logger from db.models.media import MediaActor, MediaActorFile, MediaFile parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) parser.add_argument( "--file", "-f", help="file with links", default="~/.sync/media/list.txt" ) parser.add_argument("--video", help="store Url as VideoFile", action="store_true") parser.add_argument("--config", "-c", default="kontor-api") parser.add_argument("--server", "-s") parser.add_argument("--verbose", "-v", action="count", default=0) parser.add_argument("--limit", "-l", type=int, help="maximum number of links to check") parser.add_argument( "--dry-run", "-m", help="excute script without storing", action="store_true" ) args = parser.parse_args() def load_data(filename: str, log) -> List[str]: """ Read list of links from file. """ link_list: List[str] = [] log.debug("load_data") import_file = Path(filename) if not import_file.exists(): log.info(f"File {filename} does not exist. Do nothing.") raise FileNotFoundError() log.info("read txt file") with open(filename, "r", encoding="utf-8") as txt_file: while line := txt_file.readline(): # log.info(line.rstrip()) link_list.append(line.rstrip()) return link_list def get_actors_mapping(actor_list: List[MediaActor]) -> Dict[str, MediaActor]: """ Create dictionary with actor links as key and MediaActor objects as values. """ mapping: Dict[str, MediaActor] = {} for actor in actor_list: if isinstance(actor, dict): url: str = actor["url"] else: url: str = str(actor.url) mapping[url] = actor return mapping def get_actornames_mapping(actor_list: List[MediaActor]) -> Dict[str, MediaActor]: """ Create dictionary with actor names as key and MediaActor objects as values. """ mapping: Dict[str, MediaActor] = {} for actor in actor_list: if isinstance(actor, dict): name: str = actor["name"] else: name: str = str(actor.name) mapping[name] = actor return mapping def get_meta_info(media_file_obj: MediaFile, log) -> List[str]: """ Get meta info for MediaFile from link. """ actor_links: List[str] = [] try: r = requests.get(media_file_obj.url, timeout=5) soup = BeautifulSoup(r.content, "html.parser") error404 = soup.css.select_one(".error404-title") if error404 and error404.get_text() == "Video nicht gefunden": log.warning(f"{error404.get_text()}") media_file_obj.url = None media_file_obj.review = False return actor_links title_tag = soup.find("title") if title_tag: media_file_obj.title = title_tag.get_text() media_file_obj.review = False anchors = soup.find_all( "a", attrs={"href": re.compile("^https://.*pornstars/.*")} ) for anchor in anchors: link_url = str(anchor.get("href")) # type: ignore if link_url.endswith("all/countries"): continue if link_url in actor_links: continue actor_links.append(link_url) except Exception as error: log.info(f"something went wrong: {error}") media_file_obj.title = None media_file_obj.review = True log.info(f"update MediaFile with MetaInfos to {repr(media_file_obj)}") log.info(f"links({len(actor_links)}): {actor_links}") return actor_links def get_actor_name(actor_link: str, log: logging.Logger) -> str | None: """ Get actor name from link url. """ try: r = requests.get(actor_link, timeout=5) soup = BeautifulSoup(r.content, "html.parser") titles = soup.find_all("h1") for title in titles: log.info(f"title: {title.get_text()}") return title.get_text() except Exception as error: log.warning(f"something went wrong: {error}") return None if __name__ == "__main__": logger = get_logger(args.verbose, args.config) logger.info("kontor.add_links started") if args.limit: logger.warning("check the first %s links", args.limit) APICONFIG = get_api_config(logger, args.config) server_list: List[Server] = [] server: Optional[Server] = None if args.server: server = APICONFIG.get_server(args.server) if not server: server = APICONFIG.server[0] else: server = APICONFIG.server[0] links_index = 1 links = load_data(args.file, logger) all_media_files = server.request(logger, table="media_file") media_actors: List[MediaActor] = server.request(log=logger, table="media_actor") actor_mapping = get_actors_mapping(media_actors) actorname_mapping = get_actornames_mapping(media_actors) for link in links: logger.info("process %s", link) media_files = [ media_file for media_file in all_media_files if media_file["url"] == link ] if len(media_files) == 0: logger.info("MediaFile for link %s not found", link) media_file = MediaFile() media_file.id = str(uuid.uuid4()) media_file.created_date = datetime.now() media_file.last_modified_date = datetime.now() media_file.version = 0 media_file.url = link media_file.review = True media_file.should_download = True media_file.path = None media_file.cloud_link = None media_file.file_name = None actor_urls: List[str] = get_meta_info(media_file, logger) if not args.dry_run: logger.info("add MediaFile %s", media_file) server.create(logger, "media_file", media_file.export_dict()) for actor_url in actor_urls: if actor_url in actor_mapping: media_actor: Optional[MediaActor] = actor_mapping[actor_url] # logger.info(f"create mapping for {repr(media_actor)}") media_actor_file = MediaActorFile() media_actor_file.id = str(uuid.uuid4()) media_actor_file.created_date = datetime.now() media_actor_file.last_modified_date = datetime.now() media_actor_file.version = 0 media_actor_file.media_file_id = media_file.id media_actor_file.media_actor_id = media_actor["id"] logger.info("create mapping with %s", media_actor_file) if not args.dry_run: logger.info("add MediaFile Actor mapping %s", media_actor_file) server.create(logger, "media_actor_file", media_actor_file.export_dict()) else: actor_name = get_actor_name(actor_url, logger) if actor_name in actorname_mapping: media_actor = actorname_mapping[actor_name] else: media_actor = MediaActor() media_actor.id = str(uuid.uuid4()) media_actor.created_date = datetime.now() media_actor.last_modified_date = datetime.now() media_actor.version = 0 media_actor.name = get_actor_name(actor_url, logger) media_actor.url = actor_url logger.info("update MediaActor with %s", repr(media_actor)) if not args.dry_run: logger.info("Update MediaActor %s", media_actor) server.create(logger, "media_actor", media_actor.export_dict()) media_actor_file = MediaActorFile() media_actor_file.id = str(uuid.uuid4()) media_actor_file.created_date = datetime.now() media_actor_file.last_modified_date = datetime.now() media_actor_file.version = 0 media_actor_file.media_file_id = media_file.id media_actor_file.media_actor_id = media_actor.id logger.info("create mapping with %s", media_actor_file) if not args.dry_run: logger.info("Add MediaFile Actor mapping") else: for media_file in media_files: logger.info("MediaFile with %s is found", media_file["id"]) links_index += 1 if args.limit and args.limit < links_index: break logger.info("kontor.add_link finished")