Skip to main content

ErisPulse的云湖协议适配模块

Project description

YunhuAdapter 模块文档

简介

YunhuAdapter 是基于 ErisPulse 架构的云湖协议适配器,整合了所有云湖功能模块,提供统一的事件处理和消息操作接口。

使用示例

事件映射关系

官方事件命名 Adapter事件命名
message.receive.normal message
message.receive.instruction command
bot.followed follow
bot.unfollowed unfollow
group.join group_join
group.leave group_leave
button.report.inline button_click
bot.shortcut.menu shortcut_menu

官方事件内容示例

{
    "version": "1.0",
    "header": {
        "eventId": "xxxxx",
        "eventTime": 1647735644000,
        "eventType": "message.receive.instruction"
    },
    "event": {
        "sender": {
            "senderId": "xxxxx",
            "senderType": "user",
            "senderUserLevel": "member",
            "senderNickname": "昵称"
        },
        "chat": {
            "chatId": "xxxxx",
            "chatType": "group"
        },
        "message": {
            "msgId": "xxxxxx",
            "parentId": "xxxx",
            "sendTime": 1647735644000,
            "chatId": "xxxxxxxx",
            "chatType": "group",
            "contentType": "text",
            "content": {
                "text": "早上好"
            },
            "commandId": 98,
            "commandName": "计算器"
        }
    }
}

初始化与事件处理

from ErisPulse import sdk

async def main():
    # 初始化 SDK
    sdk.init()

    # 获取适配器实例
    yunhu = sdk.adapter.Yunhu

    # 注册事件处理器
    @yunhu.on("message")
    async def handle_message(data):
        """处理普通消息事件"""
        sender = data["event"]["sender"]["senderId"]
        message = data["event"]["message"]["content"]["text"]
        print(f"收到消息: {message}")
        await yunhu.Send.To("user", sender).Text(f"已收到消息: {message}")

    @yunhu.on("command")
    async def handle_command(data):
        """处理指令事件"""
        command_info = data["event"]["message"]
        sender_id = data["event"]["sender"]["senderId"]
        command_name = command_info["commandName"]
        
        print(f"收到指令: {command_name}, 参数: {command_args}")
        
        if command_name == "计算器":
            await yunhu.Send.To("user", sender_id).Text(f"计算结果: 114514")
        else:
            await yunhu.Send.To("user", sender_id).Text(f"未知指令: {command_name}")

    @yunhu.on("follow")
    async def handle_follow(data):
        print(f"新关注: {data}")
        user_id = data["event"]["sender"]["senderId"]
        await yunhu.Send.To("user", user_id).Text("感谢关注!")

    # 启动适配器
    await sdk.adapter.startup()

    # 保持程序运行
    await asyncio.Event().wait()

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

消息发送示例

# 发送文本消息
await yunhu.Send.To("user", "user123").Text("Hello World!")

# 发送图片(需先读取为 bytes)
with open("image.png", "rb") as f:
    image_data = f.read()
await yunhu.Send.To("user", "user123").Image(image_data)

# 发送视频(需先读取为 bytes)
with open("video.mp4", "rb") as f:
    video_data = f.read()
await yunhu.Send.To("group", "group456").Video(video_data)

# 发送文件(需先读取为 bytes)
with open("file.txt", "rb") as f:
    file_data = f.read()
await yunhu.Send.To("group", "group456").File(file_data)

# 发送富文本 (HTML)
await yunhu.Send.To("group", "group456").Html("<b>加粗</b>消息")

# 发送 Markdown 格式消息
await yunhu.Send.To("user", "user123").Markdown("# 标题\n- 列表项")

# 批量发送消息 (过时的)
# 该方法批量发送文本/富文本消息时, 更推荐的方法是使用: 
#   Send.To('user'/'group', user_ids: list/group_ids: list).Text/Html/Markdown(message, buttons = None, parent_id = None)
await yunhu.Send.To("users", ["user1", "user2"]).Batch("批量通知")

# 编辑已有消息
# 可以在编辑时添加按钮
# Send.To('user'/'group', user_ids: list/group_ids: list).Edit(message_id, message, buttons = None)
await yunhu.Send.To("user", "user123").Edit("msg_abc123", "修改后的内容")

# 撤回消息
await yunhu.Send.To("group", "group456").Recall("msg_abc123")

# 流式消息传输
async def stream_generator():
    for i in range(5):
        yield f"这是第 {i+1} 段内容\n".encode("utf-8")
        await asyncio.sleep(1)

await yunhu.Send.To("user", "user123").Stream("text", stream_generator())

Text/Html/Markdown 的发送支持使用list传入多个id进行批量发送 | 而不再推荐使用 await yunhu.Send.To("users", ["user1", "user2"]).Batch("批量通知")


配置说明

在 env.py 中进行如下配置:

sdk.env.set("YunhuAdapter", {
    "token": "your_bot_token",
    "mode": "server",  # 可选: server 或 polling
    "server": {
        "host": "0.0.0.0",      # Webhook 监听地址
        "port": 25888,          # Webhook 监听端口
        "path": "/yunhu/webhook" # Webhook 路径
    },
    "polling": {
        "url": "https://sse.bot.anran.xyz/sse"  # SSE 轮询地址(polling 模式)
    }
})

公告看板管理

# 发布全局公告
await yunhu.Send.To("user", "user123").Board("global", "重要公告", expire_time=86400)

# 发布群组公告
await yunhu.Send.To("user", "user123").Board("local", "指定用户看板")

# 撤销公告
await yunhu.Send.To("user", "user123").DismissBoard("local" / "global")

新特性

2.6.0

File/Image/Video 已支持流式上传模式
async def generate_file():
    with open('large_file.mp4', 'rb') as f:
        while chunk := f.read(1024*1024):
            yield chunk
            await asyncio.sleep(0.1)

await yunhu.Send.Video(generate_file(), stream=True)

2.7.0

编辑消息支持传入按钮 上传文件时可以传入文件名(包括流式)

2.8.0

添加 ErisPulse 2.0.0 对于OneBot12协议对转的兼容

参数说明

参数 类型 说明
file bytes/AsyncGenerator 文件内容或异步生成器
stream bool 是否使用流式模式(默认False)
parent_id str 父消息ID(可选)

注意事项:

  1. 确保在调用 startup() 前完成所有处理器的注册
  2. 生产环境建议配置服务器反向代理指向 webhook 地址以实现 HTTPS
  3. 二进制内容(图片/视频等)需以 bytes 形式传入
  4. 程序退出时请调用 shutdown() 确保资源释放
  5. 指令事件中的 commandId 是唯一标识符,可用于区分不同的指令
  6. 官方事件数据结构需通过 data["event"] 访问

参考链接

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

erispulse_yunhuadapter-3.0.0.tar.gz (15.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

erispulse_yunhuadapter-3.0.0-py3-none-any.whl (14.3 kB view details)

Uploaded Python 3

File details

Details for the file erispulse_yunhuadapter-3.0.0.tar.gz.

File metadata

  • Download URL: erispulse_yunhuadapter-3.0.0.tar.gz
  • Upload date:
  • Size: 15.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for erispulse_yunhuadapter-3.0.0.tar.gz
Algorithm Hash digest
SHA256 6e0ce2dadb33f2b0de130f19468e0df0a26f04ac3331fce5cc5dcf8152abee31
MD5 bf551bef35218b69de46b55bb2f273e8
BLAKE2b-256 bd971d4bd831b1a98118c95b53dea823618a08f9a00abff2dd63296376e4577f

See more details on using hashes here.

Provenance

The following attestation bundles were made for erispulse_yunhuadapter-3.0.0.tar.gz:

Publisher: python-publish.yml on ErisPulse/ErisPulse-YunhuAdapter

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file erispulse_yunhuadapter-3.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for erispulse_yunhuadapter-3.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ea774575db33a3149ef234cb3e9e27f6671e281519525a671f951f53ffd75416
MD5 cc03a9e9673af450524375d3ef92692b
BLAKE2b-256 73bc9c28e9ad222c86d7fec53a038beb1325096972327772d5e83ddac3ddbac3

See more details on using hashes here.

Provenance

The following attestation bundles were made for erispulse_yunhuadapter-3.0.0-py3-none-any.whl:

Publisher: python-publish.yml on ErisPulse/ErisPulse-YunhuAdapter

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page