Skip to main content

基于 asyncio 的轻量级事件总线,支持发布/订阅、正则匹配、背压控制与优雅停机

Project description

InfinityBus — 异步事件总线

Test License PyPI Version PyPI Downloads Supported Python

基于 asyncio 的轻量级事件总线,实现发布/订阅模式,用于在异步应用中解耦组件间的通信。

✨ 特性

  • 强类型负载校验 — 基于 Pydantic,发布时自动校验数据类型与结构
  • 正则表达式订阅 — 支持灵活的事件名匹配规则
  • 背压控制 — 队列大小与并发信号量双重限流,防止过载
  • 超时保护 — 每个处理器可独立设置超时,避免单任务阻塞总线
  • 错误隔离 — 单个处理器异常不影响其他处理器,错误通过内置事件统一上报
  • 优雅停机 — 保证停止过程中已入队事件被完整处理,避免数据丢失
  • 可观测性 — 提供活跃任务数、队列长度等监控指标

📦 安装

pip install infinity_bus

或从源码安装:

git clone https://github.com/yinbailiang/event_bus.git
cd event_bus
pip install -e ".[test]"

🚀 快速开始

基础发布/订阅

import asyncio
from pydantic import BaseModel
from event_bus import (
    EventBus, EventDeclaration, EventHandler,
    EventRegistry, EventHandlerRegistry,
)

# 1. 定义负载
class MyPayload(BaseModel):
    message: str

# 2. 声明事件
class MyEvent(EventDeclaration):
    name = "my.event"
    payload_type = MyPayload

# 3. 实现处理器
class MyHandler(EventHandler):
    def __init__(self):
        super().__init__(subscriptions=["my.event"])

    async def handle(self, payload, bus_proxy, raw_event):
        print(f"Received: {payload.message}")

# 4. 组装并运行
async def main():
    reg = EventRegistry()
    reg.register(MyEvent)
    h_reg = EventHandlerRegistry()
    h_reg.register(MyHandler())

    async with EventBus(reg, h_reg) as bus:
        await bus.proxy("cli").publish("my.event", {"message": "Hello, EventBus!"})
        await asyncio.sleep(1)  # 等待处理器输出

asyncio.run(main())

请求-响应模式

import asyncio
from pydantic import BaseModel
from event_bus import (
    EventBus, EventDeclaration, EventHandler,
    EventRegistry, EventHandlerRegistry,
)
from event_bus.templates.request import (
    request, RequestProtocol, ResponseProtocol,
)

# 1. 定义请求/响应负载
class GetUserRequest(RequestProtocol):
    user_id: int

class GetUserResponse(ResponseProtocol):
    user_name: str
    email: str

# 2. 声明事件
class GetUserRequestEvent(EventDeclaration):
    name = "user.get.request"
    payload_type = GetUserRequest

class GetUserResponseEvent(EventDeclaration):
    name = "user.get.response"
    payload_type = GetUserResponse

# 3. 实现服务端处理器
class GetUserHandler(EventHandler):
    def __init__(self):
        super().__init__(subscriptions=["user.get.request"])

    async def handle(self, payload, bus_proxy, raw_event):
        if not isinstance(payload, GetUserRequest):
            return
        resp = GetUserResponse(
            session_id=payload.session_id,
            request_id=payload.request_id,
            success=True,
            user_name="Alice",
            email="alice@example.com",
        )
        await bus_proxy.publish("user.get.response", resp)

# 4. 组装并运行
async def main():
    reg = EventRegistry()
    reg.register(GetUserRequestEvent)
    reg.register(GetUserResponseEvent)
    h_reg = EventHandlerRegistry()
    h_reg.register(GetUserHandler())

    async with EventBus(reg, h_reg) as bus:
        proxy = bus.proxy("cli")
        resp = await request(
            bus_proxy=proxy,
            req_event="user.get.request",
            req_data={"user_id": 123},
            resp_event="user.get.response",
            timeout=10.0,
        )
        resp.raise_if_failed()
        print(f"User: {resp.user_name} ({resp.email})")

asyncio.run(main())

🧱 架构

组件 职责
Event 运行时事件实例,含名称、负载、处理链追踪
EventDeclaration 事件类型元数据声明(名称 + 可选 Pydantic 负载模型)
EventRegistry 集中管理已注册的事件声明,发布时校验
EventHandler 处理器基类,实现 handle 方法定义业务逻辑
EventHandlerRegistry 管理处理器实例,按事件名匹配处理器列表
EventBus 事件分发中枢:任务队列、并发控制、错误上报、生命周期
Middleware 中间件基类,洋葱管道:before_publish / on_publish 双钩子
MiddlewareChain 责任链管理器,按序包裹发布流程
templates 高级模板:expect 监听、request RPC、pipe 管道、register 批量注册
middlewares 内置中间件:日志(JSONL+SQLite)、限流、转换、屏蔽、递归防护

📚 文档

文档 内容
核心总览 Event / EventDeclaration / EventHandler / EventBus / Middleware 核心概念
中间件系统 Middleware 基类、MiddlewareChain 洋葱管道
高级模板 expectrequestpiperegister 四大模板总览
内置中间件 日志、限流、转换、屏蔽、递归防护

🧪 测试

# 运行全部测试
pytest --cov=src -v

# 仅运行核心测试
pytest tests/event_bus_test.py -v

# 仅运行模板测试
pytest tests/templates/ -v

📄 许可证

MIT

此项目属于 InfinitySystem

icon

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

infinity_bus-1.3.5.tar.gz (27.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

infinity_bus-1.3.5-py3-none-any.whl (32.9 kB view details)

Uploaded Python 3

File details

Details for the file infinity_bus-1.3.5.tar.gz.

File metadata

  • Download URL: infinity_bus-1.3.5.tar.gz
  • Upload date:
  • Size: 27.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for infinity_bus-1.3.5.tar.gz
Algorithm Hash digest
SHA256 b3139599d34cebb0ca702079ad7cb8416b1ef6c055d776b09057162d45b98a1b
MD5 7d28b8aaa66641f2907d005ea267e642
BLAKE2b-256 1529875321ba51d7e0cd1dd5e6437862be1324647ce2fd24bd0c8d0388b0fe14

See more details on using hashes here.

File details

Details for the file infinity_bus-1.3.5-py3-none-any.whl.

File metadata

  • Download URL: infinity_bus-1.3.5-py3-none-any.whl
  • Upload date:
  • Size: 32.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for infinity_bus-1.3.5-py3-none-any.whl
Algorithm Hash digest
SHA256 f6b34cd637c9a3884276408658b0e242d6fb5b1480a02ddc44f90fbbe61355ad
MD5 f1318e00ae23188c4aa9cf2be8d01d36
BLAKE2b-256 2b32fdb9dc0b529c08253fc4c9b67470d474570df6aa2f85f7f717f190c25710

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page