基于 asyncio 的轻量级事件总线,支持发布/订阅、正则匹配、背压控制与优雅停机
Project description
InfinityBus — 异步事件总线
强类型、可扩展的异步事件总线——中间件管道 + 高级模板。
📑 目录
✨ 特性
| 类别 | 能力 |
|---|---|
| 类型安全 | Pydantic 负载校验 · pyright strict · 业务代码零 # type: ignore |
| 灵活订阅 | 正则表达式匹配事件名 · 通配符处理器 |
| 中间件管道 | 洋葱模型 · before_publish / on_publish 双钩子 · 5 个内置中间件 |
| 高级模板 | expect 一次性监听 · request RPC 调用 · pipe 双向管道 · register 批量注册 |
| 生产可靠 | 优雅停机 · 背压控制 · 超时保护 · 错误隔离 · 可观测性 |
| 工程纪律 | 90%+ 测试覆盖 · 85%+ docstring 覆盖 · pre-commit 自动门禁 |
和同类项目不同:InfinityBus 是可扩展的(中间件洋葱管道),而非把所有功能硬编码在核心类里。 详见 中间件系统。
🔍 同类对比
| 特性 | InfinityBus | bubus | pyee | PyPubSub |
|---|---|---|---|---|
| 异步原生 | ✅ asyncio | ✅ asyncio / anyio | ✅ asyncio / trio | ❌ 纯同步 |
| 类型安全 | ✅ pyright strict · 业务零 type: ignore |
⚠️ pyright strict · 业务 ~30 处遮蔽 | ✅ mypy + pyright | ❌ 无类型注解 |
| 负载校验 | ✅ Pydantic 自动校验 | ✅ Pydantic 自动校验 | ❌ 无 | ❌ 无(任意对象) |
| 订阅方式 | 正则表达式 | 类名 + 通配符 * |
字符串精确匹配 | 主题层级(a.b.c) |
| 中间件管道 | ✅ 洋葱模型双钩子 | ❌ | ❌ | ❌ |
| 高级模板 | ✅ expect/request/pipe/register | ⚠️ 仅 bus.expect() |
❌ | ❌ |
| 事件返回值 | ⚠️ 通过 request 模板 |
✅ 内置 event.event_result() |
❌ | ❌ |
| 事件转发 | ❌ | ✅ 内置 event.forward_to() |
❌ | ❌ |
| 依赖数量 | 1 核心(pydantic) | 6(anyio, aiofiles...) | 0 | 0 |
| 优雅停机 | ✅ 排空队列 + 等待活跃任务 | ✅ wait_until_idle + 队列关闭 | ⚠️ wait_for_complete + cancel |
N/A(同步) |
| 超时保护 | ✅ 每 handler 独立超时 | ✅ event_result.timeout | ❌ | N/A |
| 错误隔离 | ✅ TaskErrorEvent 统一上报 | ✅ 错误记录不中断总线 | ❌ | N/A |
| FIFO 处理 | ❌ | ✅ 全局锁保证 | ❌ | N/A |
| 慢 handler 告警 | ❌ | ✅ 15s 超时告警 | ❌ | N/A |
| 防递归 | ✅ 中间件(可配置) | ✅ 内置(不可配置) | ❌ | N/A |
| 日志与审计 | ✅ JSONL + SQLite(中间件) | ✅ 内置 WAL 日志 | ❌ 无内置 | ⚠️ 调试追踪 |
| 测试覆盖 | ✅ 94%(147 tests) | ✅ 83%(138 tests) | ✅ 94%(43 tests) | ✅ 86%(167 tests) |
| Python 版本 | 3.12+ | 3.11+ | 3.12+ | 3.7–3.14 |
| 基准版本 | v1.3.6 0b7f50f |
v1.5.6 7c09342 |
v13.0.1 5157de2 |
v4.0.7 4ec2c47 |
选择 InfinityBus 的理由:你需要通过中间件扩展功能而非修改核心代码;你无法容忍生产代码中出现
# type: ignore;你想用最少的依赖获得完整的生产级特性(背压、优雅停机、正则订阅)。选择 bubus 的理由:你需要事件自带返回值、事件转发这类开箱即用的功能;你需要支持 Python 3.11;你偏好
bus.on(EventClass, fn)这种类驱动的 API 风格。选择 pyee 的理由:你熟悉 Node.js EventEmitter 风格;你需要同时支持 asyncio 和 trio;你偏好极简 API(
ee.on('event', fn));v13 起类型系统大幅增强,同时通过 mypy 和 pyright 检查。选择 PyPubSub 的理由:你不使用 asyncio;你需要极宽的 Python 版本兼容(3.7–3.14);你偏好传统的主题层级字符串匹配。
完整 commit: · InfinityBus
0b7f50f· bubus7c09342· pyee5157de2(官方 CI 无覆盖率,此处为手动pytest-cov测量) · PyPubSub4ec2c47
📦 安装
推荐使用 uv —— 极速 Python 包管理器,比 pip 快 10–100 倍, 自动管理 venv、锁定依赖、解析冲突。无需单独安装虚拟环境工具。
pip
# 核心(仅发布/订阅、中间件管道)
pip install infinity_bus
# 核心 + 高级模板(expect、request、pipe、SQLite 日志等)
pip install infinity_bus[templates]
uv(推荐)
# 核心
uv add infinity_bus
# 核心 + 高级模板
uv add infinity_bus --extra templates
提示:高级模板(
expect、request、pipe、SQLiteLoggingMiddleware) 需要[templates]extra。若未安装而使用这些功能,会收到ImportError提示。
或从源码安装:
git clone https://github.com/yinbailiang/event_bus.git
cd event_bus
uv sync --extra dev
🚀 快速开始
基础发布/订阅
import asyncio
from pydantic import BaseModel
from event_bus import (
EventBus, EventDeclaration, EventHandler,
EventRegistry, EventHandlerRegistry,
)
# 1. 定义负载
class MyPayload(BaseModel):
message: str
# 2. 声明事件
class MyEvent(EventDeclaration):
name = "my.event"
payload_type = MyPayload
# 3. 实现处理器
class MyHandler(EventHandler):
def __init__(self):
super().__init__(subscriptions=["my.event"])
async def handle(self, payload, bus_proxy, raw_event):
print(f"Received: {payload.message}")
# 4. 组装并运行
async def main():
reg = EventRegistry()
reg.register(MyEvent)
h_reg = EventHandlerRegistry()
h_reg.register(MyHandler())
async with EventBus(reg, h_reg) as bus:
await bus.proxy("cli").publish("my.event", {"message": "Hello, EventBus!"})
await asyncio.sleep(1) # 等待处理器输出
asyncio.run(main())
请求-响应模式
使用
request/expect/pipe等高级模板需安装:pip install infinity_bus[templates]uv 使用uv add infinity_bus --extra templates
import asyncio
from pydantic import BaseModel
from event_bus import (
EventBus, EventDeclaration, EventHandler,
EventRegistry, EventHandlerRegistry,
)
from event_bus.templates.request import (
request, RequestProtocol, ResponseProtocol,
)
# 1. 定义请求/响应负载
class GetUserRequest(RequestProtocol):
user_id: int
class GetUserResponse(ResponseProtocol):
user_name: str
email: str
# 2. 声明事件
class GetUserRequestEvent(EventDeclaration):
name = "user.get.request"
payload_type = GetUserRequest
class GetUserResponseEvent(EventDeclaration):
name = "user.get.response"
payload_type = GetUserResponse
# 3. 实现服务端处理器
class GetUserHandler(EventHandler):
def __init__(self):
super().__init__(subscriptions=["user.get.request"])
async def handle(self, payload, bus_proxy, raw_event):
if not isinstance(payload, GetUserRequest):
return
resp = GetUserResponse(
session_id=payload.session_id,
request_id=payload.request_id,
success=True,
user_name="Alice",
email="alice@example.com",
)
await bus_proxy.publish("user.get.response", resp)
# 4. 组装并运行
async def main():
reg = EventRegistry()
reg.register(GetUserRequestEvent)
reg.register(GetUserResponseEvent)
h_reg = EventHandlerRegistry()
h_reg.register(GetUserHandler())
async with EventBus(reg, h_reg) as bus:
proxy = bus.proxy("cli")
resp = await request(
bus_proxy=proxy,
req_event="user.get.request",
req_data={"user_id": 123},
resp_event="user.get.response",
timeout=10.0,
)
resp.raise_if_failed()
print(f"User: {resp.user_name} ({resp.email})")
asyncio.run(main())
🧱 架构
| 组件 | 职责 |
|---|---|
| Event | 运行时事件实例,含名称、负载、处理链追踪 |
| EventDeclaration | 事件类型元数据声明(名称 + 可选 Pydantic 负载模型) |
| EventRegistry | 集中管理已注册的事件声明,发布时校验 |
| EventHandler | 处理器基类,实现 handle 方法定义业务逻辑 |
| EventHandlerRegistry | 管理处理器实例,按事件名匹配处理器列表 |
| EventBus | 事件分发中枢:任务队列、并发控制、错误上报、生命周期 |
| Middleware | 中间件基类,洋葱管道:before_publish / on_publish 双钩子 |
| MiddlewareChain | 责任链管理器,按序包裹发布流程 |
| templates | 高级模板:expect 监听、request RPC、pipe 管道、register 批量注册 |
| middlewares | 内置中间件:日志(JSONL+SQLite)、限流、转换、屏蔽、递归防护 |
📚 文档
| 文档 | 内容 |
|---|---|
| 核心总览 | Event / EventDeclaration / EventHandler / EventBus / Middleware 核心概念 |
| 中间件系统 | Middleware 基类、MiddlewareChain 洋葱管道 |
| 高级模板 | expect、request、pipe、register 四大模板总览 |
| 内置中间件 | 日志、限流、转换、屏蔽、递归防护 |
| 工程质量 | 类型安全、测试覆盖、pre-commit 门禁、模块化规范 |
🧪 测试
pip
# 克隆并安装测试依赖
git clone https://github.com/yinbailiang/event_bus.git
cd event_bus
pip install -e ".[test]"
# 运行全部测试
pytest --cov=src -v
# 仅运行模板测试
pytest tests/templates/ -v
uv
# 克隆并同步测试依赖
git clone https://github.com/yinbailiang/event_bus.git
cd event_bus
uv sync --extra test
# 运行全部测试
uv run pytest --cov=src -v
# 仅运行模板测试
uv run pytest tests/templates/ -v
🧑💻 开发
pip
# 1. 克隆并安装全部开发依赖
git clone https://github.com/yinbailiang/event_bus.git
cd event_bus
pip install -e ".[dev]"
# 2. 安装 pre-commit 门禁
pre-commit install
uv(推荐)
# 1. 克隆并同步全部依赖(自动创建 venv)
git clone https://github.com/yinbailiang/event_bus.git
cd event_bus
uv sync --extra dev
# 2. 安装 pre-commit 门禁
uv run pre-commit install
开发循环
# lint + 格式化
ruff check src/ && ruff format src/ --check
# 类型检查
pyright src/
# 测试 + 覆盖率
pytest --cov=src -v
# 手动运行全部门禁
pre-commit run --all-files
使用 uv 时,命令前加
uv run即可自动使用项目 venv,例如uv run pytest --cov=src -v。
| 工具 | 用途 |
|---|---|
uv |
极速 Python 包管理器(替代 pip/venv) |
ruff |
Lint + 格式化 |
pyright |
严格类型检查 |
pytest + pytest-cov |
测试 + 覆盖率 |
interrogate |
docstring 覆盖率 |
pre-commit |
提交前自动门禁 |
📄 许可证
此项目属于 InfinitySystem
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file infinity_bus-1.4.0.tar.gz.
File metadata
- Download URL: infinity_bus-1.4.0.tar.gz
- Upload date:
- Size: 32.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7ba533b4db01159a1a6c1efa2ae2b79649334c6b3eedffd09cb4b9ca6a9ec864
|
|
| MD5 |
47bfb812bd51140d01df1ad83d34815c
|
|
| BLAKE2b-256 |
17b037e2054848b53114153be6ad3fa71db086b41c4a9acd1c0e2127df06d1b4
|
File details
Details for the file infinity_bus-1.4.0-py3-none-any.whl.
File metadata
- Download URL: infinity_bus-1.4.0-py3-none-any.whl
- Upload date:
- Size: 35.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b260f15c264c01f2e553ac8e129d4b7760d29ddad52e2f4f3d7f7e74c411f0c2
|
|
| MD5 |
2bb8a280a07e94d87e0ff49295f995aa
|
|
| BLAKE2b-256 |
a9dfc092880fd7c6c0b82562b0f82a5ebd078d0bb1732797e7485139ba182398
|