基于 asyncio 的轻量级事件总线,支持发布/订阅、正则匹配、背压控制与优雅停机
Project description
InfinityBus — 异步事件总线
基于 asyncio 的轻量级事件总线,实现发布/订阅模式,用于在异步应用中解耦组件间的通信。
✨ 特性
- 强类型负载校验 — 基于 Pydantic,发布时自动校验数据类型与结构
- 正则表达式订阅 — 支持灵活的事件名匹配规则
- 背压控制 — 队列大小与并发信号量双重限流,防止过载
- 超时保护 — 每个处理器可独立设置超时,避免单任务阻塞总线
- 错误隔离 — 单个处理器异常不影响其他处理器,错误通过内置事件统一上报
- 优雅停机 — 保证停止过程中已入队事件被完整处理,避免数据丢失
- 可观测性 — 提供活跃任务数、队列长度等监控指标
📦 安装
pip install infinity_bus
或从源码安装:
git clone https://github.com/yinbailiang/event_bus.git
cd event_bus
pip install -e ".[test]"
🚀 快速开始
基础发布/订阅
import asyncio
from pydantic import BaseModel
from event_bus import (
EventBus, EventDeclaration, EventHandler,
EventRegistry, EventHandlerRegistry,
)
# 1. 定义负载
class MyPayload(BaseModel):
message: str
# 2. 声明事件
class MyEvent(EventDeclaration):
name = "my.event"
payload_type = MyPayload
# 3. 实现处理器
class MyHandler(EventHandler):
def __init__(self):
super().__init__(subscriptions=["my.event"])
async def handle(self, payload, bus_proxy, raw_event):
print(f"Received: {payload.message}")
# 4. 组装并运行
async def main():
reg = EventRegistry()
reg.register(MyEvent)
h_reg = EventHandlerRegistry()
h_reg.register(MyHandler())
async with EventBus(reg, h_reg) as bus:
await bus.proxy("cli").publish("my.event", {"message": "Hello, EventBus!"})
await asyncio.sleep(1) # 等待处理器输出
asyncio.run(main())
请求-响应模式
from event_bus.templates.request import request, RequestProtocol, ResponseProtocol
# 定义请求/响应负载与事件(详见文档)
# ...
resp = await request(
bus_proxy=proxy,
req_event="user.get.request",
req_data={"user_id": 123},
resp_event="user.get.response",
timeout=10.0,
)
resp.raise_if_failed()
🧱 架构
| 组件 | 职责 |
|---|---|
| Event | 运行时事件实例,含名称、负载、处理链追踪 |
| EventDeclaration | 事件类型元数据声明(名称 + 可选 Pydantic 负载模型) |
| EventRegistry | 集中管理已注册的事件声明,发布时校验 |
| EventHandler | 处理器基类,实现 handle 方法定义业务逻辑 |
| EventHandlerRegistry | 管理处理器实例,按事件名匹配处理器列表 |
| EventBus | 事件分发中枢:任务队列、并发控制、错误上报、生命周期 |
📚 文档
🧪 测试
# 运行全部测试
pytest --cov=src -v
# 仅运行核心测试
pytest tests/event_bus_test.py -v
# 仅运行模板测试
pytest tests/templates/ -v
📄 许可证
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
infinity_bus-1.3.2.tar.gz
(24.7 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file infinity_bus-1.3.2.tar.gz.
File metadata
- Download URL: infinity_bus-1.3.2.tar.gz
- Upload date:
- Size: 24.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cbf5338553d5b31e7caf65795a17dcef617c046f53b13eabbe6eec477dcffe75
|
|
| MD5 |
485473bc132e52216e8f7d0caa660353
|
|
| BLAKE2b-256 |
367539799169e72b534d6da6c2925fdcaeffbd1b25581c7442b9fd52937b4c45
|
File details
Details for the file infinity_bus-1.3.2-py3-none-any.whl.
File metadata
- Download URL: infinity_bus-1.3.2-py3-none-any.whl
- Upload date:
- Size: 27.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
76d5b25c904d27aa288cde4d3c233adfbbb072cbf5224bae53bcab92d3b5787d
|
|
| MD5 |
83aa65424e6d6cf6266c73c2de577c79
|
|
| BLAKE2b-256 |
1adcee5d792fbcba328f3ffaf9a2856bdcc3e28fbef99b352bab90cda3afc0b4
|