add missing endpoints for creating items
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 4s

This commit is contained in:
Thomas Peetz
2026-05-26 22:43:04 +02:00
parent 0f9c90b883
commit c885f6cc02
16 changed files with 314 additions and 145 deletions
+60 -24
View File
@@ -78,6 +78,12 @@ class Option:
return f"/{self.value}"
class CredentialsValidationException(Exception):
"""
Raised when login failed or token is outdated.
"""
class EndPointNotAvailableException(Exception):
"""
Raised when calling an not existing endpoint.
@@ -104,30 +110,38 @@ class Server:
url: str
token: str
token_type: str
email: str
password: str
timeout: int
def login(self, login: Login, log: Logger):
def login(self, log: Logger, refresh_token: bool= False):
"""
get token from server by calling login endpoint.
"""
log.debug("call login and retrieve token")
if not self.token:
log.info("Call login first")
login_url = f"{self.url}/login"
login_data = {}
login_data["email"] = login.email
login_data["password"] = login.password
response = requests.post(login_url, json=login_data, timeout=self.timeout)
status = response.status_code
log.info(f"Status: {status}")
if status != 200:
log.fatal("authentication failed")
return
data = response.json()
log.debug(f"got data: {data}")
token = data["access_token"]
token_type = data["token_type"]
self.token = str(token)
self.token_type = str(token_type)
self.__get_token__(log=log)
if refresh_token:
self.__get_token__(log=log)
def __get_token__(self, log: Logger):
log.info("Call login first")
login_url = f"{self.url}/login"
login_data = {}
login_data["email"] = self.email
login_data["password"] = self.password
response = requests.post(login_url, json=login_data, timeout=self.timeout)
status = response.status_code
log.info(f"Status: {status}")
if status != 200:
log.fatal("authentication failed")
return
data = response.json()
log.debug(f"got data: {data}")
token = data["access_token"]
token_type = data["token_type"]
self.token = str(token)
self.token_type = str(token_type)
def request(self, log: Logger, table: str, param: Optional[Option] = None):
"""
@@ -140,6 +154,10 @@ class Server:
headers: Dict[str, str] = {"Authorization": f"Bearer {self.token}"}
response = requests.get(url, headers=headers, timeout=self.timeout)
log.debug(f"Status: {response.status_code}")
if response.status_code == 401:
self.login(log, refresh_token=True)
headers: Dict[str, str] = {"Authorization": f"Bearer {self.token}"}
response = requests.get(url, headers=headers, timeout=self.timeout)
if response.status_code == 404:
raise EndPointNotAvailableException
data = response.json()
@@ -159,6 +177,22 @@ class Server:
raise EndPointNotAvailableException
data = update.json()
return data
def create(self, log: Logger, table: str, new_item: dict):
"""
Create item in Kontor-API instance.
"""
url: str = f"{self.url}/{MAPPING[table]}"
headers: Dict[str, str] = {"Authorization": f"Bearer {self.token}"}
create = requests.post(url, headers=headers, json=new_item, timeout=self.timeout)
log.info(f"Status: {create.status_code}")
if create.status_code == 404:
raise EndPointNotAvailableException
if create.status_code == 409:
log.fatal("Create Exception %s", create.json())
data = create.json()
return data
@dataclass
@@ -167,7 +201,6 @@ class ApiConfig:
Dataclass to define required contents of configuration file.
"""
login: Login
server: List[Server]
def get_server(self, server_name: str) -> Optional[Server]:
@@ -199,13 +232,17 @@ def get_logger(level, config: str):
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
case 1:
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("requests").setLevel(logging.INFO)
logging.getLogger("urllib3").setLevel(logging.INFO)
logger.setLevel(logging.INFO)
case 2:
logger.setLevel(logging.DEBUG)
logging.getLogger("requests").setLevel(logging.DEBUG)
logging.getLogger("urllib3").setLevel(logging.DEBUG)
case _:
logger.setLevel(logging.INFO)
logging.getLogger("requests").setLevel(logging.INFO)
logging.getLogger("urllib3").setLevel(logging.INFO)
return logger
@@ -218,14 +255,13 @@ def get_api_config(log: Logger, config: str) -> ApiConfig:
with open(api_config, "rt", encoding="utf-8") as f:
api_data = yaml.safe_load(f.read())
servers = [Server(**server) for server in api_data["server"]]
login = Login(**(api_data["login"]))
api_config_data = ApiConfig(server=servers, login=login)
api_config_data = ApiConfig(server=servers)
log.debug(api_config_data)
if not api_data:
log.fatal("API configuration is missing")
return api_config_data
for server in api_config_data.server:
server.login(api_config_data.login, log)
server.login(log)
with open(api_config, "w", encoding="utf-8") as f:
yaml.dump(api_data, f)
return api_config_data
+5 -3
View File
@@ -25,12 +25,13 @@ parser.add_argument("--cleanup", "-d", action="store_true")
args = parser.parse_args()
def create_item_id_mapping(data_list: List[dict]) -> Dict[str, dict]:
def create_item_id_mapping(log: Logger, data_list: List[dict]) -> Dict[str, dict]:
"""
create dictionary with id as key and dictionary as value.
"""
item_id_mapping: Dict[str, dict] = {}
for data_item in data_list:
log.debug(data_item)
item_id_mapping[data_item["id"]] = data_item
return item_id_mapping
@@ -87,7 +88,7 @@ if __name__ == "__main__":
)
if len(server_list) > 1:
for table, path in MAPPING.items():
mapping = create_item_id_mapping(export_data[server_list[1].name][table])
mapping = create_item_id_mapping(logger, export_data[server_list[1].name][table])
for item in export_data[server_list[0].name][table]:
logger.debug("checking %s:%s", table, item["id"])
check_item_id = item["id"]
@@ -111,8 +112,9 @@ if __name__ == "__main__":
)
else:
logger.info(
"item %s in %s missing", check_item_id, server_list[1].name
"item %s in %s missing: ", check_item_id, server_list[1].name, item
)
server_list[1].create(logger, table, item)
logger.info("synchronization of %s finished", table)
logger.info("all tables synchronized")
else: