diff --git a/kontor-api/src/apis/version1/login.py b/kontor-api/src/apis/version1/login.py index 73a0d2e..f6b28b7 100644 --- a/kontor-api/src/apis/version1/login.py +++ b/kontor-api/src/apis/version1/login.py @@ -1,4 +1,5 @@ from datetime import timedelta + from fastapi import APIRouter, HTTPException, status from pydantic import BaseModel @@ -9,6 +10,7 @@ from src.schema.admin import Token login_router = APIRouter() + class LoginRequest(BaseModel): email: str | None = None password: str | None = None @@ -19,12 +21,11 @@ class LoginRequest(BaseModel): tags=["login"], summary="Login and get token", response_description="Return HTTP status code 200 (OK)", - status_code=status.HTTP_200_OK + status_code=status.HTTP_200_OK, ) def login(request: LoginRequest) -> Token: - logger.info(f"login with {request.email} with {request.password}") - print(f"login with {request.email} with {request.password}") - user = authenticate_user(request.email, request.password) # type: ignore + logger.info(f"login with {request.email}") + user = authenticate_user(request.email, request.password) # type: ignore scopes = ["admin", "read"] if not user: raise HTTPException( @@ -33,5 +34,8 @@ def login(request: LoginRequest) -> Token: headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) - access_token = create_access_token(data={"sub": user.email, "scope": " ".join(scopes)}, expires_delta=access_token_expires) + access_token = create_access_token( + data={"sub": user.email, "scope": " ".join(scopes)}, + expires_delta=access_token_expires, + ) return Token(access_token=access_token, token_type="bearer") diff --git a/kontor-api/src/core/log_conf.py b/kontor-api/src/core/log_conf.py index 0ca6fe2..c33186f 100644 --- a/kontor-api/src/core/log_conf.py +++ b/kontor-api/src/core/log_conf.py @@ -8,15 +8,15 @@ LOGGING_CONFIG: dict[str, Any] = { "formatters": { "default": { "()": "uvicorn.logging.DefaultFormatter", - "fmt": "%(asctime)s - %(name)s - %(levelprefix)s %(message)s" + "fmt": "%(asctime)s - %(name)s - %(levelprefix)s %(message)s", }, "access": { "()": "uvicorn.logging.AccessFormatter", - "fmt": '%(asctime)s - %(name)s - %(levelprefix)s %(client_addr)s - "%(request_line)s" %(status_code)s', # noqa: E501 + "fmt": '%(asctime)s - %(name)s - %(levelprefix)s %(client_addr)s - "%(request_line)s" %(status_code)s', # noqa: E501 }, "access_file": { "()": "uvicorn.logging.AccessFormatter", - "fmt": '%(asctime)s - %(name)s - %(levelprefix)s %(client_addr)s - "%(request_line)s" %(status_code)s', # noqa: E501 + "fmt": '%(asctime)s - %(name)s - %(levelprefix)s %(client_addr)s - "%(request_line)s" %(status_code)s', # noqa: E501 "use_colors": False, }, }, @@ -34,12 +34,16 @@ LOGGING_CONFIG: dict[str, Any] = { }, "loggers": { "root": {"handlers": ["default"], "level": "INFO", "propagate": False}, - "kontor": {"handlers": ["default"], "level": "INFO", "propagate": True}, + "kontor": {"handlers": ["default"], "level": "INFO", "propagate": False}, "uvicorn": {"handlers": ["default"], "level": "INFO", "propagate": False}, "uvicorn.error": {"level": "INFO"}, - "uvicorn.access": {"handlers": ["default"], "level": "WARNING", "propagate": False}, + "uvicorn.access": { + "handlers": ["default"], + "level": "WARNING", + "propagate": False, + }, }, } logging.config.dictConfig(LOGGING_CONFIG) -logger = logging.getLogger('kontor') +logger = logging.getLogger("kontor") diff --git a/kontor-api/src/core/security.py b/kontor-api/src/core/security.py index ad3e42e..10c8796 100644 --- a/kontor-api/src/core/security.py +++ b/kontor-api/src/core/security.py @@ -1,29 +1,21 @@ import logging -from datetime import datetime, timezone -from datetime import timedelta -from typing import Optional, Annotated, List -from typing import Dict -from typing import Optional - -from fastapi import HTTPException, Security -from fastapi import Request -from fastapi import status -from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel -from fastapi.security.utils import get_authorization_scheme_param +from datetime import datetime, timedelta, timezone +from typing import Annotated, Dict, List, Optional import bcrypt -from fastapi import Depends -from fastapi.security import SecurityScopes, OAuth2PasswordBearer, OAuth2 +from fastapi import Depends, HTTPException, Request, Security, status +from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel +from fastapi.security import OAuth2, OAuth2PasswordBearer, SecurityScopes +from fastapi.security.utils import get_authorization_scheme_param +from jose import JWTError, jwt from pydantic import ValidationError from src.core.config import settings -from jose import jwt, JWTError - from src.core.log_conf import logger from src.db.models.admin import Profile from src.db.repository.admin import get_profile from src.db.session import SessionLocal -from src.schema.admin import TokenData, ProfileModel +from src.schema.admin import ProfileModel, TokenData oauth2_scheme = OAuth2PasswordBearer( tokenUrl="/api/login/token", @@ -33,15 +25,15 @@ oauth2_scheme = OAuth2PasswordBearer( class OAuth2PasswordBearerWithCookie(OAuth2): def __init__( - self, - tokenUrl: str, - scheme_name: Optional[str] = None, - scopes: Optional[Dict[str, str]] = None, - auto_error: bool = True, + self, + tokenUrl: str, + scheme_name: Optional[str] = None, + scopes: Optional[Dict[str, str]] = None, + auto_error: bool = True, ): if not scopes: scopes = {} - flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes}) # type: ignore + flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes}) # type: ignore super().__init__(flows=flows, scheme_name=scheme_name, auto_error=auto_error) async def __call__(self, request: Request) -> Optional[str]: @@ -63,7 +55,7 @@ class OAuth2PasswordBearerWithCookie(OAuth2): def authenticate_user(username: str, password: str) -> Optional[Profile]: with SessionLocal() as db: user = get_profile(username=username, db=db) - logger.info(user) + logger.debug(user) if not user: return None if bcrypt.checkpw(password.encode(), user.password.encode()): @@ -88,7 +80,9 @@ def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): return encoded_jwt -async def get_current_user(security_scopes: SecurityScopes, token: Annotated[str, Depends(oauth2_scheme)]): +async def get_current_user( + security_scopes: SecurityScopes, token: Annotated[str, Depends(oauth2_scheme)] +): if security_scopes.scopes: authenticate_value = f'Bearer scope="{security_scopes.scope_str}"' else: @@ -99,8 +93,10 @@ async def get_current_user(security_scopes: SecurityScopes, token: Annotated[str headers={"WWW-Authenticate": authenticate_value}, ) try: - payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) - username: str = payload.get("sub") # type: ignore + payload = jwt.decode( + token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] + ) + username: str = payload.get("sub") # type: ignore logger.info("username/email extracted is ", username) if username is None: raise credentials_exception @@ -110,7 +106,7 @@ async def get_current_user(security_scopes: SecurityScopes, token: Annotated[str except (JWTError, ValidationError): raise credentials_exception with SessionLocal() as db: - user = get_profile(username=token_data.username, db=db) # type: ignore + user = get_profile(username=token_data.username, db=db) # type: ignore if user is None: raise credentials_exception for scope in security_scopes.scopes: @@ -124,17 +120,20 @@ async def get_current_user(security_scopes: SecurityScopes, token: Annotated[str async def get_current_active_user( - current_user: Annotated[Profile, Security(get_current_user, scopes=["me"])], + current_user: Annotated[Profile, Security(get_current_user, scopes=["me"])], ) -> ProfileModel: - if not current_user.enabled: # type: ignore + if not current_user.enabled: # type: ignore raise HTTPException(status_code=400, detail="Inactive user") - user_model = ProfileModel(username=current_user.user_name, email=current_user.email, # type: ignore - first_name=current_user.first_name, last_name=current_user.last_name, # type: ignore - active=current_user.enabled) # type: ignore + user_model = ProfileModel( + username=current_user.user_name, + email=current_user.email, # type: ignore + first_name=current_user.first_name, + last_name=current_user.last_name, # type: ignore + active=current_user.enabled, + ) # type: ignore return user_model - def get_current_user_from_token(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, @@ -144,7 +143,7 @@ def get_current_user_from_token(token: str = Depends(oauth2_scheme)): payload = jwt.decode( token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) - username: str = payload.get("sub") # type: ignore + username: str = payload.get("sub") # type: ignore logger.info("username/email extracted is ", username) if username is None: raise credentials_exception diff --git a/kontor-scripts/config.py b/kontor-scripts/config.py index afad375..830667a 100644 --- a/kontor-scripts/config.py +++ b/kontor-scripts/config.py @@ -1,35 +1,41 @@ """ Setup database connections """ -from logging import Logger -import sqlite3 -from typing import Any, Dict -import psycopg2 + import logging.config -from platformdirs import PlatformDirs +import sqlite3 +from logging import Logger from pathlib import Path +from typing import Any, Dict + +import psycopg2 import requests import yaml +from platformdirs import PlatformDirs def get_database_cursors(log, config: str): dirs = PlatformDirs(config) - database_config = Path(dirs.user_config_dir, 'database-config.yaml') - with open(database_config, 'rt') as f: + database_config = Path(dirs.user_config_dir, "database-config.yaml") + with open(database_config, "rt") as f: db_config = yaml.safe_load(f.read()) sqlite_db = db_config["sqlite"]["file"] - log.info('using SQLite3 database {}'.format(sqlite_db)) - sqlite_conn = sqlite3.connect(sqlite_db, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) + log.info("using SQLite3 database {}".format(sqlite_db)) + sqlite_conn = sqlite3.connect( + sqlite_db, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES + ) mariadb_conn = None - postgres_conn = psycopg2.connect(f"host={db_config['postgres']['host']} port={db_config['postgres']['port']} user={db_config['postgres']['user']} password={db_config['postgres']['password']} dbname={db_config['postgres']['database']}") + postgres_conn = psycopg2.connect( + f"host={db_config['postgres']['host']} port={db_config['postgres']['port']} user={db_config['postgres']['user']} password={db_config['postgres']['password']} dbname={db_config['postgres']['database']}" + ) return sqlite_conn, mariadb_conn, postgres_conn def create_tables(sqlite_conn, logger, recreate_db, scripts): - logger.info('create_tables') + logger.info("create_tables") for table_id in scripts: - create_statement = scripts[table_id]['create'] - drop_statement = scripts[table_id]['drop'] + create_statement = scripts[table_id]["create"] + drop_statement = scripts[table_id]["drop"] logger.debug(create_statement) cursor = sqlite_conn.cursor() if recreate_db: @@ -40,11 +46,11 @@ def create_tables(sqlite_conn, logger, recreate_db, scripts): def get_logger(level, config: str): dirs = PlatformDirs(config) - logging_config = Path(dirs.user_config_dir, 'logging-config.yaml') - with open(logging_config, 'rt') as f: + logging_config = Path(dirs.user_config_dir, "logging-config.yaml") + with open(logging_config, "rt") as f: log_config = yaml.safe_load(f.read()) logging.config.dictConfig(log_config) - logger = logging.getLogger('development') + logger = logging.getLogger("development") if level is not None: match level: case 0: @@ -57,14 +63,15 @@ def get_logger(level, config: str): logger.setLevel(logging.INFO) return logger + def get_api_config(log: Logger, config: str) -> Dict[str, Any]: api_data: Dict[str, Any] = {} token: str | None = None host: str | None = None port: int = 0 dirs = PlatformDirs(config) - api_config = Path(dirs.user_config_dir, 'api.yaml') - with open(api_config, 'rt') as f: + api_config = Path(dirs.user_config_dir, "api.yaml") + with open(api_config, "rt") as f: api_data = yaml.safe_load(f.read()) if not api_data: log.fatal("API configuration is missing") @@ -75,8 +82,8 @@ def get_api_config(log: Logger, config: str) -> Dict[str, Any]: log.info("Call login first") login_url = f"http://{host}:{port}/login" login_data = {} - login_data['email'] = api_data["email"] - login_data['password'] = api_data["password"] + login_data["email"] = api_data["email"] + login_data["password"] = api_data["password"] response = requests.post(login_url, json=login_data) status = response.status_code log.info(f"Status: {status}") @@ -85,12 +92,10 @@ def get_api_config(log: Logger, config: str) -> Dict[str, Any]: return api_data data = response.json() log.debug(f"got data: {data}") - token = data['access_token'] - token_type = data['token_type'] - api_data['token'] = token - api_data['token_type'] = token_type - with open(api_config, 'w') as f: + token = data["access_token"] + token_type = data["token_type"] + api_data["token"] = token + api_data["token_type"] = token_type + with open(api_config, "w") as f: yaml.dump(api_data, f) - else: - token = api_data['token'] return api_data