Skip to main content

企业微信机器人通知组件,支持频率控制、长文本分段等功能

Project description

企业微信通知器 (WeComNotifier)

一个功能完善的企业微信机器人通知组件,专为 Python 项目设计,支持频率控制、长文本自动分段、多webhook并发管理等高级功能。

✨ 特性

  • 🚀 多webhook并发管理 - 每个webhook独立队列和频率控制,互不影响
  • 🌟 Webhook池负载均衡 - 突破单webhook频率限制,3个webhook可达60条/分钟,理论无上限
  • ⏱️ 双层频率控制 - 本地预防(20条/分钟)+ 服务端频控智能重试,确保消息必达
  • 🔐 跨程序频控保护 - 即使webhook被其他程序触发频控,也能自动等待并重试(最多5分钟)
  • ✂️ 长文本自动分段 - 超过4096字节自动分段,支持Markdown语法保护
  • 📝 三种消息格式 - 支持text、markdown_v2、image
  • 🎯 @all功能增强 - 为不支持@all的格式(markdown_v2、image)自动追加text消息
  • 🔄 同步/异步模式 - 灵活选择发送模式
  • 🛡️ 智能重试机制 - 网络错误(指数退避)和频率限制(固定65秒)分别处理
  • 📊 详细日志记录 - 完整的调试和错误日志

📦 安装

从PyPI安装(未来发布后)

pip install wecom-notifier

从源码安装(当前)

# 克隆仓库
git clone https://github.com/yourusername/wecom-notifier.git
cd wecom-notifier

# 安装依赖
pip install -r requirements.txt

# 安装包
pip install -e .

在其他项目中使用

# 方式1:通过相对路径安装(本地开发)
pip install -e /path/to/wecom-notifier

# 方式2:通过requirements.txt
# 在你的项目的requirements.txt中添加:
-e /path/to/wecom-notifier

🚀 快速开始

基础用法

from wecom_notifier import WeComNotifier

# 初始化通知器
notifier = WeComNotifier()

# 发送文本消息
result = notifier.send_text(
    webhook_url="https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=YOUR-KEY",
    content="Hello, 企业微信!"
)

# 检查结果
if result.is_success():
    print("发送成功")
else:
    print(f"发送失败: {result.error}")

发送Markdown消息

markdown_content = """# 项目部署通知

## 更新内容
- 新增用户导出功能
- 修复登录超时问题

| 测试项 | 结果 |
|--------|------|
| 单元测试 | ✅ 通过 |
| 集成测试 | ✅ 通过 |
"""

result = notifier.send_markdown(
    webhook_url=WEBHOOK_URL,
    content=markdown_content,
    mention_all=True,  # 自动追加@all
    async_send=False   # 同步等待
)

发送图片

# 通过文件路径
result = notifier.send_image(
    webhook_url=WEBHOOK_URL,
    image_path="report.png",
    mention_all=True
)

# 或通过base64
result = notifier.send_image(
    webhook_url=WEBHOOK_URL,
    image_base64="your-base64-string",
    mention_all=True
)

@特定用户

result = notifier.send_text(
    webhook_url=WEBHOOK_URL,
    content="紧急通知!",
    mentioned_list=["user1", "user2", "@all"],  # @指定用户或所有人
    mentioned_mobile_list=["13800138000"]  # 也可以通过手机号@
)

📖 高级用法

并发发送

notifier = WeComNotifier()

# 异步发送多条消息
results = []
for i in range(10):
    result = notifier.send_text(
        webhook_url=WEBHOOK_URL,
        content=f"消息 {i}",
        async_send=True  # 异步发送,立即返回
    )
    results.append(result)

# 等待所有消息发送完成
for result in results:
    result.wait()  # 阻塞直到完成
    print(f"状态: {'成功' if result.is_success() else '失败'}")

多Webhook管理

方式1:向不同群组发送

notifier = WeComNotifier()

webhooks = {
    "group1": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=KEY1",
    "group2": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=KEY2",
}

# 向不同群组发送消息(自动管理频率限制)
for name, url in webhooks.items():
    notifier.send_text(
        webhook_url=url,
        content=f"发送到 {name}"
    )

方式2:Webhook池 - 突破频率限制(新功能)

notifier = WeComNotifier()

# 在同一个群聊中添加多个机器人,获取多个webhook地址
webhook_pool = [
    "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=KEY1",
    "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=KEY2",
    "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=KEY3"
]

# 传入列表,系统自动负载均衡
result = notifier.send_text(
    webhook_url=webhook_pool,  # 传入列表
    content="批量数据推送..." * 100,
    async_send=False
)

# 查看使用情况
print(f"使用的webhooks: {len(result.used_webhooks)}")
print(f"分段数: {result.segment_count}")

性能提升

  • 单webhook:20条/分钟
  • 3个webhook池:60条/分钟
  • 10个webhook池:200条/分钟
  • 理论无上限(添加更多webhook)

自定义配置

notifier = WeComNotifier(
    max_retries=5,        # 最大重试次数
    retry_delay=3.0,      # 重试延迟(秒)
    log_level="DEBUG"     # 日志级别
)

超长文本处理

# 自动分段发送
long_text = "\n".join([f"第 {i} 行" for i in range(1000)])

result = notifier.send_text(
    webhook_url=WEBHOOK_URL,
    content=long_text,
    async_send=False
)
# 会自动分段,每段不超过4096字节,并添加"续上页"/"未完待续"提示

表格智能分段

# 对于超长Markdown表格,会保留表头分段
table_markdown = """
| 姓名 | 年龄 | 地址 |
|------|------|------|
""" + "\n".join([f"| 用户{i} | {20+i} | 城市{i} |" for i in range(100)])

result = notifier.send_markdown(
    webhook_url=WEBHOOK_URL,
    content=table_markdown
)
# 每个分段都会保留表头,并添加续页提示

🏗️ 架构设计

单Webhook模式

WeComNotifier (主类)
    ↓
WebhookManager (每个webhook一个实例)
    ↓
├── RateLimiter (频率控制:20条/分钟)
├── MessageSegmenter (智能分段)
└── Sender (HTTP发送 + 重试)

Webhook池模式(新架构)

WeComNotifier (主类)
    ↓
WebhookPool (webhook池管理器)
    ↓
├── WebhookResource 1
│   └── RateLimiter (独立频控:20条/分钟)
├── WebhookResource 2
│   └── RateLimiter (独立频控:20条/分钟)
├── WebhookResource 3
│   └── RateLimiter (独立频控:20条/分钟)
├── Scheduler (单线程调度器 - 保证顺序)
│   ├── 智能webhook选择(最空闲优先)
│   ├── 自动容错切换
│   └── 负载均衡
├── MessageSegmenter (智能分段)
└── Sender (HTTP发送 + 重试)

关键设计

  • 全局RateLimiter共享:同一webhook在单/池模式下共享频控
  • 单线程串行处理:严格保证消息顺序
  • 最空闲优先策略:自动选择配额最多的webhook
  • 自动容错恢复:失败webhook进入冷却期,过后自动恢复

核心特性说明

1. 频率控制(双层保护)

本组件采用双层频率控制机制,确保即使webhook被其他程序触发频控,消息也能最终送达:

本地预防性控制

  • 使用滑动窗口算法,默认限制20条/分钟
  • 每个webhook独立队列和限制
  • 自动阻塞等待,避免触发服务端限制

服务端频控智能重试

  • 如果收到企业微信的频控错误(45009),说明webhook可能被其他程序刷爆
  • 自动等待65秒后重试(足够让频控窗口过期)
  • 最多重试5次,总计等待约5分钟
  • 与网络错误重试分开处理(网络错误使用指数退避)

示例场景: 假设你的webhook被另一个监控程序每分钟发送20条消息,已达到频控上限。 当你的程序尝试发送消息时:

  1. 第一条消息会触发服务端频控(45009错误)
  2. 自动等待65秒(让频控窗口过期)
  3. 重试发送,成功
  4. 后续消息通过本地频控器,以20条/分钟的速率顺利发送

核心保证:只要webhook地址有效,消息一定会被送达(最多等待约5分钟)

2. 智能分段

  • 文本: 按行分割,尽量填满每段
  • Markdown:
    • 保护链接、图片、代码块语法
    • 表格分段保留表头
    • 添加"续上页"/"未完待续"提示

3. 消息顺序保证

  • 同一消息的分段连续发送
  • 不同消息按入队顺序处理
  • 多webhook互不影响

4. 错误处理

  • 网络错误:自动重试(指数退避)
  • Webhook无效:立即失败并返回错误
  • 分段失败:立即停止,避免不完整消息

📋 API 参考

WeComNotifier

初始化参数

WeComNotifier(
    max_retries=3,         # HTTP请求最大重试次数
    retry_delay=2.0,       # 重试延迟(秒)
    log_level="INFO",      # 日志级别: DEBUG/INFO/WARNING/ERROR
    logger=None            # 自定义日志记录器
)

send_text()

send_text(
    webhook_url: str,                    # Webhook地址
    content: str,                        # 文本内容
    mentioned_list: List[str] = None,    # @的用户ID列表
    mentioned_mobile_list: List[str] = None,  # @的手机号列表
    async_send: bool = True              # 是否异步发送
) -> SendResult

send_markdown()

send_markdown(
    webhook_url: str,      # Webhook地址
    content: str,          # Markdown内容
    mention_all: bool = False,  # 是否@所有人
    async_send: bool = True     # 是否异步发送
) -> SendResult

send_image()

send_image(
    webhook_url: str,              # Webhook地址
    image_path: str = None,        # 图片文件路径
    image_base64: str = None,      # 图片base64编码(二选一)
    mention_all: bool = False,     # 是否@所有人
    async_send: bool = True        # 是否异步发送
) -> SendResult

SendResult

result.message_id        # 消息ID
result.is_success()      # 是否成功
result.error             # 错误信息(如果失败)
result.wait(timeout)     # 等待发送完成(异步模式)

🔍 常见问题

Q: 如何在多个项目中共享此组件?

A: 有以下几种方式:

  1. 本地开发模式(推荐用于开发):

    pip install -e /path/to/wecom-notifier
    
  2. 发布到PyPI(推荐用于生产):

    # 构建
    python setup.py sdist bdist_wheel
    # 上传
    twine upload dist/*
    # 在其他项目中安装
    pip install wecom-notifier
    
  3. Git子模块

    git submodule add https://github.com/yourusername/wecom-notifier.git
    pip install -e ./wecom-notifier
    

Q: 如何更新其他项目中的此组件?

A:

  • 如果使用 pip install -e:组件代码自动同步
  • 如果从PyPI安装:pip install --upgrade wecom-notifier
  • 如果使用git子模块:git submodule update --remote

Q: 消息发送顺序会乱吗?

A: 不会。同一消息的分段保证连续发送,不会被其他消息插入。

Q: 如果超过20条/分钟会怎样?

A: 本地频率控制器会自动等待,确保不超过20条/分钟的速率。

Q: 如果webhook已经被其他程序刷爆了怎么办?

A: 组件会自动处理:

  1. 检测到服务端频控错误(45009)
  2. 等待65秒让频控窗口过期
  3. 自动重试(最多5次)
  4. 确保消息最终送达

核心设计理念:不管webhook之前是什么状态(即使被其他程序触发频控),只要调用本组件,消息就一定会成功发送(最多等待约5分钟)。

Q: 如何突破单webhook的频率限制?

A: 使用Webhook池功能(v2.0新增)

notifier = WeComNotifier()

# 在同一个群聊中添加多个机器人webhook
webhook_pool = [
    "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=KEY1",
    "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=KEY2",
    "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=KEY3"
]

# 传入列表即可,系统自动负载均衡
notifier.send_text(webhook_url=webhook_pool, content="消息")

性能提升

  • 1个webhook:20条/分钟
  • 3个webhook:60条/分钟
  • 10个webhook:200条/分钟

关键特性

  • ✅ 严格保证消息顺序
  • ✅ 智能负载均衡(最空闲优先)
  • ✅ 自动容错恢复
  • ✅ 完全向后兼容

Q: 可以创建多个 WeComNotifier 实例吗?

A: 技术上可以,但强烈不推荐针对同一个 webhook 创建多个实例。

推荐做法(单例模式)

# 在应用启动时创建一个全局实例
notifier = WeComNotifier()

# 在整个应用中复用这个实例
notifier.send_text(webhook_url, "消息1")
notifier.send_text(webhook_url, "消息2")

不推荐做法

# ❌ 每次都创建新实例
def send_msg():
    notifier = WeComNotifier()  # 会创建新的工作线程
    notifier.send_text(webhook_url, "消息")

原因

  • 每个实例会为每个 webhook 创建独立的工作线程和频控器
  • 多个实例无法协调频率限制,容易触发服务端频控
  • 造成资源浪费和消息顺序混乱

详见 USAGE_GUIDE.md 的最佳实践部分。

Q: 支持哪些Python版本?

A: Python 3.7+

🤝 贡献

欢迎提交Issue和Pull Request!

📄 许可证

MIT License

📞 联系方式

🙏 致谢

感谢企业微信开放平台提供的API文档。

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

wecom_notifier-0.1.3.tar.gz (35.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

wecom_notifier-0.1.3-py3-none-any.whl (29.4 kB view details)

Uploaded Python 3

File details

Details for the file wecom_notifier-0.1.3.tar.gz.

File metadata

  • Download URL: wecom_notifier-0.1.3.tar.gz
  • Upload date:
  • Size: 35.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.7

File hashes

Hashes for wecom_notifier-0.1.3.tar.gz
Algorithm Hash digest
SHA256 b59c191335b85e38d943f4a03748d6781d635a6a78ae0444d790fe420751ca18
MD5 e4ef98e4cf9ba8e244a063c87c39e1a3
BLAKE2b-256 9a89826f6e806b1bc5abae35551b9cbb76b88968535c140cc07561956a8be543

See more details on using hashes here.

File details

Details for the file wecom_notifier-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: wecom_notifier-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 29.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.7

File hashes

Hashes for wecom_notifier-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 1e30cc25a6064cc2eecf1f7b0c9a3d6ed28691ac4e8d5789371917268f694e2a
MD5 0859007997c8452a87f7bf91df57057b
BLAKE2b-256 6267a4196868f6912a9321388758e564820a64420f6097febc504255dffff113

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page