Skip to main content

基于 asyncio 的轻量级事件总线,支持发布/订阅、正则匹配、背压控制与优雅停机

Project description

InfinityBus — 异步事件总线

Test Coverage Pyright License PyPI Version Supported Python

强类型、可扩展的异步事件总线——中间件管道 + 高级模板。

✨ 特性

类别 能力
类型安全 Pydantic 负载校验 · pyright strict · 业务代码零 # type: ignore
灵活订阅 正则表达式匹配事件名 · 通配符处理器
中间件管道 洋葱模型 · before_publish / on_publish 双钩子 · 5 个内置中间件
高级模板 expect 一次性监听 · request RPC 调用 · pipe 双向管道 · register 批量注册
生产可靠 优雅停机 · 背压控制 · 超时保护 · 错误隔离 · 可观测性
工程纪律 90%+ 测试覆盖 · 85%+ docstring 覆盖 · pre-commit 自动门禁

和同类项目不同:InfinityBus 是可扩展的(中间件洋葱管道),而非把所有功能硬编码在核心类里。 详见 中间件系统

📦 安装

推荐使用 uv —— 极速 Python 包管理器,比 pip 快 10–100 倍, 自动管理 venv、锁定依赖、解析冲突。无需单独安装虚拟环境工具。

pip

# 核心(仅发布/订阅、中间件管道)
pip install infinity_bus

# 核心 + 高级模板(expect、request、pipe、SQLite 日志等)
pip install infinity_bus[templates]

uv(推荐)

# 核心
uv add infinity_bus

# 核心 + 高级模板
uv add infinity_bus --extra templates

提示:高级模板(expectrequestpipeSQLiteLoggingMiddleware) 需要 [templates] extra。若未安装而使用这些功能,会收到 ImportError 提示。

或从源码安装:

git clone https://github.com/yinbailiang/event_bus.git
cd event_bus
uv sync --extra dev

🚀 快速开始

基础发布/订阅

import asyncio
from pydantic import BaseModel
from event_bus import (
    EventBus, EventDeclaration, EventHandler,
    EventRegistry, EventHandlerRegistry,
)

# 1. 定义负载
class MyPayload(BaseModel):
    message: str

# 2. 声明事件
class MyEvent(EventDeclaration):
    name = "my.event"
    payload_type = MyPayload

# 3. 实现处理器
class MyHandler(EventHandler):
    def __init__(self):
        super().__init__(subscriptions=["my.event"])

    async def handle(self, payload, bus_proxy, raw_event):
        print(f"Received: {payload.message}")

# 4. 组装并运行
async def main():
    reg = EventRegistry()
    reg.register(MyEvent)
    h_reg = EventHandlerRegistry()
    h_reg.register(MyHandler())

    async with EventBus(reg, h_reg) as bus:
        await bus.proxy("cli").publish("my.event", {"message": "Hello, EventBus!"})
        await asyncio.sleep(1)  # 等待处理器输出

asyncio.run(main())

请求-响应模式

使用 request / expect / pipe 等高级模板需安装:pip install infinity_bus[templates] uv 使用 uv add infinity_bus --extra templates

import asyncio
from pydantic import BaseModel
from event_bus import (
    EventBus, EventDeclaration, EventHandler,
    EventRegistry, EventHandlerRegistry,
)
from event_bus.templates.request import (
    request, RequestProtocol, ResponseProtocol,
)

# 1. 定义请求/响应负载
class GetUserRequest(RequestProtocol):
    user_id: int

class GetUserResponse(ResponseProtocol):
    user_name: str
    email: str

# 2. 声明事件
class GetUserRequestEvent(EventDeclaration):
    name = "user.get.request"
    payload_type = GetUserRequest

class GetUserResponseEvent(EventDeclaration):
    name = "user.get.response"
    payload_type = GetUserResponse

# 3. 实现服务端处理器
class GetUserHandler(EventHandler):
    def __init__(self):
        super().__init__(subscriptions=["user.get.request"])

    async def handle(self, payload, bus_proxy, raw_event):
        if not isinstance(payload, GetUserRequest):
            return
        resp = GetUserResponse(
            session_id=payload.session_id,
            request_id=payload.request_id,
            success=True,
            user_name="Alice",
            email="alice@example.com",
        )
        await bus_proxy.publish("user.get.response", resp)

# 4. 组装并运行
async def main():
    reg = EventRegistry()
    reg.register(GetUserRequestEvent)
    reg.register(GetUserResponseEvent)
    h_reg = EventHandlerRegistry()
    h_reg.register(GetUserHandler())

    async with EventBus(reg, h_reg) as bus:
        proxy = bus.proxy("cli")
        resp = await request(
            bus_proxy=proxy,
            req_event="user.get.request",
            req_data={"user_id": 123},
            resp_event="user.get.response",
            timeout=10.0,
        )
        resp.raise_if_failed()
        print(f"User: {resp.user_name} ({resp.email})")

asyncio.run(main())

🧱 架构

组件 职责
Event 运行时事件实例,含名称、负载、处理链追踪
EventDeclaration 事件类型元数据声明(名称 + 可选 Pydantic 负载模型)
EventRegistry 集中管理已注册的事件声明,发布时校验
EventHandler 处理器基类,实现 handle 方法定义业务逻辑
EventHandlerRegistry 管理处理器实例,按事件名匹配处理器列表
EventBus 事件分发中枢:任务队列、并发控制、错误上报、生命周期
Middleware 中间件基类,洋葱管道:before_publish / on_publish 双钩子
MiddlewareChain 责任链管理器,按序包裹发布流程
templates 高级模板:expect 监听、request RPC、pipe 管道、register 批量注册
middlewares 内置中间件:日志(JSONL+SQLite)、限流、转换、屏蔽、递归防护

📚 文档

文档 内容
核心总览 Event / EventDeclaration / EventHandler / EventBus / Middleware 核心概念
中间件系统 Middleware 基类、MiddlewareChain 洋葱管道
高级模板 expectrequestpiperegister 四大模板总览
内置中间件 日志、限流、转换、屏蔽、递归防护
工程质量 类型安全、测试覆盖、pre-commit 门禁、模块化规范

🧪 测试

pip

# 克隆并安装测试依赖
git clone https://github.com/yinbailiang/event_bus.git
cd event_bus
pip install -e ".[test]"

# 运行全部测试
pytest --cov=src -v

# 仅运行模板测试
pytest tests/templates/ -v

uv

# 克隆并同步测试依赖
git clone https://github.com/yinbailiang/event_bus.git
cd event_bus
uv sync --extra test

# 运行全部测试
uv run pytest --cov=src -v

# 仅运行模板测试
uv run pytest tests/templates/ -v

🧑‍💻 开发

pip

# 1. 克隆并安装全部开发依赖
git clone https://github.com/yinbailiang/event_bus.git
cd event_bus
pip install -e ".[dev]"

# 2. 安装 pre-commit 门禁
pre-commit install

uv(推荐)

# 1. 克隆并同步全部依赖(自动创建 venv)
git clone https://github.com/yinbailiang/event_bus.git
cd event_bus
uv sync --extra dev

# 2. 安装 pre-commit 门禁
uv run pre-commit install

开发循环

# lint + 格式化
ruff check src/ && ruff format src/ --check

# 类型检查
pyright src/

# 测试 + 覆盖率
pytest --cov=src -v

# 手动运行全部门禁
pre-commit run --all-files

使用 uv 时,命令前加 uv run 即可自动使用项目 venv,例如 uv run pytest --cov=src -v

工具 用途
uv 极速 Python 包管理器(替代 pip/venv)
ruff Lint + 格式化
pyright 严格类型检查
pytest + pytest-cov 测试 + 覆盖率
interrogate docstring 覆盖率
pre-commit 提交前自动门禁

📄 许可证

MIT

此项目属于 InfinitySystem

icon

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

infinity_bus-1.3.6.tar.gz (29.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

infinity_bus-1.3.6-py3-none-any.whl (34.0 kB view details)

Uploaded Python 3

File details

Details for the file infinity_bus-1.3.6.tar.gz.

File metadata

  • Download URL: infinity_bus-1.3.6.tar.gz
  • Upload date:
  • Size: 29.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for infinity_bus-1.3.6.tar.gz
Algorithm Hash digest
SHA256 8c99f1afc73b6d9b7568312599c29ca31d8330f8d48f34bbafa05e69b9dbc580
MD5 6f03c92a59a190556bd4da526a5f90e8
BLAKE2b-256 f8fd96253083f29a71e932a45a0a960779051e58065124250463b14e26610ccc

See more details on using hashes here.

File details

Details for the file infinity_bus-1.3.6-py3-none-any.whl.

File metadata

  • Download URL: infinity_bus-1.3.6-py3-none-any.whl
  • Upload date:
  • Size: 34.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for infinity_bus-1.3.6-py3-none-any.whl
Algorithm Hash digest
SHA256 2277887e75faad3cfa1a942d560de92ea0639225ac4244be2c819ef3140c4af9
MD5 aef00f722742763203d53c41208e7ee3
BLAKE2b-256 599323173cc9c6f30c09c8eaed8f5503ad25ed556eea002c7864d871290c462e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page