JSON Web Tokens (JWT) serve as a secure means of transmitting information in JSON format, providing a way to establish trust between different parties in a web application.
Each section is divided by a dot
{
# For example:
"alg": "HS256",
"typ": "JWT"
}
{
# For Example:
"sub": "1234567890",
"name": "John Doe",
"admin": true
}
# Using the HMAC SHA256 algorithm the signature will be created as such:
HMACSHA256(
base64UrlEncode(header) + "." +
base64UrlEncode(payload),
secret)
Used for signing JWT tokens
Principals of REST-
A Java Web Token (JWT) is often used in the context of RESTful APIs to secure and authenticate communication between clients and servers.
PyJWT will be used to encode and decode JWT tokens Flask_SqlAlchemy Flask_Migrate Flask_Restful Flask_Cors PyJWT
Modify the app.config [‘SECRET_KEY’] to try and get secret key for encoding and decoding the JWT from the variable SECRET_KEY. It defaults to “SECRET_KEY” for the key.
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
import os
"""
These objects can be used throughout project.
"""
@@ -14,7 +15,8 @@
dbURI = 'sqlite:///volumes/sqlite.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_DATABASE_URI'] = dbURI
SECRET_KEY = os.environ.get('SECRET_KEY') or 'SECRET_KEY'
app.config['SECRET_KEY'] = SECRET_KEY
db = SQLAlchemy()
Migrate(app, db)
We need to prevent storing passwords in plaintext (for security reasons) so we need to hash passwords when initializing users in the database (in migration sript). All the user set_password methods are set to the same hash method.
from alembic import op
import sqlalchemy as sa
from datetime import date
from werkzeug.security import generate_password_hash
# revision identifiers, used by Alembic.
revision = '5ac11951f352'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
players_table = op.create_table('players',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('_name', sa.String(length=255), nullable=False),
sa.Column('_uid', sa.String(length=255), nullable=False),
sa.Column('_password', sa.String(length=255), nullable=False),
sa.Column('_tokens', sa.Integer(), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('_uid')
)
users_table = op.create_table('users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('_name', sa.String(length=255), nullable=False),
sa.Column('_uid', sa.String(length=255), nullable=False),
sa.Column('_password', sa.String(length=255), nullable=False),
sa.Column('_dob', sa.Date(), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('_uid')
)
posts_table = op.create_table('posts',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('note', sa.Text(), nullable=False),
sa.Column('image', sa.String(), nullable=True),
sa.Column('userID', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['userID'], ['users.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.bulk_insert(players_table,
[
{'id': 1,
'_name': "Azeem Khan",
'_uid': "azeemK",
'_password': generate_password_hash("123qwerty", "pbkdf2:sha256", salt_length=10),
"_tokens": 45},
{'id': 2,
'_name': "Ahad Biabani",
'_uid': "ahadB",
'_password': generate_password_hash("123qwerty", "pbkdf2:sha256", salt_length=10),
"_tokens": 41},
{'id': 3,
'_name': "Akshat Parikh",
'_uid': "akshatP",
'_password': generate_password_hash("123qwerty", "pbkdf2:sha256", salt_length=10),
"_tokens": 40},
{'id': 4,
'_name': "Josh Williams",
'_uid': "joshW",
'_password': generate_password_hash("123qwerty", "pbkdf2:sha256", salt_length=10),
"_tokens": 38},
]
)
op.bulk_insert(users_table,
[
{'id': 1,
'_name': "Thomas Edison",
'_uid': "toby",
'_password': generate_password_hash("123toby", "pbkdf2:sha256", salt_length=10),
"_dob": date(1847, 2, 11)},
{'id': 2,
'_name': "Nicholas Tesla",
'_uid': "niko",
'_password': generate_password_hash("123niko", "pbkdf2:sha256", salt_length=10),
"_dob": date(1856, 7, 10)},
{'id': 3,
'_name': "Alexander Graham Bell",
'_uid': "lex",
'_password': generate_password_hash("123qwerty", "pbkdf2:sha256", salt_length=10),
"_dob": date.today()},
{'id': 4,
'_name': "Grace Hopper",
'_uid': "hop",
'_password': generate_password_hash("123hop", "pbkdf2:sha256", salt_length=10),
"_dob": date(1906, 12, 9)},
]
)
op.bulk_insert(posts_table,
[
{'id': 1,
'note': "#### Thomas Edison Test Note ####",
'image': "ncs_logo.png",
'userID': 1
},
{'id': 2,
'note': "#### Nicholas Tesla Test Note ####",
'image': "ncs_logo.png",
'userID': 2
},
{'id': 3,
'note': "#### Alexander Graham Bell Test Note ####",
'image': "ncs_logo.png",
'userID': 3
},
{'id': 4,
'note': "#### Grace Hopper Test Note ####",
'image': "ncs_logo.png",
'userID': 4},
]
)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('posts')
op.drop_table('users')
op.drop_table('players')
# ### end Alembic commands ###
# update password, this is conventional setter
def set_password(self, password):
"""Create a hashed password."""
self._password = generate_password_hash(password, "pbkdf2:sha256", salt_length=10)
# check password parameter versus stored/encrypted password
def is_password(self, password):
if(self.password==password):
return "correct password"
return "wrong password"
This authentication middleware functions as a checkpoint for all incoming requests to the web server before reaching the API. Its primary task is to check for the presence of a JWT cookie in the incoming request. In cases where the cookie is absent, the middleware responds with a 401 Unauthorized Error, indicating that the required token is missing. If a cookie is found, the middleware proceeds to decode the token, extracting the user ID (uid). Following this, it validates the existence of a user associated with that ID.
from functools import wraps
import jwt
from flask import request, abort
from flask import current_app
from model.users import User
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = request.cookies.get("jwt")
print(token)
if not token:
return {
"message": "Authentication Token is missing!",
"data": None,
"error": "Unauthorized"
}, 401
try:
data=jwt.decode(token, current_app.config["SECRET_KEY"], algorithms=["HS256"])
print(data["_uid"])
current_user=User.query.filter_by(_uid=data["_uid"]).first()
if current_user is None:
return {
"message": "Invalid Authentication token!",
"data": None,
"error": "Unauthorized"
}, 401
except Exception as e:
return {
"message": "Something went wrong",
"data": None,
"error": str(e)
}, 500
return f(current_user, *args, **kwargs)
return decorated
The endpoint /api/users/authenticate now provides a JWT token based on the request’s JSON data, which includes the uid and password. Several checks are performed, ensuring the presence of the request body, existence of the uid, presence of a user with the specified uid, and correctness of the provided password. If all checks pass, the uid is encoded into a JWT token, which is then returned as a cookie.
Additionally, the @token_required decorator is applied to the get and post methods so that only authenticated users can reach and access resources.
import json, jwt
from flask import Blueprint, request, jsonify, current_app, Response
from flask_restful import Api, Resource # used for REST API building
from datetime import datetime
from auth_middleware import token_required
from model.users import User
user_api = Blueprint('user_api', __name__,
url_prefix='/api/users')
# API docs https://flask-restful.readthedocs.io/en/latest/api.html
api = Api(user_api)
class UserAPI:
class _CRUD(Resource): # User API operation for Create, Read. THe Update, Delete methods need to be implemeented
@token_required
def post(self, current_user): # Create method
''' Read data for json body '''
body = request.get_json()
''' Avoid garbage in, error checking '''
# validate name
name = body.get('name')
if name is None or len(name) < 2:
return {'message': f'Name is missing, or is less than 2 characters'}, 400
# validate uid
uid = body.get('uid')
if uid is None or len(uid) < 2:
return {'message': f'User ID is missing, or is less than 2 characters'}, 400
# look for password and dob
password = body.get('password')
dob = body.get('dob')
''' #1: Key code block, setup USER OBJECT '''
uo = User(name=name,
uid=uid)
''' Additional garbage error checking '''
# set password if provided
if password is not None:
uo.set_password(password)
# convert to date type
if dob is not None:
try:
uo.dob = datetime.strptime(dob, '%Y-%m-%d').date()
except:
return {'message': f'Date of birth format error {dob}, must be mm-dd-yyyy'}, 400
''' #2: Key Code block to add user to database '''
# create user in database
user = uo.create()
# success returns json of user
if user:
return jsonify(user.read())
# failure returns error
return {'message': f'Processed {name}, either a format error or User ID {uid} is duplicate'}, 400
@token_required
def get(self, current_user): # Read Method
users = User.query.all() # read/extract all users from database
json_ready = [user.read() for user in users] # prepare output in json
return jsonify(json_ready) # jsonify creates Flask response object, more specific to APIs than json.dumps
class _Security(Resource):
def post(self):
try:
body = request.get_json()
if not body:
return {
"message": "Please provide user details",
"data": None,
"error": "Bad request"
}, 400
''' Get Data '''
uid = body.get('uid')
if uid is None:
return {'message': f'User ID is missing'}, 400
password = body.get('password')
''' Find user '''
user = User.query.filter_by(_uid=uid).first()
print(user._uid,user._password)
if user is None or not user.is_password(password):
return {'message': f"Invalid user id or password"}, 400
if user:
try:
token = jwt.encode(
{"_uid": user._uid},
current_app.config["SECRET_KEY"],
algorithm="HS256"
)
resp = Response("Authentication for %s successful" % (user._uid))
resp.set_cookie("jwt", token,
max_age=3600,
secure=True,
httponly=True,
path='/'
# domain="frontend.com"
)
return resp
except Exception as e:
return {
"error": "Something went wrong",
"message": str(e)
}, 500
return {
"message": "Error fetching auth token!, invalid email or password",
"data": None,
"error": "Unauthorized"
},
except Exception as e:
return {
"message": "Something went wrong!",
"error": str(e),
"data": None
}, 500
# building RESTapi endpoint
api.add_resource(_CRUD, '/')
api.add_resource(_Security, '/authenticate')
pip install Flask-JWT-Extended
from flask_jwt_extended import JWTManager
# You don't have to add this again if you already have it.
app = Flask(__name__)
# Setup the Flask-JWT-Extended extension
app.config["JWT_SECRET_KEY"] = "SECRET_KEY" # Remember to change "secret" to a more complex key
jwt = JWTManager(app)
Using POST because it is creating tokens (POST is for creation)
POST /token
Content-type: application/json
Body:
{
"username": "alesanchezr",
"password": "12341234"
}
from flask_jwt_extended import create_access_token
# Create a route to authenticate your users and return JWT Token
# The create_access_token() function is used to actually generate the JWT
@app.route("/token", methods=["POST"])
def create_token():
username = request.json.get("username", None)
password = request.json.get("password", None)
# Query your database for username and password
user = User.query.filter_by(username=username, password=password).first()
if user is None:
# The user was not found on the database
return jsonify({"msg": "Bad username or password"}), 401
# Create a new token with the user id inside
access_token = create_access_token(identity=user.id)
return jsonify({ "token": access_token, "user_id": user.id })
Endpoints that require authorization (private endpoints) should use the @jwt_required() decorator. You can retrieve valid, authenticated user information using the get_jwt_identity function.
from flask_jwt_extended import jwt_required, get_jwt_identity
# Protect a route with jwt_required, which will kick out requests without a valid JWT
@app.route("/protected", methods=["GET"])
@jwt_required()
def protected():
# Access the identity of the current user with get_jwt_identity
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
return jsonify({"id": user.id, "username": user.username }), 200
Based on earlier endpoints, we have to POST /token with username + password information in request body.
const login = async (username, password) => {
const resp = await fetch(`https://your_api.com/token`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ username, password })
})
if(!resp.ok) throw Error("There was a problem in the login request")
if(resp.status === 401){
throw("Invalid credentials")
}
else if(resp.status === 400){
throw ("Invalid email or password format")
}
const data = await resp.json()
// Save your token in the localStorage
// Also you should set your user into the store using the setItem function
localStorage.setItem("jwt-token", data.token);
return data
}
Now, lets say I’ve logged in to the front-end application and I want to retrieve sensitive data:
// Assuming "/protected" is a private endpoint
const getMyTasks = async () => {
// Retrieve token from localStorage
const token = localStorage.getItem('jwt-token');
const resp = await fetch(`https://your_api.com/protected`, {
method: 'GET',
headers: {
"Content-Type": "application/json",
'Authorization': 'Bearer ' + token // ⬅⬅⬅ authorization token
}
});
if(!resp.ok) {
throw Error("There was a problem in the login request")
} else if(resp.status === 403) {
throw Error("Missing or invalid token");
} else {
throw Error("Unknown error");
}
const data = await resp.json();
console.log("This is the data you requested", data);
return data
}
Cross-Origin Resource Sharing (CORS) is a crucial aspect of web development, allowing or restricting requests from different origins. This configuration is essential to ensure that your Flask application can seamlessly handle requests from various sources, promoting security and compatibility.
Change: Added CORS support for deployed and local runs.
cors = CORS(app, supports_credentials=True)
The concept of “Dynamic Allowed Origins” refers to the ability to dynamically set the allowed origins for Cross-Origin Resource Sharing (CORS) based on the incoming request’s ‘Origin’ header. In CORS, the ‘Origin’ header indicates the origin (domain) of the requesting client.
Change: A before_request hook is implemented to dynamically set the allowed origin based on the incoming request’s ‘Origin’ header.
@app.before_request
def before_request():
allowed_origin = request.headers.get('Origin')
if allowed_origin in ['http://localhost:4100', 'http://127.0.0.1:4100', 'https://nighthawkcoders.github.io']:
cors._origins = allowed_origin
The SameSite attribute in a cookie is a security feature that controls whether a cookie should be sent with cross-site requests. It is set within the Set-Cookie header when a server instructs a browser to store a cookie on the user’s device. The SameSite attribute helps mitigate certain types of cross-site request forgery (CSRF) attacks.
Change: The SameSite attribute is set to ‘None’ in the set_cookie method to allow cross-site requests.
from urllib import response
response.set_cookie(
"jwt",
token,
max_age=3600,
secure=True,
httponly=True,
path='/',
samesite='None'
)
The Nginx preflighted requests configuration is designed to handle preflighted requests. Preflighted requests are HTTP OPTIONS requests sent by the browser before the actual request to check whether the server will accept the actual request.
Change: Nginx configuration is added to handle preflighted requests, ensuring that only the specified frontend server is allowed.
if ($request_method = OPTIONS) {
add_header "Access-Control-Allow-Credentials" "true" always;
add_header "Access-Control-Allow-Origin" "https://nighthawkcoders.github.io" always;
add_header "Access-Control-Allow-Methods" "GET, POST, PUT, OPTIONS, HEAD" always;
add_header "Access-Control-Allow-MaxAge" 600 always;
add_header "Access-Control-Allow-Headers" "Authorization, Origin, X-Requested-With, Content-Type, Accept" always;
return 204;
}
Cookies are small pieces of data stored on the user’s browser by website
Cookies can be used to store JWTs.
The process of maintaining tokens in cookies and transmitting them in HTTP requests involves several steps (we’ve gone over this earlier, but lets go through it again in the context of cookies):
Postman serves as a valuable tool in the realm of computer science, specifically designed for testing and interacting with Application Programming Interfaces (APIs). Postman can be likened to a sophisticated testing environment that facilitates the exploration of API functionalities.
Within Postman, users can craft and dispatch HTTP requests to APIs, allowing for a comprehensive examination of how different software components communicate and respond. This tool is instrumental in ensuring the seamless integration of APIs into software applications. Its intuitive interface provides a user-friendly platform to engage in the essential practice of testing and validating API behavior.
Postman allows you to include JWTs in your requests easily. Here’s different ways of how you can use Postman to send a request with a JWT:
This way, Postman enables you to simulate API requests with JWTs, ensuring that your authentication mechanisms are working as expected. If everything is working as expected, you will recieve a status code in the 2xx range (e.g., 200 OK) which typically indicates success. If you do not recieve a 200 OK message, Postman will let you know what is wrong so you can catch errors in your code.
Similar to JWT, Postman allows you to manage cookies as efficently. Here’s different ways you can work with cookies in Postman: