add missing endpoints for creating items
This commit is contained in:
+60
-24
@@ -78,6 +78,12 @@ class Option:
|
||||
return f"/{self.value}"
|
||||
|
||||
|
||||
class CredentialsValidationException(Exception):
|
||||
"""
|
||||
Raised when login failed or token is outdated.
|
||||
"""
|
||||
|
||||
|
||||
class EndPointNotAvailableException(Exception):
|
||||
"""
|
||||
Raised when calling an not existing endpoint.
|
||||
@@ -104,30 +110,38 @@ class Server:
|
||||
url: str
|
||||
token: str
|
||||
token_type: str
|
||||
email: str
|
||||
password: str
|
||||
timeout: int
|
||||
|
||||
def login(self, login: Login, log: Logger):
|
||||
def login(self, log: Logger, refresh_token: bool= False):
|
||||
"""
|
||||
get token from server by calling login endpoint.
|
||||
"""
|
||||
log.debug("call login and retrieve token")
|
||||
if not self.token:
|
||||
log.info("Call login first")
|
||||
login_url = f"{self.url}/login"
|
||||
login_data = {}
|
||||
login_data["email"] = login.email
|
||||
login_data["password"] = login.password
|
||||
response = requests.post(login_url, json=login_data, timeout=self.timeout)
|
||||
status = response.status_code
|
||||
log.info(f"Status: {status}")
|
||||
if status != 200:
|
||||
log.fatal("authentication failed")
|
||||
return
|
||||
data = response.json()
|
||||
log.debug(f"got data: {data}")
|
||||
token = data["access_token"]
|
||||
token_type = data["token_type"]
|
||||
self.token = str(token)
|
||||
self.token_type = str(token_type)
|
||||
self.__get_token__(log=log)
|
||||
if refresh_token:
|
||||
self.__get_token__(log=log)
|
||||
|
||||
def __get_token__(self, log: Logger):
|
||||
log.info("Call login first")
|
||||
login_url = f"{self.url}/login"
|
||||
login_data = {}
|
||||
login_data["email"] = self.email
|
||||
login_data["password"] = self.password
|
||||
response = requests.post(login_url, json=login_data, timeout=self.timeout)
|
||||
status = response.status_code
|
||||
log.info(f"Status: {status}")
|
||||
if status != 200:
|
||||
log.fatal("authentication failed")
|
||||
return
|
||||
data = response.json()
|
||||
log.debug(f"got data: {data}")
|
||||
token = data["access_token"]
|
||||
token_type = data["token_type"]
|
||||
self.token = str(token)
|
||||
self.token_type = str(token_type)
|
||||
|
||||
def request(self, log: Logger, table: str, param: Optional[Option] = None):
|
||||
"""
|
||||
@@ -140,6 +154,10 @@ class Server:
|
||||
headers: Dict[str, str] = {"Authorization": f"Bearer {self.token}"}
|
||||
response = requests.get(url, headers=headers, timeout=self.timeout)
|
||||
log.debug(f"Status: {response.status_code}")
|
||||
if response.status_code == 401:
|
||||
self.login(log, refresh_token=True)
|
||||
headers: Dict[str, str] = {"Authorization": f"Bearer {self.token}"}
|
||||
response = requests.get(url, headers=headers, timeout=self.timeout)
|
||||
if response.status_code == 404:
|
||||
raise EndPointNotAvailableException
|
||||
data = response.json()
|
||||
@@ -159,6 +177,22 @@ class Server:
|
||||
raise EndPointNotAvailableException
|
||||
data = update.json()
|
||||
return data
|
||||
|
||||
def create(self, log: Logger, table: str, new_item: dict):
|
||||
"""
|
||||
Create item in Kontor-API instance.
|
||||
"""
|
||||
url: str = f"{self.url}/{MAPPING[table]}"
|
||||
headers: Dict[str, str] = {"Authorization": f"Bearer {self.token}"}
|
||||
create = requests.post(url, headers=headers, json=new_item, timeout=self.timeout)
|
||||
log.info(f"Status: {create.status_code}")
|
||||
if create.status_code == 404:
|
||||
raise EndPointNotAvailableException
|
||||
if create.status_code == 409:
|
||||
log.fatal("Create Exception %s", create.json())
|
||||
data = create.json()
|
||||
return data
|
||||
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -167,7 +201,6 @@ class ApiConfig:
|
||||
Dataclass to define required contents of configuration file.
|
||||
"""
|
||||
|
||||
login: Login
|
||||
server: List[Server]
|
||||
|
||||
def get_server(self, server_name: str) -> Optional[Server]:
|
||||
@@ -199,13 +232,17 @@ def get_logger(level, config: str):
|
||||
logging.getLogger("requests").setLevel(logging.WARNING)
|
||||
logging.getLogger("urllib3").setLevel(logging.WARNING)
|
||||
case 1:
|
||||
logging.getLogger("requests").setLevel(logging.WARNING)
|
||||
logging.getLogger("urllib3").setLevel(logging.WARNING)
|
||||
logging.getLogger("requests").setLevel(logging.INFO)
|
||||
logging.getLogger("urllib3").setLevel(logging.INFO)
|
||||
logger.setLevel(logging.INFO)
|
||||
case 2:
|
||||
logger.setLevel(logging.DEBUG)
|
||||
logging.getLogger("requests").setLevel(logging.DEBUG)
|
||||
logging.getLogger("urllib3").setLevel(logging.DEBUG)
|
||||
case _:
|
||||
logger.setLevel(logging.INFO)
|
||||
logging.getLogger("requests").setLevel(logging.INFO)
|
||||
logging.getLogger("urllib3").setLevel(logging.INFO)
|
||||
return logger
|
||||
|
||||
|
||||
@@ -218,14 +255,13 @@ def get_api_config(log: Logger, config: str) -> ApiConfig:
|
||||
with open(api_config, "rt", encoding="utf-8") as f:
|
||||
api_data = yaml.safe_load(f.read())
|
||||
servers = [Server(**server) for server in api_data["server"]]
|
||||
login = Login(**(api_data["login"]))
|
||||
api_config_data = ApiConfig(server=servers, login=login)
|
||||
api_config_data = ApiConfig(server=servers)
|
||||
log.debug(api_config_data)
|
||||
if not api_data:
|
||||
log.fatal("API configuration is missing")
|
||||
return api_config_data
|
||||
for server in api_config_data.server:
|
||||
server.login(api_config_data.login, log)
|
||||
server.login(log)
|
||||
with open(api_config, "w", encoding="utf-8") as f:
|
||||
yaml.dump(api_data, f)
|
||||
return api_config_data
|
||||
|
||||
Reference in New Issue
Block a user