最近在写博客的友链的登录界面的时候总是无法实用@jwt_required()装饰器来保护我的登录时,总是不能使用,于是我找了个折中的办法来实现了登录。

@jwt_required()装饰器用于保护你的路由,要求必须携带有效的令牌才能访问该路由。当访问该路由时,JWT中间件会自动验证令牌的有效性。如果令牌无效或已过期,将返回错误信息(Invalid token)和HTTP状态码401(未授权)。

安装软件包

1
2
3
4
pip install PyJWT
pip install flask
pip install Flask-JWT-Extended
pip install werkzeug

创建登录

首先我们创建一个app.py,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from flask import Flask, abort, jsonify, request
from flask_jwt_extended import JWTManager, create_access_token
from werkzeug.security import generate_password_hash, check_password_hash

app = Flask(__name__)
access_token = None
#生成密码哈希
password_hash = generate_password_hash('123456')
#配置JWT密钥
app.config['JWT_SECRET_KEY'] = 'jwt-secret-string'
jwt = JWTManager(app)


@app.route('/login', methods=['POST'])
def login():
#获取请求参数
password = request.json.get('password', None)
if not check_password_hash(password_hash, password):
#密码错误
abort(403)
#生成access_token,并全局使用
global access_token
access_token = create_access_token(identity=password)
return jsonify(code=200,access_token=access_token)
if __name__ == "__main__":
app.run(debug=True, host='0.0.0.0', port=3005)

打开postman进行调试(在Edge也有可能叫Postwoman)
选择Post请求,接口地址输入http://127.0.0.1:3005/login,参数选择自定义格式application/json,输入{ "password": "123456"},点击发送,下方就会返回我们的access_token

自定义需要token的接口

打开app.py,添加如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def custom():
#获取请求token
access_token_headers = request.headers.get('Authorization', None)
#判断token是否存在
if not access_token_headers:
return jsonify({"code": "err", "msg": "Missing authorization token"}), 401
#判断本地是否存在token
if access_token == None:
return jsonify({"code": "err", "msg": "Please log in first!"}), 401
#判断token是否相等
if access_token_headers == access_token and access_token:
try:
json_data = request.get_json()
print(json_data)
except:
abort(400, 'Invalid JSON')
# 可以改为你自己需要的内容
return jsonify(json_data)

按照上面的步骤,重新发送/login请求,复制你的token,然后修改接口地址改为http://127.0.0.1:3005/custom,请求头中key输入AuthorizationValue为你复制的token,参数选择自定义格式application/json,输入{ "code": "200"},点击发送,下方就会返回我们的json内容(可以自定义)

设置token失效时间

打开app.py,添加如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import threading
···
app = Flask(__name__)
# 设置定时器
timer = None
#配置失效token列表
BLACKLIST = set()
···
def login():
···
#判断timer定时器是否存在,存在则取消
if timer is not None:
timer.cancel()
block_token(access_token)
#设置定时器,24小时后失效
timer = threading.Timer(24 * 60 * 60, block_token, args=[access_token])
timer.start()
···
def block_token(token):
#将失效token加入到失效token列表
BLACKLIST.add(token)
···
def custom():
···
#判断token是否相等且不存在于失效token列表中
if access_token_headers == access_token and access_token not in BLACKLIST:
···

仅放行需要的接口

打开app.py,添加如下代码:

1
2
3
4
5
6
7
···
#设置接口列表
@app.before_request
def before():
if request.path not in ['/custom', '/login']:
return jsonify({"code": "正常", "message": "{}".format("输入正确参数")})
···

设置@app.before_request后只有/custom和/login才会返回数据

完整代码

创建app.py,添加如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
from flask import Flask, abort, jsonify, request
from flask_jwt_extended import JWTManager, create_access_token
from werkzeug.security import generate_password_hash, check_password_hash
import threading

app = Flask(__name__)
access_token = None
#生成密码哈希
password_hash = generate_password_hash('123456')
#配置JWT密钥
app.config['JWT_SECRET_KEY'] = 'jwt-secret-string'
# 设置定时器
timer = None
#配置失效token列表
BLACKLIST = set()
jwt = JWTManager(app)

#设置接口列表
@app.before_request
def before():
if request.path not in ['/custom', '/login']:
return jsonify({"code": "正常", "message": "{}".format("输入正确参数")})

@app.route('/login', methods=['POST'])
def login():
#获取请求参数
password = request.json.get('password', None)
if not check_password_hash(password_hash, password):
#密码错误
abort(403)
#生成access_token,timer并全局使用
global access_token, timer
access_token = create_access_token(identity=password)
#判断timer定时器是否存在,存在则取消
if timer is not None:
timer.cancel()
block_token(access_token)
#设置定时器,24小时后失效
timer = threading.Timer(24 * 60 * 60, block_token, args=[access_token])
timer.start()
return jsonify(code=200,access_token=access_token)

def block_token(token):
#将失效token加入到失效token列表
BLACKLIST.add(token)

@app.route('/custom', methods=['POST'])
def custom():
#获取请求token
access_token_headers = request.headers.get('Authorization', None)
#判断token是否存在
if not access_token_headers:
return jsonify({"code": "err", "msg": "Missing authorization token"}), 401
#判断本地是否存在token
if access_token == None:
return jsonify({"code": "err", "msg": "Please log in first!"}), 401
#判断token是否相等且不存在于失效token列表中
if access_token_headers == access_token and access_token not in BLACKLIST:
try:
json_data = request.get_json()
print(json_data)
except:
abort(400, 'Invalid JSON')
# 可以改为你自己需要的内容
return jsonify(json_data)
if __name__ == "__main__":
app.run(debug=True, host='0.0.0.0', port=3005)