""" read file with links and store it in DB """ from datetime import datetime import logging.config import re from typing import Dict, List import uuid from bs4 import BeautifulSoup import requests import yaml from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter from pathlib import Path from platformdirs import PlatformDirs from pathlib import Path from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, Session from db.models.base import Base import os from db.models.media import MediaActor, MediaActorFile, MediaFile parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) parser.add_argument('--file', '-f', help='file with links', default='~/.sync/media/list.txt') parser.add_argument('--video', help='store Url as VideoFile', action="store_true") parser.add_argument('--config', '-c', default='kontor-docker') parser.add_argument('--verbose', '-v', action='count', default=0) parser.add_argument('--limit', '-l', type=int, help='maximum number of links to check') parser.add_argument('--dry-run', '-m', help='excute script without storing', action="store_true") args = parser.parse_args() DB_USER: str = os.getenv("DB_USER", "kontor") DB_PASSWORD: str = os.getenv("DB_PASSWORD", "kontor") DB_SERVER: str = os.getenv("DB_SERVER", "127.0.0.1") DB_PORT: int = int(os.getenv("DB_PORT", 5432)) DB_DBNAME: str = os.getenv("DB_DBNAME", "kontor") DATABASE_URL: str = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_SERVER}:{DB_PORT}/{DB_DBNAME}" def get_logger(level, config: str): dirs = PlatformDirs(config) logging_config = Path(dirs.user_config_dir, 'logging-config.yaml') with open(logging_config, 'rt') as f: log_config = yaml.safe_load(f.read()) logging.config.dictConfig(log_config) logger = logging.getLogger('development') if level is not None: match level: case 0: logger.setLevel(logging.CRITICAL) case 1: logger.setLevel(logging.INFO) case 2: logger.setLevel(logging.DEBUG) case _: logger.setLevel(logging.INFO) return logger def get_session() -> Session: engine = create_engine(DATABASE_URL) Base.metadata.create_all(bind=engine, checkfirst=True) SessionLocal = sessionmaker(bind=engine) return SessionLocal() def load_data(filename: str, log) -> List[str]: links: List[str] = [] log.debug("load_data") import_file = Path(filename) if not import_file.exists(): log.info(f"File {filename} does not exist. Do nothing.") raise FileNotFoundError() log.info("read txt file") with open(filename, 'r') as txt_file: while line := txt_file.readline(): # log.info(line.rstrip()) links.append(line.rstrip()) return links def get_actors_mapping(actor_list: List[MediaActor]) -> Dict[str, MediaActor]: mapping: Dict[str, MediaActor] = {} for actor in actor_list: mapping[str(actor.url)] = actor return mapping def get_meta_info(media_file: MediaFile, log) -> List[str]: actor_links: List[str] = [] try: r = requests.get(media_file.url) soup = BeautifulSoup(r.content, "html.parser") error404 = soup.css.select_one('.error404-title') if error404 and error404.get_text() == "Video nicht gefunden": log.warning(f"{error404.get_text()}") media_file.url = None media_file.review = False return actor_links title_tag = soup.find('title') if title_tag: media_file.title = title_tag.get_text() media_file.review = False anchors = soup.find_all('a', attrs={'href': re.compile("^https://.*pornstars/.*")}) for anchor in anchors: link_url = str(anchor.get("href")) # type: ignore if link_url.endswith('all/countries'): continue if link_url in actor_links: continue actor_links.append(link_url) except Exception as error: log.info(f"something went wrong: {error}") media_file.title = None media_file.review = True log.info(f"update MediaFile with MetaInfos to {repr(media_file)}") log.info(f"links({len(actor_links)}): {actor_links}") return actor_links def get_actor_name(actor_url: str, log: logging.Logger) -> str | None: try: r = requests.get(actor_url) soup = BeautifulSoup(r.content, "html.parser") titles = soup.find_all('h1') for title in titles: log.info(f"title: {title.get_text()}") return title.get_text() except Exception as error: log.warning(f"something went wrong: {error}") return None if __name__ == '__main__': logger = get_logger(args.verbose, "kontor") logger.info('kontor.add_links started') if args.limit: logger.warning(f"check the first {args.limit} links") session = get_session() links_index = 1 with session as db: links = load_data(args.file, logger) for link in links: logger.debug(f"process {link}") media_files = db.query(MediaFile).filter(MediaFile.url == link).all() media_actors = db.query(MediaActor).all() actor_mapping = get_actors_mapping(media_actors) if len(media_files) == 0: logger.info(f"MediaFile for link {link} not found") media_file = MediaFile() media_file.id = str(uuid.uuid4()) media_file.created_date = datetime.now() media_file.last_modified_date = datetime.now() media_file.version = 0 media_file.url = link media_file.review = True media_file.should_download = True media_file.path = None media_file.cloud_link = None media_file.file_name = None actor_urls: List[str] = get_meta_info(media_file, logger) if not args.dry_run: db.add(media_file) db.commit() db.refresh(media_file) for actor_url in actor_urls: if actor_url in actor_mapping: media_actor: MediaActor = actor_mapping[actor_url] # logger.info(f"create mapping for {repr(media_actor)}") media_actor_file = MediaActorFile() media_actor_file.id = str(uuid.uuid4()) media_actor_file.created_date = datetime.now() media_actor_file.last_modified_date = datetime.now() media_actor_file.version = 0 media_actor_file.media_file_id = media_file.id media_actor_file.media_actor_id = media_actor.id logger.info(f"create mapping with {media_actor_file}") if not args.dry_run: db.add(media_actor_file) db.commit() else: media_actor = MediaActor() media_actor.id = str(uuid.uuid4()) media_actor.created_date = datetime.now() media_actor.last_modified_date = datetime.now() media_actor.version = 0 media_actor.name = get_actor_name(actor_url, logger) media_actor.url = actor_url logger.info(f"update MediaActor with {repr(media_actor)}") if not args.dry_run: db.add(media_actor) db.commit() media_actor_file = MediaActorFile() media_actor_file.id = str(uuid.uuid4()) media_actor_file.created_date = datetime.now() media_actor_file.last_modified_date = datetime.now() media_actor_file.version = 0 media_actor_file.media_file_id = media_file.id media_actor_file.media_actor_id = media_actor.id logger.info(f"create mapping with {media_actor_file}") if not args.dry_run: db.add(media_actor_file) db.commit() else: for media_file in media_files: logger.debug(f"MediaFile with {media_file.id} is found") links_index += 1 if args.limit and args.limit < links_index: break logger.info('kontor.add_link finished')