Files
kontor/kontor-scripts/find_links.py
T
2025-09-06 19:55:33 +02:00

205 lines
7.6 KiB
Python

"""
download files with URLs from DB
"""
import logging.config
import sys
from typing import Any, AnyStr, Dict, List
import requests
import re
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from bs4 import BeautifulSoup
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument('--verbose', '-v', action='count', default=0)
parser.add_argument('--all', '-a', action='store_true')
parser.add_argument('--limit', '-l', type=int, help='maximum number of links to check')
args = parser.parse_args()
def get_logger(level: int) -> logging.Logger:
logging.config.dictConfig({
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'simple': {
'format': '[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s',
'datefmt': '%Y-%m-%d %H:%M:%S',
},
},
'handlers': {
'console': {
'class': logging.StreamHandler,
'level': logging.DEBUG,
'formatter': 'simple',
'stream': 'ext://sys.stdout'
},
},
'loggers': {
'urllib3.connectionpool': {
'level': 'WARNING',
'propagate': False,
},
'root': {
'level': 'DEBUG',
'handlers': ['console'],
},
},
})
logger = logging.getLogger(__file__)
if level is not None:
match level:
case 0:
logger.setLevel(logging.WARNING)
case 1:
logger.setLevel(logging.INFO)
case 2:
logger.setLevel(logging.DEBUG)
case _:
logger.setLevel(logging.CRITICAL)
return logger
def update_file(log: logging.Logger, media_file):
update = requests.put(f"http://127.0.0.1:8800/api/media/files/{media_file['id']}", json=media_file)
log.debug(f"update status: {update.status_code}")
log.debug(f"update result: {update.json()}")
def get_actor_links(log: logging.Logger, media_file_url: str) -> list:
try:
r = requests.get(media_file_url)
soup = BeautifulSoup(r.content, "html.parser")
error404 = soup.css.select_one('.error404-title')
if error404 and error404.get_text() == "Video nicht gefunden":
log.warning(f"{error404.get_text()}")
media_file['url'] = None
media_file['review'] = False
update_file(log, media_file)
return []
anchors = soup.find_all('a', attrs={'href': re.compile("^https://.*pornstars/.*")})
actor_links = []
for anchor in anchors:
link_url = anchor.get('href')
if link_url.endswith('all/countries'):
continue
if link_url in actor_links:
continue
actor_links.append(link_url)
log.debug(f"links({len(actor_links)}): {actor_links}")
return actor_links
except Exception as error:
log.warning(f"something went wrong: {error}")
return []
def get_media_files(all_files: bool)-> Any:
files_url = ""
if all_files:
files_url= "http://127.0.0.1:8800/api/media/files"
else:
files_url = "http://127.0.0.1:8800/api/media/files?review=true"
response = requests.get(files_url)
log.debug(f"Status: {response.status_code}")
data = response.json()
return data
def update_media_file(item, log: logging.Logger):
update = requests.put(f"http://127.0.0.1:8800/api/media/files/{item['id']}", json=item)
log.debug(f"update status: {update.status_code}")
log.debug(f"update result: {update.json()}")
return update.json()
def update_media_file_actors(mediafile: dict, actor_id_list: List[AnyStr], log: logging.Logger):
media_file_id = mediafile['id']
actor_response = requests.put(f"http://127.0.0.1:8800/api/media/files/{media_file_id}/actors", json=actor_id_list)
actor_data = actor_response.json()
persisted_actor_links: int = len(actor_data)
found_actor_links: int = len(actor_links)
if persisted_actor_links < found_actor_links:
log.warning(f"{persisted_actor_links} links persisted, but {found_actor_links} links are available")
mediafile['review'] = True
elif persisted_actor_links > found_actor_links:
log.warning("more persisted links than found actors")
mediafile['review'] = True
else:
mediafile['review'] = False
log.debug(f"found {persisted_actor_links} actors")
log.debug(f"found actors: {actor_data}")
def get_actor_ids(link_list: list, map_url_actor, map_ids_actor, map_path_actor, missing_actors: dict, log: logging.Logger) -> list:
found_actors: list = []
for link in link_list:
if link in map_url_actor:
actor_id = map_url_actor[link]['id']
log.debug(f"found actor with id: {actor_id}")
found_actors.append(map_ids_actor[actor_id])
else:
path = link.split('/')[-1]
if path in map_path_actor:
actor_id = map_path_actor[path]['id']
log.debug(f"found actor with id: {actor_id} by path {path}")
found_actors.append(map_ids_actor[actor_id])
else:
log.info(f"found actor {link} missing")
if link in missing_actors:
count = missing_actors[link]
missing_actors[link] = count +1
else:
missing_actors.update({link: 1})
return found_actors
def get_actors(log: logging.Logger):
actors_url = {}
actors_id = {}
actors_path = {}
response = requests.get("http://127.0.0.1:8800/api/media/actors")
data = response.json()
for media_actor in data:
actor_id = media_actor['id']
actor_name = media_actor['name']
actor_url = media_actor['url']
actor = {}
actor['id'] = actor_id
actor['name'] = actor_name
actor['url'] = actor_url
actors_url[actor_url] = actor
actors_id[actor_id] = actor
actors_path[actor_url.split('/')[-1]] = actor
log.debug(f'all actors: {actors_url}')
log.debug(f'all actors: {actors_path}')
return (actors_url, actors_id, actors_path)
if __name__ == '__main__':
log = get_logger(args.verbose)
log.warning('kontor.find_links started')
log.debug('get all actors')
(actors_url, actors_id, actors_path) = get_actors(log)
data = get_media_files(args.all)
entries_count = len(data)
mediafile_index = 1
log.debug(f"data: {len(data)}")
missing_actors = {}
if args.limit:
log.warning(f"check the first {args.limit} links")
for media_file in data:
link = media_file['url']
media_file_id = media_file['id']
if not link:
continue
if str(link) == "None":
continue
log.warning(f"{media_file['id']} - {str(link)}")
actor_links = get_actor_links(log, link)
actor_id_list = get_actor_ids(actor_links, actors_url, actors_id, actors_path, missing_actors, log)
update_media_file_actors(media_file, actor_id_list, log)
result = update_media_file(media_file, log)
log.warning(f"processed {mediafile_index}/{entries_count}")
if args.limit and args.limit <= mediafile_index:
break
mediafile_index += 1
for link in missing_actors:
log.info(f"{link}: {missing_actors[link]}")
sorted_missing = dict(sorted(missing_actors.items(), key=lambda item: item[1]))
for key in sorted_missing:
log.info(f"{key} : {sorted_missing[key]}")
log.warning('kontor.find_links finished')