130 lines
4.8 KiB
Python
130 lines
4.8 KiB
Python
"""
|
|
read file with links and store it in DB
|
|
"""
|
|
import logging.config
|
|
import re
|
|
from typing import List
|
|
from bs4 import BeautifulSoup
|
|
import requests
|
|
import yaml
|
|
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
|
|
from pathlib import Path
|
|
from platformdirs import PlatformDirs
|
|
from pathlib import Path
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker, Session
|
|
from db.models.base import Base
|
|
import os
|
|
|
|
from db.models.media import MediaFile
|
|
|
|
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
|
|
parser.add_argument('--file', '-f', help='file with links', default='~/.sync/media/list.txt')
|
|
parser.add_argument('--video', help='store Url as VideoFile', action="store_true")
|
|
parser.add_argument('--config', '-c', default='kontor-docker')
|
|
parser.add_argument('--verbose', '-v', action='count', default=0)
|
|
parser.add_argument('--dry-run', '-m', help='excute script without storing', action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
DB_USER: str = os.getenv("DB_USER", "kontor")
|
|
DB_PASSWORD: str = os.getenv("DB_PASSWORD", "kontor")
|
|
DB_SERVER: str = os.getenv("DB_SERVER", "127.0.0.1")
|
|
DB_PORT: int = int(os.getenv("DB_PORT", 5432))
|
|
DB_DBNAME: str = os.getenv("DB_DBNAME", "kontor")
|
|
DATABASE_URL: str = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_SERVER}:{DB_PORT}/{DB_DBNAME}"
|
|
|
|
def get_logger(level, config: str):
|
|
dirs = PlatformDirs(config)
|
|
logging_config = Path(dirs.user_config_dir, 'logging-config.yaml')
|
|
with open(logging_config, 'rt') as f:
|
|
log_config = yaml.safe_load(f.read())
|
|
logging.config.dictConfig(log_config)
|
|
logger = logging.getLogger('development')
|
|
if level is not None:
|
|
match level:
|
|
case 0:
|
|
logger.setLevel(logging.CRITICAL)
|
|
case 1:
|
|
logger.setLevel(logging.INFO)
|
|
case 2:
|
|
logger.setLevel(logging.DEBUG)
|
|
case _:
|
|
logger.setLevel(logging.INFO)
|
|
return logger
|
|
|
|
def get_session() -> Session:
|
|
engine = create_engine(DATABASE_URL)
|
|
Base.metadata.create_all(bind=engine, checkfirst=True)
|
|
SessionLocal = sessionmaker(bind=engine)
|
|
return SessionLocal()
|
|
|
|
def load_data(filename: str, log) -> List[str]:
|
|
links: List[str] = []
|
|
log.debug("load_data")
|
|
import_file = Path(filename)
|
|
if not import_file.exists():
|
|
log.info(f"File {filename} does not exist. Do nothing.")
|
|
raise FileNotFoundError()
|
|
log.info("read txt file")
|
|
with open(filename, 'r') as txt_file:
|
|
while line := txt_file.readline():
|
|
# log.info(line.rstrip())
|
|
links.append(line.rstrip())
|
|
return links
|
|
|
|
def get_meta_info(media_file: MediaFile, log):
|
|
try:
|
|
r = requests.get(media_file.url)
|
|
soup = BeautifulSoup(r.content, "html.parser")
|
|
error404 = soup.css.select_one('.error404-title')
|
|
if error404 and error404.get_text() == "Video nicht gefunden":
|
|
log.warning(f"{error404.get_text()}")
|
|
media_file.url = None
|
|
media_file.review = False
|
|
return
|
|
title_tag = soup.find('title')
|
|
if title_tag:
|
|
media_file.title = title_tag.get_text()
|
|
media_file.review = False
|
|
anchors = soup.find_all('a', attrs={'href': re.compile("^https://.*pornstars/.*")})
|
|
actor_links = []
|
|
for anchor in anchors:
|
|
link_url = str(anchor.get("href")) # type: ignore
|
|
if link_url.endswith('all/countries'):
|
|
continue
|
|
if link_url in actor_links:
|
|
continue
|
|
actor_links.append(link_url)
|
|
log.info(f"links({len(actor_links)}): {actor_links}")
|
|
except Exception as error:
|
|
log.info(f"something went wrong: {error}")
|
|
media_file.title = None
|
|
media_file.review = True
|
|
log.info(f"update MediaFile with MetaInfos to {repr(media_file)}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
logger = get_logger(args.verbose, "kontor")
|
|
logger.info('kontor.add_links started')
|
|
session = get_session()
|
|
with session as db:
|
|
links = load_data(args.file, logger)
|
|
for link in links:
|
|
logger.debug(f"process {link}")
|
|
media_files = db.query(MediaFile).filter(MediaFile.url == link).all()
|
|
if len(media_files) == 0:
|
|
logger.info(f"MediaFile for link {link} not found")
|
|
media_file = MediaFile()
|
|
media_file.url = link
|
|
media_file.review = True
|
|
media_file.should_download = True
|
|
get_meta_info(media_file, logger)
|
|
if not args.dry_run:
|
|
db.add(media_file)
|
|
db.commit()
|
|
db.refresh(media_file)
|
|
else:
|
|
for media_file in media_files:
|
|
logger.debug(f"MediaFile with {media_file.id} is found")
|
|
logger.info('kontor.add_link finished')
|