Vorbereitung Release 0.2.0 #83

Merged
tpeetz merged 178 commits from develop/0.2.0 into main 2026-01-29 22:50:42 +00:00
9 changed files with 237 additions and 420 deletions
Showing only changes of commit b87f0fc60a - Show all commits
+82 -7
View File
@@ -1,9 +1,11 @@
"""
read file with links and store it in DB
"""
from datetime import datetime
import logging.config
import re
from typing import List
from typing import Dict, List
import uuid
from bs4 import BeautifulSoup
import requests
import yaml
@@ -16,13 +18,14 @@ from sqlalchemy.orm import sessionmaker, Session
from db.models.base import Base
import os
from db.models.media import MediaFile
from db.models.media import MediaActor, MediaActorFile, MediaFile
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument('--file', '-f', help='file with links', default='~/.sync/media/list.txt')
parser.add_argument('--video', help='store Url as VideoFile', action="store_true")
parser.add_argument('--config', '-c', default='kontor-docker')
parser.add_argument('--verbose', '-v', action='count', default=0)
parser.add_argument('--limit', '-l', type=int, help='maximum number of links to check')
parser.add_argument('--dry-run', '-m', help='excute script without storing', action="store_true")
args = parser.parse_args()
@@ -72,7 +75,14 @@ def load_data(filename: str, log) -> List[str]:
links.append(line.rstrip())
return links
def get_meta_info(media_file: MediaFile, log):
def get_actors_mapping(actor_list: List[MediaActor]) -> Dict[str, MediaActor]:
mapping: Dict[str, MediaActor] = {}
for actor in actor_list:
mapping[str(actor.url)] = actor
return mapping
def get_meta_info(media_file: MediaFile, log) -> List[str]:
actor_links: List[str] = []
try:
r = requests.get(media_file.url)
soup = BeautifulSoup(r.content, "html.parser")
@@ -81,13 +91,12 @@ def get_meta_info(media_file: MediaFile, log):
log.warning(f"{error404.get_text()}")
media_file.url = None
media_file.review = False
return
return actor_links
title_tag = soup.find('title')
if title_tag:
media_file.title = title_tag.get_text()
media_file.review = False
anchors = soup.find_all('a', attrs={'href': re.compile("^https://.*pornstars/.*")})
actor_links = []
for anchor in anchors:
link_url = str(anchor.get("href")) # type: ignore
if link_url.endswith('all/countries'):
@@ -95,35 +104,101 @@ def get_meta_info(media_file: MediaFile, log):
if link_url in actor_links:
continue
actor_links.append(link_url)
log.info(f"links({len(actor_links)}): {actor_links}")
except Exception as error:
log.info(f"something went wrong: {error}")
media_file.title = None
media_file.review = True
log.info(f"update MediaFile with MetaInfos to {repr(media_file)}")
log.info(f"links({len(actor_links)}): {actor_links}")
return actor_links
def get_actor_name(actor_url: str, log: logging.Logger) -> str | None:
try:
r = requests.get(actor_url)
soup = BeautifulSoup(r.content, "html.parser")
titles = soup.find_all('h1')
for title in titles:
log.info(f"title: {title.get_text()}")
return title.get_text()
except Exception as error:
log.warning(f"something went wrong: {error}")
return None
if __name__ == '__main__':
logger = get_logger(args.verbose, "kontor")
logger.info('kontor.add_links started')
if args.limit:
logger.warning(f"check the first {args.limit} links")
session = get_session()
links_index = 1
with session as db:
links = load_data(args.file, logger)
for link in links:
logger.debug(f"process {link}")
media_files = db.query(MediaFile).filter(MediaFile.url == link).all()
media_actors = db.query(MediaActor).all()
actor_mapping = get_actors_mapping(media_actors)
if len(media_files) == 0:
logger.info(f"MediaFile for link {link} not found")
media_file = MediaFile()
media_file.id = str(uuid.uuid4())
media_file.created_date = datetime.now()
media_file.last_modified_date = datetime.now()
media_file.version = 0
media_file.url = link
media_file.review = True
media_file.should_download = True
get_meta_info(media_file, logger)
media_file.path = None
media_file.cloud_link = None
media_file.file_name = None
actor_urls: List[str] = get_meta_info(media_file, logger)
if not args.dry_run:
db.add(media_file)
db.commit()
db.refresh(media_file)
for actor_url in actor_urls:
if actor_url in actor_mapping:
media_actor: MediaActor = actor_mapping[actor_url]
# logger.info(f"create mapping for {repr(media_actor)}")
media_actor_file = MediaActorFile()
media_actor_file.id = str(uuid.uuid4())
media_actor_file.created_date = datetime.now()
media_actor_file.last_modified_date = datetime.now()
media_actor_file.version = 0
media_actor_file.media_file_id = media_file.id
media_actor_file.media_actor_id = media_actor.id
logger.info(f"create mapping with {media_actor_file}")
if not args.dry_run:
db.add(media_actor_file)
db.commit()
else:
media_actor = MediaActor()
media_actor.id = str(uuid.uuid4())
media_actor.created_date = datetime.now()
media_actor.last_modified_date = datetime.now()
media_actor.version = 0
media_actor.name = get_actor_name(actor_url, logger)
media_actor.url = actor_url
logger.info(f"update MediaActor with {repr(media_actor)}")
if not args.dry_run:
db.add(media_actor)
db.commit()
media_actor_file = MediaActorFile()
media_actor_file.id = str(uuid.uuid4())
media_actor_file.created_date = datetime.now()
media_actor_file.last_modified_date = datetime.now()
media_actor_file.version = 0
media_actor_file.media_file_id = media_file.id
media_actor_file.media_actor_id = media_actor.id
logger.info(f"create mapping with {media_actor_file}")
if not args.dry_run:
db.add(media_actor_file)
db.commit()
else:
for media_file in media_files:
logger.debug(f"MediaFile with {media_file.id} is found")
links_index += 1
if args.limit and args.limit < links_index:
break
logger.info('kontor.add_link finished')
+2 -1
View File
@@ -1,3 +1,4 @@
from typing import Any, Dict
from db.models.admin import (
Assignment,
Token,
@@ -44,7 +45,7 @@ from db.models.tysc import (
Sport,
)
registry = {
registry: Dict[str, Any] = {
Sport.__tablename__: Sport,
Player.__tablename__: Player,
Team.__tablename__: Team,
+23 -23
View File
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Any, AnyStr, Dict
from typing import Any, Dict
from sqlalchemy import Column, ForeignKey, Integer, String, Boolean
from sqlalchemy.orm import relationship, mapped_column, Mapped
@@ -19,16 +19,16 @@ class Profile(Base, BaseMixin):
tokens = relationship("Token")
def get_full_name(self) -> str:
full_name = ""
full_name: str = ""
if self.first_name is not None:
full_name += self.first_name
full_name += str(self.first_name)
if self.last_name is not None:
if len(full_name) > 0:
full_name += " "
full_name += self.last_name
full_name += str(self.last_name)
return full_name
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -40,8 +40,8 @@ class Profile(Base, BaseMixin):
self.password = import_data['password']
self.enabled = import_data['enabled']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
@@ -64,7 +64,7 @@ class Token(Base, BaseMixin):
profile_id = Column(String, ForeignKey("profile.id"), nullable=False)
profile = relationship("Profile", back_populates="tokens")
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -75,8 +75,8 @@ class Token(Base, BaseMixin):
self.enabled = import_data['enabled']
self.profile_id = import_data['profile_id']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
@@ -87,22 +87,22 @@ class Token(Base, BaseMixin):
item['enabled'] = self.enabled
item['profile_id'] = self.profile_id
return item
class Permission(Base, BaseMixin):
__tablename__ = "permission"
name = Column(String, nullable=False)
assignments = relationship("Assignment")
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
self.version = import_data['version']
self.name = import_data['name']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
@@ -117,7 +117,7 @@ class Assignment(Base, BaseMixin):
permission_id = Column(String, ForeignKey("permission.id"), nullable=False)
permission = relationship("Permission", back_populates="assignments")
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -125,8 +125,8 @@ class Assignment(Base, BaseMixin):
self.profile_id = import_data['profile_id']
self.permission_id = import_data['permission_id']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
@@ -145,7 +145,7 @@ class MailAccount(Base, BaseMixin):
password = Column(String)
start_tls = Column(Boolean)
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -157,8 +157,8 @@ class MailAccount(Base, BaseMixin):
self.password = import_data['password']
self.start_tls = import_data['start_tls']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
@@ -180,7 +180,7 @@ class Mail(Base, BaseMixin):
sent_date: Mapped[datetime] = mapped_column()
received_date: Mapped[datetime] = mapped_column()
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -191,8 +191,8 @@ class Mail(Base, BaseMixin):
self.sent_date = import_data['sent_date']
self.received_date = import_data['received_date']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
+18 -18
View File
@@ -10,15 +10,15 @@ class Article(Base, BaseMixin):
title = Column(String, unique=True)
article_authors = relationship("ArticleAuthor")
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
self.version = import_data['version']
self.title = import_data['title']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
@@ -34,7 +34,7 @@ class Author(Base, BaseMixin):
article_authors = relationship("ArticleAuthor")
book_authors = relationship("BookAuthor")
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -42,8 +42,8 @@ class Author(Base, BaseMixin):
self.first_name = import_data['first_name']
self.last_name = import_data['last_name']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
@@ -58,15 +58,15 @@ class BookshelfPublisher(Base, BaseMixin):
name = Column(String, unique=True)
books = relationship("Book")
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
self.version = import_data['version']
self.name = import_data['name']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
@@ -84,7 +84,7 @@ class Book(Base, BaseMixin):
publisher = relationship('BookshelfPublisher', back_populates="books")
book_authors = relationship("BookAuthor")
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -94,8 +94,8 @@ class Book(Base, BaseMixin):
self.year = import_data['year']
self.publisher_id = import_data['publisher_id']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
@@ -114,7 +114,7 @@ class ArticleAuthor(Base, BaseMixin):
author_id = Column(String, ForeignKey('author.id'), nullable=False)
author = relationship('Author', back_populates="article_authors")
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -122,8 +122,8 @@ class ArticleAuthor(Base, BaseMixin):
self.article_id = import_data['article_id']
self.author_id = import_data['author_id']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
@@ -140,7 +140,7 @@ class BookAuthor(Base, BaseMixin):
book_id = Column(String, ForeignKey('book.id'), nullable=False)
book = relationship('Book', back_populates="book_authors")
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -148,8 +148,8 @@ class BookAuthor(Base, BaseMixin):
self.author_id = import_data['author_id']
self.book_id = import_data['book_id']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
+31 -32
View File
@@ -26,7 +26,7 @@ class Publisher(Base):
def __str__(self):
return self.__repr__()
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -34,8 +34,8 @@ class Publisher(Base):
self.name = import_data['name']
self.parent_publisher_id = import_data['parent_publisher_id']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {
'id': self.id,
'created_date': str(self.created_date),
'last_modified_date': str(self.last_modified_date),
@@ -67,7 +67,7 @@ class Comic(Base, BaseMixin):
def __str__(self):
return f'{self.title}({self.id})'
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -79,8 +79,8 @@ class Comic(Base, BaseMixin):
if 'weblink' in import_data:
self.weblink = import_data['weblink']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {
'id': self.id,
'created_date': str(self.created_date),
'last_modified_date': str(self.last_modified_date),
@@ -102,7 +102,7 @@ class Volume(Base, BaseMixin):
story_arcs = relationship("StoryArc")
issues = relationship("Issue")
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -110,8 +110,8 @@ class Volume(Base, BaseMixin):
self.name = import_data['name']
self.comic_id = import_data['comic_id']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
@@ -128,7 +128,7 @@ class TradePaperback(Base, BaseMixin):
comic_id = Column(String, ForeignKey("comic.id"), nullable=False)
comic = relationship("Comic", back_populates="trade_paperbacks")
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -138,8 +138,8 @@ class TradePaperback(Base, BaseMixin):
self.issue_end = import_data['issue_end']
self.comic_id = import_data['comic_id']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
@@ -160,7 +160,7 @@ class StoryArc(Base, BaseMixin):
volume = relationship("Volume", back_populates="story_arcs")
issues = relationship("Issue")
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -169,8 +169,8 @@ class StoryArc(Base, BaseMixin):
self.comic_id = import_data['comic_id']
self.volume_id = import_data['volume_id']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {
'id': self.id,
'created_date': str(self.created_date),
'last_modified_date': str(self.last_modified_date),
@@ -197,7 +197,7 @@ class Issue(Base, BaseMixin):
story_arc = relationship("StoryArc", back_populates="issues")
issue_works = relationship("IssueWork")
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -205,7 +205,7 @@ class Issue(Base, BaseMixin):
self.issue_number = import_data['issue_number']
self.title = import_data['title']
if import_data['published_on'] == 'None':
self.published_on = None
self.published_on = None # type: ignore
else:
self.published_on = import_data['published_on']
self.in_stock = import_data['in_stock']
@@ -214,8 +214,8 @@ class Issue(Base, BaseMixin):
self.volume_id = import_data['volume_id']
self.story_arc_id = import_data['story_arc_id']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {
'id': self.id,
'created_date': str(self.created_date),
'last_modified_date': str(self.last_modified_date),
@@ -239,7 +239,7 @@ class Artist(Base, BaseMixin):
comic_works = relationship("ComicWork")
issue_works = relationship("IssueWork")
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -248,8 +248,8 @@ class Artist(Base, BaseMixin):
if 'weblink' in import_data:
self.weblink = import_data['weblink']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {
'id': self.id,
'created_date': str(self.created_date),
'last_modified_date': str(self.last_modified_date),
@@ -272,15 +272,15 @@ class WorkType(Base, BaseMixin):
def __str__(self):
return f'{self.name}({self.id})'
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
self.version = import_data['version']
self.name = import_data['name']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {
'id': self.id,
'created_date': str(self.created_date),
'last_modified_date': str(self.last_modified_date),
@@ -299,7 +299,7 @@ class ComicWork(Base, BaseMixin):
work_type_id = Column(String, ForeignKey("worktype.id"), nullable=False)
work_type = relationship("WorkType", back_populates="comic_works")
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -308,8 +308,8 @@ class ComicWork(Base, BaseMixin):
self.artist_id = import_data['artist_id']
self.work_type_id = import_data['work_type_id']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {
'id': self.id,
'created_date': str(self.created_date),
'last_modified_date': str(self.last_modified_date),
@@ -330,7 +330,7 @@ class IssueWork(Base, BaseMixin):
work_type_id = Column(String, ForeignKey("worktype.id"), nullable=False)
work_type = relationship("WorkType", back_populates="issue_works")
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -339,8 +339,8 @@ class IssueWork(Base, BaseMixin):
self.artist_id = import_data['artist_id']
self.work_type_id = import_data['work_type_id']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {
'id': self.id,
'created_date': str(self.created_date),
'last_modified_date': str(self.last_modified_date),
@@ -350,4 +350,3 @@ class IssueWork(Base, BaseMixin):
'work_type_id': self.work_type_id
}
return item
-272
View File
@@ -1,22 +1,11 @@
import json
import uuid
from datetime import datetime
from enum import Enum, auto
from logging import Logger
from pathlib import Path
from typing import Any
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import sessionmaker
from db.models import registry
from db.models.tysc import Card, CardSet, Rooster, Team, FieldPosition, Player, Vendor, Sport
from db.models.comic import Issue, TradePaperback, StoryArc, Volume, ComicWork, Artist, Comic, Publisher, WorkType
from db.models.bookshelf import ArticleAuthor, BookAuthor, BookshelfPublisher, Article, Book, Author
from db.models.admin import Mail, MailAccount, ModuleData, Permission, Profile, Token, Assignment
from db.models.metadata import MetaDataTable, MetaDataColumn
from db.models.media import MediaVideo, MediaArticle, MediaFile, MediaActor, MediaActorFile
class ColumnEntry(Enum):
@@ -49,52 +38,6 @@ class KontorDB:
self.engine = db_engine
self.log = log
def get_table_by_name(self, table_name: str) -> dict:
result = {}
__session__ = sessionmaker(self.engine)
_filter = {'table_name': table_name}
with __session__() as session:
table = session.query(MetaDataTable).filter_by(**_filter).one()
result['id'] = table.id
result['table_name'] = table.table_name
return result
def get_column_meta_data(self, table_name: str, view_only=True) -> dict:
meta_data = {}
order = 0
__session__ = sessionmaker(self.engine)
columns = list()
table_info = self.get_table_by_name(table_name)
_filters = {'table_id': table_info['id']}
if view_only:
_filters['is_shown'] = True
with __session__() as session:
columns = session.query(MetaDataColumn).filter_by(**_filters).all()
for column in columns:
# self.log.info("get_column_meta_data: %s %s %d", column.column_name, column.column_label, column.column_order)
meta_data[order] = {
ColumnEntry.COLUMN_NAME: column.column_name,
ColumnEntry.COLUMN_LABEL: column.column_label,
ColumnEntry.COLUMN_ORDER: column.column_order,
ColumnEntry.COLUMN_REF_COLUMN: column.ref_column,
ColumnEntry.COLUMN_TYPE: column.column_type
}
order += 1
return meta_data
def get_filters(self, table_name: str) -> dict:
_filter_map = {}
__session__ = sessionmaker(self.engine)
table_info = self.get_table_by_name(table_name)
_filters = {'table_id': table_info['id'], 'show_filter': True}
with __session__() as session:
for column in session.query(MetaDataColumn).filter_by(**_filters).all():
_filter_map[column.column_name] = {
ColumnEntry.COLUMN_LABEL: column.filter_label,
ColumnEntry.COLUMN_WIDGET: None
}
return _filter_map
def data(self, table_name: str, columns: dict, filters: dict) -> list:
data = []
__session__ = sessionmaker(self.engine)
@@ -121,218 +64,3 @@ class KontorDB:
data.append(row)
# self.log.info("data: %s", data)
return data
def export_db(self, export_type: ExportType, export_file_name: str) -> dict:
results = {}
db = {}
export_table_list = self.get_table_names()
for table in export_table_list:
columns = self.get_column_meta_data(table, view_only=False)
if table in self.registry:
model = self.registry[table]
else:
self.log.info(f"table {table} is not registered")
continue
__session__ = sessionmaker(self.engine)
with __session__() as session:
rows = session.query(model).all()
entries = []
for row in rows:
# print(row)
entry = {}
for order in columns:
# print(columns[order])
column_name = columns[order][ColumnEntry.COLUMN_NAME]
# print(f"get value {column_name} from {row} of table {table}")
try:
value = getattr(row, column_name)
if isinstance(value, datetime):
entry[column_name] = str(value)
else:
entry[column_name] = value
except AttributeError:
pass
entries.append(entry)
db[table] = entries
results[table] = len(entries)
match export_type:
case ExportType.JSON:
json_dump = json.dumps(db, indent=4)
with open(export_file_name, "w") as dump_file:
dump_file.write(json_dump)
case ExportType.YAML:
pass
case ExportType.SQLITE:
pass
self.log.info(f"{len(results)} tables exported")
return results
def import_db(self, import_file_name: str) -> dict:
result = {}
import_file = Path(import_file_name)
if not import_file.exists():
self.log.info(f"File {import_file_name} does not exist. Do nothing.")
return result
match import_file.suffix:
case '.json':
print("read json file")
with open(import_file_name, 'r') as json_file:
json_load = json.load(json_file)
for table in json_load:
self.log.info(f"{table}: {len(json_load[table])}")
result[table] = self.import_table(table, json_load[table])
case '.yml':
print("read yaml file")
case '.yaml':
print("read yaml file")
case '.db':
print("read sqlite file")
return result
def import_table(self, table_name: str, items: list) -> dict:
result = {}
updated = []
added = []
remaining = []
existing_ids = self.get_ids(table_name)
self.log.info(f"found {len(existing_ids)} existing ids for table {table_name}")
for item in items:
current_id = item['id']
# print(f"import item: {item}")
found_item = None
__session__ = sessionmaker(self.engine)
with __session__() as session:
found_item = session.get(self.registry[table_name], current_id)
# print(f"found item: {found_item}")
if found_item is not None:
changed = self.update_entry(table_name, current_id, item)
updated.append(item)
if changed:
self.log.info(f"{current_id} has changed")
updated.append(item)
existing_ids.remove(current_id)
else:
try:
self.add_entry(table_name, item)
added.append(item)
except IntegrityError as error:
self.log.info(f"Could not add item, due to: {error.detail}")
if len(existing_ids) > 0:
print(f"remaining items for {table_name}: {existing_ids}")
remaining.extend(existing_ids)
result['updated'] = updated
result['added'] = added
result['remaining'] = remaining
return result
def get_ids(self, table_name: str) -> list:
existing_ids = []
__session__ = sessionmaker(self.engine)
with __session__() as session:
items = session.query(self.registry[table_name]).all()
for item in items:
existing_ids.append(getattr(item, 'id'))
return existing_ids
def add_entry(self, table_name: str, update_item: dict):
self.log.debug(f"add entry to table {table_name} with {update_item}")
__session__ = sessionmaker(self.engine)
with __session__() as session:
add_item = self.registry[table_name]()
for key in update_item.keys():
update_value = update_item[key]
setattr(add_item, key, update_value)
session.add(add_item)
session.commit()
def update_entry(self, table_name, current_id, update_item: dict) -> bool:
# self.log.info("update entry to table %s", table_name)
__session__ = sessionmaker(self.engine)
with __session__() as session:
existing_item = session.query(self.registry[table_name]).get(current_id)
changed = False
for key in update_item.keys():
update_value = update_item[key]
existing_value = getattr(existing_item, key)
if type(existing_value) is not type(update_value):
existing_value = str(existing_value)
if existing_value != update_value:
self.log.info(f"{key} has changed: {existing_value} != {update_value}")
setattr(existing_item, key, update_value)
session.commit()
changed = True
self.log.info(f"update {key} with {update_value}")
return changed
def add_link(self, link: str) -> dict:
result = {}
__session__ = sessionmaker(self.engine)
with __session__() as session:
media_file = MediaFile()
media_file.id = str(uuid.uuid4())
media_file.created_date = datetime.now()
media_file.last_modified_date = datetime.now()
media_file.version = 0
media_file.url = link
media_file.review = 1
media_file.should_download = 1
try:
session.add(media_file)
session.commit()
result['added'] = {'url': media_file.url, 'title': media_file.title, 'review': media_file.review,
'download': media_file.should_download}
except IntegrityError as error:
session.rollback()
result['error'] = error.orig
return result
def update_titles(self) -> dict:
update_list = {}
__session__ = sessionmaker(self.engine)
_filter = {'review': True}
with __session__() as session:
links = session.query(MediaFile).filter_by(**_filter).all()
self.log.info("%d entries found for updating titles", len(links))
for link in links:
url = link.url
if url is None:
continue
link.update_title()
session.commit()
update_list[link.id] = link.title
return update_list
def get_download_list(self) -> list[str]:
download_list = []
__session__ = sessionmaker(self.engine)
_filter = {'should_download': True}
with __session__() as session:
links = session.query(MediaFile).filter_by(**_filter).all()
for link in links:
url = link.url
if url is None:
continue
download_list.append(link.id)
return download_list
def download_file(self, entry_id: str, download_dir="/data/media", dl_tool="yt-dlp") -> str:
__session__ = sessionmaker(self.engine)
with __session__() as session:
link = session.query(MediaFile).get(entry_id)
link.download_file(download_dir, dl_tool)
session.commit()
file_name = link.file_name
return file_name
def delete_entries(self):
for (table_name, table) in self.registry.items():
# self.log.info("delete entries from table %s", table_name)
__session__ = sessionmaker(self.engine)
with __session__() as session:
items = session.query(table).all()
for item in items:
session.delete(item)
session.commit()
def check_files(self):
pass
+34 -21
View File
@@ -2,10 +2,10 @@ import re
import subprocess
from datetime import datetime
from pathlib import Path
from typing import Any, AnyStr, Dict
from typing import Any, Dict
import requests
from bs4 import BeautifulSoup
from sqlalchemy import Boolean, Column, False_, String, ForeignKey
from sqlalchemy import Boolean, Column, String, ForeignKey
from sqlalchemy.orm import relationship
from db.models.base import Base, BaseMixin, BaseVideoMixin
@@ -16,12 +16,12 @@ class MediaFile(Base, BaseMixin, BaseVideoMixin):
media_actor_files = relationship("MediaActorFile")
def __repr__(self):
return f'MediaFile(\n\tID: {self.id}\n\tTitle: {self.title}\n\tURL: {self.url}\n\tReview: {self.review}\n\tDownload: {self.should_download})'
return f'MediaFile(\n\tID: {self.id}\n\tTitle: {self.title}\n\tURL: {self.url}\n\tReview: {self.review}\n\tDownload: {self.should_download}\n\tPath: {self.path}\n\tCloudlink: {self.cloud_link})'
def __str__(self):
return f'{self.title}({self.id})'
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -34,8 +34,8 @@ class MediaFile(Base, BaseMixin, BaseVideoMixin):
self.url = import_data['url']
self.should_download = import_data['should_download']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
@@ -54,9 +54,10 @@ class MediaFile(Base, BaseMixin, BaseVideoMixin):
try:
r = requests.get(self.url)
soup = BeautifulSoup(r.content, "html.parser")
title = soup.title.string
self.title = title
self.review = False
title_tag = soup.find('title')
if title_tag:
self.title = title_tag.get_text()
self.review = False
except:
self.title = None
self.review = True
@@ -101,7 +102,13 @@ class MediaActor(Base, BaseMixin):
url = Column(String, unique=True)
media_actor_files = relationship("MediaActorFile")
def import_dict(self, import_data: Dict[AnyStr, Any]):
def __repr__(self):
return f'MediaActor(\n\tID: {self.id}\n\tName: {self.name}\n\tURL: {self.url})'
def __str__(self):
return f'{self.name}({self.id})'
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -109,8 +116,8 @@ class MediaActor(Base, BaseMixin):
self.name = import_data['name']
self.url = import_data['url']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
@@ -127,7 +134,13 @@ class MediaActorFile(Base, BaseMixin):
media_file_id = Column(String, ForeignKey("media_file.id"), nullable=True)
media_file = relationship("MediaFile", back_populates="media_actor_files")
def import_dict(self, import_data: Dict[AnyStr, Any]):
def __repr__(self):
return f'MediaActorFile(\n\tID: {self.id}\n\tMediaActor: {self.media_actor_id}\n\tMediaFile: {self.media_file_id})'
def __str__(self):
return f'{self.id}: MediaActor: {self.media_actor_id} - MediaFile: {self.media_file_id}'
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -135,8 +148,8 @@ class MediaActorFile(Base, BaseMixin):
self.media_actor_id = import_data['media_actor_id']
self.media_file_id = import_data['media_file_id']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
@@ -151,7 +164,7 @@ class MediaArticle(Base, BaseMixin):
title = Column(String)
url = Column(String, unique=True)
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -160,8 +173,8 @@ class MediaArticle(Base, BaseMixin):
self.title = import_data['title']
self.url = import_data['url']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
@@ -182,7 +195,7 @@ class MediaVideo(Base, BaseMixin):
url = Column(String, unique=True)
should_download = Column(Boolean)
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -195,8 +208,8 @@ class MediaVideo(Base, BaseMixin):
self.url = import_data['url']
self.should_download = import_data['should_download']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
+25 -25
View File
@@ -1,4 +1,4 @@
from typing import AnyStr, Dict, Any
from typing import Dict, Any
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Boolean
from sqlalchemy.orm import relationship
@@ -17,15 +17,15 @@ class Sport(Base, BaseMixin):
def __repr__(self):
return f"Sport(id={self.id}, name={self.name}, created_date={self.created_date})"
def import_dict(self, import_data: Dict[AnyStr, AnyStr]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
self.version = import_data['version']
self.name = import_data['name']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
@@ -41,7 +41,7 @@ class Team(Base, BaseMixin):
sport = relationship("Sport", back_populates="teams")
roosters = relationship("Rooster")
def import_dict(self, import_data: Dict[AnyStr, AnyStr]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -50,8 +50,8 @@ class Team(Base, BaseMixin):
self.short_name = import_data['short_name']
self.sport_id = import_data['sport_id']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
@@ -73,7 +73,7 @@ class FieldPosition(Base, BaseMixin):
sport = relationship("Sport", back_populates="positions")
roosters = relationship("Rooster")
def import_dict(self, import_data: Dict[AnyStr, AnyStr]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -82,8 +82,8 @@ class FieldPosition(Base, BaseMixin):
self.short_name = import_data['short_name']
self.sport_id = import_data['sport_id']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
@@ -106,7 +106,7 @@ class Player(Base, BaseMixin):
def get_full_name(self) -> str:
return f"{self.last_name}, {self.first_name}"
def import_dict(self, import_data: Dict[AnyStr, AnyStr]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -114,8 +114,8 @@ class Player(Base, BaseMixin):
self.first_name = import_data['first_name']
self.last_name = import_data['last_name']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
@@ -139,7 +139,7 @@ class Rooster(Base, BaseMixin):
position = relationship("FieldPosition", back_populates="roosters")
cards = relationship("Card")
def import_dict(self, import_data: Dict[AnyStr, AnyStr]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -149,8 +149,8 @@ class Rooster(Base, BaseMixin):
self.player_id = import_data['player_id']
self.position_id = import_data['position_id']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
@@ -168,15 +168,15 @@ class Vendor(Base, BaseMixin):
card_sets = relationship("CardSet")
cards = relationship("Card")
def import_dict(self, import_data: Dict[AnyStr, AnyStr]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
self.version = import_data['version']
self.name = import_data['name']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
@@ -197,7 +197,7 @@ class CardSet(Base, BaseMixin):
vendor = relationship("Vendor", back_populates="card_sets")
cards = relationship("Card")
def import_dict(self, import_data: Dict[AnyStr, AnyStr]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -207,8 +207,8 @@ class CardSet(Base, BaseMixin):
self.insert_set = import_data['insert_set']
self.vendor_id = import_data['vendor_id']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
@@ -234,7 +234,7 @@ class Card(Base, BaseMixin):
vendor_id = Column(String, ForeignKey("vendor.id"), nullable=False)
vendor = relationship("Vendor", back_populates="cards")
def import_dict(self, import_data: Dict[AnyStr, Any]):
def import_dict(self, import_data: Dict[str, Any]):
self.id = import_data['id']
self.created_date = import_data['created_date']
self.last_modified_date = import_data['last_modified_date']
@@ -245,8 +245,8 @@ class Card(Base, BaseMixin):
self.rooster_id = import_data['rooster_id']
self.vendor_id = import_data['vendor_id']
def export_dict(self) -> Dict[AnyStr, Any]:
item: Dict[AnyStr, Any] = {}
def export_dict(self) -> Dict[str, Any]:
item: Dict[str, Any] = {}
item['id'] = self.id
item['created_date'] = str(self.created_date)
item['last_modified_date'] = str(self.last_modified_date)
+22 -21
View File
@@ -4,7 +4,7 @@ import data from json file to PostgreSQL
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from datetime import datetime, date
from typing import Any, AnyStr, Dict, List
from typing import Any, Dict, List
import os
import json
@@ -43,7 +43,7 @@ def cleanup_database(db: Session, log, dry_run: bool):
db.delete(entry)
db.commit()
def load_data(filename: str, log) -> Dict[AnyStr, Dict[AnyStr, Any]]:
def load_data(filename: str, log) -> Dict[str, List[Dict[str, Any]]]:
log.debug("load_data")
import_file = Path(filename)
if not import_file.exists():
@@ -54,13 +54,13 @@ def load_data(filename: str, log) -> Dict[AnyStr, Dict[AnyStr, Any]]:
json_load = json.load(json_file)
return json_load
def get_ids(items: List[Any]) -> List[AnyStr]:
result: List[AnyStr] = []
def get_ids(items: List[Any]) -> List[str]:
result: List[str] = []
for item in items:
result.append(item.id)
return result
def update_item(db: Session, import_data: Dict[AnyStr, Any], item: Any, dry_run: bool, log):
def update_item(db: Session, import_data: Dict[str, Any], item: Any, dry_run: bool, log):
for (key, value) in import_data.items():
existing_value = getattr(item, str(key))
update: bool = has_changed(existing_value, value, log)
@@ -76,7 +76,7 @@ def update_item(db: Session, import_data: Dict[AnyStr, Any], item: Any, dry_run:
db.add(item)
db.commit()
def has_changed(existing_data: Any, import_data: AnyStr, log) -> bool:
def has_changed(existing_data: Any, import_data: str, log) -> bool:
if existing_data is None and import_data == 'None':
return False
if isinstance(existing_data, str):
@@ -94,7 +94,7 @@ def has_changed(existing_data: Any, import_data: AnyStr, log) -> bool:
return existing_data != import_data
def item_import(db: Session, import_data: Dict[AnyStr, Any], dry_run: bool, log):
def item_import(db: Session, import_data: Dict[str, Any], dry_run: bool, log):
log.info(f"import {import_data}")
if not dry_run:
log.debug(f"model: {repr(model)} {import_data}")
@@ -119,26 +119,27 @@ if __name__ == '__main__':
with SessionLocal() as db:
if args.cleanup:
cleanup_database(db, logger, args.dry_run)
data = load_data(args.file, logger)
table_list: List = list(data.keys())
logger.info(f"Liste der Tabellen: {table_list}")
sorted_table_list: List = table_list
data: Dict[str, List[Dict[str, Any]]] = load_data(args.file, logger)
table_list: List[str] = list(data.keys())
logger.debug(f"Liste der Tabellen: {table_list}")
sorted_table_list: List[str] = table_list
for tablename in sorted_table_list:
model = registry[tablename]
existing_items = db.query(model).all()
existing_ids = get_ids(existing_items)
existing_ids: List[str] = get_ids(existing_items)
logger.debug(f"found {len(existing_items)} for table {tablename}")
import_items = data[tablename]
import_items: List[Dict[str, Any]] = data[tablename]
for import_item in import_items:
if import_item['id'] in existing_ids:
logger.debug(f"update {import_item['id']}")
existing_item = db.get(model, import_item['id'])
item_id: str = import_item['id']
if item_id in existing_ids:
logger.debug(f"update {item_id}")
existing_item = db.get(model, item_id)
update_item(db, import_item, existing_item, args.dry_run, logger)
existing_ids.remove(import_item['id'])
existing_ids.remove(item_id)
else:
logger.debug(f"import {import_item['id']}")
logger.debug(f"import {item_id}")
item_import(db, import_item, args.dry_run, logger)
logger.info(f"remaining items for {tablename}: {len(existing_ids)}")
logger.info(f"remaining items for {tablename}: {existing_ids}")
logger.debug(f"remaining items for {tablename}: {len(existing_ids)}")
if len(existing_ids) > 0:
logger.info(f"remaining items for {tablename}: {existing_ids}")
logger.info('kontor.import finished')