Files
2026-01-29 23:50:41 +01:00

333 lines
15 KiB
Python

"""
download files with URLs from DB
"""
import logging.config
import sys
from typing import Any, Dict
import requests
import re
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from bs4 import BeautifulSoup
from config import get_api_config
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument('--verbose', '-v', action='count', default=0)
parser.add_argument("--config", "-c", default="kontor-docker")
parser.add_argument('--all', '-a', action='store_true')
parser.add_argument('--limit', '-l', type=int, help='maximum number of links to check')
parser.add_argument('--add-actor', action='store_true', help='add missing actors')
args = parser.parse_args()
def get_logger(level: int) -> logging.Logger:
logging.config.dictConfig({
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'simple': {
'format': '[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s',
'datefmt': '%Y-%m-%d %H:%M:%S',
},
},
'handlers': {
'console': {
'class': logging.StreamHandler,
'level': logging.DEBUG,
'formatter': 'simple',
'stream': 'ext://sys.stdout'
},
},
'loggers': {
'urllib3.connectionpool': {
'level': 'WARNING',
'propagate': False,
},
'root': {
'level': 'DEBUG',
'handlers': ['console'],
},
},
})
logger = logging.getLogger(__file__)
if level is not None:
match level:
case 0:
logger.setLevel(logging.WARNING)
case 1:
logger.setLevel(logging.INFO)
case 2:
logger.setLevel(logging.DEBUG)
case _:
logger.setLevel(logging.CRITICAL)
return logger
def update_file(log: logging.Logger, media_file, api_data: Dict[str, Any]):
host = api_data["host"]
port = api_data["port"]
token = api_data['token']
headers: Dict[str, str] = {"Authorization": f"Bearer {token}"}
url = f"http://{host}:{port}/api/media/files/{media_file['id']}"
update = requests.put(url, headers=headers, json=media_file)
log.debug(f"update status: {update.status_code}")
log.debug(f"update result: {update.json()}")
def get_actor_links(log: logging.Logger, media_file_url: str, api_data: Dict[str, Any]) -> list[str]:
try:
r = requests.get(media_file_url)
soup = BeautifulSoup(r.content, "html.parser")
error404 = soup.css.select_one('.error404-title')
if error404 and error404.get_text() == "Video nicht gefunden":
log.warning(f"{error404.get_text()}")
media_file['url'] = None
media_file['review'] = False
update_file(log, media_file, api_data=api_data)
return []
anchors = soup.find_all('a', attrs={'href': re.compile("^https://.*pornstars/.*")})
actor_links = []
for anchor in anchors:
link_url = str(anchor.get("href")) # type: ignore
if link_url.endswith('all/countries'):
continue
if link_url in actor_links:
continue
actor_links.append(link_url)
log.debug(f"links({len(actor_links)}): {actor_links}")
return actor_links
except Exception as error:
log.warning(f"something went wrong: {error}")
return []
def get_media_files(all_files: bool, api_data: Dict[str, Any])-> Any:
files_url = ""
host = api_data["host"]
port = api_data["port"]
token = api_data['token']
headers: Dict[str, str] = {"Authorization": f"Bearer {token}"}
if all_files:
files_url= f"http://{host}:{port}/api/media/files"
else:
files_url = f"http://{host}:{port}/api/media/files?review=true"
response = requests.get(files_url, headers=headers)
log.debug(f"Status: {response.status_code}")
data = response.json()
return data
def update_media_file(item, log: logging.Logger, api_data: Dict[str, Any]) -> Any:
host = api_data["host"]
port = api_data["port"]
token = api_data['token']
url: str = f"http://{host}:{port}/api/media/files/{item['id']}"
headers: Dict[str, str] = {"Authorization": f"Bearer {token}"}
update = requests.put(url, headers=headers, json=item)
log.debug(f"update status: {update.status_code}")
log.debug(f"update result: {update.json()}")
return update.json()
def update_media_file_actors(mediafile: dict,
actor_id_list: list[dict[str, str]],
actor_links: list[str],
map_ids_actor: dict[str, str],
log: logging.Logger,
api_data: Dict[str, Any]):
media_file_id = mediafile['id']
host = api_data["host"]
port = api_data["port"]
token = api_data['token']
url: str = f"http://{host}:{port}/api/media/files/{media_file_id}/actors"
headers: Dict[str, str] = {"Authorization": f"Bearer {token}"}
actor_response = requests.put(url, headers=headers, json=actor_id_list)
files_actor_list = actor_response.json()
persisted_actor_links_count: int = len(files_actor_list)
found_actor_links_count: int = len(actor_links)
if persisted_actor_links_count < found_actor_links_count:
log.warning(f"{persisted_actor_links_count} links persisted, but {found_actor_links_count} links are available")
mediafile['review'] = True
elif persisted_actor_links_count > found_actor_links_count:
log.warning("more persisted links than found actors")
for file_actor in files_actor_list:
actor_id = file_actor['actor_id']
actor_url = map_ids_actor[actor_id]['url'] # type: ignore
log.debug(f"check if actor({actor_id}) with {actor_url} in list")
if actor_url not in actor_links:
log.info(f"actor not found in links, delete relation {file_actor['id']}")
delete_media_file_actor(file_actor['id'], log, api_data)
mediafile['review'] = True
else:
mediafile['review'] = False
log.debug(f"found {persisted_actor_links_count} actors")
log.debug(f"found actors: {files_actor_list}")
def delete_media_file_actor(media_actor_file_id: str, log: logging.Logger, api_data: Dict[str, Any]):
host = api_data["host"]
port = api_data["port"]
token = api_data['token']
url: str = f"http://{host}:{port}/api/media/actorfiles/{media_actor_file_id}"
headers: Dict[str, str] = {"Authorization": f"Bearer {token}"}
delete_response = requests.delete(url, headers=headers)
if delete_response.status_code == 204:
log.info(f"actor file relation with id {media_actor_file_id} successfully deleted")
def get_actor_ids(link_list: list[str],
map_url_actor: dict[str, str],
map_ids_actor: dict[str, str],
map_path_actor: dict[str, str],
missing_actors: dict[str, int],
log: logging.Logger) -> list[dict[str, str]]:
found_actors: list[dict[str, str]] = []
for link in link_list:
actor = get_persisted_actor(link, map_url_actor, map_ids_actor, map_path_actor, log)
if actor:
found_actors.append(actor)
else:
if link in missing_actors:
count = missing_actors[link]
missing_actors[link] = count +1
else:
missing_actors.update({link: 1})
return found_actors
def get_persisted_actor(actor_url: str,
map_url_actor: dict[str, str],
map_ids_actor: dict[str, str],
map_path_actor: dict[str, str],
log: logging.Logger) -> dict[str, str] | None:
alternate_url_actor: dict[str, dict[str, str]] = {
'https://ge.xhamster2.com/pornstars/jean-yves-lecastel':
{'id': 'e354b866-717c-4a66-ad38-bc7c23d97e36', 'name': 'Jean-Yves Le Castel', 'url': 'https://ge.xhamster.com/pornstars/jean-yves-le-castel'},
'https://ge.xhamster.com/pornstars/jean-yves-lecastel':
{'id': 'e354b866-717c-4a66-ad38-bc7c23d97e36', 'name': 'Jean-Yves Le Castel', 'url': 'https://ge.xhamster.com/pornstars/jean-yves-le-castel'},
'https://ge.xhamster.com/pornstars/gracie-green':
{'id': 'cbec2e0d-869c-40f1-923f-21958d938d9f', 'name': 'Gracie May Green', 'url':'https://ge.xhamster.com/pornstars/gracie-may-green'},
'https://ge.xhamster.com/pornstars/thomas-hyka':
{'id': '1d814b45-ea98-4acc-88a2-227d3ed36959', 'name': 'Thomas Crown', 'url':'https://ge.xhamster.com/pornstars/thomas-crown'},
'https://ge.xhamster.com/pornstars/chloe-couture':
{'id': 'e22003a5-60a9-4d86-a1df-ae09ecbe5200', 'name': 'Chloe Cherry', 'url':'https://ge.xhamster.com/pornstars/chloe-cherry'},
'https://ge.xhamster.com/pornstars/dava-fox':
{'id': 'd913b778-4507-421b-88e0-9da73bb80a63', 'name': 'Dava Foxx', 'url':'https://ge.xhamster.com/pornstars/dava-foxx'},
'https://ge.xhamster.com/pornstars/john-dough':
{'id': 'a2ecd50f-09b2-4d31-9fcf-1a1438700f51', 'name': 'Jon Dough', 'url':'https://ge.xhamster.com/pornstars/jon-dough'},
'https://ge.xhamster.com/pornstars/erica-mori':
{'id': '5379dab9-63da-44ed-baf1-929d74ac60b1', 'name': 'Polly Yangs', 'url':'https://ge.xhamster.com/pornstars/polly-yangs'},
'https://ge.xhamster.com/pornstars/elnara-cat':
{'id': '543952d7-59a9-4492-a70f-e384b5f8eb57', 'name': 'Renata Fox', 'url':'https://ge.xhamster.com/pornstars/renata-fox'},
'https://ge.xhamster.com/pornstars/melissa-grand':
{'id': '5d025bea-4af6-4197-b38d-3b3afa9d30b9', 'name': 'Melissa Benz', 'url':'https://ge.xhamster.com/pornstars/melissa-benz'},
'https://ge.xhamster.com/pornstars/sindy-dollar':
{'id': 'fa97769c-9e53-4613-b3c3-4cc1a2672d4b', 'name': 'Cindy Dollar', 'url':'https://ge.xhamster.com/pornstars/cindy-dollar'},
} # type: ignore
if actor_url in map_url_actor:
actor_id: str = map_url_actor[actor_url]['id'] # type: ignore
log.debug(f"found actor with id: {actor_id}")
return map_ids_actor[actor_id] # type: ignore
path = actor_url.split('/')[-1]
if path in map_path_actor:
actor_id: str = map_path_actor[path]['id'] # type: ignore
log.debug(f"found actor with id: {actor_id} by path {path}")
return map_ids_actor[actor_id] # type: ignore
if actor_url in alternate_url_actor:
actor_id: str = alternate_url_actor[actor_url]['id']
log.info(f"found actor with id: {actor_id} by alternative {path}")
return alternate_url_actor[actor_url]
log.info(f"found actor {actor_url} missing")
return None
def get_actors(log: logging.Logger, api_data: Dict[str, Any]):
actors_url = {}
actors_id = {}
actors_path = {}
host = api_data["host"]
port = api_data["port"]
token = api_data['token']
url: str = f"http://{host}:{port}/api/media/actors"
headers: Dict[str, str] = {"Authorization": f"Bearer {token}"}
response = requests.get(url, headers=headers)
data = response.json()
for media_actor in data:
actor_id = media_actor['id']
actor_name = media_actor['name']
actor_url = media_actor['url']
actor = {}
actor['id'] = actor_id
actor['name'] = actor_name
actor['url'] = actor_url
actors_url[actor_url] = actor
actors_id[actor_id] = actor
actors_path[actor_url.split('/')[-1]] = actor
log.debug(f'all actors: {actors_url}')
log.debug(f'all actors: {actors_path}')
return (actors_url, actors_id, actors_path)
def get_actor_name(actor_url: str, log: logging.Logger) -> str | None:
try:
r = requests.get(actor_url)
soup = BeautifulSoup(r.content, "html.parser")
titles = soup.find_all('h1')
for title in titles:
log.info(f"title: {title.get_text()}")
return title.get_text()
except Exception as error:
log.warning(f"something went wrong: {error}")
return None
def create_actor(actor_url: str, actor_name: str, log: logging.Logger, api_data: Dict[str, Any]):
new_actor = { 'name': actor_name, 'url': actor_url}
host = api_data["host"]
port = api_data["port"]
token = api_data['token']
url: str = f"http://{host}:{port}/api/media/actors"
headers: Dict[str, str] = {"Authorization": f"Bearer {token}"}
actor_response = requests.post(url, headers=headers, json=new_actor)
log.warning(f"add status: {actor_response.status_code}")
if actor_response.status_code == 201:
actor_data = actor_response.json()
log.warning(f"Actor {actor_data} persisted")
else:
log.info(f"Actor with {actor_url} not persisted")
if __name__ == '__main__':
log = get_logger(args.verbose)
log.warning('kontor.find_links started')
log.debug('get all actors')
api_data = get_api_config(log, args.config)
host = api_data["host"]
port = api_data["port"]
token = api_data['token']
(actors_url, actors_id, actors_path) = get_actors(log, api_data=api_data)
data = get_media_files(args.all, api_data)
entries_count = len(data)
mediafile_index = 1
log.debug(f"data: {len(data)}")
missing_actors = {}
if args.limit:
log.warning(f"check the first {args.limit} links")
for media_file in data:
link = media_file['url']
media_file_id = media_file['id']
if not link:
continue
if str(link) == "None":
continue
log.warning(f"{media_file['id']} - {str(link)}")
actor_links: list[str] = get_actor_links(log, link, api_data=api_data)
actor_id_list = get_actor_ids(actor_links, actors_url, actors_id, actors_path, missing_actors, log)
update_media_file_actors(media_file, actor_id_list, actor_links, actors_id, log, api_data=api_data)
result = update_media_file(media_file, log, api_data=api_data)
log.warning(f"processed {mediafile_index}/{entries_count}")
if args.limit and args.limit <= mediafile_index:
break
mediafile_index += 1
for link in missing_actors:
log.info(f"{link}: {missing_actors[link]}")
actor_name = get_actor_name(link, log)
if actor_name and args.add_actor:
create_actor(link, actor_name, log, api_data=api_data)
log.info("Sort missing actors by occurence count:")
sorted_missing = dict(sorted(missing_actors.items(), key=lambda item: item[1]))
for key in sorted_missing:
log.info(f"{key} : {sorted_missing[key]}")
log.warning('kontor.find_links finished')