from datetime import datetime from typing import List from uuid import uuid4, UUID from fastapi import APIRouter, status, HTTPException from sqlalchemy import select, Sequence from app.models.media.file import MediaFileResponse, Link, get_file_details from app.schema import MediaFile, SessionDep router = APIRouter( prefix="/media", tags=["media"] ) @router.get("/update-titles") def update_titles(db: SessionDep) -> list[MediaFileResponse]: results: list[MediaFileResponse] = [] files = db.query(MediaFile).filter(MediaFile.review == 1).all() for mediafile in files: mediafile.update_title() db.add(mediafile) response = MediaFileResponse(id=mediafile.id, title=mediafile.title, file_name=mediafile.file_name, cloud_link=mediafile.cloud_link, url=str(mediafile.url), review=(mediafile.review == 1), should_download=(mediafile.should_download == 1)) results.append(response) db.commit() return results @router.get("/files", response_model=List[MediaFileResponse]) def get_all_files(db: SessionDep, review: bool = False, download: bool = False) -> List[MediaFileResponse]: results: list[MediaFileResponse] = [] files: Sequence[MediaFile] if review: files = db.query(MediaFile).filter(MediaFile.review == 1).all() elif download: files = db.query(MediaFile).filter(MediaFile.should_download == 1).all() else: files = db.scalars(select(MediaFile)).all() for mediafile in files: response = MediaFileResponse(id=mediafile.id, title=mediafile.title, file_name=mediafile.file_name, cloud_link=mediafile.cloud_link, url=str(mediafile.url), review=(mediafile.review == 1), should_download=(mediafile.should_download == 1)) results.append(response) return results @router.get("/files/{file_id}", response_model=MediaFileResponse) def get_file(file_id: UUID, db: SessionDep) -> MediaFileResponse: mediafile = db.get(MediaFile, file_id) if not mediafile: raise HTTPException(status_code=404, detail="MediaFile could not be found") response = get_file_details(mediafile) return response @router.put("/files/{file_id}", response_model=MediaFileResponse) def update_file(file_id: UUID, db: SessionDep, info: MediaFileResponse) -> MediaFileResponse: mediaFile = db.get(MediaFile, file_id) if not mediaFile: raise HTTPException(status_code=404, detail="MediaFile could not be found") mediaFile.file_name = info.file_name mediaFile.cloud_link = info.cloud_link mediaFile.url = info.url mediaFile.title = info.title mediaFile.last_modified_date = datetime.now() mediaFile.review = info.review mediaFile.should_download = info.should_download db.add(mediaFile) db.commit() return info @router.post("/files", status_code=status.HTTP_201_CREATED) def add_file(new_link: Link, db: SessionDep) -> MediaFileResponse: print(new_link.url) try: mediaFile: MediaFile = MediaFile() setattr(mediaFile, "url", new_link.url) setattr(mediaFile, "review", True) setattr(mediaFile, "should_download", True) db.add(mediaFile) db.commit() except: raise HTTPException(status_code=409, detail="Link duplicate") response = MediaFileResponse(id=uuid4(), title=mediaFile.title, file_name=mediaFile.file_name, cloud_link=mediaFile.cloud_link, url=new_link.url, review=(mediaFile.review == 1), should_download=(mediaFile.should_download == 1)) return response