import logging import re import subprocess from datetime import datetime from pathlib import Path import requests from bs4 import BeautifulSoup from sqlalchemy import Column, String, ForeignKey, Boolean from sqlalchemy.orm import relationship from src.db.models.base import Base, BaseMixin, BaseVideoMixin class MediaFile(Base, BaseMixin, BaseVideoMixin): __tablename__ = 'media_file' media_actor_files = relationship("MediaActorFile") def __repr__(self): return f'MediaFile({self.id} {self.title} {self.title})' def __str__(self): return f'{self.title}({self.id})' def update_title(self) -> None: logging.info(f"update title for {self.url}") try: r = requests.get(str(self.url)) soup = BeautifulSoup(r.content, "html.parser") title = soup.title.get_text() # type: ignore self.title = title self.review = False except: self.title = None self.review = True self.last_modified_date = datetime.now() def download_file(self, download_dir: str, dl_tool: str): logging.info(f"download file for {self.url} to {download_dir}") result = subprocess.run([dl_tool, self.url], cwd=download_dir, capture_output=True, text=True) # type: ignore if result.returncode == 0: output = result.stdout output = re.sub(' +', ' ', output) lines_list = output.splitlines() file_name = self.__parse_output__(lines_list) if file_name is None: self.review = True self.should_download = True self.file_name = None else: download_file = Path(file_name) self.should_download = False self.file_name = download_file.name self.cloud_link = str(download_file.absolute()) self.last_modified_date = datetime.now() def __parse_output__(self, lines_list): self.file_name = None for line in lines_list: if 'has already been downloaded' in line: end_len = len(' has already been downloaded') self.file_name = line[11:-end_len] if 'Destination' in line: line_len = len(line) start_len = len('[download] Destination: ') file_len = line_len - start_len self.file_name = line[-file_len:] return self.file_name class MediaActor(Base, BaseMixin): __tablename__ = 'media_actor' name = Column(String) url = Column(String, unique=True, nullable=True) media_actor_files = relationship("MediaActorFile") def __repr__(self) -> str: return f'MediaActor({self.id} {self.name} {self.url})' def __str__(self) -> str: return f'{self.url}({self.id})' class MediaActorFile(Base, BaseMixin): __tablename__ = 'media_actor_file' media_actor_id = Column(String, ForeignKey("media_actor.id"), nullable=False) media_actor = relationship("MediaActor", back_populates="media_actor_files") media_file_id = Column(String, ForeignKey("media_file.id"), nullable=True) media_file = relationship("MediaFile", back_populates="media_actor_files") def __repr__(self): return f'MediaActorFile({self.id} {self.media_actor_id} {self.media_file_id})' def __str__(self) -> str: return f'{self.id} {self.media_actor_id} {self.media_file_id}' class MediaArticle(Base, BaseMixin): __tablename__ = 'media_article' review = Column(Boolean) title = Column(String) url = Column(String, unique=True) class MediaVideo(Base, BaseMixin): __tablename__ = 'media_video' cloud_link = Column(String) file_name = Column(String) path = Column(String) review = Column(Boolean) title = Column(String) url = Column(String, unique=True) should_download = Column(Boolean) def __repr__(self): return f'MediaFile({self.id} {self.title} {self.url})' def __str__(self): if self.title is None: return f'{self.url}({self.id})' else: return f'{self.title}({self.id})'