""" download files with URLs from DB """ import logging.config import sys from typing import Any import requests import re from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter from bs4 import BeautifulSoup parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) parser.add_argument('--verbose', '-v', action='count', default=0) parser.add_argument('--all', '-a', action='store_true') parser.add_argument('--limit', '-l', type=int, help='maximum number of links to check') parser.add_argument('--add-actor', action='store_true', help='add missing actors') args = parser.parse_args() def get_logger(level: int) -> logging.Logger: logging.config.dictConfig({ 'version': 1, 'disable_existing_loggers': False, 'formatters': { 'simple': { 'format': '[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', 'datefmt': '%Y-%m-%d %H:%M:%S', }, }, 'handlers': { 'console': { 'class': logging.StreamHandler, 'level': logging.DEBUG, 'formatter': 'simple', 'stream': 'ext://sys.stdout' }, }, 'loggers': { 'urllib3.connectionpool': { 'level': 'WARNING', 'propagate': False, }, 'root': { 'level': 'DEBUG', 'handlers': ['console'], }, }, }) logger = logging.getLogger(__file__) if level is not None: match level: case 0: logger.setLevel(logging.WARNING) case 1: logger.setLevel(logging.INFO) case 2: logger.setLevel(logging.DEBUG) case _: logger.setLevel(logging.CRITICAL) return logger def update_file(log: logging.Logger, media_file): update = requests.put(f"http://127.0.0.1:8800/api/media/files/{media_file['id']}", json=media_file) log.debug(f"update status: {update.status_code}") log.debug(f"update result: {update.json()}") def get_actor_links(log: logging.Logger, media_file_url: str) -> list[str]: try: r = requests.get(media_file_url) soup = BeautifulSoup(r.content, "html.parser") error404 = soup.css.select_one('.error404-title') if error404 and error404.get_text() == "Video nicht gefunden": log.warning(f"{error404.get_text()}") media_file['url'] = None media_file['review'] = False update_file(log, media_file) return [] anchors = soup.find_all('a', attrs={'href': re.compile("^https://.*pornstars/.*")}) actor_links = [] for anchor in anchors: link_url = str(anchor.get("href")) # type: ignore if link_url.endswith('all/countries'): continue if link_url in actor_links: continue actor_links.append(link_url) log.debug(f"links({len(actor_links)}): {actor_links}") return actor_links except Exception as error: log.warning(f"something went wrong: {error}") return [] def get_media_files(all_files: bool)-> Any: files_url = "" if all_files: files_url= "http://127.0.0.1:8800/api/media/files" else: files_url = "http://127.0.0.1:8800/api/media/files?review=true" response = requests.get(files_url) log.debug(f"Status: {response.status_code}") data = response.json() return data def update_media_file(item, log: logging.Logger) -> Any: update = requests.put(f"http://127.0.0.1:8800/api/media/files/{item['id']}", json=item) log.debug(f"update status: {update.status_code}") log.debug(f"update result: {update.json()}") return update.json() def update_media_file_actors(mediafile: dict, actor_id_list: list[dict[str, str]], actor_links: list[str], map_ids_actor: dict[str, str], log: logging.Logger): media_file_id = mediafile['id'] actor_response = requests.put(f"http://127.0.0.1:8800/api/media/files/{media_file_id}/actors", json=actor_id_list) files_actor_list = actor_response.json() persisted_actor_links_count: int = len(files_actor_list) found_actor_links_count: int = len(actor_links) if persisted_actor_links_count < found_actor_links_count: log.warning(f"{persisted_actor_links_count} links persisted, but {found_actor_links_count} links are available") mediafile['review'] = True elif persisted_actor_links_count > found_actor_links_count: log.warning("more persisted links than found actors") for file_actor in files_actor_list: actor_id = file_actor['actor_id'] actor_url = map_ids_actor[actor_id]['url'] # type: ignore log.debug(f"check if actor({actor_id}) with {actor_url} in list") if actor_url not in actor_links: log.info(f"actor not found in links, delete relation {file_actor['id']}") delete_media_file_actor(file_actor['id'], log) mediafile['review'] = True else: mediafile['review'] = False log.debug(f"found {persisted_actor_links_count} actors") log.debug(f"found actors: {files_actor_list}") def delete_media_file_actor(media_actor_file_id: str, log: logging.Logger): delete_response = requests.delete(f"http://127.0.0.1:8800/api/media/actorfiles/{media_actor_file_id}") if delete_response.status_code == 204: log.info(f"actor file relation with id {media_actor_file_id} successfully deleted") def get_actor_ids(link_list: list[str], map_url_actor: dict[str, str], map_ids_actor: dict[str, str], map_path_actor: dict[str, str], missing_actors: dict[str, int], log: logging.Logger) -> list[dict[str, str]]: found_actors: list[dict[str, str]] = [] for link in link_list: actor = get_persisted_actor(link, map_url_actor, map_ids_actor, map_path_actor, log) if actor: found_actors.append(actor) else: if link in missing_actors: count = missing_actors[link] missing_actors[link] = count +1 else: missing_actors.update({link: 1}) return found_actors def get_persisted_actor(actor_url: str, map_url_actor: dict[str, str], map_ids_actor: dict[str, str], map_path_actor: dict[str, str], log: logging.Logger) -> dict[str, str] | None: alternate_url_actor: dict[str, dict[str, str]] = { 'https://ge.xhamster2.com/pornstars/jean-yves-lecastel': {'id': 'e354b866-717c-4a66-ad38-bc7c23d97e36', 'name': 'Jean-Yves Le Castel', 'url': 'https://ge.xhamster.com/pornstars/jean-yves-le-castel'}, 'https://ge.xhamster.com/pornstars/jean-yves-lecastel': {'id': 'e354b866-717c-4a66-ad38-bc7c23d97e36', 'name': 'Jean-Yves Le Castel', 'url': 'https://ge.xhamster.com/pornstars/jean-yves-le-castel'}, 'https://ge.xhamster.com/pornstars/gracie-green': {'id': 'cbec2e0d-869c-40f1-923f-21958d938d9f', 'name': 'Gracie May Green', 'url':'https://ge.xhamster.com/pornstars/gracie-may-green'}, 'https://ge.xhamster.com/pornstars/thomas-hyka': {'id': '1d814b45-ea98-4acc-88a2-227d3ed36959', 'name': 'Thomas Crown', 'url':'https://ge.xhamster.com/pornstars/thomas-crown'}, 'https://ge.xhamster.com/pornstars/chloe-couture': {'id': 'e22003a5-60a9-4d86-a1df-ae09ecbe5200', 'name': 'Chloe Cherry', 'url':'https://ge.xhamster.com/pornstars/chloe-cherry'}, 'https://ge.xhamster.com/pornstars/dava-fox': {'id': 'd913b778-4507-421b-88e0-9da73bb80a63', 'name': 'Dava Foxx', 'url':'https://ge.xhamster.com/pornstars/dava-foxx'}, 'https://ge.xhamster.com/pornstars/john-dough': {'id': 'a2ecd50f-09b2-4d31-9fcf-1a1438700f51', 'name': 'Jon Dough', 'url':'https://ge.xhamster.com/pornstars/jon-dough'}, 'https://ge.xhamster.com/pornstars/erica-mori': {'id': '5379dab9-63da-44ed-baf1-929d74ac60b1', 'name': 'Polly Yangs', 'url':'https://ge.xhamster.com/pornstars/polly-yangs'}, 'https://ge.xhamster.com/pornstars/elnara-cat': {'id': '543952d7-59a9-4492-a70f-e384b5f8eb57', 'name': 'Renata Fox', 'url':'https://ge.xhamster.com/pornstars/renata-fox'}, 'https://ge.xhamster.com/pornstars/melissa-grand': {'id': '5d025bea-4af6-4197-b38d-3b3afa9d30b9', 'name': 'Melissa Benz', 'url':'https://ge.xhamster.com/pornstars/melissa-benz'}, 'https://ge.xhamster.com/pornstars/sindy-dollar': {'id': 'fa97769c-9e53-4613-b3c3-4cc1a2672d4b', 'name': 'Cindy Dollar', 'url':'https://ge.xhamster.com/pornstars/cindy-dollar'}, } # type: ignore if actor_url in map_url_actor: actor_id: str = map_url_actor[actor_url]['id'] # type: ignore log.debug(f"found actor with id: {actor_id}") return map_ids_actor[actor_id] # type: ignore path = actor_url.split('/')[-1] if path in map_path_actor: actor_id: str = map_path_actor[path]['id'] # type: ignore log.debug(f"found actor with id: {actor_id} by path {path}") return map_ids_actor[actor_id] # type: ignore if actor_url in alternate_url_actor: actor_id: str = alternate_url_actor[actor_url]['id'] log.info(f"found actor with id: {actor_id} by alternative {path}") return alternate_url_actor[actor_url] log.info(f"found actor {actor_url} missing") return None def get_actors(log: logging.Logger): actors_url = {} actors_id = {} actors_path = {} response = requests.get("http://127.0.0.1:8800/api/media/actors") data = response.json() for media_actor in data: actor_id = media_actor['id'] actor_name = media_actor['name'] actor_url = media_actor['url'] actor = {} actor['id'] = actor_id actor['name'] = actor_name actor['url'] = actor_url actors_url[actor_url] = actor actors_id[actor_id] = actor actors_path[actor_url.split('/')[-1]] = actor log.debug(f'all actors: {actors_url}') log.debug(f'all actors: {actors_path}') return (actors_url, actors_id, actors_path) def get_actor_name(actor_url: str, log: logging.Logger) -> str | None: try: r = requests.get(actor_url) soup = BeautifulSoup(r.content, "html.parser") titles = soup.find_all('h1') for title in titles: log.info(f"title: {title.get_text()}") return title.get_text() except Exception as error: log.warning(f"something went wrong: {error}") return None def create_actor(actor_url: str, actor_name: str, log: logging.Logger): new_actor = { 'name': actor_name, 'url': actor_url} actor_response = requests.post(f"http://127.0.0.1:8800/api/media/actors", json=new_actor) log.warning(f"add status: {actor_response.status_code}") if actor_response.status_code == 201: actor_data = actor_response.json() log.warning(f"Actor {actor_data} persisted") else: log.info(f"Actor with {actor_url} not persisted") if __name__ == '__main__': log = get_logger(args.verbose) log.warning('kontor.find_links started') log.debug('get all actors') (actors_url, actors_id, actors_path) = get_actors(log) data = get_media_files(args.all) entries_count = len(data) mediafile_index = 1 log.debug(f"data: {len(data)}") missing_actors = {} if args.limit: log.warning(f"check the first {args.limit} links") for media_file in data: link = media_file['url'] media_file_id = media_file['id'] if not link: continue if str(link) == "None": continue log.warning(f"{media_file['id']} - {str(link)}") actor_links: list[str] = get_actor_links(log, link) actor_id_list = get_actor_ids(actor_links, actors_url, actors_id, actors_path, missing_actors, log) update_media_file_actors(media_file, actor_id_list, actor_links, actors_id, log) result = update_media_file(media_file, log) log.warning(f"processed {mediafile_index}/{entries_count}") if args.limit and args.limit <= mediafile_index: break mediafile_index += 1 for link in missing_actors: log.info(f"{link}: {missing_actors[link]}") actor_name = get_actor_name(link, log) if actor_name and args.add_actor: create_actor(link, actor_name, log) log.info("Sort missing actors by occurence count:") sorted_missing = dict(sorted(missing_actors.items(), key=lambda item: item[1])) for key in sorted_missing: log.info(f"{key} : {sorted_missing[key]}") log.warning('kontor.find_links finished')