Skip to main content

基于 asyncio 的轻量级事件总线,支持发布/订阅、正则匹配、背压控制与优雅停机

Project description

InfinityBus — 异步事件总线

Test Python License

基于 asyncio 的轻量级事件总线,实现发布/订阅模式,用于在异步应用中解耦组件间的通信。

✨ 特性

  • 强类型负载校验 — 基于 Pydantic,发布时自动校验数据类型与结构
  • 正则表达式订阅 — 支持灵活的事件名匹配规则
  • 背压控制 — 队列大小与并发信号量双重限流,防止过载
  • 超时保护 — 每个处理器可独立设置超时,避免单任务阻塞总线
  • 错误隔离 — 单个处理器异常不影响其他处理器,错误通过内置事件统一上报
  • 优雅停机 — 保证停止过程中已入队事件被完整处理,避免数据丢失
  • 可观测性 — 提供活跃任务数、队列长度等监控指标

📦 安装

pip install infinity_bus

或从源码安装:

git clone https://github.com/yinbailiang/event_bus.git
cd event_bus
pip install -e ".[test]"

🚀 快速开始

基础发布/订阅

import asyncio
from pydantic import BaseModel
from event_bus import (
    EventBus, EventDeclaration, EventHandler,
    EventRegistry, EventHandlerRegistry,
)

# 1. 定义负载
class MyPayload(BaseModel):
    message: str

# 2. 声明事件
class MyEvent(EventDeclaration):
    name = "my.event"
    payload_type = MyPayload

# 3. 实现处理器
class MyHandler(EventHandler):
    def __init__(self):
        super().__init__(subscriptions=["my.event"])

    async def handle(self, payload, bus_proxy, raw_event):
        print(f"Received: {payload.message}")

# 4. 组装并运行
async def main():
    reg = EventRegistry()
    reg.register(MyEvent)
    h_reg = EventHandlerRegistry()
    h_reg.register(MyHandler())

    async with EventBus(reg, h_reg) as bus:
        await bus.proxy("cli").publish("my.event", {"message": "Hello, EventBus!"})
        await asyncio.sleep(1)  # 等待处理器输出

asyncio.run(main())

请求-响应模式

from event_bus.templates.request import request, RequestProtocol, ResponseProtocol

# 定义请求/响应负载与事件(详见文档)
# ...

resp = await request(
    bus_proxy=proxy,
    req_event="user.get.request",
    req_data={"user_id": 123},
    resp_event="user.get.response",
    timeout=10.0,
)
resp.raise_if_failed()

🧱 架构

组件 职责
Event 运行时事件实例,含名称、负载、处理链追踪
EventDeclaration 事件类型元数据声明(名称 + 可选 Pydantic 负载模型)
EventRegistry 集中管理已注册的事件声明,发布时校验
EventHandler 处理器基类,实现 handle 方法定义业务逻辑
EventHandlerRegistry 管理处理器实例,按事件名匹配处理器列表
EventBus 事件分发中枢:任务队列、并发控制、错误上报、生命周期

📚 文档

🧪 测试

# 运行全部测试
pytest --cov=src -v

# 仅运行核心测试
pytest tests/event_bus_test.py -v

# 仅运行模板测试
pytest tests/templates/ -v

📄 许可证

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

infinity_bus-1.3.3.tar.gz (24.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

infinity_bus-1.3.3-py3-none-any.whl (27.8 kB view details)

Uploaded Python 3

File details

Details for the file infinity_bus-1.3.3.tar.gz.

File metadata

  • Download URL: infinity_bus-1.3.3.tar.gz
  • Upload date:
  • Size: 24.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for infinity_bus-1.3.3.tar.gz
Algorithm Hash digest
SHA256 7ed5ed8805f0256b775f51c72e5bb83a78b36741cfe2f046da6d396e308dd429
MD5 b60b2269a54d845a205aca903c5b20f5
BLAKE2b-256 79b5b0168c2d64194fbf64a9100c2f17f0702305725018f67af991c8f7c7f6ee

See more details on using hashes here.

File details

Details for the file infinity_bus-1.3.3-py3-none-any.whl.

File metadata

  • Download URL: infinity_bus-1.3.3-py3-none-any.whl
  • Upload date:
  • Size: 27.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for infinity_bus-1.3.3-py3-none-any.whl
Algorithm Hash digest
SHA256 2a3a1bcb7960fd2845b51594ca7f07f54faa7e8e94a774a1e5b4754e117cb15c
MD5 109498f4efa78036faaddb0eec53f2fb
BLAKE2b-256 6e7eaf6cd9ac39c19f8085a32063df06bd5d3ff706e9b2ca38f57e54bc1288c0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page