""" download files with URLs from DB """ import logging.config import sys from typing import Any, Dict import requests import re from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter from bs4 import BeautifulSoup from config import get_api_config parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) parser.add_argument('--verbose', '-v', action='count', default=0) parser.add_argument("--config", "-c", default="kontor-docker") parser.add_argument('--all', '-a', action='store_true') parser.add_argument('--limit', '-l', type=int, help='maximum number of links to check') parser.add_argument('--add-actor', action='store_true', help='add missing actors') args = parser.parse_args() def get_logger(level: int) -> logging.Logger: logging.config.dictConfig({ 'version': 1, 'disable_existing_loggers': False, 'formatters': { 'simple': { 'format': '[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', 'datefmt': '%Y-%m-%d %H:%M:%S', }, }, 'handlers': { 'console': { 'class': logging.StreamHandler, 'level': logging.DEBUG, 'formatter': 'simple', 'stream': 'ext://sys.stdout' }, }, 'loggers': { 'urllib3.connectionpool': { 'level': 'WARNING', 'propagate': False, }, 'root': { 'level': 'DEBUG', 'handlers': ['console'], }, }, }) logger = logging.getLogger(__file__) if level is not None: match level: case 0: logger.setLevel(logging.WARNING) case 1: logger.setLevel(logging.INFO) case 2: logger.setLevel(logging.DEBUG) case _: logger.setLevel(logging.CRITICAL) return logger def update_file(log: logging.Logger, media_file, api_data: Dict[str, Any]): host = api_data["host"] port = api_data["port"] token = api_data['token'] headers: Dict[str, str] = {"Authorization": f"Bearer {token}"} url = f"http://{host}:{port}/api/media/files/{media_file['id']}" update = requests.put(url, headers=headers, json=media_file) log.debug(f"update status: {update.status_code}") log.debug(f"update result: {update.json()}") def get_actor_links(log: logging.Logger, media_file_url: str, api_data: Dict[str, Any]) -> list[str]: try: r = requests.get(media_file_url) soup = BeautifulSoup(r.content, "html.parser") error404 = soup.css.select_one('.error404-title') if error404 and error404.get_text() == "Video nicht gefunden": log.warning(f"{error404.get_text()}") media_file['url'] = None media_file['review'] = False update_file(log, media_file, api_data=api_data) return [] anchors = soup.find_all('a', attrs={'href': re.compile("^https://.*pornstars/.*")}) actor_links = [] for anchor in anchors: link_url = str(anchor.get("href")) # type: ignore if link_url.endswith('all/countries'): continue if link_url in actor_links: continue actor_links.append(link_url) log.debug(f"links({len(actor_links)}): {actor_links}") return actor_links except Exception as error: log.warning(f"something went wrong: {error}") return [] def get_media_files(all_files: bool, api_data: Dict[str, Any])-> Any: files_url = "" host = api_data["host"] port = api_data["port"] token = api_data['token'] headers: Dict[str, str] = {"Authorization": f"Bearer {token}"} if all_files: files_url= f"http://{host}:{port}/api/media/files" else: files_url = f"http://{host}:{port}/api/media/files?review=true" response = requests.get(files_url, headers=headers) log.debug(f"Status: {response.status_code}") data = response.json() return data def update_media_file(item, log: logging.Logger, api_data: Dict[str, Any]) -> Any: host = api_data["host"] port = api_data["port"] token = api_data['token'] url: str = f"http://{host}:{port}/api/media/files/{item['id']}" headers: Dict[str, str] = {"Authorization": f"Bearer {token}"} update = requests.put(url, headers=headers, json=item) log.debug(f"update status: {update.status_code}") log.debug(f"update result: {update.json()}") return update.json() def update_media_file_actors(mediafile: dict, actor_id_list: list[dict[str, str]], actor_links: list[str], map_ids_actor: dict[str, str], log: logging.Logger, api_data: Dict[str, Any]): media_file_id = mediafile['id'] host = api_data["host"] port = api_data["port"] token = api_data['token'] url: str = f"http://{host}:{port}/api/media/files/{media_file_id}/actors" headers: Dict[str, str] = {"Authorization": f"Bearer {token}"} actor_response = requests.put(url, headers=headers, json=actor_id_list) files_actor_list = actor_response.json() persisted_actor_links_count: int = len(files_actor_list) found_actor_links_count: int = len(actor_links) if persisted_actor_links_count < found_actor_links_count: log.warning(f"{persisted_actor_links_count} links persisted, but {found_actor_links_count} links are available") mediafile['review'] = True elif persisted_actor_links_count > found_actor_links_count: log.warning("more persisted links than found actors") for file_actor in files_actor_list: actor_id = file_actor['actor_id'] actor_url = map_ids_actor[actor_id]['url'] # type: ignore log.debug(f"check if actor({actor_id}) with {actor_url} in list") if actor_url not in actor_links: log.info(f"actor not found in links, delete relation {file_actor['id']}") delete_media_file_actor(file_actor['id'], log, api_data) mediafile['review'] = True else: mediafile['review'] = False log.debug(f"found {persisted_actor_links_count} actors") log.debug(f"found actors: {files_actor_list}") def delete_media_file_actor(media_actor_file_id: str, log: logging.Logger, api_data: Dict[str, Any]): host = api_data["host"] port = api_data["port"] token = api_data['token'] url: str = f"http://{host}:{port}/api/media/actorfiles/{media_actor_file_id}" headers: Dict[str, str] = {"Authorization": f"Bearer {token}"} delete_response = requests.delete(url, headers=headers) if delete_response.status_code == 204: log.info(f"actor file relation with id {media_actor_file_id} successfully deleted") def get_actor_ids(link_list: list[str], map_url_actor: dict[str, str], map_ids_actor: dict[str, str], map_path_actor: dict[str, str], missing_actors: dict[str, int], log: logging.Logger) -> list[dict[str, str]]: found_actors: list[dict[str, str]] = [] for link in link_list: actor = get_persisted_actor(link, map_url_actor, map_ids_actor, map_path_actor, log) if actor: found_actors.append(actor) else: if link in missing_actors: count = missing_actors[link] missing_actors[link] = count +1 else: missing_actors.update({link: 1}) return found_actors def get_persisted_actor(actor_url: str, map_url_actor: dict[str, str], map_ids_actor: dict[str, str], map_path_actor: dict[str, str], log: logging.Logger) -> dict[str, str] | None: alternate_url_actor: dict[str, dict[str, str]] = { 'https://ge.xhamster2.com/pornstars/jean-yves-lecastel': {'id': 'e354b866-717c-4a66-ad38-bc7c23d97e36', 'name': 'Jean-Yves Le Castel', 'url': 'https://ge.xhamster.com/pornstars/jean-yves-le-castel'}, 'https://ge.xhamster.com/pornstars/jean-yves-lecastel': {'id': 'e354b866-717c-4a66-ad38-bc7c23d97e36', 'name': 'Jean-Yves Le Castel', 'url': 'https://ge.xhamster.com/pornstars/jean-yves-le-castel'}, 'https://ge.xhamster.com/pornstars/gracie-green': {'id': 'cbec2e0d-869c-40f1-923f-21958d938d9f', 'name': 'Gracie May Green', 'url':'https://ge.xhamster.com/pornstars/gracie-may-green'}, 'https://ge.xhamster.com/pornstars/thomas-hyka': {'id': '1d814b45-ea98-4acc-88a2-227d3ed36959', 'name': 'Thomas Crown', 'url':'https://ge.xhamster.com/pornstars/thomas-crown'}, 'https://ge.xhamster.com/pornstars/chloe-couture': {'id': 'e22003a5-60a9-4d86-a1df-ae09ecbe5200', 'name': 'Chloe Cherry', 'url':'https://ge.xhamster.com/pornstars/chloe-cherry'}, 'https://ge.xhamster.com/pornstars/dava-fox': {'id': 'd913b778-4507-421b-88e0-9da73bb80a63', 'name': 'Dava Foxx', 'url':'https://ge.xhamster.com/pornstars/dava-foxx'}, 'https://ge.xhamster.com/pornstars/john-dough': {'id': 'a2ecd50f-09b2-4d31-9fcf-1a1438700f51', 'name': 'Jon Dough', 'url':'https://ge.xhamster.com/pornstars/jon-dough'}, 'https://ge.xhamster.com/pornstars/erica-mori': {'id': '5379dab9-63da-44ed-baf1-929d74ac60b1', 'name': 'Polly Yangs', 'url':'https://ge.xhamster.com/pornstars/polly-yangs'}, 'https://ge.xhamster.com/pornstars/elnara-cat': {'id': '543952d7-59a9-4492-a70f-e384b5f8eb57', 'name': 'Renata Fox', 'url':'https://ge.xhamster.com/pornstars/renata-fox'}, 'https://ge.xhamster.com/pornstars/melissa-grand': {'id': '5d025bea-4af6-4197-b38d-3b3afa9d30b9', 'name': 'Melissa Benz', 'url':'https://ge.xhamster.com/pornstars/melissa-benz'}, 'https://ge.xhamster.com/pornstars/sindy-dollar': {'id': 'fa97769c-9e53-4613-b3c3-4cc1a2672d4b', 'name': 'Cindy Dollar', 'url':'https://ge.xhamster.com/pornstars/cindy-dollar'}, } # type: ignore if actor_url in map_url_actor: actor_id: str = map_url_actor[actor_url]['id'] # type: ignore log.debug(f"found actor with id: {actor_id}") return map_ids_actor[actor_id] # type: ignore path = actor_url.split('/')[-1] if path in map_path_actor: actor_id: str = map_path_actor[path]['id'] # type: ignore log.debug(f"found actor with id: {actor_id} by path {path}") return map_ids_actor[actor_id] # type: ignore if actor_url in alternate_url_actor: actor_id: str = alternate_url_actor[actor_url]['id'] log.info(f"found actor with id: {actor_id} by alternative {path}") return alternate_url_actor[actor_url] log.info(f"found actor {actor_url} missing") return None def get_actors(log: logging.Logger, api_data: Dict[str, Any]): actors_url = {} actors_id = {} actors_path = {} host = api_data["host"] port = api_data["port"] token = api_data['token'] url: str = f"http://{host}:{port}/api/media/actors" headers: Dict[str, str] = {"Authorization": f"Bearer {token}"} response = requests.get(url, headers=headers) data = response.json() for media_actor in data: actor_id = media_actor['id'] actor_name = media_actor['name'] actor_url = media_actor['url'] actor = {} actor['id'] = actor_id actor['name'] = actor_name actor['url'] = actor_url actors_url[actor_url] = actor actors_id[actor_id] = actor actors_path[actor_url.split('/')[-1]] = actor log.debug(f'all actors: {actors_url}') log.debug(f'all actors: {actors_path}') return (actors_url, actors_id, actors_path) def get_actor_name(actor_url: str, log: logging.Logger) -> str | None: try: r = requests.get(actor_url) soup = BeautifulSoup(r.content, "html.parser") titles = soup.find_all('h1') for title in titles: log.info(f"title: {title.get_text()}") return title.get_text() except Exception as error: log.warning(f"something went wrong: {error}") return None def create_actor(actor_url: str, actor_name: str, log: logging.Logger, api_data: Dict[str, Any]): new_actor = { 'name': actor_name, 'url': actor_url} host = api_data["host"] port = api_data["port"] token = api_data['token'] url: str = f"http://{host}:{port}/api/media/actors" headers: Dict[str, str] = {"Authorization": f"Bearer {token}"} actor_response = requests.post(url, headers=headers, json=new_actor) log.warning(f"add status: {actor_response.status_code}") if actor_response.status_code == 201: actor_data = actor_response.json() log.warning(f"Actor {actor_data} persisted") else: log.info(f"Actor with {actor_url} not persisted") if __name__ == '__main__': log = get_logger(args.verbose) log.warning('kontor.find_links started') log.debug('get all actors') api_data = get_api_config(log, args.config) host = api_data["host"] port = api_data["port"] token = api_data['token'] (actors_url, actors_id, actors_path) = get_actors(log, api_data=api_data) data = get_media_files(args.all, api_data) entries_count = len(data) mediafile_index = 1 log.debug(f"data: {len(data)}") missing_actors = {} if args.limit: log.warning(f"check the first {args.limit} links") for media_file in data: link = media_file['url'] media_file_id = media_file['id'] if not link: continue if str(link) == "None": continue log.warning(f"{media_file['id']} - {str(link)}") actor_links: list[str] = get_actor_links(log, link, api_data=api_data) actor_id_list = get_actor_ids(actor_links, actors_url, actors_id, actors_path, missing_actors, log) update_media_file_actors(media_file, actor_id_list, actor_links, actors_id, log, api_data=api_data) result = update_media_file(media_file, log, api_data=api_data) log.warning(f"processed {mediafile_index}/{entries_count}") if args.limit and args.limit <= mediafile_index: break mediafile_index += 1 for link in missing_actors: log.info(f"{link}: {missing_actors[link]}") actor_name = get_actor_name(link, log) if actor_name and args.add_actor: create_actor(link, actor_name, log, api_data=api_data) log.info("Sort missing actors by occurence count:") sorted_missing = dict(sorted(missing_actors.items(), key=lambda item: item[1])) for key in sorted_missing: log.info(f"{key} : {sorted_missing[key]}") log.warning('kontor.find_links finished')