65 lines
2.2 KiB
Python
65 lines
2.2 KiB
Python
import logging
|
|
from typing import Annotated
|
|
|
|
from fastapi import Depends, APIRouter, HTTPException
|
|
from fastapi.security import OAuth2PasswordRequestForm
|
|
|
|
from app.config import Settings, get_settings
|
|
from app.deps import get_current_user_token
|
|
from database import tokens, users
|
|
from schemas.user import TokenSchema, UserDisplaySchema
|
|
from routes.utils import verify_password, create_access_token, create_refresh_token
|
|
|
|
router = APIRouter()
|
|
|
|
logger = logging.getLogger("api")
|
|
|
|
|
|
@router.post('/login', summary="Create access and refresh tokens for user", status_code=200, response_model=TokenSchema)
|
|
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
|
|
settings: Annotated[Settings, Depends(get_settings)]) -> TokenSchema:
|
|
"""
|
|
Log in as given user - create associated JWT for API access
|
|
|
|
:return: JWT for given user
|
|
"""
|
|
# Get requested user
|
|
user = await users.get_user_system_info(username=form_data.username)
|
|
if user is None:
|
|
raise HTTPException(401, "Invalid username or password")
|
|
|
|
# Verify given password
|
|
hashed_pass = user.password
|
|
if not verify_password(form_data.password, hashed_pass):
|
|
raise HTTPException(401, "Invalid username or password")
|
|
|
|
# Create access and refresh tokens
|
|
return TokenSchema(
|
|
access_token=create_access_token(settings, str(user.id)),
|
|
refresh_token=create_refresh_token(settings, str(user.id))
|
|
)
|
|
|
|
|
|
@router.post('/logout', summary="Invalidate current user's token", status_code=200)
|
|
async def logout(user_token: (UserDisplaySchema, TokenSchema) = Depends(get_current_user_token)) -> dict:
|
|
"""
|
|
Log out given user by adding JWT to a blacklist database
|
|
|
|
:return: Logout message
|
|
"""
|
|
user, token = user_token
|
|
|
|
# Blacklist token
|
|
blacklisted = tokens.blacklist_token(token)
|
|
|
|
if not blacklisted:
|
|
logger.debug("Failed to add token to blacklist")
|
|
return {"msg": "Logout failed"}
|
|
|
|
return {"msg": "Logout successful"}
|
|
|
|
# @router.post('/refresh', summary="Refresh JWT token", status_code=200)
|
|
# async def refresh(form: OAuth2RefreshRequestForm = Depends()):
|
|
# if request.method == 'POST':
|
|
# form = await request.json()
|