Migrate to motor for DB interaction
This commit is contained in:
@@ -4,21 +4,21 @@ from typing import Annotated
|
||||
from fastapi import Depends, HTTPException
|
||||
from fastapi.security import OAuth2PasswordBearer
|
||||
from jose import jwt
|
||||
from mongoengine import DoesNotExist
|
||||
from pydantic import ValidationError
|
||||
|
||||
from app.config import get_settings, Settings
|
||||
from database.models import User, TokenBlacklist
|
||||
from schemas import GetSystemUserSchema, TokenPayload, AuthLevel
|
||||
from database.tokens import is_blacklisted
|
||||
from database.users import get_user_system_info, get_user_system_info_id
|
||||
from schemas.user import TokenPayload, AuthLevel, UserDisplaySchema
|
||||
|
||||
reusable_oath = OAuth2PasswordBearer(
|
||||
tokenUrl="/login",
|
||||
tokenUrl="/auth/login",
|
||||
scheme_name="JWT"
|
||||
)
|
||||
|
||||
|
||||
async def get_current_user(settings: Annotated[Settings, Depends(get_settings)],
|
||||
token: str = Depends(reusable_oath)) -> GetSystemUserSchema:
|
||||
token: str = Depends(reusable_oath)) -> UserDisplaySchema:
|
||||
try:
|
||||
payload = jwt.decode(
|
||||
token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm]
|
||||
@@ -30,20 +30,19 @@ async def get_current_user(settings: Annotated[Settings, Depends(get_settings)],
|
||||
except (jwt.JWTError, ValidationError):
|
||||
raise HTTPException(403, "Could not validate credentials", {"WWW-Authenticate": "Bearer"})
|
||||
|
||||
try:
|
||||
TokenBlacklist.objects.get(token=token)
|
||||
blacklisted = await is_blacklisted(token)
|
||||
if blacklisted:
|
||||
raise HTTPException(403, "Token expired", {"WWW-Authenticate": "Bearer"})
|
||||
except DoesNotExist:
|
||||
try:
|
||||
user = User.objects.get(id=token_data.sub)
|
||||
except DoesNotExist:
|
||||
raise HTTPException(404, "Could not find user")
|
||||
|
||||
return GetSystemUserSchema(id=str(user.id), username=user.username, level=user.level, password=user.password)
|
||||
user = await get_user_system_info_id(id=token_data.sub)
|
||||
if user is None:
|
||||
raise HTTPException(404, "Could not find user")
|
||||
|
||||
return user
|
||||
|
||||
|
||||
async def get_current_user_token(settings: Annotated[Settings, Depends(get_settings)],
|
||||
token: str = Depends(reusable_oath)) -> (GetSystemUserSchema, str):
|
||||
token: str = Depends(reusable_oath)) -> (UserDisplaySchema, str):
|
||||
try:
|
||||
payload = jwt.decode(
|
||||
token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm]
|
||||
@@ -55,19 +54,17 @@ async def get_current_user_token(settings: Annotated[Settings, Depends(get_setti
|
||||
except (jwt.JWTError, ValidationError):
|
||||
raise HTTPException(403, "Could not validate credentials", {"WWW-Authenticate": "Bearer"})
|
||||
|
||||
try:
|
||||
TokenBlacklist.objects.get(token=token)
|
||||
blacklisted = await is_blacklisted(token)
|
||||
if blacklisted:
|
||||
raise HTTPException(403, "Token expired", {"WWW-Authenticate": "Bearer"})
|
||||
except DoesNotExist:
|
||||
try:
|
||||
user = User.objects.get(id=token_data.sub)
|
||||
except DoesNotExist:
|
||||
raise HTTPException(404, "Could not find user")
|
||||
|
||||
return GetSystemUserSchema(id=str(user.id), username=user.username, level=user.level,
|
||||
password=user.password), token
|
||||
user = await get_user_system_info(id=token_data.sub)
|
||||
if user is None:
|
||||
raise HTTPException(404, "Could not find user")
|
||||
|
||||
return user
|
||||
|
||||
|
||||
async def admin_required(user: Annotated[GetSystemUserSchema, Depends(get_current_user)]):
|
||||
async def admin_required(user: Annotated[UserDisplaySchema, Depends(get_current_user)]):
|
||||
if user.level < AuthLevel.ADMIN:
|
||||
raise HTTPException(403, "Access unauthorized")
|
||||
|
Reference in New Issue
Block a user