Fix request JSON access issues

This commit is contained in:
april 2023-12-19 13:07:55 -06:00
parent 43e90f8b07
commit ce1d2c0918

View File

@ -1,14 +1,15 @@
import functools
import json, os, sys
import json
import os
from datetime import timedelta, datetime, timezone
import bcrypt
from flask import Flask, request, Response, jsonify, session
from pymongo import database
from flask import Flask, request, jsonify
from mongoengine import connect, ValidationError, DoesNotExist
from flask_jwt_extended import create_access_token, get_jwt, get_jwt_identity, unset_jwt_cookies, jwt_required, \
JWTManager
from database.models import Flight, User, AuthLevel
from mongoengine import connect, ValidationError, DoesNotExist
from flask_jwt_extended import create_access_token, get_jwt , get_jwt_identity, unset_jwt_cookies, jwt_required, JWTManager
# Initialize Flask app
api = Flask(__name__)
@ -37,6 +38,7 @@ def auth_level_required(level: AuthLevel):
:param level: required authorization level to access this endpoint
:return: 403 Unauthorized upon auth failure or response of decorated function on auth success
"""
def auth_inner(func):
def auth_wrapper(*args, **kwargs):
user = User.objects.get(username=get_jwt_identity())
@ -45,8 +47,10 @@ def auth_level_required(level: AuthLevel):
return '', 403
else:
return func(*args, **kwargs)
auth_wrapper.__name__ = func.__name__
return auth_wrapper
return auth_inner
@ -86,9 +90,15 @@ def add_user():
:return: Failure message if user already exists, otherwise ID of newly created user
"""
body = request.get_json()
username = body.username
password = body.password
auth_level = AuthLevel(body.auth_level)
try:
username = body["username"]
password = body["password"]
except KeyError:
return jsonify({"msg": "Missing username or password"})
try:
auth_level = AuthLevel(body["auth_level"])
except KeyError:
auth_level = AuthLevel.USER
try:
existing_user = User.objects.get(username=username)
@ -101,7 +111,7 @@ def add_user():
user = User(username=username, password=hashed_password, level=auth_level.value)
user.save()
return jsonify({"id": user.id}), 201
return jsonify({"id": str(user.id)}), 201
@api.route('/users/<user_id>', methods=['DELETE'])
@ -143,8 +153,12 @@ def create_token():
:return: 401 if username or password invalid, else JWT
"""
username = request.json.get("username", None)
password = request.json.get("password", None)
body = request.get_json()
try:
username = body["username"]
password = body["password"]
except KeyError:
return jsonify({"msg": "Missing username or password"})
try:
user = User.objects.get(username=username)
@ -206,7 +220,7 @@ def update_user_profile(user_id):
return jsonify({"msg": "User not found"}), 401
body = request.get_json()
return update_profile(user.id, body.username, body.password, body.auth_level)
return update_profile(user.id, body["username"], body["password"], body["auth_level"])
@api.route('/profile', methods=["GET"])
@ -240,7 +254,7 @@ def update_profile():
return {"msg": "user not found"}, 401
body = request.get_json()
return update_profile(user.id, body.username, body.password, body.auth_level)
return update_profile(user.id, body["username"], body["password"], body["auth_level"])
@api.route('/flights', methods=['GET'])
@ -251,8 +265,12 @@ def get_flights():
:return: List of flights
"""
user = User.objects.get(username=get_jwt_identity()).id
flights = Flight.objects(user=user).to_json()
try:
user = User.objects.get(username=get_jwt_identity())
except DoesNotExist:
api.logger.warning("User %s not found", get_jwt_identity())
return {"msg": "user not found"}, 401
flights = Flight.objects(user=user.id).to_json()
return flights, 200
@ -278,9 +296,14 @@ def get_flight(flight_id):
:param flight_id: ID of requested flight
:return: Flight details
"""
user = User.objects.get(username=get_jwt_identity()).id
try:
user = User.objects.get(username=get_jwt_identity())
except DoesNotExist:
api.logger.warning("User %s not found", get_jwt_identity())
return {"msg": "user not found"}, 401
flight = Flight.objects(id=flight_id).to_json()
if flight.user != user and AuthLevel(user.level) != AuthLevel.ADMIN:
if flight.user != user.id and AuthLevel(user.level) != AuthLevel.ADMIN:
api.logger.warning("Attempted access to unauthorized flight by %s", user.username)
return {"msg": "Unauthorized access"}, 403
return flight, 200
@ -294,7 +317,11 @@ def add_flight():
:return: Error message if request invalid, else ID of newly created log
"""
user = User.objects(username=get_jwt_identity())
try:
user = User.objects.get(username=get_jwt_identity())
except DoesNotExist:
api.logger.warning("User %s not found", get_jwt_identity())
return {"msg": "user not found"}, 401
body = request.get_json()
try:
@ -375,7 +402,7 @@ if __name__ == '__main__':
api.logger.warning("'TAILFIN_ADMIN_PASSWORD' not set, using default password 'admin'\n"
"Change this as soon as possible")
hashed_password = bcrypt.hashpw(admin_password.encode('utf-8'), bcrypt.gensalt())
User(username=admin_username, password=hashed_password, level=AuthLevel.ADMIN).save()
User(username=admin_username, password=hashed_password, level=AuthLevel.ADMIN.value).save()
api.logger.info("Default admin user created with username %s", User.objects.get(level=AuthLevel.ADMIN).username)
# Start the app