add endpoints to get items by id

This commit is contained in:
Thomas Peetz
2026-05-20 18:34:44 +02:00
committed by Thomas Peetz
parent faa1ec44c9
commit 3ba1fa40fd
10 changed files with 113 additions and 24 deletions
@@ -1,6 +1,6 @@
from typing import List
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import FieldPosition
from src.db.session import SessionDep
@@ -10,10 +10,18 @@ from src.schema.tysc.fieldposition import FieldPositionResponse, to_response
router = APIRouter()
@router.get("/positions")
def get_all_teams(db: SessionDep) -> List[FieldPositionResponse]:
def get_all_positions(db: SessionDep) -> List[FieldPositionResponse]:
results: list[FieldPositionResponse] = []
sports = db.query(FieldPosition).all()
for sport in sports:
response = to_response(sport)
positions = db.query(FieldPosition).all()
for position in positions:
response = to_response(position)
results.append(response)
return results
@router.get("/positions/{position_id}", response_model=FieldPositionResponse)
def get_position(position_id: str, db: SessionDep) -> FieldPositionResponse:
position = db.get(FieldPosition, position_id)
if position is None:
raise HTTPException(status_code=404, detail="Fieldposition could not be found")
response = to_response(position)
return response
+9 -1
View File
@@ -1,6 +1,6 @@
from typing import List
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import Player
from src.db.session import SessionDep
@@ -17,3 +17,11 @@ def get_all_players(db: SessionDep) -> List[PlayerResponse]:
response = to_response(player)
results.append(response)
return results
@router.get("/players/{player_id}", response_model=PlayerResponse)
def get_player(player_id: str, db: SessionDep) -> PlayerResponse:
player = db.get(Player, player_id)
if player is None:
raise HTTPException(status_code=404, detail="Player could not be found")
response = to_response(player)
return response
+10 -2
View File
@@ -1,6 +1,6 @@
from typing import List
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import Rooster
from src.db.session import SessionDep
@@ -10,10 +10,18 @@ from src.schema.tysc.rooster import RoosterResponse, to_response
router = APIRouter()
@router.get("/roosters")
def get_all_sports(db: SessionDep) -> List[RoosterResponse]:
def get_all_roosters(db: SessionDep) -> List[RoosterResponse]:
results: list[RoosterResponse] = []
roosters = db.query(Rooster).all()
for rooster in roosters:
response = to_response(rooster)
results.append(response)
return results
@router.get("/roosters/{rooster_id}", response_model=RoosterResponse)
def get_rooster(rooster_id: str, db: SessionDep) -> RoosterResponse:
rooster = db.get(Rooster, rooster_id)
if rooster is None:
raise HTTPException(status_code=404, detail="Rooster could not be found")
response = to_response(rooster)
return response
+12 -1
View File
@@ -1,6 +1,7 @@
from typing import List
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from src.core.log_conf import logger
from src.db.session import SessionDep
from src.schema.tysc.sport import SportResponse, to_response
from src.db.models.tysc import Sport
@@ -15,3 +16,13 @@ def get_all_sports(db: SessionDep) -> List[SportResponse]:
response = to_response(sport)
results.append(response)
return results
@router.get("/sports/{sport_id}", response_model=SportResponse)
def get_sport(sport_id: str, db: SessionDep) -> SportResponse:
sport = db.get(Sport, sport_id)
if sport is None:
raise HTTPException(status_code=404, detail="Sport could not be found")
logger.debug(f"create SportResponse for {sport}")
response: SportResponse = to_response(sport)
return response
+12 -4
View File
@@ -1,6 +1,6 @@
from typing import List
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import Team
from src.db.session import SessionDep
@@ -12,8 +12,16 @@ router = APIRouter()
@router.get("/teams")
def get_all_teams(db: SessionDep) -> List[TeamResponse]:
results: list[TeamResponse] = []
sports = db.query(Team).all()
for sport in sports:
response = to_response(sport)
teams = db.query(Team).all()
for team in teams:
response = to_response(team)
results.append(response)
return results
@router.get("/teams/{team_id}", response_model=TeamResponse)
def get_team(team_id: str, db: SessionDep) -> TeamResponse:
team = db.get(Team, team_id)
if team is None:
raise HTTPException(status_code=404, detail="Team could not be found")
response = to_response(team)
return response
+12 -1
View File
@@ -1,6 +1,6 @@
from typing import List
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from src.db.models.tysc import Vendor
from src.db.session import SessionDep
@@ -20,3 +20,14 @@ def get_all_vendors(db: SessionDep) -> List[VendorResponse]:
response = to_response(vendor)
results.append(response)
return results
@router.get("/vendors/{vendor_id}", response_model=VendorResponse)
def get_vendor(vendor_id: str, db: SessionDep) -> VendorResponse:
"""
retrieve vendor by id as json response.
"""
vendor = db.get(Vendor, vendor_id)
if vendor is None:
raise HTTPException(status_code=404, detail="Vendor could not be found")
response = to_response(vendor)
return response
+21 -2
View File
@@ -1,4 +1,5 @@
from dataclasses import dataclass
from enum import Enum, auto
import logging
import logging.config
from logging import Logger
@@ -47,6 +48,22 @@ MAPPING: Dict[str, str] = {
"mail_account": "api/admin/mailaccounts",
}
class OptionType(Enum):
PARAM = auto()
ID = auto()
class Option:
def __init__(self, option_type: OptionType, value: str) -> None:
self.type: Optional[OptionType] = option_type
self.value: Optional[str] = value
def __str__(self) -> str:
if self.type is OptionType.PARAM:
return f"?{self.value}"
else:
return f"/{self.value}"
class EndPointNotAvailableException(Exception):
"""
@@ -101,11 +118,11 @@ class Server:
self.token = str(token)
self.token_type = str(token_type)
def request(self, log: Logger, table: str, param: Optional[str] = None):
def request(self, log: Logger, table: str, param: Optional[Option] = None):
if not param:
url: str = f"{self.url}/{MAPPING[table]}"
else:
url: str = f"{self.url}/{MAPPING[table]}?{param}"
url: str = f"{self.url}/{MAPPING[table]}{str(param)}"
headers: Dict[str, str] = {"Authorization": f"Bearer {self.token}"}
response = requests.get(url, headers=headers, timeout=self.timeout)
log.debug(f"Status: {response.status_code}")
@@ -158,6 +175,8 @@ def get_logger(level, config: str):
case 0:
logger.setLevel(logging.CRITICAL)
case 1:
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logger.setLevel(logging.INFO)
case 2:
logger.setLevel(logging.DEBUG)
+1 -1
View File
@@ -1,6 +1,6 @@
import uuid
from datetime import datetime
from typing import AnyStr, Dict, List, Optional, Any
from typing import Dict, List, Optional, Any
from sqlalchemy import Column, ForeignKey, Integer, String, Boolean, func
from sqlalchemy.orm import relationship, Mapped, mapped_column
+1 -4
View File
@@ -9,13 +9,10 @@ from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from datetime import datetime
from enum import Enum, auto
from pathlib import Path
from typing import Dict
from logging import Logger
from uuid import UUID
import requests
from api import Server, get_api_config, get_logger
from api import Option, OptionType, Server, get_api_config, get_logger
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument("--verbose", "-v", action="count", default=0)
+22 -3
View File
@@ -1,6 +1,7 @@
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
import json
from typing import List
from api import MAPPING, EndPointNotAvailableException, Server, get_logger, get_api_config
from api import MAPPING, EndPointNotAvailableException, Option, OptionType, Server, get_logger, get_api_config
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
@@ -23,13 +24,31 @@ if __name__== "__main__":
server_list.append(server)
else:
server_list.extend(apiConfig.server)
export_data = {}
for server in server_list:
export_data[server.name] = {}
for table, path in MAPPING.items():
try:
data = server.request(logger, table=table)
logger.info("%s: %s", table, len(data))
#if len(data) == 1:
# logger.info("show data: %s", data)
export_data[server.name][table] = data
logger.info("%s: %s exported", table, len(data))
except EndPointNotAvailableException:
logger.info("Endpoint not implemented")
try:
json_dump = json.dumps(export_data[server.name], indent=4)
file_name = f"{server.name}-data.json"
with open(file_name, "w") as dump_file:
dump_file.write(json_dump)
except TypeError as error:
logger.info(f"{error}")
for server in server_list:
logger.info(f"{server.name}: {len(export_data[server.name])} tables exported")
for table, path in MAPPING.items():
for item in export_data[server_list[0].name][table]:
item_data = server_list[1].request(logger, table=table, param=Option(OptionType.ID, item['id']))
if item != item_data:
logger.info("diff: %s\n%s", item, item_data)
else:
logger.info("no changes for: %s", item)
logger.info("kontor.sync finished")