update log config for kontor-api

fix double occurances of log output and change level to Debug for
Profile information
This commit is contained in:
2026-01-10 15:39:06 +01:00
parent e5d4b748dc
commit 56c6808508
4 changed files with 84 additions and 72 deletions
+9 -5
View File
@@ -1,4 +1,5 @@
from datetime import timedelta
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel
@@ -9,6 +10,7 @@ from src.schema.admin import Token
login_router = APIRouter()
class LoginRequest(BaseModel):
email: str | None = None
password: str | None = None
@@ -19,12 +21,11 @@ class LoginRequest(BaseModel):
tags=["login"],
summary="Login and get token",
response_description="Return HTTP status code 200 (OK)",
status_code=status.HTTP_200_OK
status_code=status.HTTP_200_OK,
)
def login(request: LoginRequest) -> Token:
logger.info(f"login with {request.email} with {request.password}")
print(f"login with {request.email} with {request.password}")
user = authenticate_user(request.email, request.password) # type: ignore
logger.info(f"login with {request.email}")
user = authenticate_user(request.email, request.password) # type: ignore
scopes = ["admin", "read"]
if not user:
raise HTTPException(
@@ -33,5 +34,8 @@ def login(request: LoginRequest) -> Token:
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(data={"sub": user.email, "scope": " ".join(scopes)}, expires_delta=access_token_expires)
access_token = create_access_token(
data={"sub": user.email, "scope": " ".join(scopes)},
expires_delta=access_token_expires,
)
return Token(access_token=access_token, token_type="bearer")
+10 -6
View File
@@ -8,15 +8,15 @@ LOGGING_CONFIG: dict[str, Any] = {
"formatters": {
"default": {
"()": "uvicorn.logging.DefaultFormatter",
"fmt": "%(asctime)s - %(name)s - %(levelprefix)s %(message)s"
"fmt": "%(asctime)s - %(name)s - %(levelprefix)s %(message)s",
},
"access": {
"()": "uvicorn.logging.AccessFormatter",
"fmt": '%(asctime)s - %(name)s - %(levelprefix)s %(client_addr)s - "%(request_line)s" %(status_code)s', # noqa: E501
"fmt": '%(asctime)s - %(name)s - %(levelprefix)s %(client_addr)s - "%(request_line)s" %(status_code)s', # noqa: E501
},
"access_file": {
"()": "uvicorn.logging.AccessFormatter",
"fmt": '%(asctime)s - %(name)s - %(levelprefix)s %(client_addr)s - "%(request_line)s" %(status_code)s', # noqa: E501
"fmt": '%(asctime)s - %(name)s - %(levelprefix)s %(client_addr)s - "%(request_line)s" %(status_code)s', # noqa: E501
"use_colors": False,
},
},
@@ -34,12 +34,16 @@ LOGGING_CONFIG: dict[str, Any] = {
},
"loggers": {
"root": {"handlers": ["default"], "level": "INFO", "propagate": False},
"kontor": {"handlers": ["default"], "level": "INFO", "propagate": True},
"kontor": {"handlers": ["default"], "level": "INFO", "propagate": False},
"uvicorn": {"handlers": ["default"], "level": "INFO", "propagate": False},
"uvicorn.error": {"level": "INFO"},
"uvicorn.access": {"handlers": ["default"], "level": "WARNING", "propagate": False},
"uvicorn.access": {
"handlers": ["default"],
"level": "WARNING",
"propagate": False,
},
},
}
logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger('kontor')
logger = logging.getLogger("kontor")
+33 -34
View File
@@ -1,29 +1,21 @@
import logging
from datetime import datetime, timezone
from datetime import timedelta
from typing import Optional, Annotated, List
from typing import Dict
from typing import Optional
from fastapi import HTTPException, Security
from fastapi import Request
from fastapi import status
from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
from fastapi.security.utils import get_authorization_scheme_param
from datetime import datetime, timedelta, timezone
from typing import Annotated, Dict, List, Optional
import bcrypt
from fastapi import Depends
from fastapi.security import SecurityScopes, OAuth2PasswordBearer, OAuth2
from fastapi import Depends, HTTPException, Request, Security, status
from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
from fastapi.security import OAuth2, OAuth2PasswordBearer, SecurityScopes
from fastapi.security.utils import get_authorization_scheme_param
from jose import JWTError, jwt
from pydantic import ValidationError
from src.core.config import settings
from jose import jwt, JWTError
from src.core.log_conf import logger
from src.db.models.admin import Profile
from src.db.repository.admin import get_profile
from src.db.session import SessionLocal
from src.schema.admin import TokenData, ProfileModel
from src.schema.admin import ProfileModel, TokenData
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="/api/login/token",
@@ -33,15 +25,15 @@ oauth2_scheme = OAuth2PasswordBearer(
class OAuth2PasswordBearerWithCookie(OAuth2):
def __init__(
self,
tokenUrl: str,
scheme_name: Optional[str] = None,
scopes: Optional[Dict[str, str]] = None,
auto_error: bool = True,
self,
tokenUrl: str,
scheme_name: Optional[str] = None,
scopes: Optional[Dict[str, str]] = None,
auto_error: bool = True,
):
if not scopes:
scopes = {}
flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes}) # type: ignore
flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes}) # type: ignore
super().__init__(flows=flows, scheme_name=scheme_name, auto_error=auto_error)
async def __call__(self, request: Request) -> Optional[str]:
@@ -63,7 +55,7 @@ class OAuth2PasswordBearerWithCookie(OAuth2):
def authenticate_user(username: str, password: str) -> Optional[Profile]:
with SessionLocal() as db:
user = get_profile(username=username, db=db)
logger.info(user)
logger.debug(user)
if not user:
return None
if bcrypt.checkpw(password.encode(), user.password.encode()):
@@ -88,7 +80,9 @@ def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
return encoded_jwt
async def get_current_user(security_scopes: SecurityScopes, token: Annotated[str, Depends(oauth2_scheme)]):
async def get_current_user(
security_scopes: SecurityScopes, token: Annotated[str, Depends(oauth2_scheme)]
):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
@@ -99,8 +93,10 @@ async def get_current_user(security_scopes: SecurityScopes, token: Annotated[str
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
username: str = payload.get("sub") # type: ignore
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
username: str = payload.get("sub") # type: ignore
logger.info("username/email extracted is ", username)
if username is None:
raise credentials_exception
@@ -110,7 +106,7 @@ async def get_current_user(security_scopes: SecurityScopes, token: Annotated[str
except (JWTError, ValidationError):
raise credentials_exception
with SessionLocal() as db:
user = get_profile(username=token_data.username, db=db) # type: ignore
user = get_profile(username=token_data.username, db=db) # type: ignore
if user is None:
raise credentials_exception
for scope in security_scopes.scopes:
@@ -124,17 +120,20 @@ async def get_current_user(security_scopes: SecurityScopes, token: Annotated[str
async def get_current_active_user(
current_user: Annotated[Profile, Security(get_current_user, scopes=["me"])],
current_user: Annotated[Profile, Security(get_current_user, scopes=["me"])],
) -> ProfileModel:
if not current_user.enabled: # type: ignore
if not current_user.enabled: # type: ignore
raise HTTPException(status_code=400, detail="Inactive user")
user_model = ProfileModel(username=current_user.user_name, email=current_user.email, # type: ignore
first_name=current_user.first_name, last_name=current_user.last_name, # type: ignore
active=current_user.enabled) # type: ignore
user_model = ProfileModel(
username=current_user.user_name,
email=current_user.email, # type: ignore
first_name=current_user.first_name,
last_name=current_user.last_name, # type: ignore
active=current_user.enabled,
) # type: ignore
return user_model
def get_current_user_from_token(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
@@ -144,7 +143,7 @@ def get_current_user_from_token(token: str = Depends(oauth2_scheme)):
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
username: str = payload.get("sub") # type: ignore
username: str = payload.get("sub") # type: ignore
logger.info("username/email extracted is ", username)
if username is None:
raise credentials_exception
+32 -27
View File
@@ -1,35 +1,41 @@
"""
Setup database connections
"""
from logging import Logger
import sqlite3
from typing import Any, Dict
import psycopg2
import logging.config
from platformdirs import PlatformDirs
import sqlite3
from logging import Logger
from pathlib import Path
from typing import Any, Dict
import psycopg2
import requests
import yaml
from platformdirs import PlatformDirs
def get_database_cursors(log, config: str):
dirs = PlatformDirs(config)
database_config = Path(dirs.user_config_dir, 'database-config.yaml')
with open(database_config, 'rt') as f:
database_config = Path(dirs.user_config_dir, "database-config.yaml")
with open(database_config, "rt") as f:
db_config = yaml.safe_load(f.read())
sqlite_db = db_config["sqlite"]["file"]
log.info('using SQLite3 database {}'.format(sqlite_db))
sqlite_conn = sqlite3.connect(sqlite_db, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
log.info("using SQLite3 database {}".format(sqlite_db))
sqlite_conn = sqlite3.connect(
sqlite_db, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES
)
mariadb_conn = None
postgres_conn = psycopg2.connect(f"host={db_config['postgres']['host']} port={db_config['postgres']['port']} user={db_config['postgres']['user']} password={db_config['postgres']['password']} dbname={db_config['postgres']['database']}")
postgres_conn = psycopg2.connect(
f"host={db_config['postgres']['host']} port={db_config['postgres']['port']} user={db_config['postgres']['user']} password={db_config['postgres']['password']} dbname={db_config['postgres']['database']}"
)
return sqlite_conn, mariadb_conn, postgres_conn
def create_tables(sqlite_conn, logger, recreate_db, scripts):
logger.info('create_tables')
logger.info("create_tables")
for table_id in scripts:
create_statement = scripts[table_id]['create']
drop_statement = scripts[table_id]['drop']
create_statement = scripts[table_id]["create"]
drop_statement = scripts[table_id]["drop"]
logger.debug(create_statement)
cursor = sqlite_conn.cursor()
if recreate_db:
@@ -40,11 +46,11 @@ def create_tables(sqlite_conn, logger, recreate_db, scripts):
def get_logger(level, config: str):
dirs = PlatformDirs(config)
logging_config = Path(dirs.user_config_dir, 'logging-config.yaml')
with open(logging_config, 'rt') as f:
logging_config = Path(dirs.user_config_dir, "logging-config.yaml")
with open(logging_config, "rt") as f:
log_config = yaml.safe_load(f.read())
logging.config.dictConfig(log_config)
logger = logging.getLogger('development')
logger = logging.getLogger("development")
if level is not None:
match level:
case 0:
@@ -57,14 +63,15 @@ def get_logger(level, config: str):
logger.setLevel(logging.INFO)
return logger
def get_api_config(log: Logger, config: str) -> Dict[str, Any]:
api_data: Dict[str, Any] = {}
token: str | None = None
host: str | None = None
port: int = 0
dirs = PlatformDirs(config)
api_config = Path(dirs.user_config_dir, 'api.yaml')
with open(api_config, 'rt') as f:
api_config = Path(dirs.user_config_dir, "api.yaml")
with open(api_config, "rt") as f:
api_data = yaml.safe_load(f.read())
if not api_data:
log.fatal("API configuration is missing")
@@ -75,8 +82,8 @@ def get_api_config(log: Logger, config: str) -> Dict[str, Any]:
log.info("Call login first")
login_url = f"http://{host}:{port}/login"
login_data = {}
login_data['email'] = api_data["email"]
login_data['password'] = api_data["password"]
login_data["email"] = api_data["email"]
login_data["password"] = api_data["password"]
response = requests.post(login_url, json=login_data)
status = response.status_code
log.info(f"Status: {status}")
@@ -85,12 +92,10 @@ def get_api_config(log: Logger, config: str) -> Dict[str, Any]:
return api_data
data = response.json()
log.debug(f"got data: {data}")
token = data['access_token']
token_type = data['token_type']
api_data['token'] = token
api_data['token_type'] = token_type
with open(api_config, 'w') as f:
token = data["access_token"]
token_type = data["token_type"]
api_data["token"] = token
api_data["token_type"] = token_type
with open(api_config, "w") as f:
yaml.dump(api_data, f)
else:
token = api_data['token']
return api_data