Skip to main content

基于 asyncio 的轻量级事件总线,支持发布/订阅、正则匹配、背压控制与优雅停机

Project description

InfinityBus — 异步事件总线

Test Coverage Pyright License PyPI Version Supported Python

强类型、可扩展的异步事件总线——中间件管道 + 高级模板。

📑 目录

✨ 特性

类别 能力
类型安全 Pydantic 负载校验 · pyright strict · 业务代码零 # type: ignore
灵活订阅 正则表达式匹配事件名 · 通配符处理器
中间件管道 洋葱模型 · before_publish / on_publish 双钩子 · 5 个内置中间件
高级模板 expect 一次性监听 · request RPC 调用 · pipe 双向管道 · register 批量注册
生产可靠 优雅停机 · 背压控制 · 超时保护 · 错误隔离 · 可观测性
工程纪律 90%+ 测试覆盖 · 85%+ docstring 覆盖 · pre-commit 自动门禁

和同类项目不同:InfinityBus 是可扩展的(中间件洋葱管道),而非把所有功能硬编码在核心类里。 详见 中间件系统

🔍 同类对比

特性 InfinityBus bubus pyee PyPubSub
异步原生 ✅ asyncio ✅ asyncio / anyio ✅ asyncio / trio ❌ 纯同步
类型安全 ✅ pyright strict · 业务零 type: ignore ⚠️ pyright strict · 业务 ~30 处遮蔽 ✅ mypy + pyright ❌ 无类型注解
负载校验 ✅ Pydantic 自动校验 ✅ Pydantic 自动校验 ❌ 无 ❌ 无(任意对象)
订阅方式 正则表达式 类名 + 通配符 * 字符串精确匹配 主题层级(a.b.c
中间件管道 ✅ 洋葱模型双钩子
高级模板 ✅ expect/request/pipe/register ⚠️ 仅 bus.expect()
事件返回值 ⚠️ 通过 request 模板 ✅ 内置 event.event_result()
事件转发 ✅ 内置 event.forward_to()
依赖数量 1 核心(pydantic) 6(anyio, aiofiles...) 0 0
优雅停机 ✅ 排空队列 + 等待活跃任务 ✅ wait_until_idle + 队列关闭 ⚠️ wait_for_complete + cancel N/A(同步)
超时保护 ✅ 每 handler 独立超时 ✅ event_result.timeout N/A
错误隔离 ✅ TaskErrorEvent 统一上报 ✅ 错误记录不中断总线 N/A
FIFO 处理 ✅ 全局锁保证 N/A
慢 handler 告警 ✅ 15s 超时告警 N/A
防递归 ✅ 中间件(可配置) ✅ 内置(不可配置) N/A
日志与审计 ✅ JSONL + SQLite(中间件) ✅ 内置 WAL 日志 ❌ 无内置 ⚠️ 调试追踪
测试覆盖 ✅ 94%(147 tests) ✅ 83%(138 tests) ✅ 94%(43 tests) ✅ 86%(167 tests)
Python 版本 3.12+ 3.11+ 3.12+ 3.7–3.14
基准版本 v1.3.6 0b7f50f v1.5.6 7c09342 v13.0.1 5157de2 v4.0.7 4ec2c47

选择 InfinityBus 的理由:你需要通过中间件扩展功能而非修改核心代码;你无法容忍生产代码中出现 # type: ignore;你想用最少的依赖获得完整的生产级特性(背压、优雅停机、正则订阅)。

选择 bubus 的理由:你需要事件自带返回值、事件转发这类开箱即用的功能;你需要支持 Python 3.11;你偏好 bus.on(EventClass, fn) 这种类驱动的 API 风格。

选择 pyee 的理由:你熟悉 Node.js EventEmitter 风格;你需要同时支持 asyncio 和 trio;你偏好极简 API(ee.on('event', fn));v13 起类型系统大幅增强,同时通过 mypy 和 pyright 检查。

选择 PyPubSub 的理由:你不使用 asyncio;你需要极宽的 Python 版本兼容(3.7–3.14);你偏好传统的主题层级字符串匹配。

完整 commit: · InfinityBus 0b7f50f · bubus 7c09342 · pyee 5157de2(官方 CI 无覆盖率,此处为手动 pytest-cov 测量) · PyPubSub 4ec2c47

📦 安装

推荐使用 uv —— 极速 Python 包管理器,比 pip 快 10–100 倍, 自动管理 venv、锁定依赖、解析冲突。无需单独安装虚拟环境工具。

pip

# 核心(仅发布/订阅、中间件管道)
pip install infinity_bus

# 核心 + 高级模板(expect、request、pipe、SQLite 日志等)
pip install infinity_bus[templates]

uv(推荐)

# 核心
uv add infinity_bus

# 核心 + 高级模板
uv add infinity_bus --extra templates

提示:高级模板(expectrequestpipeSQLiteLoggingMiddleware) 需要 [templates] extra。若未安装而使用这些功能,会收到 ImportError 提示。

或从源码安装:

git clone https://github.com/yinbailiang/event_bus.git
cd event_bus
uv sync --extra dev

🚀 快速开始

基础发布/订阅

import asyncio
from pydantic import BaseModel
from event_bus import (
    EventBus, EventDeclaration, EventHandler,
    EventRegistry, EventHandlerRegistry,
)

# 1. 定义负载
class MyPayload(BaseModel):
    message: str

# 2. 声明事件
class MyEvent(EventDeclaration):
    name = "my.event"
    payload_type = MyPayload

# 3. 实现处理器
class MyHandler(EventHandler):
    def __init__(self):
        super().__init__(subscriptions=["my.event"])

    async def handle(self, payload, bus_proxy, raw_event):
        print(f"Received: {payload.message}")

# 4. 组装并运行
async def main():
    reg = EventRegistry()
    reg.register(MyEvent)
    h_reg = EventHandlerRegistry()
    h_reg.register(MyHandler())

    async with EventBus(reg, h_reg) as bus:
        await bus.proxy("cli").publish("my.event", {"message": "Hello, EventBus!"})
        await asyncio.sleep(1)  # 等待处理器输出

asyncio.run(main())

请求-响应模式

使用 request / expect / pipe 等高级模板需安装:pip install infinity_bus[templates] uv 使用 uv add infinity_bus --extra templates

import asyncio
from pydantic import BaseModel
from event_bus import (
    EventBus, EventDeclaration, EventHandler,
    EventRegistry, EventHandlerRegistry,
)
from event_bus.templates.request import (
    request, RequestProtocol, ResponseProtocol,
)

# 1. 定义请求/响应负载
class GetUserRequest(RequestProtocol):
    user_id: int

class GetUserResponse(ResponseProtocol):
    user_name: str
    email: str

# 2. 声明事件
class GetUserRequestEvent(EventDeclaration):
    name = "user.get.request"
    payload_type = GetUserRequest

class GetUserResponseEvent(EventDeclaration):
    name = "user.get.response"
    payload_type = GetUserResponse

# 3. 实现服务端处理器
class GetUserHandler(EventHandler):
    def __init__(self):
        super().__init__(subscriptions=["user.get.request"])

    async def handle(self, payload, bus_proxy, raw_event):
        if not isinstance(payload, GetUserRequest):
            return
        resp = GetUserResponse(
            session_id=payload.session_id,
            request_id=payload.request_id,
            success=True,
            user_name="Alice",
            email="alice@example.com",
        )
        await bus_proxy.publish("user.get.response", resp)

# 4. 组装并运行
async def main():
    reg = EventRegistry()
    reg.register(GetUserRequestEvent)
    reg.register(GetUserResponseEvent)
    h_reg = EventHandlerRegistry()
    h_reg.register(GetUserHandler())

    async with EventBus(reg, h_reg) as bus:
        proxy = bus.proxy("cli")
        resp = await request(
            bus_proxy=proxy,
            req_event="user.get.request",
            req_data={"user_id": 123},
            resp_event="user.get.response",
            timeout=10.0,
        )
        resp.raise_if_failed()
        print(f"User: {resp.user_name} ({resp.email})")

asyncio.run(main())

🧱 架构

组件 职责
Event 运行时事件实例,含名称、负载、处理链追踪
EventDeclaration 事件类型元数据声明(名称 + 可选 Pydantic 负载模型)
EventRegistry 集中管理已注册的事件声明,发布时校验
EventHandler 处理器基类,实现 handle 方法定义业务逻辑
EventHandlerRegistry 管理处理器实例,按事件名匹配处理器列表
EventBus 事件分发中枢:任务队列、并发控制、错误上报、生命周期
Middleware 中间件基类,洋葱管道:before_publish / on_publish 双钩子
MiddlewareChain 责任链管理器,按序包裹发布流程
templates 高级模板:expect 监听、request RPC、pipe 管道、register 批量注册
middlewares 内置中间件:日志(JSONL+SQLite)、限流、转换、屏蔽、递归防护

📚 文档

文档 内容
核心总览 Event / EventDeclaration / EventHandler / EventBus / Middleware 核心概念
中间件系统 Middleware 基类、MiddlewareChain 洋葱管道
高级模板 expectrequestpiperegister 四大模板总览
内置中间件 日志、限流、转换、屏蔽、递归防护
工程质量 类型安全、测试覆盖、pre-commit 门禁、模块化规范

🧪 测试

pip

# 克隆并安装测试依赖
git clone https://github.com/yinbailiang/event_bus.git
cd event_bus
pip install -e ".[test]"

# 运行全部测试
pytest --cov=src -v

# 仅运行模板测试
pytest tests/templates/ -v

uv

# 克隆并同步测试依赖
git clone https://github.com/yinbailiang/event_bus.git
cd event_bus
uv sync --extra test

# 运行全部测试
uv run pytest --cov=src -v

# 仅运行模板测试
uv run pytest tests/templates/ -v

🧑‍💻 开发

pip

# 1. 克隆并安装全部开发依赖
git clone https://github.com/yinbailiang/event_bus.git
cd event_bus
pip install -e ".[dev]"

# 2. 安装 pre-commit 门禁
pre-commit install

uv(推荐)

# 1. 克隆并同步全部依赖(自动创建 venv)
git clone https://github.com/yinbailiang/event_bus.git
cd event_bus
uv sync --extra dev

# 2. 安装 pre-commit 门禁
uv run pre-commit install

开发循环

# lint + 格式化
ruff check src/ && ruff format src/ --check

# 类型检查
pyright src/

# 测试 + 覆盖率
pytest --cov=src -v

# 手动运行全部门禁
pre-commit run --all-files

使用 uv 时,命令前加 uv run 即可自动使用项目 venv,例如 uv run pytest --cov=src -v

工具 用途
uv 极速 Python 包管理器(替代 pip/venv)
ruff Lint + 格式化
pyright 严格类型检查
pytest + pytest-cov 测试 + 覆盖率
interrogate docstring 覆盖率
pre-commit 提交前自动门禁

📄 许可证

MIT

此项目属于 InfinitySystem

icon

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

infinity_bus-1.4.0.tar.gz (32.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

infinity_bus-1.4.0-py3-none-any.whl (35.6 kB view details)

Uploaded Python 3

File details

Details for the file infinity_bus-1.4.0.tar.gz.

File metadata

  • Download URL: infinity_bus-1.4.0.tar.gz
  • Upload date:
  • Size: 32.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for infinity_bus-1.4.0.tar.gz
Algorithm Hash digest
SHA256 7ba533b4db01159a1a6c1efa2ae2b79649334c6b3eedffd09cb4b9ca6a9ec864
MD5 47bfb812bd51140d01df1ad83d34815c
BLAKE2b-256 17b037e2054848b53114153be6ad3fa71db086b41c4a9acd1c0e2127df06d1b4

See more details on using hashes here.

File details

Details for the file infinity_bus-1.4.0-py3-none-any.whl.

File metadata

  • Download URL: infinity_bus-1.4.0-py3-none-any.whl
  • Upload date:
  • Size: 35.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for infinity_bus-1.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b260f15c264c01f2e553ac8e129d4b7760d29ddad52e2f4f3d7f7e74c411f0c2
MD5 2bb8a280a07e94d87e0ff49295f995aa
BLAKE2b-256 a9dfc092880fd7c6c0b82562b0f82a5ebd078d0bb1732797e7485139ba182398

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page