搭建属于自己的飞书机器人

飞书文档指北☞


飞书机器人原理

根据飞书提供的文档和API可知道,想要开发出自己的飞书机器人,需要先创建一个企业应用,然后通过飞书的接口获取到tenant_access_token,这个。token的有效期是两个小时,当刷新时间小于一个半小时的时候,会返回原token,刷新时间大于一个半小时的时候会返回新的token,此时会存在两个同时生效的token,需要自己做好平滑过渡。
假如我们需要设置回调的话,我们需要先在回调配置中设置回调地址,订阅方式选择自己服务器的api地址,这里的api地址必须是公网可以访问的(本地可能需要内网穿透),当我们点击保存的时候,飞书会向此api发送一个加密的encrypt,你需要将其在1s内解密出其中的challenge值并将其返回。
假如你想要实用飞书的卡片消息的话,你需要提前编辑卡片模板并发布,记住其中的模板id和版本。

创建飞书机器人指北

在探索飞书平台的奇妙世界时,我发现开发一个飞书机器人就像拼图一样有趣。以下是我整理的详细步骤,希望能帮助你也成为飞书机器人开发的高手。

创建企业应用

首先,登录飞书开放平台,按照提示创建一个企业应用。这个过程中,你会被要求填写应用名称、描述、图标等信息。完成这些步骤后,你会得到一个应用ID和密钥,这些是你在后续开发中的身份凭证。
打开飞书开发者后台,创建自己的企业自建应用

应用能力里选择添加应用能力,选择按能力添加机器人选项,点击添加

飞书权限管理

开发配置 > 权限管理 页面,为应用添加以下权限。

飞书权限

获取与发送单聊、群组消息
获取用户发给机器人的单聊消息
读取用户发给机器人的单聊消息
接收群聊中@机器人消息事件
获取群组中所有消息
获取用户在群组中@机器人的消息
获取与上传图片或文件资源


你可以直接将以下权限 Keys 粘贴到权限搜索框,点击批量开通权限,最后点击确认并前往创建应用版本
im:message,im:message.p2p_msg,im:message.p2p_msg:readonly,im:message.group_at_msg:readonly,im:message.group_msg,im:message.group_at_msg,im:resource


点击创建版本

添加相关的信息,点击保存,点击申请线上发布

通过自己的飞书来同意版本发布

创建卡片消息模板

打开飞书卡片搭建工具,创建卡片

这里我们创建一个文字模块,一个图片模块,和一个按钮来使用卡片消息的不同的功能示例


创建机器人回调地址

配置加密策略

打开开发配置 > 事件与回调,选择加密策略,点击重置按钮或者编辑按钮来创建Encrypt Key

创建回调

以下操作均有公网Ip域名
安装FlaskCrypto

pip install flask
pip install pycryptodome
pip install Werkzeug
pip install requests
pip install requests_toolbelt

创建app.py,代码如下:
from flask import Flask, jsonify, request
from src.app.api.feishu_callback import feishu_callback

# 基础设置
basic_settings = {
    'debug': True,  # 如果设置为True,那么在应用出错时,会显示详细的错误信息
    'host': '0.0.0.0',  # 应用运行的主机,'0.0.0.0'表示监听所有的公网IP
    'port': 5000,  # 应用运行的端口
    'cors': True,  # 是否启用CORS
}

# 创建 Flask 应用实例
app = Flask(__name__)

# 在每个请求之前运行的代码
@app.before_request
def before():
    # 如果请求的路径不是 '/feishu_callback' 并且请求方法不是 'OPTIONS',则返回一个 JSON 响应
    if request.path not in ['/feishu_callback'] and request.method != 'OPTIONS':
        return jsonify({"code": "正常", "message": "{}".format("输入正确参数")})

# 注册蓝图
app.register_blueprint(feishu_callback)

# 在每个请求之后运行的代码
def after_request(resp):
    # 允许跨域请求
    resp.headers['Access-Control-Allow-Origin'] = '*'
    # 允许携带Content-Type Authorization请求头
    resp.headers['Access-Control-Allow-Headers'] = 'Content-Type,Access-Control-Request-Headers,Authorization'  
    return resp

# 运行 Flask 应用
def run_app(app, config):
    # 如果启用了 CORS,则在每个请求之后运行 after_request 函数
    if config['cors']:
        app.after_request(after_request)
    # 运行 Flask 应用
    app.run(debug=config['debug'], host=config['host'], port=config['port'], use_reloader=False)

# 如果这个文件是直接运行的,而不是被导入的,那么运行 Flask 应用
if __name__ == '__main__':
    run_app(app, basic_settings)

创建src/app/api/feishu_callback.py文件,内容如下:
from flask import Blueprint, jsonify, request
import hashlib
import base64
from Crypto.Cipher import AES
import json

# 你的加密密钥
encrypt_key = 'your_encrypt_key' 

# 创建一个新的蓝图
feishu_callback = Blueprint('feishu_callback', __name__)

# 定义一个路由和处理函数
@feishu_callback.route('/feishu_callback', methods=['POST'])
def webhook_event():
    # 获取请求的 JSON 数据
    data = request.get_json()

    # 定义一个 AES 加密/解密类
    class AESCipher(object):
        def __init__(self, key):
            # AES 的块大小
            self.bs = AES.block_size
            # 使用 SHA256 哈希函数处理密钥
            self.key = hashlib.sha256(AESCipher.str_to_bytes(key)).digest()

        # 将字符串转换为字节
        @staticmethod
        def str_to_bytes(data):
            u_type = type(b"".decode('utf8'))
            if isinstance(data, u_type):
                return data.encode('utf8')
            return data

        # 移除填充
        @staticmethod
        def _unpad(s):
            return s[:-ord(s[len(s) - 1:])]

        # 解密数据
        def decrypt(self, enc):
            iv = enc[:AES.block_size]
            cipher = AES.new(self.key, AES.MODE_CBC, iv)
            return self._unpad(cipher.decrypt(enc[AES.block_size:]))

        # 解密字符串
        def decrypt_string(self, enc):
            enc = base64.b64decode(enc)
            return self.decrypt(enc).decode('utf8')

    # 获取加密的数据
    encrypt = data['encrypt']

    # 创建一个新的 AESCipher 实例
    cipher = AESCipher(encrypt_key)

    # 解密字符串
    decrypted_string = cipher.decrypt_string(encrypt)

    # 将解密后的字符串转换为字典
    decrypted_dict = json.loads(decrypted_string)

    # 如果字典中有 'challenge' 键,并且它的值不为空,那么返回这个值
    if 'challenge' in decrypted_dict and decrypted_dict['challenge']:
        return jsonify({"challenge": decrypted_dict['challenge']})

此时当你运行的时候,可能会出现类似报错,这个错误可能是由于Flask和Werkzeug库的版本不兼容导致的。

报错信息
root@ser474577053711:/www/teach# python app.py 
Traceback (most recent call last):
  File "app.py", line 1, in <module>
    from flask import Flask, jsonify, request
  File "/usr/local/lib/python3.8/dist-packages/flask/__init__.py", line 5, in <module>
    from . import json as json
  File "/usr/local/lib/python3.8/dist-packages/flask/json/__init__.py", line 6, in <module>
    from ..globals import current_app
  File "/usr/local/lib/python3.8/dist-packages/flask/globals.py", line 25, in <module>
    app_ctx: AppContext = LocalProxy(  # type: ignore[assignment]
TypeError: __init__() got an unexpected keyword argument 'unbound_message'

解决办法

pip install --upgrade Flask Werkzeug

添加回调

打开 开发配置 > 事件与回调 中的回调配置
在订阅方式中输入域名/feishu_callback,点击保存,此时会向你的服务器发送一个加密的json,需要你进行解密
成功之后是这样的

点击右侧的添加回调,选择卡片交互回传

此时我们点击页面上方出现的创建版本,来发布我们的应用,此时回调就已经设置好了

获取tenant_access_token

以下内容仅为教学所使,你可以根据自己的情况尽情的修改代码内容(所以我会使用一种简单的方法来进行教程的书写)
修改app.py,修改内容如下:

from multiprocessing import Process
from src.utils.feishu_refreshtoken import feishu_refresh

if __name__ == '__main__':

    # 启动飞书token刷新进程
    p1 = Process(target=feishu_refresh, args=('app_id','app_secret')) #此处输入你的app_id 和 app_secret
    p1.start()

    run_app(app, basic_settings)

    p1.join()

创建src/utils/feishu_refreshtoken.py
import time
import traceback
import requests
import json

# 定义一个函数,用于刷新飞书的 tenant_access_token
def feishu_refreshtoken(app_id,app_secret):
    # 飞书的 API 地址
    url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
    # 请求头
    headers = {
        'Content-Type': 'application/json',
        'charset': 'utf-8'
    }
    # 请求体,包含 app_id 和 app_secret
    payload = json.dumps({
        "app_id": app_id,
        "app_secret": app_secret
    })
    # 发送 POST 请求
    response = requests.request("POST", url, headers=headers, data=payload)
    # 从响应中获取 tenant_access_token
    token = response.json().get("tenant_access_token")
    # 从响应中获取 token 的过期时间
    expire = response.json().get("expire")
    # 返回 token 和过期时间
    return [token,expire]

# 定义一个函数,用于定时刷新 tenant_access_token
def feishu_refresh(app_id,app_secret):
    # 无限循环
    while True:
        try:
            # 尝试刷新 tenant_access_token
            data = feishu_refreshtoken(app_id, app_secret)
        except Exception as e:
            # 如果出现异常,打印异常信息
            print(f"An error occurred: {e}")
            traceback.print_exc()
        finally:
            # 等待 token 过期后再次刷新
            time.sleep(data[1])

此时你就可以获取到tenant_access_token了,但是这个token只能在这个线程中使用,我们要是想在其他线程中使用可以使用multiprocessing.Manager来同步线程之间的变量,也可以使用数据库来同步token,这里我们使用json文件来持久化数据(相比线程间同步更不容易出错,相比于数据库更简单)。

持久化token和message_id

修改src/utils/feishu_refreshtoken.py文件,内容如下:

import os

# 定义一个函数,用于定时刷新 tenant_access_token 并保存到文件
def feishu_refresh(app_id,app_secret):
    while True:
        try:
            # 尝试刷新 tenant_access_token
            data = feishu_refreshtoken(app_id, app_secret)
            # 保存 token 到文件
            savatoken(data)
        except Exception as e:
            # 如果出现异常,打印异常信息
            print(f"An error occurred: {e}")
            traceback.print_exc()
        finally:
            # 等待 token 过期后再次刷新
            time.sleep(data[1])

# 定义一个函数,用于将 token 保存到文件
def savatoken(data):
    # 将 token 和过期时间封装成字典
    data = {"token": {"tenant_access_token": data[0], "expire": data[1]}}
    # 如果文件不存在,创建文件并写入数据
    if not os.path.exists('./data.json'):
        with open('./data.json', 'w') as f:
            json.dump(data, f)
    else:
        # 如果文件已存在,读取文件中的数据
        with open('./data.json', 'r') as f:
            file_data = json.load(f)
            # 更新文件中的 token 数据
            file_data['token'].update(data['token'])
        # 将更新后的数据写回文件
        with open('./data.json', 'w') as f:
            json.dump(file_data, f)

发送消息

发送普通消息

创建send_message.py文件

发送最普通的文本消息也是十分的简单,我们修改app.py文件,修改内容如下:

from src.app.api.send_message import send_message

@app.before_request
def before():
    if request.path not in ['/feishu_callback','/send_message'] and request.method != 'OPTIONS':
        return jsonify({"code": "正常", "message": "{}".format("输入正确参数")})

app.register_blueprint(feishu_callback)
app.register_blueprint(send_message)

接下来我们写发送信息的api,此处你可以选择在本地传固定内容,也可以像我一样通过post传参
创建src/app/api/send_message.py文件,内容如下:
import json
from flask import Blueprint, jsonify, request
import requests

# 创建一个新的蓝图
send_message = Blueprint('send_message', __name__)

# 定义一个路由和处理函数
@send_message.route('/send_message', methods=['POST'])
def sendMessage():
    # 从文件中读取数据
    with open('data.json', 'r') as f:
        data = json.load(f)
    # 获取飞书的 tenant_access_token
    feishu_token = data['token']['tenant_access_token']

    # 获取请求的 JSON 数据
    message = request.get_json()
    # 如果数据中有 'normal' 字段,处理这个字段的数据
    if 'normal' in message:
        message = message['normal']
        # 发送消息并获取消息 ID
        message_id = normalMessage(message,feishu_token)
    # 封装结果数据
    result = {
        "message_id": message_id,
        "user": message['user']
    }
    # 如果数据中没有 'message' 字段,添加这个字段
    if 'message' not in data:
        data['message'] = []
    # 如果数据中已经有这个用户的消息,更新这个消息
    if any(message['user'] == result['user'] for message in data['message']):
        for message in data['message']:
            if message['user'] == result['user']:
                message.update(result)
    else:
        # 如果数据中没有这个用户的消息,添加这个消息
        data['message'].append(result)
    # 将更新后的数据写回文件
    with open('data.json', 'w') as f:
        json.dump(data, f)
    # 返回响应
    return jsonify({"code": "正常", "message": "发送成功"})

# 定义一个函数,用于发送消息
def normalMessage(message,feishu_token):
    # 飞书的 API 地址
    url = "https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=chat_id"
    # 请求体,包含接收者 ID、消息类型和内容
    payload = json.dumps({
        "receive_id": "receive_id", #你的chat_id,可选其他的
        "msg_type": "text",
        "content": "{\"text\":\""+"user:"+message['user']+"\"}",
    })
    # 请求头,包含 Content-Type 和 Authorization
    headers = {
        'Content-Type': 'application/json',
        'Authorization': 'Bearer '+ feishu_token
    }
    # 发送 POST 请求
    response = requests.request("POST", url, headers=headers, data=payload)
    # 打印响应
    print(response.json())
    # 从响应中获取消息 ID
    message_id = response.json()['data']['message_id']
    # 返回消息 ID
    return message_id

获取receive_id

那么此时我们的代码部分就已经完成,现在我们需要去飞书进行设置,打开右上角飞书API调试台
在左侧搜索栏输入消息,点击发送消息后在右侧选择查询参数,在下拉列表选择chat_id,点击选择群组,找到你的群组,复制其id,填入代码中的receive_id中。
我们可以看到下拉列表有许多选项这些选项的意思如下:

receive_id

open_id:标识一个用户在某个应用中的身份。同一个用户在不同应用中的 Open ID 不同。
user_id:标识一个用户在某个租户内的身份。同一个用户在租户 A 和租户 B 内的 User ID 是不同的。在同一个租户内,一个用户的 User ID 在所有应用(包括商店应用)中都保持一致。User ID 主要用于在不同的应用间打通用户数据。
union_id:标识一个用户在某个应用开发商下的身份。同一用户在同一开发商下的应用中的 Union ID 是相同的,在不同开发商下的应用中的 Union ID 是不同的。通过 Union ID,应用开发商可以把同个用户在多个应用中的身份关联起来。
email:以用户的真实邮箱来标识用户。
chat_id:以群ID来标识群聊。
更多内容点我查看


测试发送普通消息

现在我们要开始在postman(网页可以用postWoman)中测试我们的代码是否能正常运行
填入我们的接口api域名/send_message,选择json类型,输入如下内容{"user":"1"},点击发送就可以看到我们的飞书已经收到了一条消息,并且data.json中储存了这消息的message_id

发送卡片消息

发布消息模板

在此前面我们已经成功的创建了卡片消息模板,我们需要获取其中的template_id
打开卡片,选择你的卡片,点击左上角id会自动复制template_id,点击右上角机器人图标,选择加号,点击指定应用,选择你的应用,点击确认,选择发布

修改send_message.py文件

修改src/app/api/send_message.py文件,内容如下:

    # 检查消息中是否包含 'normal' 字段
    if 'normal' in message:
        # 如果包含,将 'normal' 字段的值赋给 message
        message = message['normal']
        # 调用 normalMessage 函数发送消息,并获取消息 ID
        message_id = normalMessage(message,feishu_token)

    # 检查消息中是否包含 'card_message' 字段
    if 'card_message' in message:
        # 如果包含,将 'card_message' 字段的值赋给 message
        message = message['card_message']
        # 调用 cardMessage 函数发送消息,并获取消息 ID
        message_id = cardMessage(message,feishu_token)

    # 定义一个函数,用于发送卡片消息
def cardMessage(message,feishu_token):
    # 飞书的 API 地址
    url = "https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=chat_id"
    # 请求体,包含接收者 ID、消息类型和内容
    payload = json.dumps({
        "receive_id": "receive_id", #你的chat_id,可选其他的
        "msg_type": "interactive",
        "content": "{\"type\":\"template\",\"data\":{\"template_id\":\"template_id\",\"template_variable\":{\"test_text\":\""+message['test_text']+"\",\"test_img\":\""+message['test_img']+"\"}}}" #template_id是你的模板id
    })
    # 请求头,包含 Content-Type 和 Authorization
    headers = {
        'Content-Type': 'application/json',
        'Authorization': 'Bearer '+ feishu_token
    }
    # 发送 POST 请求
    response = requests.request("POST", url, headers=headers, data=payload)
    # 从响应中获取消息 ID
    message_id = response.json()['data']['message_id']
    # 返回消息 ID
    return message_id

测试发送卡片消息

现在我们可以在postman(网页可以用postWoman)中测试我们的代码是否能正常运行
填入我们的接口api域名/send_message,选择json类型,输入如下内容{"card_message":{"test_text":"测试消息","test_img":"img_v2_9dd98485-2900-4d65-ada9-e31d1408dcfg","user":"1"}},点击发送就可以看到我们的飞书已经收到了一条消息,并且data.json中储存了这消息的message_id

上传图片

我们可以看到我们上传的图片是一个img_v2_9dd98485-2900-4d65-ada9-e31d1408dcfg的东西,那么我们需要使用飞书的api来上传图片,文档如下,有需要的可以自己阅读


修改src/app/api/send_message.py文件,内容如下:
from src.utils.upload_feishu_image import upload_feishu_image

    if 'card_message' in message:
        message = message['card_message']
        message['test_img'] = upload_feishu_image(feishu_token,message)
        message_id = cardMessage(message,feishu_token)

创建src/utils/upload_feishu_image.py文件,内容如下:
import requests
from requests_toolbelt import MultipartEncoder

# 定义一个函数,用于上传飞书图片
def upload_feishu_image(feishu_token,data):
    # 检查 data 中的 'test_img' 字段是否有值
    if data['test_img'] != "NONE" and data['test_img'] != "" and data['test_img'] is not None:
        # 如果有值,下载图片
        image_content = download_image(data['test_img'])
        # 如果图片下载成功,上传图片
        if image_content is not None:
            img = feish_uploadImage(image_content, feishu_token)
        else:
            # 如果图片下载失败,使用默认的图片
            img = "img_v2_9dd98485-2900-4d65-ada9-e31d1408dcfg"
    # 返回图片的 key
    return img

# 定义一个函数,用于下载图片
def download_image(image_url):
    # 发送 HEAD 请求,检查图片是否存在
    response = requests.head(image_url)

    # 如果图片存在,下载图片
    if response.status_code == 200:
        response = requests.get(image_url)
        response.raise_for_status()
        return response.content

    # 如果图片不存在,返回 None
    elif response.status_code == 404:
        return None

    # 如果出现其他错误,抛出异常
    else:
        response.raise_for_status()

# 定义一个函数,用于上传图片到飞书
def feish_uploadImage(image,token):
    # 飞书的 API 地址
    url = "https://open.feishu.cn/open-apis/im/v1/images"
    # 请求体,包含图片类型和图片内容
    form = {'image_type': 'message',
            'image': image}
    # 使用 MultipartEncoder 封装请求体
    multi_form = MultipartEncoder(form)
    # 请求头,包含 Authorization 和 Content-Type
    headers = {
        'Authorization': 'Bearer '+ token,
    }
    headers['Content-Type'] = multi_form.content_type
    # 发送 POST 请求
    response = requests.request("POST", url, headers=headers, data=multi_form)
    # 打印响应
    print(response.json())
    # 从响应中获取图片的 key
    return response.json()['data']['image_key']

现在我们可以在postman(网页可以用postWoman)中测试我们的代码是否能正常运行
填入我们的接口api域名/send_message,选择json类型,输入如下内容{"card_message":{"test_text":"测试消息","test_img":"https://q1.qlogo.cn/headimg_dl?dst_uin=1842105028&spec=140","user":"1"}},点击发送就可以看到我们的飞书已经收到了一条消息,并且是我们自己设置的图片

卡片回调

可以看到我们的飞书卡片上的按钮点击后会出现出错了,请稍后重试,这是因为我们没有正确处理回调导致的,根据上面的教程,我们的回调地址是feishu_callback,这里我们还要使用这个api来处理回调,我们按钮绑定的事件的参数类型是字符串,字符串内容是test_botton接下来我们就使用这些信息来处理回调。
修改src/app/api/feishu_callback.py文件,内容如下:

# 检查解密后的字典中是否包含 'challenge' 字段,并且该字段的值不为空
if 'challenge' in decrypted_dict and decrypted_dict['challenge']:
    # 如果满足条件,返回一个包含 'challenge' 字段的 JSON 对象
    return jsonify({"challenge": decrypted_dict['challenge']})
else:
    # 否则,从解密后的字典中获取消息 ID
    message_id = decrypted_dict['event']['context']['open_message_id']
    # 打开数据文件
    with open('data.json', 'r') as f:
        # 读取 JSON 数据
        data = json.load(f)
    # 遍历消息列表
    for message in data['message']:
        # 如果找到了匹配的消息 ID
        if message['message_id'] == message_id:
            # 获取该消息的用户
            item = message['user']
            # 结束循环
            break

    # 检查用户列表的长度
    if len(item) > 0:
        # 如果用户列表不为空,获取第一个用户
        item = item[0]
        # 获取动作的值
        action = decrypted_dict['event']['action']['value']
        # 如果动作的值为 'test_botton'
        if action == 'test_botton':
            # 返回一个包含成功提示的 JSON 对象
            return jsonify({
                "toast": {
                    "type": "success",
                    "content": "成功"
                }
            })
    else:
        # 如果用户列表为空,返回一个包含错误代码和消息的 JSON 对象
        return jsonify({"code": "err", "message": "No data to process"})

现在我们可以在postman(网页可以用postWoman)中测试我们的代码是否能正常运行
填入我们的接口api域名/send_message,选择json类型,输入如下内容{"card_message":{"test_text":"测试消息","test_img":"https://q1.qlogo.cn/headimg_dl?dst_uin=1842105028&spec=140","user":"1"}},点击发送就可以看到我们的飞书已经收到了一条消息,此时点击按钮会出现成功二字,当然这个功能你也可以实现其他的东西,比如更新数据库什么的

经过这些简单的教程,相信你已经创建出了一个简单的飞书机器人,并且对其开发有了一定的想法和认知。
以下是本篇文章的全部代码,你可以对照着跟着文章一起走(强烈建议自己写一遍),假如文章有哪些错误,欢迎指出。