搭建属于自己的飞书机器人
白夜飞书文档指北☞
飞书机器人原理
根据飞书提供的文档和API可知道,想要开发出自己的飞书机器人,需要先创建一个企业应用,然后通过飞书的接口获取到tenant_access_token
,这个。token的有效期是两个小时,当刷新时间小于一个半小时的时候,会返回原token,刷新时间大于一个半小时的时候会返回新的token,此时会存在两个同时生效的token,需要自己做好平滑过渡。
假如我们需要设置回调的话,我们需要先在回调配置中设置回调地址,订阅方式选择自己服务器的api地址,这里的api地址必须是公网可以访问的(本地可能需要内网穿透),当我们点击保存的时候,飞书会向此api发送一个加密的encrypt
,你需要将其在1s
内解密出其中的challenge
值并将其返回。
假如你想要实用飞书的卡片消息的话,你需要提前编辑卡片模板
并发布,记住其中的模板id和版本。
创建飞书机器人指北
在探索飞书平台的奇妙世界时,我发现开发一个飞书机器人就像拼图一样有趣。以下是我整理的详细步骤,希望能帮助你也成为飞书机器人开发的高手。
创建企业应用
首先,登录飞书开放平台,按照提示创建一个企业应用。这个过程中,你会被要求填写应用名称、描述、图标等信息。完成这些步骤后,你会得到一个应用ID和密钥,这些是你在后续开发中的身份凭证。
打开飞书开发者后台,创建自己的企业自建应用
在应用能力
里选择添加应用能力
,选择按能力添加
的机器人
选项,点击添加
飞书权限管理
在 开发配置
> 权限管理
页面,为应用添加以下权限。
飞书权限
获取与发送单聊、群组消息
获取用户发给机器人的单聊消息
读取用户发给机器人的单聊消息
接收群聊中@机器人消息事件
获取群组中所有消息
获取用户在群组中@机器人的消息
获取与上传图片或文件资源
你可以直接将以下权限 Keys 粘贴到权限搜索框,点击
批量开通
权限,最后点击确认并前往创建应用版本
。im:message,im:message.p2p_msg,im:message.p2p_msg:readonly,im:message.group_at_msg:readonly,im:message.group_msg,im:message.group_at_msg,im:resource
点击
创建版本
添加相关的信息,点击保存,点击申请线上发布
通过自己的飞书来同意版本发布
创建卡片消息模板
打开飞书卡片搭建工具,创建卡片
这里我们创建一个文字模块,一个图片模块,和一个按钮来使用卡片消息的不同的功能示例
创建机器人回调地址
配置加密策略
打开开发配置 > 事件与回调,选择加密策略
,点击重置
按钮或者编辑
按钮来创建Encrypt Key
创建回调
以下操作均有公网Ip
和域名
安装Flask
和Crypto
pip install flask
pip install pycryptodome
pip install Werkzeug
pip install requests
pip install requests_toolbelt
创建
app.py
,代码如下:from flask import Flask, jsonify, request
from src.app.api.feishu_callback import feishu_callback
# 基础设置
basic_settings = {
'debug': True, # 如果设置为True,那么在应用出错时,会显示详细的错误信息
'host': '0.0.0.0', # 应用运行的主机,'0.0.0.0'表示监听所有的公网IP
'port': 5000, # 应用运行的端口
'cors': True, # 是否启用CORS
}
# 创建 Flask 应用实例
app = Flask(__name__)
# 在每个请求之前运行的代码
@app.before_request
def before():
# 如果请求的路径不是 '/feishu_callback' 并且请求方法不是 'OPTIONS',则返回一个 JSON 响应
if request.path not in ['/feishu_callback'] and request.method != 'OPTIONS':
return jsonify({"code": "正常", "message": "{}".format("输入正确参数")})
# 注册蓝图
app.register_blueprint(feishu_callback)
# 在每个请求之后运行的代码
def after_request(resp):
# 允许跨域请求
resp.headers['Access-Control-Allow-Origin'] = '*'
# 允许携带Content-Type Authorization请求头
resp.headers['Access-Control-Allow-Headers'] = 'Content-Type,Access-Control-Request-Headers,Authorization'
return resp
# 运行 Flask 应用
def run_app(app, config):
# 如果启用了 CORS,则在每个请求之后运行 after_request 函数
if config['cors']:
app.after_request(after_request)
# 运行 Flask 应用
app.run(debug=config['debug'], host=config['host'], port=config['port'], use_reloader=False)
# 如果这个文件是直接运行的,而不是被导入的,那么运行 Flask 应用
if __name__ == '__main__':
run_app(app, basic_settings)
创建
src/app/api/feishu_callback.py
文件,内容如下:from flask import Blueprint, jsonify, request
import hashlib
import base64
from Crypto.Cipher import AES
import json
# 你的加密密钥
encrypt_key = 'your_encrypt_key'
# 创建一个新的蓝图
feishu_callback = Blueprint('feishu_callback', __name__)
# 定义一个路由和处理函数
@feishu_callback.route('/feishu_callback', methods=['POST'])
def webhook_event():
# 获取请求的 JSON 数据
data = request.get_json()
# 定义一个 AES 加密/解密类
class AESCipher(object):
def __init__(self, key):
# AES 的块大小
self.bs = AES.block_size
# 使用 SHA256 哈希函数处理密钥
self.key = hashlib.sha256(AESCipher.str_to_bytes(key)).digest()
# 将字符串转换为字节
@staticmethod
def str_to_bytes(data):
u_type = type(b"".decode('utf8'))
if isinstance(data, u_type):
return data.encode('utf8')
return data
# 移除填充
@staticmethod
def _unpad(s):
return s[:-ord(s[len(s) - 1:])]
# 解密数据
def decrypt(self, enc):
iv = enc[:AES.block_size]
cipher = AES.new(self.key, AES.MODE_CBC, iv)
return self._unpad(cipher.decrypt(enc[AES.block_size:]))
# 解密字符串
def decrypt_string(self, enc):
enc = base64.b64decode(enc)
return self.decrypt(enc).decode('utf8')
# 获取加密的数据
encrypt = data['encrypt']
# 创建一个新的 AESCipher 实例
cipher = AESCipher(encrypt_key)
# 解密字符串
decrypted_string = cipher.decrypt_string(encrypt)
# 将解密后的字符串转换为字典
decrypted_dict = json.loads(decrypted_string)
# 如果字典中有 'challenge' 键,并且它的值不为空,那么返回这个值
if 'challenge' in decrypted_dict and decrypted_dict['challenge']:
return jsonify({"challenge": decrypted_dict['challenge']})
此时当你运行的时候,可能会出现类似报错,这个错误可能是由于Flask和Werkzeug库的版本不兼容导致的。
报错信息
root@ser474577053711:/www/teach# python app.py
Traceback (most recent call last):
File "app.py", line 1, in <module>
from flask import Flask, jsonify, request
File "/usr/local/lib/python3.8/dist-packages/flask/__init__.py", line 5, in <module>
from . import json as json
File "/usr/local/lib/python3.8/dist-packages/flask/json/__init__.py", line 6, in <module>
from ..globals import current_app
File "/usr/local/lib/python3.8/dist-packages/flask/globals.py", line 25, in <module>
app_ctx: AppContext = LocalProxy( # type: ignore[assignment]
TypeError: __init__() got an unexpected keyword argument 'unbound_message'
解决办法
pip install --upgrade Flask Werkzeug
添加回调
打开 开发配置 > 事件与回调 中的回调配置
在订阅方式中输入域名/feishu_callback
,点击保存,此时会向你的服务器发送一个加密的json,需要你进行解密
成功之后是这样的
点击右侧的添加回调
,选择卡片交互回传
此时我们点击页面上方出现的创建版本
,来发布我们的应用,此时回调就已经设置好了
获取tenant_access_token
以下内容仅为教学所使,你可以根据自己的情况尽情的修改代码内容(所以我会使用一种简单的方法来进行教程的书写)
修改app.py
,修改内容如下:
from multiprocessing import Process
from src.utils.feishu_refreshtoken import feishu_refresh
if __name__ == '__main__':
# 启动飞书token刷新进程
p1 = Process(target=feishu_refresh, args=('app_id','app_secret')) #此处输入你的app_id 和 app_secret
p1.start()
run_app(app, basic_settings)
p1.join()
创建
src/utils/feishu_refreshtoken.py
import time
import traceback
import requests
import json
# 定义一个函数,用于刷新飞书的 tenant_access_token
def feishu_refreshtoken(app_id,app_secret):
# 飞书的 API 地址
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
# 请求头
headers = {
'Content-Type': 'application/json',
'charset': 'utf-8'
}
# 请求体,包含 app_id 和 app_secret
payload = json.dumps({
"app_id": app_id,
"app_secret": app_secret
})
# 发送 POST 请求
response = requests.request("POST", url, headers=headers, data=payload)
# 从响应中获取 tenant_access_token
token = response.json().get("tenant_access_token")
# 从响应中获取 token 的过期时间
expire = response.json().get("expire")
# 返回 token 和过期时间
return [token,expire]
# 定义一个函数,用于定时刷新 tenant_access_token
def feishu_refresh(app_id,app_secret):
# 无限循环
while True:
try:
# 尝试刷新 tenant_access_token
data = feishu_refreshtoken(app_id, app_secret)
except Exception as e:
# 如果出现异常,打印异常信息
print(f"An error occurred: {e}")
traceback.print_exc()
finally:
# 等待 token 过期后再次刷新
time.sleep(data[1])
此时你就可以获取到
tenant_access_token
了,但是这个token只能在这个线程中使用,我们要是想在其他线程中使用可以使用multiprocessing.Manager
来同步线程之间的变量,也可以使用数据库来同步token,这里我们使用json
文件来持久化数据(相比线程间同步更不容易出错,相比于数据库更简单)。持久化token和message_id
修改src/utils/feishu_refreshtoken.py
文件,内容如下:
import os
# 定义一个函数,用于定时刷新 tenant_access_token 并保存到文件
def feishu_refresh(app_id,app_secret):
while True:
try:
# 尝试刷新 tenant_access_token
data = feishu_refreshtoken(app_id, app_secret)
# 保存 token 到文件
savatoken(data)
except Exception as e:
# 如果出现异常,打印异常信息
print(f"An error occurred: {e}")
traceback.print_exc()
finally:
# 等待 token 过期后再次刷新
time.sleep(data[1])
# 定义一个函数,用于将 token 保存到文件
def savatoken(data):
# 将 token 和过期时间封装成字典
data = {"token": {"tenant_access_token": data[0], "expire": data[1]}}
# 如果文件不存在,创建文件并写入数据
if not os.path.exists('./data.json'):
with open('./data.json', 'w') as f:
json.dump(data, f)
else:
# 如果文件已存在,读取文件中的数据
with open('./data.json', 'r') as f:
file_data = json.load(f)
# 更新文件中的 token 数据
file_data['token'].update(data['token'])
# 将更新后的数据写回文件
with open('./data.json', 'w') as f:
json.dump(file_data, f)
发送消息
发送普通消息
创建send_message.py文件
发送最普通的文本消息也是十分的简单,我们修改app.py
文件,修改内容如下:
from src.app.api.send_message import send_message
@app.before_request
def before():
if request.path not in ['/feishu_callback','/send_message'] and request.method != 'OPTIONS':
return jsonify({"code": "正常", "message": "{}".format("输入正确参数")})
app.register_blueprint(feishu_callback)
app.register_blueprint(send_message)
接下来我们写发送信息的api,此处你可以选择在本地传固定内容,也可以像我一样通过post传参
创建
src/app/api/send_message.py
文件,内容如下:import json
from flask import Blueprint, jsonify, request
import requests
# 创建一个新的蓝图
send_message = Blueprint('send_message', __name__)
# 定义一个路由和处理函数
@send_message.route('/send_message', methods=['POST'])
def sendMessage():
# 从文件中读取数据
with open('data.json', 'r') as f:
data = json.load(f)
# 获取飞书的 tenant_access_token
feishu_token = data['token']['tenant_access_token']
# 获取请求的 JSON 数据
message = request.get_json()
# 如果数据中有 'normal' 字段,处理这个字段的数据
if 'normal' in message:
message = message['normal']
# 发送消息并获取消息 ID
message_id = normalMessage(message,feishu_token)
# 封装结果数据
result = {
"message_id": message_id,
"user": message['user']
}
# 如果数据中没有 'message' 字段,添加这个字段
if 'message' not in data:
data['message'] = []
# 如果数据中已经有这个用户的消息,更新这个消息
if any(message['user'] == result['user'] for message in data['message']):
for message in data['message']:
if message['user'] == result['user']:
message.update(result)
else:
# 如果数据中没有这个用户的消息,添加这个消息
data['message'].append(result)
# 将更新后的数据写回文件
with open('data.json', 'w') as f:
json.dump(data, f)
# 返回响应
return jsonify({"code": "正常", "message": "发送成功"})
# 定义一个函数,用于发送消息
def normalMessage(message,feishu_token):
# 飞书的 API 地址
url = "https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=chat_id"
# 请求体,包含接收者 ID、消息类型和内容
payload = json.dumps({
"receive_id": "receive_id", #你的chat_id,可选其他的
"msg_type": "text",
"content": "{\"text\":\""+"user:"+message['user']+"\"}",
})
# 请求头,包含 Content-Type 和 Authorization
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer '+ feishu_token
}
# 发送 POST 请求
response = requests.request("POST", url, headers=headers, data=payload)
# 打印响应
print(response.json())
# 从响应中获取消息 ID
message_id = response.json()['data']['message_id']
# 返回消息 ID
return message_id
获取receive_id
那么此时我们的代码部分就已经完成,现在我们需要去飞书进行设置,打开右上角飞书API调试台
在左侧搜索栏
输入消息
,点击发送消息
后在右侧选择查询参数
,在下拉列表选择chat_id
,点击选择群组
,找到你的群组,复制其id,填入代码中的receive_id
中。
我们可以看到下拉列表有许多选项这些选项的意思如下:
receive_id
open_id:标识一个用户在某个应用中的身份。同一个用户在不同应用中的 Open ID 不同。
user_id:标识一个用户在某个租户内的身份。同一个用户在租户 A 和租户 B 内的 User ID 是不同的。在同一个租户内,一个用户的 User ID 在所有应用(包括商店应用)中都保持一致。User ID 主要用于在不同的应用间打通用户数据。
union_id:标识一个用户在某个应用开发商下的身份。同一用户在同一开发商下的应用中的 Union ID 是相同的,在不同开发商下的应用中的 Union ID 是不同的。通过 Union ID,应用开发商可以把同个用户在多个应用中的身份关联起来。
email:以用户的真实邮箱来标识用户。
chat_id:以群ID来标识群聊。
更多内容点我查看
测试发送普通消息
现在我们要开始在postman(网页可以用postWoman)中测试我们的代码是否能正常运行
填入我们的接口api域名/send_message
,选择json类型
,输入如下内容{"user":"1"}
,点击发送就可以看到我们的飞书已经收到了一条消息,并且data.json
中储存了这消息的message_id
发送卡片消息
发布消息模板
在此前面我们已经成功的创建了卡片消息模板
,我们需要获取其中的template_id
打开卡片,选择你的卡片,点击左上角id会自动复制template_id
,点击右上角机器人图标,选择加号,点击指定应用,选择你的应用,点击确认,选择发布
修改send_message.py文件
修改src/app/api/send_message.py
文件,内容如下:
# 检查消息中是否包含 'normal' 字段
if 'normal' in message:
# 如果包含,将 'normal' 字段的值赋给 message
message = message['normal']
# 调用 normalMessage 函数发送消息,并获取消息 ID
message_id = normalMessage(message,feishu_token)
# 检查消息中是否包含 'card_message' 字段
if 'card_message' in message:
# 如果包含,将 'card_message' 字段的值赋给 message
message = message['card_message']
# 调用 cardMessage 函数发送消息,并获取消息 ID
message_id = cardMessage(message,feishu_token)
# 定义一个函数,用于发送卡片消息
def cardMessage(message,feishu_token):
# 飞书的 API 地址
url = "https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=chat_id"
# 请求体,包含接收者 ID、消息类型和内容
payload = json.dumps({
"receive_id": "receive_id", #你的chat_id,可选其他的
"msg_type": "interactive",
"content": "{\"type\":\"template\",\"data\":{\"template_id\":\"template_id\",\"template_variable\":{\"test_text\":\""+message['test_text']+"\",\"test_img\":\""+message['test_img']+"\"}}}" #template_id是你的模板id
})
# 请求头,包含 Content-Type 和 Authorization
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer '+ feishu_token
}
# 发送 POST 请求
response = requests.request("POST", url, headers=headers, data=payload)
# 从响应中获取消息 ID
message_id = response.json()['data']['message_id']
# 返回消息 ID
return message_id
测试发送卡片消息
现在我们可以在postman(网页可以用postWoman)中测试我们的代码是否能正常运行
填入我们的接口api域名/send_message
,选择json类型
,输入如下内容{"card_message":{"test_text":"测试消息","test_img":"img_v2_9dd98485-2900-4d65-ada9-e31d1408dcfg","user":"1"}}
,点击发送就可以看到我们的飞书已经收到了一条消息,并且data.json
中储存了这消息的message_id
上传图片
我们可以看到我们上传的图片是一个img_v2_9dd98485-2900-4d65-ada9-e31d1408dcfg
的东西,那么我们需要使用飞书的api来上传图片,文档如下,有需要的可以自己阅读
修改
src/app/api/send_message.py
文件,内容如下:from src.utils.upload_feishu_image import upload_feishu_image
if 'card_message' in message:
message = message['card_message']
message['test_img'] = upload_feishu_image(feishu_token,message)
message_id = cardMessage(message,feishu_token)
创建
src/utils/upload_feishu_image.py
文件,内容如下:import requests
from requests_toolbelt import MultipartEncoder
# 定义一个函数,用于上传飞书图片
def upload_feishu_image(feishu_token,data):
# 检查 data 中的 'test_img' 字段是否有值
if data['test_img'] != "NONE" and data['test_img'] != "" and data['test_img'] is not None:
# 如果有值,下载图片
image_content = download_image(data['test_img'])
# 如果图片下载成功,上传图片
if image_content is not None:
img = feish_uploadImage(image_content, feishu_token)
else:
# 如果图片下载失败,使用默认的图片
img = "img_v2_9dd98485-2900-4d65-ada9-e31d1408dcfg"
# 返回图片的 key
return img
# 定义一个函数,用于下载图片
def download_image(image_url):
# 发送 HEAD 请求,检查图片是否存在
response = requests.head(image_url)
# 如果图片存在,下载图片
if response.status_code == 200:
response = requests.get(image_url)
response.raise_for_status()
return response.content
# 如果图片不存在,返回 None
elif response.status_code == 404:
return None
# 如果出现其他错误,抛出异常
else:
response.raise_for_status()
# 定义一个函数,用于上传图片到飞书
def feish_uploadImage(image,token):
# 飞书的 API 地址
url = "https://open.feishu.cn/open-apis/im/v1/images"
# 请求体,包含图片类型和图片内容
form = {'image_type': 'message',
'image': image}
# 使用 MultipartEncoder 封装请求体
multi_form = MultipartEncoder(form)
# 请求头,包含 Authorization 和 Content-Type
headers = {
'Authorization': 'Bearer '+ token,
}
headers['Content-Type'] = multi_form.content_type
# 发送 POST 请求
response = requests.request("POST", url, headers=headers, data=multi_form)
# 打印响应
print(response.json())
# 从响应中获取图片的 key
return response.json()['data']['image_key']
现在我们可以在postman(网页可以用postWoman)中测试我们的代码是否能正常运行
填入我们的接口api
域名/send_message
,选择json类型
,输入如下内容{"card_message":{"test_text":"测试消息","test_img":"https://q1.qlogo.cn/headimg_dl?dst_uin=1842105028&spec=140","user":"1"}}
,点击发送就可以看到我们的飞书已经收到了一条消息,并且是我们自己设置的图片卡片回调
可以看到我们的飞书卡片上的按钮点击后会出现出错了,请稍后重试
,这是因为我们没有正确处理回调导致的,根据上面的教程,我们的回调地址是feishu_callback
,这里我们还要使用这个api来处理回调,我们按钮绑定的事件的参数类型是字符串
,字符串内容是test_botton
接下来我们就使用这些信息来处理回调。
修改src/app/api/feishu_callback.py
文件,内容如下:
# 检查解密后的字典中是否包含 'challenge' 字段,并且该字段的值不为空
if 'challenge' in decrypted_dict and decrypted_dict['challenge']:
# 如果满足条件,返回一个包含 'challenge' 字段的 JSON 对象
return jsonify({"challenge": decrypted_dict['challenge']})
else:
# 否则,从解密后的字典中获取消息 ID
message_id = decrypted_dict['event']['context']['open_message_id']
# 打开数据文件
with open('data.json', 'r') as f:
# 读取 JSON 数据
data = json.load(f)
# 遍历消息列表
for message in data['message']:
# 如果找到了匹配的消息 ID
if message['message_id'] == message_id:
# 获取该消息的用户
item = message['user']
# 结束循环
break
# 检查用户列表的长度
if len(item) > 0:
# 如果用户列表不为空,获取第一个用户
item = item[0]
# 获取动作的值
action = decrypted_dict['event']['action']['value']
# 如果动作的值为 'test_botton'
if action == 'test_botton':
# 返回一个包含成功提示的 JSON 对象
return jsonify({
"toast": {
"type": "success",
"content": "成功"
}
})
else:
# 如果用户列表为空,返回一个包含错误代码和消息的 JSON 对象
return jsonify({"code": "err", "message": "No data to process"})
现在我们可以在postman(网页可以用postWoman)中测试我们的代码是否能正常运行
填入我们的接口api
域名/send_message
,选择json类型
,输入如下内容{"card_message":{"test_text":"测试消息","test_img":"https://q1.qlogo.cn/headimg_dl?dst_uin=1842105028&spec=140","user":"1"}}
,点击发送就可以看到我们的飞书已经收到了一条消息,此时点击按钮会出现成功
二字,当然这个功能你也可以实现其他的东西,比如更新数据库什么的经过这些简单的教程,相信你已经创建出了一个简单的飞书机器人,并且对其开发有了一定的想法和认知。
以下是本篇文章的全部代码,你可以对照着跟着文章一起走(强烈建议自己写一遍),假如文章有哪些错误,欢迎指出。