HomePythonPython REST API – Authentication

Python REST API – Authentication

Authentication is a very part of any rest api. Authentication means certain routes are protected via login and certain routes are protected via roles.

We will use jwt token to our authentication purposes and use the flask library https://flask-jwt-extended.readthedocs.io/en/latest/

If you are not familiar with what is JWT token, its best to google about it as there would be many resources online.

Let’s start by installing the token in our app

pip install flask-jwt-extended

Next follow this guide https://flask-jwt-extended.readthedocs.io/en/latest/basic_usage.html and setup the basic route for login/ and protected/ route its very straight forward.

Next test it on postman as follows

access token generated!

and now next lets test the protected route

works!

So this basic authentication system works well.

In a more real world auth system, we would have a registration route, users table, roles, permissions and database. Let’s try to implement all these step by step for now without the database.

User Registration

Let’s create a simple route to register a user, we will encrypt password and generate a unique id for every user. Store users in a list.

We will be using some new packages in this e.g passlib so that needs to be installed first

pip install passlib

Here is the code

 
users = []

@app.route('/register', methods=['POST'])
def register():
if not request.json:
abort(500)

username = request.json.get("username", None)
password = request.json.get("password", None)
name = request.json.get("name", None)

if username is None or password is None or name is None:
abort(500)

global users

users = [user for user in users if user["username"] == username]

if len(users) > 0:
return jsonify("username already exists!") , 500

id = uuid.uuid4().hex
hash = pbkdf2_sha256.hash(password)

users.append({
"id" : id,
"username" : username,
"password" : hash,
"name" : name
})

return jsonify(id)
Registration Works!
username validation!

all good, real easy!

LOGIN

 
@app.route('/login', methods=['POST'])
def login():
if not request.is_json:
return jsonify(msg="Missing JSON in request"), 500

username = request.json.get('username', None)
password = request.json.get('password', None)
if not username:
return jsonify(msg="Missing username parameter"), 500
if not password:
return jsonify(msg="Missing password parameter"), 500

user = [user for user in users if user["username"] == username]

if len(user) == 0:
return jsonify("username failed!"), 400

if not pbkdf2_sha256.verify(password, user[0]["password"]):
return jsonify("password failed!"), 400

access_token = create_access_token(identity=user[0]["id"])
return jsonify(access_token=access_token), 200

In the above login code, we are using user id to create access token, but many times its much easier simply use a user object to create access token and code looks much readable. To do that we need make a custom function to create access token as described here https://flask-jwt-extended.readthedocs.io/en/latest/tokens_from_complex_object.html

Let’s see how the code looks

 
# Create a function that will be called whenever create_access_token
# is used. It will take whatever object is passed into the
# create_access_token method, and lets us define what custom claims
# should be added to the access token.

@jwt.user_identity_loader
def user_identity_lookup(user):
return user["id"]

@app.route('/login', methods=['POST'])
def login():
if not request.is_json:
return jsonify(msg="Missing JSON in request"), 500

username = request.json.get('username', None)
password = request.json.get('password', None)
if not username:
return jsonify({msg="Missing username parameter"), 500
if not password:
return jsonify({msg="Missing password parameter"), 500

global users
users = [user for user in users if user["username"] == username]

if len(users) == 0:
return jsonify("username failed!"), 400

user = users[0]

if not pbkdf2_sha256.verify(password, user["password"]):
return jsonify("password failed!"), 400

access_token = create_access_token(identity=user)
return jsonify(access_token=access_token), 200

Profile

Now let’s look at the profile/ route, this will be a protected route to access data

 
# Protect a view with jwt_required, which requires a valid access token
# in the request to access.
@app.route('/profile', methods=['GET'])
@jwt_required
def profile():
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
return jsonify(logged_in_as=current_user), 200

Profile Route

This looks good, except we need a generic method to load full user data from access key. As almost all protected routes would need user data, so it needs to be a generic global function.

Let’s see how to do it

 
@jwt.user_loader_callback_loader
def user_loader_callback(identity):
print(identity)
global users
print(users)
users = [user for user in users if user["id"] == identity]
print(users)
if len(users) > 0:
del users[0]["password"]
return users[0]
return []

# Protect a view with jwt_required, which requires a valid access token
# in the request to access.


@app.route('/profile', methods=['GET'])
@jwt_required
def profile():
# Access the identity of the current user with get_current_user
current_user = get_current_user()
return jsonify(logged_in_as=current_user), 200

ROLES

Now we need to assign different user roles to user, like assume we have only two roles for now ADMIN and NORMAL

For now, we will just put roles based on username instead of having an extra column for roles.

 
def admin_required(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
verify_jwt_in_request()
user = get_current_user()
if user["username"] == "manish":
return fn(*args, **kwargs)

return jsonify(msg='Admins only!'), 403
return wrapper


@app.route("/admin_only")
@jwt_required
@admin_required
def admin_only():
return ""

At this stage we have most of the things need to setup authorization with rest api.

At good assignment would be integrate the authorization with todo api’s. The way to setup that would be

  1. /todo GET to have optional authorization i.e if user is logged in he can see only his tasks, if he is admin user then he can see tasks of all users and if its not logged in at all it should return only the latest 5 tasks
  2. /todo DELETE, PUT, POST can only be done via logged in user and admin can delete anyone’s task
  3. /todo can only marked done by admin

Its best to do this fully on your own, i will add full code reference below

 
import datetime
from flask import Flask, jsonify, abort, request

from flask_jwt_extended import (
JWTManager, jwt_required, create_access_token,
get_jwt_identity, get_current_user, verify_jwt_in_request,
jwt_optional
)
from passlib.hash import pbkdf2_sha256

from functools import wraps


import uuid


app = Flask(__name__)

app.config['JWT_SECRET_KEY'] = 'xxxxxxxxxxxxxx' # Change this!
jwt = JWTManager(app)


tasks = [
{
'id': 1,
'title': 'Buy groceries',
'description': 'Milk, Cheese, Pizza, Fruit, Tylenol',
'done': False,
'due': datetime.datetime.now()
},
{
'id': 2,
'title': 'Learn Python',
'description': 'Need to find a good Python tutorial on the web',
'done': False,
'due': datetime.datetime.now()
}
]

users = []


@app.route('/register', methods=['POST'])
def register():
if not request.json:
abort(500)

username = request.json.get("username", None)
password = request.json.get("password", None)
name = request.json.get("name", None)

if username is None or password is None or name is None:
abort(500)

global users

users = [user for user in users if user["username"] == username]

if len(users) > 0:
return jsonify("username already exists!"), 500

id = uuid.uuid4().hex
hash = pbkdf2_sha256.hash(password)

users.append({
"id": id,
"username": username,
"password": hash,
"name": name
})

return jsonify(id)


# Create a function that will be called whenever create_access_token
# is used. It will take whatever object is passed into the
# create_access_token method, and lets us define what custom claims
# should be added to the access token.

@jwt.user_identity_loader
def user_identity_lookup(user):
return user["id"]


@app.route('/login', methods=['POST'])
def login():
if not request.is_json:
return jsonify({msg="Missing JSON in request"), 500

username = request.json.get('username', None)
password = request.json.get('password', None)
if not username:
return jsonify({msg="Missing username parameter"), 500
if not password:
return jsonify(msg="Missing password parameter"), 500

global users
users = [user for user in users if user["username"] == username]

if len(users) == 0:
return jsonify("username failed!"), 400

user = users[0]

if not pbkdf2_sha256.verify(password, user["password"]):
return jsonify("password failed!"), 400

access_token = create_access_token(identity=user)
return jsonify(access_token=access_token), 200


@jwt.user_loader_callback_loader
def user_loader_callback(identity):
global users
users = [user for user in users if user["id"] == identity]
if len(users) > 0:
if users[0]["username"] == "manish":
users[0]["role"] = "admin"
else:
users[0]["role"] = "normal"
return users[0]
return {}

# Protect a view with jwt_required, which requires a valid access token
# in the request to access.


@app.route('/profile', methods=['GET'])
@jwt_required
def profile():
# Access the identity of the current user with get_current_user
current_user = get_current_user()
return jsonify(logged_in_as=current_user), 200


def admin_required(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
verify_jwt_in_request()
user = get_current_user()
if user["username"] == "manish":
return fn(*args, **kwargs)

return jsonify(msg='Admins only!'), 403
return wrapper


@app.route("/admin_only")
@jwt_required
@admin_required
def admin_only():
return ""


def sort_due_date(x):
return x["due"]


@app.route('/todo', methods=["GET"])
@app.route('/todo/<string:direction>', methods=["GET"])
@jwt_optional
def todo(direction=None):
# direction is optional
current_user = get_current_user()
print(current_user)
if direction == "ASC":
direction = True
else:
direction = False

global tasks
if direction is not None:
tasks.sort(reverse=direction, key=sort_due_date)

if current_user is not None and "id" in current_user:
if current_user["role"] == "normal":
tasks = [task for task in tasks if task["user"] == current_user["id"] ]
else:
tasks = [task for i, task, in enumerate(tasks) if i < 5 ]
#https://stackoverflow.com/questions/41003206/getting-index-with-list-comprehension-in-a-list-in-python
return jsonify(tasks)


@app.route('/todo', methods=["POST"])
@jwt_required
def add_todo():
if not request.json:
abort(500)

title = request.json.get("title", None)
desc = request.json.get("description", "")

due = request.json.get("due", None)

if due is not None:
due = datetime.datetime.strptime(due, "%d-%m-%Y")
else:
due = datetime.datetime.now()

current_user = get_current_user()

global tasks
tasks.append({
"id": len(tasks) + 1,
"title": title,
"description": desc,
"done": False,
"due": due,
"user" : current_user["id"]
})
return jsonify(len(tasks))


def update(task, task_id, data):
if task["id"] == task_id:
if 'title' in data:
task["title"] = data['title']
if 'description' in data:
task["description"] = data["description"]
return task


@app.route("/todo/<int:id>", methods=['PUT'])
@jwt_required
def update_todo(id):

if not request.json:
abort(500)

todo_id = request.json.get("id", None)
title = request.json.get("title", None)
desc = request.json.get("description", "")

if todo_id is None or title is None:
return jsonify(message="Invalid Request"), 500

global tasks
tasks = [update(task, id, request.json) for task in tasks]

return jsonify(tasks)


@app.route("/todo/<int:id>", methods=["DELETE"])
def delete_todo(id):

global tasks
tasks = [task for task in tasks if task["id"] != id]

return jsonify(tasks)


def mark(task, status, task_id):
if task_id == task["id"]:
task["done"] = status

return task


@app.route("/todo/mark/<int:task_id>/<int:status>", methods=["PUT"])
@jwt_required
@admin_required
def mark_task(task_id, status):

global tasks
if status == 1:
status = True
else:
status = False

tasks = [mark(task, status, task_id) for task in tasks]

return jsonify(tasks)

Leave a Reply

Your email address will not be published. Required fields are marked *

%d bloggers like this: