基于 asyncio 的轻量级事件总线,支持发布/订阅、正则匹配、背压控制与优雅停机
Project description
InfinityBus — 异步事件总线
强类型、可扩展的异步事件总线——中间件管道 + 高级模板。
✨ 特性
| 类别 | 能力 |
|---|---|
| 类型安全 | Pydantic 负载校验 · pyright strict · 业务代码零 # type: ignore |
| 灵活订阅 | 正则表达式匹配事件名 · 通配符处理器 |
| 中间件管道 | 洋葱模型 · before_publish / on_publish 双钩子 · 5 个内置中间件 |
| 高级模板 | expect 一次性监听 · request RPC 调用 · pipe 双向管道 · register 批量注册 |
| 生产可靠 | 优雅停机 · 背压控制 · 超时保护 · 错误隔离 · 可观测性 |
| 工程纪律 | 90%+ 测试覆盖 · 85%+ docstring 覆盖 · pre-commit 自动门禁 |
和同类项目不同:InfinityBus 是可扩展的(中间件洋葱管道),而非把所有功能硬编码在核心类里。 详见 中间件系统。
📦 安装
推荐使用 uv —— 极速 Python 包管理器,比 pip 快 10–100 倍, 自动管理 venv、锁定依赖、解析冲突。无需单独安装虚拟环境工具。
pip
# 核心(仅发布/订阅、中间件管道)
pip install infinity_bus
# 核心 + 高级模板(expect、request、pipe、SQLite 日志等)
pip install infinity_bus[templates]
uv(推荐)
# 核心
uv add infinity_bus
# 核心 + 高级模板
uv add infinity_bus --extra templates
提示:高级模板(
expect、request、pipe、SQLiteLoggingMiddleware) 需要[templates]extra。若未安装而使用这些功能,会收到ImportError提示。
或从源码安装:
git clone https://github.com/yinbailiang/event_bus.git
cd event_bus
uv sync --extra dev
🚀 快速开始
基础发布/订阅
import asyncio
from pydantic import BaseModel
from event_bus import (
EventBus, EventDeclaration, EventHandler,
EventRegistry, EventHandlerRegistry,
)
# 1. 定义负载
class MyPayload(BaseModel):
message: str
# 2. 声明事件
class MyEvent(EventDeclaration):
name = "my.event"
payload_type = MyPayload
# 3. 实现处理器
class MyHandler(EventHandler):
def __init__(self):
super().__init__(subscriptions=["my.event"])
async def handle(self, payload, bus_proxy, raw_event):
print(f"Received: {payload.message}")
# 4. 组装并运行
async def main():
reg = EventRegistry()
reg.register(MyEvent)
h_reg = EventHandlerRegistry()
h_reg.register(MyHandler())
async with EventBus(reg, h_reg) as bus:
await bus.proxy("cli").publish("my.event", {"message": "Hello, EventBus!"})
await asyncio.sleep(1) # 等待处理器输出
asyncio.run(main())
请求-响应模式
使用
request/expect/pipe等高级模板需安装:pip install infinity_bus[templates]uv 使用uv add infinity_bus --extra templates
import asyncio
from pydantic import BaseModel
from event_bus import (
EventBus, EventDeclaration, EventHandler,
EventRegistry, EventHandlerRegistry,
)
from event_bus.templates.request import (
request, RequestProtocol, ResponseProtocol,
)
# 1. 定义请求/响应负载
class GetUserRequest(RequestProtocol):
user_id: int
class GetUserResponse(ResponseProtocol):
user_name: str
email: str
# 2. 声明事件
class GetUserRequestEvent(EventDeclaration):
name = "user.get.request"
payload_type = GetUserRequest
class GetUserResponseEvent(EventDeclaration):
name = "user.get.response"
payload_type = GetUserResponse
# 3. 实现服务端处理器
class GetUserHandler(EventHandler):
def __init__(self):
super().__init__(subscriptions=["user.get.request"])
async def handle(self, payload, bus_proxy, raw_event):
if not isinstance(payload, GetUserRequest):
return
resp = GetUserResponse(
session_id=payload.session_id,
request_id=payload.request_id,
success=True,
user_name="Alice",
email="alice@example.com",
)
await bus_proxy.publish("user.get.response", resp)
# 4. 组装并运行
async def main():
reg = EventRegistry()
reg.register(GetUserRequestEvent)
reg.register(GetUserResponseEvent)
h_reg = EventHandlerRegistry()
h_reg.register(GetUserHandler())
async with EventBus(reg, h_reg) as bus:
proxy = bus.proxy("cli")
resp = await request(
bus_proxy=proxy,
req_event="user.get.request",
req_data={"user_id": 123},
resp_event="user.get.response",
timeout=10.0,
)
resp.raise_if_failed()
print(f"User: {resp.user_name} ({resp.email})")
asyncio.run(main())
🧱 架构
| 组件 | 职责 |
|---|---|
| Event | 运行时事件实例,含名称、负载、处理链追踪 |
| EventDeclaration | 事件类型元数据声明(名称 + 可选 Pydantic 负载模型) |
| EventRegistry | 集中管理已注册的事件声明,发布时校验 |
| EventHandler | 处理器基类,实现 handle 方法定义业务逻辑 |
| EventHandlerRegistry | 管理处理器实例,按事件名匹配处理器列表 |
| EventBus | 事件分发中枢:任务队列、并发控制、错误上报、生命周期 |
| Middleware | 中间件基类,洋葱管道:before_publish / on_publish 双钩子 |
| MiddlewareChain | 责任链管理器,按序包裹发布流程 |
| templates | 高级模板:expect 监听、request RPC、pipe 管道、register 批量注册 |
| middlewares | 内置中间件:日志(JSONL+SQLite)、限流、转换、屏蔽、递归防护 |
📚 文档
| 文档 | 内容 |
|---|---|
| 核心总览 | Event / EventDeclaration / EventHandler / EventBus / Middleware 核心概念 |
| 中间件系统 | Middleware 基类、MiddlewareChain 洋葱管道 |
| 高级模板 | expect、request、pipe、register 四大模板总览 |
| 内置中间件 | 日志、限流、转换、屏蔽、递归防护 |
| 工程质量 | 类型安全、测试覆盖、pre-commit 门禁、模块化规范 |
🧪 测试
pip
# 克隆并安装测试依赖
git clone https://github.com/yinbailiang/event_bus.git
cd event_bus
pip install -e ".[test]"
# 运行全部测试
pytest --cov=src -v
# 仅运行模板测试
pytest tests/templates/ -v
uv
# 克隆并同步测试依赖
git clone https://github.com/yinbailiang/event_bus.git
cd event_bus
uv sync --extra test
# 运行全部测试
uv run pytest --cov=src -v
# 仅运行模板测试
uv run pytest tests/templates/ -v
🧑💻 开发
pip
# 1. 克隆并安装全部开发依赖
git clone https://github.com/yinbailiang/event_bus.git
cd event_bus
pip install -e ".[dev]"
# 2. 安装 pre-commit 门禁
pre-commit install
uv(推荐)
# 1. 克隆并同步全部依赖(自动创建 venv)
git clone https://github.com/yinbailiang/event_bus.git
cd event_bus
uv sync --extra dev
# 2. 安装 pre-commit 门禁
uv run pre-commit install
开发循环
# lint + 格式化
ruff check src/ && ruff format src/ --check
# 类型检查
pyright src/
# 测试 + 覆盖率
pytest --cov=src -v
# 手动运行全部门禁
pre-commit run --all-files
使用 uv 时,命令前加
uv run即可自动使用项目 venv,例如uv run pytest --cov=src -v。
| 工具 | 用途 |
|---|---|
uv |
极速 Python 包管理器(替代 pip/venv) |
ruff |
Lint + 格式化 |
pyright |
严格类型检查 |
pytest + pytest-cov |
测试 + 覆盖率 |
interrogate |
docstring 覆盖率 |
pre-commit |
提交前自动门禁 |
📄 许可证
此项目属于 InfinitySystem
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file infinity_bus-1.3.6.tar.gz.
File metadata
- Download URL: infinity_bus-1.3.6.tar.gz
- Upload date:
- Size: 29.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8c99f1afc73b6d9b7568312599c29ca31d8330f8d48f34bbafa05e69b9dbc580
|
|
| MD5 |
6f03c92a59a190556bd4da526a5f90e8
|
|
| BLAKE2b-256 |
f8fd96253083f29a71e932a45a0a960779051e58065124250463b14e26610ccc
|
File details
Details for the file infinity_bus-1.3.6-py3-none-any.whl.
File metadata
- Download URL: infinity_bus-1.3.6-py3-none-any.whl
- Upload date:
- Size: 34.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2277887e75faad3cfa1a942d560de92ea0639225ac4244be2c819ef3140c4af9
|
|
| MD5 |
aef00f722742763203d53c41208e7ee3
|
|
| BLAKE2b-256 |
599323173cc9c6f30c09c8eaed8f5503ad25ed556eea002c7864d871290c462e
|