add endpoints to get items by id
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 4s

This commit is contained in:
Thomas Peetz
2026-05-20 18:34:44 +02:00
parent 944420802f
commit bbc00ae2cc
10 changed files with 114 additions and 25 deletions
+21 -2
View File
@@ -1,4 +1,5 @@
from dataclasses import dataclass
from enum import Enum, auto
import logging
import logging.config
from logging import Logger
@@ -47,6 +48,22 @@ MAPPING: Dict[str, str] = {
"mail_account": "api/admin/mailaccounts",
}
class OptionType(Enum):
PARAM = auto()
ID = auto()
class Option:
def __init__(self, option_type: OptionType, value: str) -> None:
self.type: Optional[OptionType] = option_type
self.value: Optional[str] = value
def __str__(self) -> str:
if self.type is OptionType.PARAM:
return f"?{self.value}"
else:
return f"/{self.value}"
class EndPointNotAvailableException(Exception):
"""
@@ -101,11 +118,11 @@ class Server:
self.token = str(token)
self.token_type = str(token_type)
def request(self, log: Logger, table: str, param: Optional[str] = None):
def request(self, log: Logger, table: str, param: Optional[Option] = None):
if not param:
url: str = f"{self.url}/{MAPPING[table]}"
else:
url: str = f"{self.url}/{MAPPING[table]}?{param}"
url: str = f"{self.url}/{MAPPING[table]}{str(param)}"
headers: Dict[str, str] = {"Authorization": f"Bearer {self.token}"}
response = requests.get(url, headers=headers, timeout=self.timeout)
log.debug(f"Status: {response.status_code}")
@@ -158,6 +175,8 @@ def get_logger(level, config: str):
case 0:
logger.setLevel(logging.CRITICAL)
case 1:
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logger.setLevel(logging.INFO)
case 2:
logger.setLevel(logging.DEBUG)
+1 -1
View File
@@ -1,6 +1,6 @@
import uuid
from datetime import datetime
from typing import AnyStr, Dict, List, Optional, Any
from typing import Dict, List, Optional, Any
from sqlalchemy import Column, ForeignKey, Integer, String, Boolean, func
from sqlalchemy.orm import relationship, Mapped, mapped_column
+2 -5
View File
@@ -9,13 +9,10 @@ from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from datetime import datetime
from enum import Enum, auto
from pathlib import Path
from typing import Dict
from logging import Logger
from uuid import UUID
import requests
from api import Server, get_api_config, get_logger
from api import Option, OptionType, Server, get_api_config, get_logger
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument("--verbose", "-v", action="count", default=0)
@@ -130,7 +127,7 @@ if __name__ == "__main__":
log.info("kontor.download started")
apiConfig = get_api_config(log, args.config)
server: Server = apiConfig.server[0]
data = server.request(log=log, table="media_file", param="download=true")
data = server.request(log=log, table="media_file", param=Option(OptionType.PARAM, "download=true"))
# url: str = f"http://{host}:{port}/api/media/files?download=true"
# headers: Dict[str, str] = {"Authorization": f"Bearer {token}"}
# response = requests.get(url, headers=headers)
+22 -3
View File
@@ -1,6 +1,7 @@
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
import json
from typing import List
from api import MAPPING, EndPointNotAvailableException, Server, get_logger, get_api_config
from api import MAPPING, EndPointNotAvailableException, Option, OptionType, Server, get_logger, get_api_config
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
@@ -23,13 +24,31 @@ if __name__== "__main__":
server_list.append(server)
else:
server_list.extend(apiConfig.server)
export_data = {}
for server in server_list:
export_data[server.name] = {}
for table, path in MAPPING.items():
try:
data = server.request(logger, table=table)
logger.info("%s: %s", table, len(data))
#if len(data) == 1:
# logger.info("show data: %s", data)
export_data[server.name][table] = data
logger.info("%s: %s exported", table, len(data))
except EndPointNotAvailableException:
logger.info("Endpoint not implemented")
try:
json_dump = json.dumps(export_data[server.name], indent=4)
file_name = f"{server.name}-data.json"
with open(file_name, "w") as dump_file:
dump_file.write(json_dump)
except TypeError as error:
logger.info(f"{error}")
for server in server_list:
logger.info(f"{server.name}: {len(export_data[server.name])} tables exported")
for table, path in MAPPING.items():
for item in export_data[server_list[0].name][table]:
item_data = server_list[1].request(logger, table=table, param=Option(OptionType.ID, item['id']))
if item != item_data:
logger.info("diff: %s\n%s", item, item_data)
else:
logger.info("no changes for: %s", item)
logger.info("kontor.sync finished")