奇技淫巧实现JWT认证登录

最近在写博客的友链的登录界面的时候总是无法实用@jwt_required()装饰器来保护我的登录时,总是不能使用,于是我找了个折中的办法来实现了登录。

@jwt_required()装饰器用于保护你的路由,要求必须携带有效的令牌才能访问该路由。当访问该路由时,JWT中间件会自动验证令牌的有效性。如果令牌无效或已过期,将返回错误信息(Invalid token)和HTTP状态码401(未授权)。

安装软件包

pip install PyJWT
pip install flask
pip install Flask-JWT-Extended
pip install werkzeug

创建登录

首先我们创建一个app.py,代码如下:

from flask import Flask, abort, jsonify, request
from flask_jwt_extended import JWTManager, create_access_token
from werkzeug.security import generate_password_hash, check_password_hash

app = Flask(__name__)
access_token = None
#生成密码哈希
password_hash = generate_password_hash('123456')
#配置JWT密钥
app.config['JWT_SECRET_KEY'] = 'jwt-secret-string'
jwt = JWTManager(app)


@app.route('/login', methods=['POST'])
def login():
    #获取请求参数
    password = request.json.get('password', None)
    if not check_password_hash(password_hash, password):
        #密码错误
        abort(403)
    #生成access_token,并全局使用
    global access_token
    access_token = create_access_token(identity=password)
    return jsonify(code=200,access_token=access_token)
if __name__ == "__main__":
    app.run(debug=True, host='0.0.0.0', port=3005)

打开postman进行调试(在Edge也有可能叫Postwoman)
选择Post请求,接口地址输入http://127.0.0.1:3005/login,参数选择自定义格式application/json,输入{ "password": "123456"},点击发送,下方就会返回我们的access_token

自定义需要token的接口

打开app.py,添加如下代码:

def custom():
    #获取请求token
    access_token_headers = request.headers.get('Authorization', None)
    #判断token是否存在
    if not access_token_headers:
        return jsonify({"code": "err", "msg": "Missing authorization token"}), 401
    #判断本地是否存在token
    if access_token == None:
        return jsonify({"code": "err", "msg": "Please log in first!"}), 401
    #判断token是否相等
    if access_token_headers == access_token and access_token:
        try:
            json_data = request.get_json()
            print(json_data)
        except:
            abort(400, 'Invalid JSON')
        # 可以改为你自己需要的内容
        return jsonify(json_data)

按照上面的步骤,重新发送/login请求,复制你的token,然后修改接口地址改为http://127.0.0.1:3005/custom,请求头中key输入AuthorizationValue为你复制的token,参数选择自定义格式application/json,输入{ "code": "200"},点击发送,下方就会返回我们的json内容(可以自定义)

设置token失效时间

打开app.py,添加如下代码:

import threading
···
app = Flask(__name__)
# 设置定时器
timer = None
#配置失效token列表
BLACKLIST = set()
···
def login():
    ···
    #判断timer定时器是否存在,存在则取消
    if timer is not None:
        timer.cancel()
        block_token(access_token)
    #设置定时器,24小时后失效
    timer = threading.Timer(24 * 60 * 60, block_token, args=[access_token])
    timer.start()
    ···
def block_token(token):
    #将失效token加入到失效token列表
    BLACKLIST.add(token)
···
def custom():
    ···
    #判断token是否相等且不存在于失效token列表中
    if access_token_headers == access_token and access_token not in BLACKLIST:
        ···

仅放行需要的接口

打开app.py,添加如下代码:

···
#设置接口列表
@app.before_request
def before():
    if request.path not in ['/custom', '/login']:
        return jsonify({"code": "正常", "message": "{}".format("输入正确参数")})
···

设置@app.before_request后只有/custom和/login才会返回数据

完整代码

创建app.py,添加如下代码:

from flask import Flask, abort, jsonify, request
from flask_jwt_extended import JWTManager, create_access_token
from werkzeug.security import generate_password_hash, check_password_hash
import threading

app = Flask(__name__)
access_token = None
#生成密码哈希
password_hash = generate_password_hash('123456')
#配置JWT密钥
app.config['JWT_SECRET_KEY'] = 'jwt-secret-string'
# 设置定时器
timer = None
#配置失效token列表
BLACKLIST = set()
jwt = JWTManager(app)

#设置接口列表
@app.before_request
def before():
    if request.path not in ['/custom', '/login']:
        return jsonify({"code": "正常", "message": "{}".format("输入正确参数")})
    
@app.route('/login', methods=['POST'])
def login():
    #获取请求参数
    password = request.json.get('password', None)
    if not check_password_hash(password_hash, password):
        #密码错误
        abort(403)
    #生成access_token,timer并全局使用
    global access_token, timer
    access_token = create_access_token(identity=password)
    #判断timer定时器是否存在,存在则取消
    if timer is not None:
        timer.cancel()
        block_token(access_token)
    #设置定时器,24小时后失效
    timer = threading.Timer(24 * 60 * 60, block_token, args=[access_token])
    timer.start()
    return jsonify(code=200,access_token=access_token)

def block_token(token):
    #将失效token加入到失效token列表
    BLACKLIST.add(token)

@app.route('/custom', methods=['POST'])
def custom():
    #获取请求token
    access_token_headers = request.headers.get('Authorization', None)
    #判断token是否存在
    if not access_token_headers:
        return jsonify({"code": "err", "msg": "Missing authorization token"}), 401
    #判断本地是否存在token
    if access_token == None:
        return jsonify({"code": "err", "msg": "Please log in first!"}), 401
    #判断token是否相等且不存在于失效token列表中
    if access_token_headers == access_token and access_token not in BLACKLIST:
        try:
            json_data = request.get_json()
            print(json_data)
        except:
            abort(400, 'Invalid JSON')
        # 可以改为你自己需要的内容
        return jsonify(json_data)
if __name__ == "__main__":
    app.run(debug=True, host='0.0.0.0', port=3005)