add login functions for cookie and non-cookie authentication
This commit is contained in:
@@ -1,17 +1,86 @@
|
||||
from datetime import datetime
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
from datetime import timedelta
|
||||
from typing import Optional, Annotated, List
|
||||
from typing import Dict
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import HTTPException, Security
|
||||
from fastapi import Request
|
||||
from fastapi import status
|
||||
from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
|
||||
from fastapi.security.utils import get_authorization_scheme_param
|
||||
|
||||
import bcrypt
|
||||
from fastapi import Depends
|
||||
from fastapi.security import SecurityScopes, OAuth2PasswordBearer, OAuth2
|
||||
from pydantic import ValidationError
|
||||
|
||||
from src.core.config import settings
|
||||
from jose import jwt
|
||||
from jose import jwt, JWTError
|
||||
|
||||
from src.core.log_conf import logger
|
||||
from src.db.models.admin import Profile
|
||||
from src.db.repository.admin import get_profile
|
||||
from src.db.session import SessionLocal
|
||||
from src.schema.admin import TokenData, ProfileModel
|
||||
|
||||
oauth2_scheme = OAuth2PasswordBearer(
|
||||
tokenUrl="/api/login/token",
|
||||
scopes={"me": "read", "admin": "read"},
|
||||
)
|
||||
|
||||
|
||||
class OAuth2PasswordBearerWithCookie(OAuth2):
|
||||
def __init__(
|
||||
self,
|
||||
tokenUrl: str,
|
||||
scheme_name: Optional[str] = None,
|
||||
scopes: Optional[Dict[str, str]] = None,
|
||||
auto_error: bool = True,
|
||||
):
|
||||
if not scopes:
|
||||
scopes = {}
|
||||
flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes})
|
||||
super().__init__(flows=flows, scheme_name=scheme_name, auto_error=auto_error)
|
||||
|
||||
async def __call__(self, request: Request) -> Optional[str]:
|
||||
authorization: str = request.cookies.get(
|
||||
"access_token"
|
||||
) # changed to accept access token from httpOnly Cookie
|
||||
|
||||
scheme, param = get_authorization_scheme_param(authorization)
|
||||
if not authorization or scheme.lower() != "bearer":
|
||||
if self.auto_error:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Not authenticated",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
else:
|
||||
return None
|
||||
return param
|
||||
|
||||
|
||||
def authenticate_user(username: str, password: str) -> Optional[Profile]:
|
||||
with SessionLocal() as db:
|
||||
user = get_profile(username=username, db=db)
|
||||
print(user)
|
||||
if not user:
|
||||
return None
|
||||
if bcrypt.checkpw(password.encode(), user.password.encode()):
|
||||
print("User successful authenticated")
|
||||
else:
|
||||
logger.info("Authentication failed!")
|
||||
return user
|
||||
|
||||
|
||||
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
|
||||
to_encode = data.copy()
|
||||
if expires_delta:
|
||||
expire = datetime.utcnow() + expires_delta
|
||||
expire = datetime.now(timezone.utc) + expires_delta
|
||||
else:
|
||||
expire = datetime.utcnow() + timedelta(
|
||||
expire = datetime.now(timezone.utc) + timedelta(
|
||||
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
|
||||
)
|
||||
to_encode.update({"exp": expire})
|
||||
@@ -19,3 +88,72 @@ def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
|
||||
to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM
|
||||
)
|
||||
return encoded_jwt
|
||||
|
||||
|
||||
async def get_current_user(security_scopes: SecurityScopes, token: Annotated[str, Depends(oauth2_scheme)]):
|
||||
if security_scopes.scopes:
|
||||
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
|
||||
else:
|
||||
authenticate_value = "Bearer"
|
||||
credentials_exception = HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Could not validate credentials",
|
||||
headers={"WWW-Authenticate": authenticate_value},
|
||||
)
|
||||
try:
|
||||
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
|
||||
username: str = payload.get("sub")
|
||||
print("username/email extracted is ", username)
|
||||
if username is None:
|
||||
raise credentials_exception
|
||||
scope: str = payload.get("scope", "")
|
||||
token_scopes: List[str] = scope.split(" ")
|
||||
token_data = TokenData(scopes=token_scopes, username=username)
|
||||
except (JWTError, ValidationError):
|
||||
raise credentials_exception
|
||||
with SessionLocal() as db:
|
||||
user = get_profile(username=token_data.username, db=db)
|
||||
if user is None:
|
||||
raise credentials_exception
|
||||
for scope in security_scopes.scopes:
|
||||
if scope not in token_scopes:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="not enough permissions",
|
||||
headers={"WWW-Authenticate": authenticate_value},
|
||||
)
|
||||
return user
|
||||
|
||||
|
||||
async def get_current_active_user(
|
||||
current_user: Annotated[Profile, Security(get_current_user, scopes=["me"])],
|
||||
) -> ProfileModel:
|
||||
if not current_user.enabled:
|
||||
raise HTTPException(status_code=400, detail="Inactive user")
|
||||
user_model = ProfileModel(username=current_user.user_name, email=current_user.email,
|
||||
first_name=current_user.first_name, last_name=current_user.last_name,
|
||||
active=current_user.enabled)
|
||||
return user_model
|
||||
|
||||
|
||||
|
||||
def get_current_user_from_token(token: str = Depends(oauth2_scheme)):
|
||||
credentials_exception = HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Could not validate credentials",
|
||||
)
|
||||
try:
|
||||
payload = jwt.decode(
|
||||
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
|
||||
)
|
||||
username: str = payload.get("sub")
|
||||
print("username/email extracted is ", username)
|
||||
if username is None:
|
||||
raise credentials_exception
|
||||
except JWTError:
|
||||
raise credentials_exception
|
||||
with SessionLocal() as db:
|
||||
user = get_profile(username=username, db=db)
|
||||
if user is None:
|
||||
raise credentials_exception
|
||||
return user
|
||||
|
||||
Reference in New Issue
Block a user