Skip to main content

基于 asyncio 的轻量级事件总线,支持发布/订阅、正则匹配、背压控制与优雅停机

Project description

InfinityBus — 异步事件总线

Test Python License

基于 asyncio 的轻量级事件总线,实现发布/订阅模式,用于在异步应用中解耦组件间的通信。

✨ 特性

  • 强类型负载校验 — 基于 Pydantic,发布时自动校验数据类型与结构
  • 正则表达式订阅 — 支持灵活的事件名匹配规则
  • 背压控制 — 队列大小与并发信号量双重限流,防止过载
  • 超时保护 — 每个处理器可独立设置超时,避免单任务阻塞总线
  • 错误隔离 — 单个处理器异常不影响其他处理器,错误通过内置事件统一上报
  • 优雅停机 — 保证停止过程中已入队事件被完整处理,避免数据丢失
  • 可观测性 — 提供活跃任务数、队列长度等监控指标

📦 安装

pip install infinity_bus

或从源码安装:

git clone https://github.com/yinbailiang/event_bus.git
cd event_bus
pip install -e ".[test]"

🚀 快速开始

基础发布/订阅

import asyncio
from pydantic import BaseModel
from event_bus import (
    EventBus, EventDeclaration, EventHandler,
    EventRegistry, EventHandlerRegistry,
)

# 1. 定义负载
class MyPayload(BaseModel):
    message: str

# 2. 声明事件
class MyEvent(EventDeclaration):
    name = "my.event"
    payload_type = MyPayload

# 3. 实现处理器
class MyHandler(EventHandler):
    def __init__(self):
        super().__init__(subscriptions=["my.event"])

    async def handle(self, payload, bus_proxy, raw_event):
        print(f"Received: {payload.message}")

# 4. 组装并运行
async def main():
    reg = EventRegistry()
    reg.register(MyEvent)
    h_reg = EventHandlerRegistry()
    h_reg.register(MyHandler())

    async with EventBus(reg, h_reg) as bus:
        await bus.proxy("cli").publish("my.event", {"message": "Hello, EventBus!"})
        await asyncio.sleep(1)  # 等待处理器输出

asyncio.run(main())

请求-响应模式

from event_bus.templates.request import request, RequestProtocol, ResponseProtocol

# 定义请求/响应负载与事件(详见文档)
# ...

resp = await request(
    bus_proxy=proxy,
    req_event="user.get.request",
    req_data={"user_id": 123},
    resp_event="user.get.response",
    timeout=10.0,
)
resp.raise_if_failed()

🧱 架构

组件 职责
Event 运行时事件实例,含名称、负载、处理链追踪
EventDeclaration 事件类型元数据声明(名称 + 可选 Pydantic 负载模型)
EventRegistry 集中管理已注册的事件声明,发布时校验
EventHandler 处理器基类,实现 handle 方法定义业务逻辑
EventHandlerRegistry 管理处理器实例,按事件名匹配处理器列表
EventBus 事件分发中枢:任务队列、并发控制、错误上报、生命周期

📚 文档

🧪 测试

# 运行全部测试
pytest --cov=src -v

# 仅运行核心测试
pytest tests/event_bus_test.py -v

# 仅运行模板测试
pytest tests/templates/ -v

📄 许可证

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

infinity_bus-1.3.2.tar.gz (24.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

infinity_bus-1.3.2-py3-none-any.whl (27.8 kB view details)

Uploaded Python 3

File details

Details for the file infinity_bus-1.3.2.tar.gz.

File metadata

  • Download URL: infinity_bus-1.3.2.tar.gz
  • Upload date:
  • Size: 24.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for infinity_bus-1.3.2.tar.gz
Algorithm Hash digest
SHA256 cbf5338553d5b31e7caf65795a17dcef617c046f53b13eabbe6eec477dcffe75
MD5 485473bc132e52216e8f7d0caa660353
BLAKE2b-256 367539799169e72b534d6da6c2925fdcaeffbd1b25581c7442b9fd52937b4c45

See more details on using hashes here.

File details

Details for the file infinity_bus-1.3.2-py3-none-any.whl.

File metadata

  • Download URL: infinity_bus-1.3.2-py3-none-any.whl
  • Upload date:
  • Size: 27.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for infinity_bus-1.3.2-py3-none-any.whl
Algorithm Hash digest
SHA256 76d5b25c904d27aa288cde4d3c233adfbbb072cbf5224bae53bcab92d3b5787d
MD5 83aa65424e6d6cf6266c73c2de577c79
BLAKE2b-256 1adcee5d792fbcba328f3ffaf9a2856bdcc3e28fbef99b352bab90cda3afc0b4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page