Skip to main content

基于 asyncio 的轻量级事件总线,支持发布/订阅、正则匹配、背压控制与优雅停机

Project description

InfinityBus — 异步事件总线

Test Python License

基于 asyncio 的轻量级事件总线,实现发布/订阅模式,用于在异步应用中解耦组件间的通信。

✨ 特性

  • 强类型负载校验 — 基于 Pydantic,发布时自动校验数据类型与结构
  • 正则表达式订阅 — 支持灵活的事件名匹配规则
  • 背压控制 — 队列大小与并发信号量双重限流,防止过载
  • 超时保护 — 每个处理器可独立设置超时,避免单任务阻塞总线
  • 错误隔离 — 单个处理器异常不影响其他处理器,错误通过内置事件统一上报
  • 优雅停机 — 保证停止过程中已入队事件被完整处理,避免数据丢失
  • 可观测性 — 提供活跃任务数、队列长度等监控指标

📦 安装

pip install git+https://github.com/yinbailiang/infinity_bus.git

或从源码安装:

git clone https://github.com/yinbailiang/infinity_bus.git
cd event_bus
pip install -e ".[test]"

🚀 快速开始

基础发布/订阅

import asyncio
from pydantic import BaseModel
from event_bus import (
    EventBus, EventDeclaration, EventHandler,
    EventRegistry, EventHandlerRegistry,
)

# 1. 定义负载
class MyPayload(BaseModel):
    message: str

# 2. 声明事件
class MyEvent(EventDeclaration):
    name = "my.event"
    payload_type = MyPayload

# 3. 实现处理器
class MyHandler(EventHandler):
    def __init__(self):
        super().__init__(subscriptions=["my.event"])

    async def handle(self, payload, bus_proxy, raw_event):
        print(f"Received: {payload.message}")

# 4. 组装并运行
async def main():
    reg = EventRegistry()
    reg.register(MyEvent)
    h_reg = EventHandlerRegistry()
    h_reg.register(MyHandler())

    async with EventBus(reg, h_reg) as bus:
        await bus.proxy("cli").publish("my.event", {"message": "Hello, EventBus!"})
        await asyncio.sleep(1)  # 等待处理器输出

asyncio.run(main())

请求-响应模式

from event_bus.templates.request import request, RequestProtocol, ResponseProtocol

# 定义请求/响应负载与事件(详见文档)
# ...

resp = await request(
    bus_proxy=proxy,
    req_event="user.get.request",
    req_data={"user_id": 123},
    resp_event="user.get.response",
    timeout=10.0,
)
resp.raise_if_failed()

🧱 架构

组件 职责
Event 运行时事件实例,含名称、负载、处理链追踪
EventDeclaration 事件类型元数据声明(名称 + 可选 Pydantic 负载模型)
EventRegistry 集中管理已注册的事件声明,发布时校验
EventHandler 处理器基类,实现 handle 方法定义业务逻辑
EventHandlerRegistry 管理处理器实例,按事件名匹配处理器列表
EventBus 事件分发中枢:任务队列、并发控制、错误上报、生命周期

📚 文档

🧪 测试

# 运行全部测试
pytest --cov=src -v

# 仅运行核心测试
pytest tests/event_bus_test.py -v

# 仅运行模板测试
pytest tests/templates/ -v

📄 许可证

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

infinity_bus-1.3.1.tar.gz (24.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

infinity_bus-1.3.1-py3-none-any.whl (27.8 kB view details)

Uploaded Python 3

File details

Details for the file infinity_bus-1.3.1.tar.gz.

File metadata

  • Download URL: infinity_bus-1.3.1.tar.gz
  • Upload date:
  • Size: 24.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for infinity_bus-1.3.1.tar.gz
Algorithm Hash digest
SHA256 fecbb6bb54edb32a2ed93ca10c041d369a20b2c5e103a81e208e3692083d4e79
MD5 b306a095a07235456a297f16d676b1d6
BLAKE2b-256 a073c4d0fa7274c98825b5f1c01c6db806bf5eb8f434eaa7632dbd25591b8fce

See more details on using hashes here.

File details

Details for the file infinity_bus-1.3.1-py3-none-any.whl.

File metadata

  • Download URL: infinity_bus-1.3.1-py3-none-any.whl
  • Upload date:
  • Size: 27.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for infinity_bus-1.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 d56e25369c5deb3de30346e3a053d1a8b238e0999dde18a8811ec4446a889c69
MD5 fe6d614296a65fc77ff93622b0a22590
BLAKE2b-256 4ba7835a2c57f25853c837bb6f7a5ab02370299cd8f07e048abb111c323bf6a3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page