add missing endpoints
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 5s

This commit is contained in:
2026-05-17 19:52:00 +02:00
parent 1b58ec8e27
commit cd033f458d
17 changed files with 466 additions and 143 deletions
+22 -2
View File
@@ -1,12 +1,32 @@
"""
add router for different parts (like comics, tysc, media)
"""
from fastapi import APIRouter from fastapi import APIRouter
from src.apis.version1 import comic, mediaactor, mediafile, mediaactorfile, tysc, admin, user from src.apis.version1 import (
comic,
mediaactor,
mediafile,
mediaactorfile,
sport,
player,
team,
fieldposition,
vendor,
admin,
user,
)
api_router = APIRouter(prefix="/api") api_router = APIRouter(prefix="/api")
api_router.include_router(comic.router, prefix="/comics", tags=["comics"]) api_router.include_router(comic.router, prefix="/comics", tags=["comics"])
api_router.include_router(mediafile.router, prefix="/media", tags=["media"]) api_router.include_router(mediafile.router, prefix="/media", tags=["media"])
api_router.include_router(mediaactor.router, prefix="/media", tags=["media"]) api_router.include_router(mediaactor.router, prefix="/media", tags=["media"])
api_router.include_router(mediaactorfile.router, prefix="/media", tags=["media"]) api_router.include_router(mediaactorfile.router, prefix="/media", tags=["media"])
api_router.include_router(tysc.router, prefix="/tysc", tags=["tysc"]) api_router.include_router(sport.router, prefix="/tysc", tags=["tysc"])
api_router.include_router(player.router, prefix="/tysc", tags=["tysc"])
api_router.include_router(team.router, prefix="/tysc", tags=["tysc"])
api_router.include_router(fieldposition.router, prefix="/tysc", tags=["tysc"])
api_router.include_router(vendor.router, prefix="/tysc", tags=["tysc"])
api_router.include_router(admin.router, prefix="/login", tags=["login"]) api_router.include_router(admin.router, prefix="/login", tags=["login"])
api_router.include_router(user.router, prefix="/user", tags=["user"]) api_router.include_router(user.router, prefix="/user", tags=["user"])
@@ -0,0 +1,19 @@
from typing import List
from fastapi import APIRouter
from src.db.models.tysc import FieldPosition
from src.db.session import SessionDep
from src.schema.tysc.fieldposition import FieldPositionResponse, to_response
router = APIRouter()
@router.get("/positions")
def get_all_teams(db: SessionDep) -> List[FieldPositionResponse]:
results: list[FieldPositionResponse] = []
sports = db.query(FieldPosition).all()
for sport in sports:
response = to_response(sport)
results.append(response)
return results
+19
View File
@@ -0,0 +1,19 @@
from typing import List
from fastapi import APIRouter
from src.db.models.tysc import Player
from src.db.session import SessionDep
from src.schema.tysc.player import PlayerResponse, to_response
router = APIRouter()
@router.get("/players")
def get_all_players(db: SessionDep) -> List[PlayerResponse]:
results: List[PlayerResponse] = []
players = db.query(Player).all()
for player in players:
response = to_response(player)
results.append(response)
return results
@@ -2,7 +2,7 @@ from typing import List
from fastapi import APIRouter from fastapi import APIRouter
from src.db.session import SessionDep from src.db.session import SessionDep
from src.schema.tysc.sport import SportResponse from src.schema.tysc.sport import SportResponse, to_response
from src.db.models.tysc import Sport from src.db.models.tysc import Sport
router = APIRouter() router = APIRouter()
@@ -12,5 +12,6 @@ def get_all_sports(db: SessionDep) -> List[SportResponse]:
results: list[SportResponse] = [] results: list[SportResponse] = []
sports = db.query(Sport).all() sports = db.query(Sport).all()
for sport in sports: for sport in sports:
results.append(SportResponse(id=sport.id, name=sport.name)) response = to_response(sport)
results.append(response)
return results return results
+19
View File
@@ -0,0 +1,19 @@
from typing import List
from fastapi import APIRouter
from src.db.models.tysc import Team
from src.db.session import SessionDep
from src.schema.tysc.team import TeamResponse, to_response
router = APIRouter()
@router.get("/teams")
def get_all_teams(db: SessionDep) -> List[TeamResponse]:
results: list[TeamResponse] = []
sports = db.query(Team).all()
for sport in sports:
response = to_response(sport)
results.append(response)
return results
+22
View File
@@ -0,0 +1,22 @@
from typing import List
from fastapi import APIRouter
from src.db.models.tysc import Vendor
from src.db.session import SessionDep
from src.schema.tysc.vendor import VendorResponse, to_response
router = APIRouter()
@router.get("/vendors")
def get_all_vendors(db: SessionDep) -> List[VendorResponse]:
"""
retrieve all vendors as json response.
"""
results: list[VendorResponse] = []
vendors = db.query(Vendor).all()
for vendor in vendors:
response = to_response(vendor)
results.append(response)
return results
+9 -1
View File
@@ -1,15 +1,23 @@
"""
Schema definitions for Comics.
"""
from typing import Optional from typing import Optional
from pydantic import BaseModel, AnyUrl from pydantic import BaseModel, AnyUrl
from src.core.log_conf import logger
class ComicResponse(BaseModel): class ComicResponse(BaseModel):
"""
Pydantic model for returning Comic objects.
"""
id: str id: str
title: str title: str
completed: bool completed: bool
class ComicSchema(BaseModel): class ComicSchema(BaseModel):
"""
Pydantic model for uploading Comic object.
"""
id: str id: str
title: str title: str
weblink: Optional[AnyUrl] weblink: Optional[AnyUrl]
@@ -0,0 +1,28 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import FieldPosition, Team
class FieldPositionResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
short_name: str
sport_id: str
def to_response(fieldposition: FieldPosition) -> FieldPositionResponse:
response: FieldPositionResponse = FieldPositionResponse(
id=fieldposition.id,
created_date=fieldposition.created_date,
last_modified_date=fieldposition.last_modified_date,
version=fieldposition.version,
name=fieldposition.name,
short_name=fieldposition.short_name,
sport_id=fieldposition.sport_id
)
return response
+25
View File
@@ -0,0 +1,25 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import Player
class PlayerResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
first_name: str
last_name: str
def to_response(player: Player) -> PlayerResponse:
response: PlayerResponse = PlayerResponse(
id=player.id,
created_date=player.created_date,
last_modified_date=player.last_modified_date,
version=player.version,
first_name=player.first_name,
last_name=player.last_name
)
return response
+36
View File
@@ -0,0 +1,36 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import Rooster
class RoosterResponse(BaseModel):
"""
Pydantic model for returning Rooster objects.
"""
id: str
created_date: datetime
last_modified_date: datetime
version: int
year: int
team_id: str
player_id: str
position_id: str
def to_reponse(rooster: Rooster) -> RoosterResponse:
"""
convert database object to response object (Pydantic).
"""
response: RoosterResponse = RoosterResponse(
id=rooster.id,
created_date=rooster.created_date,
last_modified_date=rooster.last_modified_date,
version=rooster.version,
year=rooster.year,
team_id=rooster.team_id,
player_id=rooster.player_id,
position_id=rooster.position_id
)
return response
+18 -2
View File
@@ -1,8 +1,24 @@
from typing import AnyStr from datetime import datetime
from pydantic import BaseModel from pydantic import BaseModel
from src.db.models.tysc import Sport
class SportResponse(BaseModel): class SportResponse(BaseModel):
id: AnyStr id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str name: str
def to_response(sport: Sport) -> SportResponse:
response: SportResponse = SportResponse(
id=sport.id,
created_date=sport.created_date,
last_modified_date=sport.last_modified_date,
version=sport.version,
name=sport.name
)
return response
+28
View File
@@ -0,0 +1,28 @@
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import Team
class TeamResponse(BaseModel):
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
short_name: str
sport_id: str
def to_response(team: Team) -> TeamResponse:
response: TeamResponse = TeamResponse(
id=team.id,
created_date=team.created_date,
last_modified_date=team.last_modified_date,
version=team.version,
name=team.name,
short_name=team.short_name,
sport_id=team.sport_id
)
return response
+32
View File
@@ -0,0 +1,32 @@
"""
class and function for json response objects for Vendor.
"""
from datetime import datetime
from pydantic import BaseModel
from src.db.models.tysc import Vendor
class VendorResponse(BaseModel):
"""
Pydantic model for Vendor reponse object.
"""
id: str
created_date: datetime
last_modified_date: datetime
version: int
name: str
def to_response(vendor: Vendor) -> VendorResponse:
"""
convert database object Vendor to response object VendorResponse.
"""
reponse: VendorResponse = VendorResponse(
id=vendor.id,
created_date=vendor.created_date,
last_modified_date=vendor.last_modified_date,
version=vendor.version,
name=vendor.name
)
return reponse
-15
View File
@@ -1,15 +0,0 @@
clean:
find . -name '*.py[co]' -delete
test:
python -m pytest \
-v \
--cov=kontor \
--cov-report=term \
--cov-report=html:coverage-report \
tests/
docker: clean
docker build -t kontor:latest .
+166
View File
@@ -0,0 +1,166 @@
from dataclasses import dataclass
import logging
import logging.config
from logging import Logger
from pathlib import Path
from typing import Dict, List, Optional
from uuid import UUID
from platformdirs import PlatformDirs
import requests
import yaml
MAPPING: Dict[str, str] = {
"sport": "api/tysc/sports",
"player": "api/tysc/players",
"team": "api/tysc/teams",
"field_position": "api/tysc/positions",
"rooster": "api/tysc/roosters",
"vendor": "api/tysc/vendors",
"card_set": "api/tysc/cardsets",
"card": "api/tysc/cards",
"artist": "api/comics/artists",
"publisher": "api/comics/publishers",
"worktype": "api/comics/worktypes",
"comic": "api/comics/comics",
"volume": "api/comics/volumes",
"story_arc": "api/comics/storyarcs",
"issue": "api/comics/issues",
"issue_work": "api/comics/issueworks",
"article": "",
"bookshelf_publisher": "",
"book": "",
"author": "",
"article_author": "",
"book_author": "",
"media_article": "",
"media_video": "api/media/videos",
"media_file": "api/media/files",
"media_actor": "api/media/actors",
"media_actor_file": "api/media/actorfiles",
"profile": "",
"permission": "",
"assignment": "",
"token": "",
"mail_account": "",
}
@dataclass
class Login:
"""
Dataclass to store login information.
"""
email: str
password: str
@dataclass
class Server:
"""
Dataclass to represent a Kontor-API instance.
"""
name: str
url: str
token: str
token_type: str
timeout: int
def login(self, login: Login, log: Logger):
"""
get token from server by calling login endpoint.
"""
if not self.token:
log.info("Call login first")
login_url = f"{self.url}/login"
login_data = {}
login_data["email"] = login.email
login_data["password"] = login.password
response = requests.post(login_url, json=login_data, timeout=self.timeout)
status = response.status_code
log.info(f"Status: {status}")
if status != 200:
log.fatal("authentication failed")
return
data = response.json()
log.debug(f"got data: {data}")
token = data["access_token"]
token_type = data["token_type"]
self.token = str(token)
self.token_type = str(token_type)
def request(self, log: Logger, table: str, param: Optional[str] = None):
if not param:
url: str = f"{self.url}/{MAPPING[table]}"
else:
url: str = f"{self.url}/{MAPPING[table]}?{param}"
headers: Dict[str, str] = {"Authorization": f"Bearer {self.token}"}
response = requests.get(url, headers=headers, timeout=self.timeout)
log.info(f"Status: {response.status_code}")
data = response.json()
return data
def update(self, log: Logger, table: str, item_id: UUID, file_info: dict):
url: str = f"{self.url}/{MAPPING[table]}/{item_id}"
headers: Dict[str, str] = {"Authorization": f"Bearer {self.token}"}
update = requests.put(
url, headers=headers, json=file_info, timeout=self.timeout
)
log.info(f"Status: {update.status_code}")
return update
@dataclass
class ApiConfig:
"""
Dataclass to define required contents of configuration file.
"""
login: Login
server: List[Server]
def get_logger(level, config: str):
"""
get Logger according to given log level by verbosity.
"""
dirs = PlatformDirs(config)
logging_config = Path(dirs.user_config_dir, "logging-config.yaml")
log_config = None
with open(logging_config, "rt", encoding="utf-8") as f:
log_config = yaml.safe_load(f.read())
logging.config.dictConfig(log_config)
logger = logging.getLogger("development")
if level is not None:
match level:
case 0:
logger.setLevel(logging.CRITICAL)
case 1:
logger.setLevel(logging.INFO)
case 2:
logger.setLevel(logging.DEBUG)
case _:
logger.setLevel(logging.INFO)
return logger
def get_api_config(log: Logger, config: str) -> ApiConfig:
dirs = PlatformDirs(config)
api_config = Path(dirs.user_config_dir, "api.yaml")
with open(api_config, "rt") as f:
api_data = yaml.safe_load(f.read())
servers = [Server(**server) for server in api_data["server"]]
login = Login(**(api_data["login"]))
apiConfig = ApiConfig(server=servers, login=login)
log.debug(apiConfig)
if not api_data:
log.fatal("API configuration is missing")
return apiConfig
for server in apiConfig.server:
server.login(apiConfig.login, log)
with open(api_config, "w") as f:
yaml.dump(api_data, f)
return apiConfig
+17 -22
View File
@@ -2,7 +2,6 @@
download files with URLs from DB download files with URLs from DB
""" """
import os
import re import re
import subprocess import subprocess
import sys import sys
@@ -10,16 +9,17 @@ from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from datetime import datetime from datetime import datetime
from enum import Enum, auto from enum import Enum, auto
from pathlib import Path from pathlib import Path
from typing import Any, Dict from typing import Dict
from logging import Logger
from uuid import UUID from uuid import UUID
import requests import requests
from config import get_api_config, get_logger from api import Server, get_api_config, get_logger
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument("--verbose", "-v", action="count", default=0) parser.add_argument("--verbose", "-v", action="count", default=0)
parser.add_argument("--config", "-c", default="kontor-docker") parser.add_argument("--config", "-c", default="kontor-api")
parser.add_argument("--dir", "-d", default="/data/media") parser.add_argument("--dir", "-d", default="/data/media")
parser.add_argument("--limit", "-l", type=int, help="maximum number of links to check") parser.add_argument("--limit", "-l", type=int, help="maximum number of links to check")
parser.add_argument("--tool", "-t", default="yt-dlp") parser.add_argument("--tool", "-t", default="yt-dlp")
@@ -104,12 +104,8 @@ def is_file_downloaded(media_file: dict, dir: Path) -> FileStatus:
return FileStatus.UNKNOWN return FileStatus.UNKNOWN
def update_status(item_id: UUID, file_info: dict, api_data: Dict[str, Any]): def update_status(item_id: UUID, file_info: dict, server: Server, log: Logger):
host = api_data["host"] update = server.update(log, "media_file", item_id, file_info)
token = api_data["token"]
url: str = f"http://{host}:{port}/api/media/files/{item_id}"
headers: Dict[str, str] = {"Authorization": f"Bearer {token}"}
update = requests.put(url, headers=headers, json=file_info)
log.info(f"update status: {update.status_code}") log.info(f"update status: {update.status_code}")
log.info(f"update result: {update.json()}") log.info(f"update result: {update.json()}")
@@ -132,15 +128,14 @@ def rename_file(file_info: dict):
if __name__ == "__main__": if __name__ == "__main__":
log = get_logger(args.verbose, args.config) log = get_logger(args.verbose, args.config)
log.info("kontor.download started") log.info("kontor.download started")
api_data = get_api_config(log, args.config) apiConfig = get_api_config(log, args.config)
host = api_data["host"] server: Server = apiConfig.server[0]
port = api_data["port"] data = server.request(log=log, table="media_file", param="download=true")
token = api_data["token"] # url: str = f"http://{host}:{port}/api/media/files?download=true"
url: str = f"http://{host}:{port}/api/media/files?download=true" # headers: Dict[str, str] = {"Authorization": f"Bearer {token}"}
headers: Dict[str, str] = {"Authorization": f"Bearer {token}"} # response = requests.get(url, headers=headers)
response = requests.get(url, headers=headers) # log.info(f"Status: {response.status_code}")
log.info(f"Status: {response.status_code}") # data = response.json()
data = response.json()
entries_count = len(data) entries_count = len(data)
log.info(f"data: {entries_count}") log.info(f"data: {entries_count}")
mediafile_index = 1 mediafile_index = 1
@@ -158,15 +153,15 @@ if __name__ == "__main__":
match download_status: match download_status:
case FileStatus.DOWNLOADED: case FileStatus.DOWNLOADED:
rename_file(item) rename_file(item)
update_status(file_id, item, api_data) update_status(file_id, item, server=server, log=log)
case FileStatus.RENAMED: case FileStatus.RENAMED:
log.info("update status") log.info("update status")
update_status(file_id, item, api_data) update_status(file_id, item, server=server, log=log)
case FileStatus.UNKNOWN: case FileStatus.UNKNOWN:
download_file(link, item, args.dir) download_file(link, item, args.dir)
rename_file(item) rename_file(item)
log.info(f"{item}") log.info(f"{item}")
update_status(file_id, item, api_data) update_status(file_id, item, server=server, log=log)
log.warning(f"processed {mediafile_index}/{entries_count}") log.warning(f"processed {mediafile_index}/{entries_count}")
if args.limit and args.limit <= mediafile_index: if args.limit and args.limit <= mediafile_index:
break break
+2 -98
View File
@@ -1,14 +1,6 @@
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from dataclasses import dataclass from dataclasses import dataclass
import logging from api import get_logger, get_api_config
import logging.config
from logging import Logger
from pathlib import Path
from typing import Dict, List
from platformdirs import PlatformDirs
import requests
import yaml
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
@@ -19,99 +11,11 @@ parser.add_argument("--cleanup", "-d", action="store_true")
args = parser.parse_args() args = parser.parse_args()
@dataclass
class Login:
email: str
password: str
@dataclass
class Server:
name: str
url: str
token: str
token_type: str
def login(self, login: Login, log: Logger):
if not self.token:
log.info("Call login first")
login_url = f"{self.url}/login"
login_data = {}
login_data["email"] = login.email
login_data["password"] = login.password
response = requests.post(login_url, json=login_data)
status = response.status_code
log.info(f"Status: {status}")
if status != 200:
log.fatal("authentication failed")
return
data = response.json()
log.debug(f"got data: {data}")
token = data["access_token"]
token_type = data["token_type"]
self.token = str(token)
self.token_type = str(token_type)
def request(self, log: Logger):
url: str = f"{self.url}/api/media/files"
headers: Dict[str, str] = {"Authorization": f"Bearer {self.token}"}
response = requests.get(url, headers=headers)
log.info(f"Status: {response.status_code}")
data = response.json()
return data
@dataclass
class ApiConfig:
login: Login
server: List[Server]
def get_logger(level, config: str):
dirs = PlatformDirs(config)
logging_config = Path(dirs.user_config_dir, "logging-config.yaml")
log_config = None
with open(logging_config, "rt") as f:
log_config = yaml.safe_load(f.read())
logging.config.dictConfig(log_config)
logger = logging.getLogger("development")
if level is not None:
match level:
case 0:
logger.setLevel(logging.CRITICAL)
case 1:
logger.setLevel(logging.INFO)
case 2:
logger.setLevel(logging.DEBUG)
case _:
logger.setLevel(logging.INFO)
return logger
def get_api_config(log: Logger, config: str) -> ApiConfig:
# api_data: Dict[str, Any] = {}
dirs = PlatformDirs(config)
api_config = Path(dirs.user_config_dir, "api.yaml")
with open(api_config, "rt") as f:
api_data = yaml.safe_load(f.read())
servers = [Server(**server) for server in api_data['server']]
login = Login(**(api_data["login"]))
apiConfig = ApiConfig(server=servers, login=login)
log.info(apiConfig)
if not api_data:
log.fatal("API configuration is missing")
return apiConfig
for server in apiConfig.server:
server.login(apiConfig.login, log)
with open(api_config, "w") as f:
yaml.dump(api_data, f)
return apiConfig
if __name__== "__main__": if __name__== "__main__":
logger = get_logger(args.verbose, "kontor") logger = get_logger(args.verbose, "kontor")
logger.info("kontor.sync started") logger.info("kontor.sync started")
apiConfig = get_api_config(logger, args.config) apiConfig = get_api_config(logger, args.config)
for server in apiConfig.server: for server in apiConfig.server:
data = server.request(logger) data = server.request(logger, "media_file")
logger.info(len(data)) logger.info(len(data))
logger.info("kontor.sync finished") logger.info("kontor.sync finished")