Refactor into multiple files

This commit is contained in:
april 2023-12-19 13:27:07 -06:00
parent ce1d2c0918
commit 1f275ec195
5 changed files with 398 additions and 350 deletions

View File

@ -2,18 +2,22 @@ import json
import os import os
from datetime import timedelta, datetime, timezone from datetime import timedelta, datetime, timezone
import bcrypt from flask import Flask
from flask import Flask, request, jsonify
from mongoengine import connect, ValidationError, DoesNotExist from mongoengine import connect
from flask_jwt_extended import create_access_token, get_jwt, get_jwt_identity, unset_jwt_cookies, jwt_required, \ from flask_jwt_extended import create_access_token, get_jwt, get_jwt_identity, JWTManager
JWTManager
from database.models import Flight, User, AuthLevel from routes.flights import flights_api
from routes.users import users_api
from routes.utils import create_admin_user
# Initialize Flask app # Initialize Flask app
api = Flask(__name__) api = Flask(__name__)
# Register route blueprints
api.register_blueprint(users_api)
api.register_blueprint(flights_api)
# Set JWT key from environment variable # Set JWT key from environment variable
try: try:
api.config["JWT_SECRET_KEY"] = os.environ["TAILFIN_DB_KEY"] api.config["JWT_SECRET_KEY"] = os.environ["TAILFIN_DB_KEY"]
@ -31,29 +35,6 @@ jwt = JWTManager(api)
connect('tailfin') connect('tailfin')
def auth_level_required(level: AuthLevel):
"""
Limit access to given authorization level.
:param level: required authorization level to access this endpoint
:return: 403 Unauthorized upon auth failure or response of decorated function on auth success
"""
def auth_inner(func):
def auth_wrapper(*args, **kwargs):
user = User.objects.get(username=get_jwt_identity())
if AuthLevel(user.level) < level:
api.logger.warning("Attempted access to unauthorized resource by %s", user.username)
return '', 403
else:
return func(*args, **kwargs)
auth_wrapper.__name__ = func.__name__
return auth_wrapper
return auth_inner
@api.after_request @api.after_request
def refresh_expiring_jwts(response): def refresh_expiring_jwts(response):
""" """
@ -80,330 +61,12 @@ def refresh_expiring_jwts(response):
return response return response
@api.route('/users', methods=["POST"])
@jwt_required()
@auth_level_required(AuthLevel.ADMIN)
def add_user():
"""
Add user to database.
:return: Failure message if user already exists, otherwise ID of newly created user
"""
body = request.get_json()
try:
username = body["username"]
password = body["password"]
except KeyError:
return jsonify({"msg": "Missing username or password"})
try:
auth_level = AuthLevel(body["auth_level"])
except KeyError:
auth_level = AuthLevel.USER
try:
existing_user = User.objects.get(username=username)
api.logger.info("User %s already exists at auth level %s", existing_user.username, existing_user.level)
return jsonify({"msg": "Username already exists"})
except DoesNotExist:
api.logger.info("Creating user %s with auth level %s", username, auth_level)
hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
user = User(username=username, password=hashed_password, level=auth_level.value)
user.save()
return jsonify({"id": str(user.id)}), 201
@api.route('/users/<user_id>', methods=['DELETE'])
@jwt_required()
@auth_level_required(AuthLevel.ADMIN)
def remove_user(user_id):
"""
Delete given user from database
:param user_id: ID of user to delete
:return: 200 if success, 401 if user does not exist
"""
try:
User.objects.get(id=user_id).delete()
except DoesNotExist:
api.logger.info("Attempt to delete nonexistent user %s by %s", user_id, get_jwt_identity())
return {"msg": "User does not exist"}, 401
Flight.objects(user=user_id).delete()
return '', 200
@api.route('/users', methods=["GET"])
@jwt_required()
@auth_level_required(AuthLevel.ADMIN)
def get_users():
"""
Get a list of all users
:return: List of users in the database
"""
users = User.objects.to_json()
return users, 200
@api.route('/login', methods=["POST"])
def create_token():
"""
Log in as given user and return JWT for API access
:return: 401 if username or password invalid, else JWT
"""
body = request.get_json()
try:
username = body["username"]
password = body["password"]
except KeyError:
return jsonify({"msg": "Missing username or password"})
try:
user = User.objects.get(username=username)
except DoesNotExist:
return jsonify({"msg": "Invalid username or password"}), 401
else:
if bcrypt.checkpw(password.encode('utf-8'), user.password.encode('utf-8')):
access_token = create_access_token(identity=username)
api.logger.info("%s successfully logged in", username)
response = {"access_token": access_token}
return jsonify(response), 200
api.logger.info("Failed login attempt from %s", request.remote_addr)
return jsonify({"msg": "Invalid username or password"}), 401
@api.route('/logout', methods=["POST"])
def logout():
"""
Log out given user. Note that JWTs cannot be natively revoked so this must also be handled by the frontend
:return: Message with JWT removed from headers
"""
response = jsonify({"msg": "logout successful"})
unset_jwt_cookies(response)
return response
@api.route('/profile/<user_id>', methods=["GET"])
@jwt_required()
@auth_level_required(AuthLevel.ADMIN)
def get_user_profile(user_id):
"""
Get profile of the given user
:param user_id: ID of the requested user
:return: 401 is user does not exist, else username and auth level
"""
try:
user = User.objects.get(id=user_id)
except DoesNotExist:
api.logger.warning("User %s not found", get_jwt_identity())
return {"msg": "User not found"}, 401
return jsonify({"username": user.username, "auth_level:": str(user.level)}), 200
@api.route('/profile/<user_id>', methods=["PUT"])
@jwt_required()
@auth_level_required(AuthLevel.ADMIN)
def update_user_profile(user_id):
"""
Update the profile of the given user
:param user_id: ID of the user to update
:return: Error messages if request is invalid, else 200
"""
try:
user = User.objects.get(id=user_id)
except DoesNotExist:
api.logger.warning("User %s not found", get_jwt_identity())
return jsonify({"msg": "User not found"}), 401
body = request.get_json()
return update_profile(user.id, body["username"], body["password"], body["auth_level"])
@api.route('/profile', methods=["GET"])
@jwt_required()
def get_profile():
"""
Return basic user information for the currently logged-in user
:return: 401 if user not found, else username and auth level
"""
try:
user = User.objects.get(username=get_jwt_identity())
except DoesNotExist:
api.logger.warning("User %s not found", get_jwt_identity())
return jsonify({"msg": "User not found"}), 401
return jsonify({"username": user.username, "auth_level:": str(user.level)}), 200
@api.route('/profile', methods=["PUT"])
@jwt_required()
def update_profile():
"""
Update the profile of the currently logged-in user
:return: Error messages if request is invalid, else 200
"""
try:
user = User.objects.get(username=get_jwt_identity())
except DoesNotExist:
api.logger.warning("User %s not found", get_jwt_identity())
return {"msg": "user not found"}, 401
body = request.get_json()
return update_profile(user.id, body["username"], body["password"], body["auth_level"])
@api.route('/flights', methods=['GET'])
@jwt_required()
def get_flights():
"""
Get a list of the flights logged by the currently logged-in user
:return: List of flights
"""
try:
user = User.objects.get(username=get_jwt_identity())
except DoesNotExist:
api.logger.warning("User %s not found", get_jwt_identity())
return {"msg": "user not found"}, 401
flights = Flight.objects(user=user.id).to_json()
return flights, 200
@api.route('/flights/all', methods=['GET'])
@jwt_required()
@auth_level_required(AuthLevel.ADMIN)
def get_all_flights():
"""
Get a list of all flights logged by any user
:return: List of flights
"""
flights = Flight.objects.to_json()
return flights, 200
@api.route('/flights/<flight_id>', methods=['GET'])
@jwt_required()
def get_flight(flight_id):
"""
Get all details of a given flight
:param flight_id: ID of requested flight
:return: Flight details
"""
try:
user = User.objects.get(username=get_jwt_identity())
except DoesNotExist:
api.logger.warning("User %s not found", get_jwt_identity())
return {"msg": "user not found"}, 401
flight = Flight.objects(id=flight_id).to_json()
if flight.user != user.id and AuthLevel(user.level) != AuthLevel.ADMIN:
api.logger.warning("Attempted access to unauthorized flight by %s", user.username)
return {"msg": "Unauthorized access"}, 403
return flight, 200
@api.route('/flights', methods=['POST'])
@jwt_required()
def add_flight():
"""
Add a flight logbook entry
:return: Error message if request invalid, else ID of newly created log
"""
try:
user = User.objects.get(username=get_jwt_identity())
except DoesNotExist:
api.logger.warning("User %s not found", get_jwt_identity())
return {"msg": "user not found"}, 401
body = request.get_json()
try:
flight = Flight(user=user, **body).save()
except ValidationError:
return jsonify({"msg": "Invalid request"})
id = flight.id
return jsonify({'id': str(id)}), 201
@api.route('/flights/<flight_id>', methods=['PUT'])
@jwt_required()
def update_flight(flight_id):
"""
Update the given flight with new information
:param flight_id: ID of flight to update
:return: Error messages if user not found or access unauthorized, else 200
"""
try:
user = User.objects.get(username=get_jwt_identity())
except DoesNotExist:
api.logger.warning("User %s not found", get_jwt_identity())
return {"msg": "user not found"}, 401
flight = Flight.objects(id=flight_id)
if flight.user != user and AuthLevel(user.level) != AuthLevel.ADMIN:
api.logger.warning("Attempted access to unauthorized flight by %s", user.username)
return {"msg": "Unauthorized access"}, 403
body = request.get_json()
flight.update(**body)
return '', 200
@api.route('/flights/<flight_id>', methods=['DELETE'])
def delete_flight(flight_id):
"""
Delete the given flight
:param flight_id: ID of flight to delete
:return: Error messages if user not found or access unauthorized, else 200
"""
try:
user = User.objects.get(username=get_jwt_identity())
except DoesNotExist:
api.logger.warning("User %s not found", get_jwt_identity())
return {"msg": "user not found"}, 401
flight = Flight.objects(id=flight_id)
if flight.user != user and AuthLevel(user.level) != AuthLevel.ADMIN:
api.logger.warning("Attempted access to unauthorized flight by %s", user.username)
return {"msg": "Unauthorized access"}, 403
flight.delete()
return '', 200
if __name__ == '__main__': if __name__ == '__main__':
# Create default admin user if no admin users found # Create default admin user if it doesn't exist
if User.objects(level=AuthLevel.ADMIN.value).count() == 0: create_admin_user()
api.logger.info("No admin users exist. Creating default admin user...")
try:
admin_username = os.environ["TAILFIN_ADMIN_USERNAME"]
api.logger.info("Setting admin username to 'TAILFIN_ADMIN_USERNAME': %s", admin_username)
except KeyError:
admin_username = "admin"
api.logger.info("'TAILFIN_ADMIN_USERNAME' not set, using default username 'admin'")
try:
admin_password = os.environ["TAILFIN_ADMIN_PASSWORD"]
api.logger.info("Setting admin password to 'TAILFIN_ADMIN_PASSWORD'")
except KeyError:
admin_password = "admin"
api.logger.warning("'TAILFIN_ADMIN_PASSWORD' not set, using default password 'admin'\n"
"Change this as soon as possible")
hashed_password = bcrypt.hashpw(admin_password.encode('utf-8'), bcrypt.gensalt())
User(username=admin_username, password=hashed_password, level=AuthLevel.ADMIN.value).save()
api.logger.info("Default admin user created with username %s", User.objects.get(level=AuthLevel.ADMIN).username)
# Start the app # Start the app
api.run() api.run()

View File

@ -1,5 +1,5 @@
import bcrypt import bcrypt
from flask import jsonify from flask import jsonify, current_app
from mongoengine import DoesNotExist from mongoengine import DoesNotExist
from database.models import User, AuthLevel from database.models import User, AuthLevel
@ -28,6 +28,7 @@ def update_profile(user_id, username=None, password=None, auth_level=None):
hashed_password = bcrypt.hashpw(password.encode('UTF-8'), bcrypt.gensalt()) hashed_password = bcrypt.hashpw(password.encode('UTF-8'), bcrypt.gensalt())
if auth_level: if auth_level:
if AuthLevel(user.level) < AuthLevel.ADMIN: if AuthLevel(user.level) < AuthLevel.ADMIN:
current_app.logger.warning("Unauthorized attempt by %s to change auth level", user.username)
return jsonify({"msg": "Unauthorized attempt to change auth level"}), 403 return jsonify({"msg": "Unauthorized attempt to change auth level"}), 403
if username: if username:

136
api/routes/flights.py Normal file
View File

@ -0,0 +1,136 @@
from flask import Blueprint, current_app, request, jsonify
from mongoengine import DoesNotExist, ValidationError
from flask_jwt_extended import get_jwt_identity, jwt_required
from database.models import User, Flight, AuthLevel
from routes.utils import auth_level_required
flights_api = Blueprint('flights_api', __name__)
@flights_api.route('/flights', methods=['GET'])
@jwt_required()
def get_flights():
"""
Get a list of the flights logged by the currently logged-in user
:return: List of flights
"""
try:
user = User.objects.get(username=get_jwt_identity())
except DoesNotExist:
current_app.logger.warning("User %s not found", get_jwt_identity())
return {"msg": "user not found"}, 401
flights = Flight.objects(user=user.id).to_json()
return flights, 200
@flights_api.route('/flights/all', methods=['GET'])
@jwt_required()
@auth_level_required(AuthLevel.ADMIN)
def get_all_flights():
"""
Get a list of all flights logged by any user
:return: List of flights
"""
flights = Flight.objects.to_json()
return flights, 200
@flights_api.route('/flights/<flight_id>', methods=['GET'])
@jwt_required()
def get_flight(flight_id):
"""
Get all details of a given flight
:param flight_id: ID of requested flight
:return: Flight details
"""
try:
user = User.objects.get(username=get_jwt_identity())
except DoesNotExist:
current_app.logger.warning("User %s not found", get_jwt_identity())
return {"msg": "user not found"}, 401
flight = Flight.objects(id=flight_id).to_json()
if flight.user != user.id and AuthLevel(user.level) != AuthLevel.ADMIN:
current_app.logger.warning("Attempted access to unauthorized flight by %s", user.username)
return {"msg": "Unauthorized access"}, 403
return flight, 200
@flights_api.route('/flights', methods=['POST'])
@jwt_required()
def add_flight():
"""
Add a flight logbook entry
:return: Error message if request invalid, else ID of newly created log
"""
try:
user = User.objects.get(username=get_jwt_identity())
except DoesNotExist:
current_app.logger.warning("User %s not found", get_jwt_identity())
return {"msg": "user not found"}, 401
body = request.get_json()
try:
flight = Flight(user=user, **body).save()
except ValidationError:
return jsonify({"msg": "Invalid request"})
id = flight.id
return jsonify({'id': str(id)}), 201
@flights_api.route('/flights/<flight_id>', methods=['PUT'])
@jwt_required()
def update_flight(flight_id):
"""
Update the given flight with new information
:param flight_id: ID of flight to update
:return: Error messages if user not found or access unauthorized, else 200
"""
try:
user = User.objects.get(username=get_jwt_identity())
except DoesNotExist:
current_app.logger.warning("User %s not found", get_jwt_identity())
return {"msg": "user not found"}, 401
flight = Flight.objects(id=flight_id)
if flight.user != user and AuthLevel(user.level) != AuthLevel.ADMIN:
current_app.logger.warning("Attempted access to unauthorized flight by %s", user.username)
return {"msg": "Unauthorized access"}, 403
body = request.get_json()
flight.update(**body)
return '', 200
@flights_api.route('/flights/<flight_id>', methods=['DELETE'])
def delete_flight(flight_id):
"""
Delete the given flight
:param flight_id: ID of flight to delete
:return: Error messages if user not found or access unauthorized, else 200
"""
try:
user = User.objects.get(username=get_jwt_identity())
except DoesNotExist:
current_app.logger.warning("User %s not found", get_jwt_identity())
return {"msg": "user not found"}, 401
flight = Flight.objects(id=flight_id)
if flight.user != user and AuthLevel(user.level) != AuthLevel.ADMIN:
current_app.logger.warning("Attempted access to unauthorized flight by %s", user.username)
return {"msg": "Unauthorized access"}, 403
flight.delete()
return '', 200

191
api/routes/users.py Normal file
View File

@ -0,0 +1,191 @@
import bcrypt
from flask import Blueprint, request, jsonify, current_app
from flask_jwt_extended import create_access_token, get_jwt, get_jwt_identity, unset_jwt_cookies, jwt_required, \
JWTManager
from mongoengine import DoesNotExist, ValidationError
from database.models import AuthLevel, User, Flight
from routes.utils import auth_level_required
users_api = Blueprint('users_api', __name__)
@users_api.route('/users', methods=["POST"])
@jwt_required()
@auth_level_required(AuthLevel.ADMIN)
def add_user():
"""
Add user to database.
:return: Failure message if user already exists, otherwise ID of newly created user
"""
body = request.get_json()
try:
username = body["username"]
password = body["password"]
except KeyError:
return jsonify({"msg": "Missing username or password"})
try:
auth_level = AuthLevel(body["auth_level"])
except KeyError:
auth_level = AuthLevel.USER
try:
existing_user = User.objects.get(username=username)
current_app.logger.info("User %s already exists at auth level %s", existing_user.username, existing_user.level)
return jsonify({"msg": "Username already exists"})
except DoesNotExist:
current_app.logger.info("Creating user %s with auth level %s", username, auth_level)
hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
user = User(username=username, password=hashed_password, level=auth_level.value)
try:
user.save()
except ValidationError:
return jsonify({"msg": "Invalid request"})
return jsonify({"id": str(user.id)}), 201
@users_api.route('/users/<user_id>', methods=['DELETE'])
@jwt_required()
@auth_level_required(AuthLevel.ADMIN)
def remove_user(user_id):
"""
Delete given user from database
:param user_id: ID of user to delete
:return: 200 if success, 401 if user does not exist
"""
try:
User.objects.get(id=user_id).delete()
except DoesNotExist:
current_app.logger.info("Attempt to delete nonexistent user %s by %s", user_id, get_jwt_identity())
return {"msg": "User does not exist"}, 401
Flight.objects(user=user_id).delete()
return '', 200
@users_api.route('/users', methods=["GET"])
@jwt_required()
@auth_level_required(AuthLevel.ADMIN)
def get_users():
"""
Get a list of all users
:return: List of users in the database
"""
users = User.objects.to_json()
return users, 200
@users_api.route('/login', methods=["POST"])
def create_token():
"""
Log in as given user and return JWT for API access
:return: 401 if username or password invalid, else JWT
"""
body = request.get_json()
try:
username = body["username"]
password = body["password"]
except KeyError:
return jsonify({"msg": "Missing username or password"})
try:
user = User.objects.get(username=username)
except DoesNotExist:
return jsonify({"msg": "Invalid username or password"}), 401
else:
if bcrypt.checkpw(password.encode('utf-8'), user.password.encode('utf-8')):
access_token = create_access_token(identity=username)
current_app.logger.info("%s successfully logged in", username)
response = {"access_token": access_token}
return jsonify(response), 200
current_app.logger.info("Failed login attempt from %s", request.remote_addr)
return jsonify({"msg": "Invalid username or password"}), 401
@users_api.route('/logout', methods=["POST"])
def logout():
"""
Log out given user. Note that JWTs cannot be natively revoked so this must also be handled by the frontend
:return: Message with JWT removed from headers
"""
response = jsonify({"msg": "logout successful"})
unset_jwt_cookies(response)
return response
@users_api.route('/profile/<user_id>', methods=["GET"])
@jwt_required()
@auth_level_required(AuthLevel.ADMIN)
def get_user_profile(user_id):
"""
Get profile of the given user
:param user_id: ID of the requested user
:return: 401 is user does not exist, else username and auth level
"""
try:
user = User.objects.get(id=user_id)
except DoesNotExist:
current_app.logger.warning("User %s not found", get_jwt_identity())
return {"msg": "User not found"}, 401
return jsonify({"username": user.username, "auth_level:": str(user.level)}), 200
@users_api.route('/profile/<user_id>', methods=["PUT"])
@jwt_required()
@auth_level_required(AuthLevel.ADMIN)
def update_user_profile(user_id):
"""
Update the profile of the given user
:param user_id: ID of the user to update
:return: Error messages if request is invalid, else 200
"""
try:
user = User.objects.get(id=user_id)
except DoesNotExist:
current_app.logger.warning("User %s not found", get_jwt_identity())
return jsonify({"msg": "User not found"}), 401
body = request.get_json()
return update_profile(user.id, body["username"], body["password"], body["auth_level"])
@users_api.route('/profile', methods=["GET"])
@jwt_required()
def get_profile():
"""
Return basic user information for the currently logged-in user
:return: 401 if user not found, else username and auth level
"""
try:
user = User.objects.get(username=get_jwt_identity())
except DoesNotExist:
current_app.logger.warning("User %s not found", get_jwt_identity())
return jsonify({"msg": "User not found"}), 401
return jsonify({"username": user.username, "auth_level:": str(user.level)}), 200
@users_api.route('/profile', methods=["PUT"])
@jwt_required()
def update_profile():
"""
Update the profile of the currently logged-in user
:return: Error messages if request is invalid, else 200
"""
try:
user = User.objects.get(username=get_jwt_identity())
except DoesNotExist:
current_app.logger.warning("User %s not found", get_jwt_identity())
return {"msg": "user not found"}, 401
body = request.get_json()
return update_profile(user.id, body["username"], body["password"], body["auth_level"])

57
api/routes/utils.py Normal file
View File

@ -0,0 +1,57 @@
import os
import bcrypt
from flask import current_app
from flask_jwt_extended import get_jwt_identity
from database.models import AuthLevel, User
def auth_level_required(level: AuthLevel):
"""
Limit access to given authorization level.
:param level: Required authorization level to access this endpoint
:return: 403 Unauthorized upon auth failure or response of decorated function on auth success
"""
def auth_inner(func):
def auth_wrapper(*args, **kwargs):
user = User.objects.get(username=get_jwt_identity())
if AuthLevel(user.level) < level:
current_app.logger.warning("Attempted access to unauthorized resource by %s", user.username)
return '', 403
else:
return func(*args, **kwargs)
auth_wrapper.__name__ = func.__name__
return auth_wrapper
return auth_inner
def create_admin_user():
"""
Create default admin user if no admin users are present in the database
:return: None
"""
if User.objects(level=AuthLevel.ADMIN.value).count() == 0:
current_app.logger.info("No admin users exist. Creating default admin user...")
try:
admin_username = os.environ["TAILFIN_ADMIN_USERNAME"]
current_app.logger.info("Setting admin username to 'TAILFIN_ADMIN_USERNAME': %s", admin_username)
except KeyError:
admin_username = "admin"
current_app.logger.info("'TAILFIN_ADMIN_USERNAME' not set, using default username 'admin'")
try:
admin_password = os.environ["TAILFIN_ADMIN_PASSWORD"]
current_app.logger.info("Setting admin password to 'TAILFIN_ADMIN_PASSWORD'")
except KeyError:
admin_password = "admin"
current_app.logger.warning("'TAILFIN_ADMIN_PASSWORD' not set, using default password 'admin'\n"
"Change this as soon as possible")
hashed_password = bcrypt.hashpw(admin_password.encode('utf-8'), bcrypt.gensalt())
User(username=admin_username, password=hashed_password, level=AuthLevel.ADMIN.value).save()
current_app.logger.info("Default admin user created with username %s", User.objects.get(level=AuthLevel.ADMIN).username)