Skip to main content

High-availability event-driven extension for FastAPI

Project description

fastapi-eventbus

高可用事件驱动 pub/sub 扩展,专为 FastAPI 设计。

Python License Status

特性

  • 可插拔后端:内存同步、Redis、Kafka
  • 装饰器注册事件处理器(@on_event
  • FastAPI 依赖注入集成
  • 生命周期感知的启动/关闭
  • 指数退避重试 + 死信队列(DLQ)
  • 请求级 Correlation ID 中间件
  • 后端健康检查
  • 可插拔 Metrics 采集(支持对接 Prometheus/StatsD/Datadog)
  • 可插拔 Tracing(支持对接 OpenTelemetry/Jaeger)
  • Kafka Consumer Rebalance 安全处理
  • 完整类型标注(PEP 561)

安装

pip install fastapi-eventbus

# Redis 支持
pip install fastapi-eventbus[redis]

# Kafka 支持
pip install fastapi-eventbus[kafka]

# 全部安装
pip install fastapi-eventbus[all]

# 开发环境
pip install -e ".[all,dev]"

快速开始

from fastapi import FastAPI, Depends
from fastapi_eventbus import (
    BaseEvent, EventBus, EventBusMiddleware,
    create_lifespan, get_event_bus, on_event,
)


# 1. 定义事件
class UserCreated(BaseEvent):
    topic: str = "user.created"
    user_id: int
    username: str


# 2. 注册处理器
@on_event("user.created")
async def handle_user_created(event: BaseEvent) -> None:
    print(f"新用户: {event}")


# 3. 创建应用
bus = EventBus()
app = FastAPI(lifespan=create_lifespan(bus))
app.add_middleware(EventBusMiddleware)


# 4. 发布事件
@app.post("/users")
async def create_user(bus: EventBus = Depends(get_event_bus)):
    event = UserCreated(user_id=1, username="alice")
    await bus.publish(event)
    return {"status": "ok"}

配置

通过环境变量配置(前缀 EVENTBUS_):

变量 默认值 说明
EVENTBUS_BACKEND sync 后端类型:sync / redis / kafka
EVENTBUS_MAX_RETRIES 3 最大重试次数
EVENTBUS_RETRY_DELAY 1.0 初始重试延迟(秒)
EVENTBUS_RETRY_BACKOFF_FACTOR 2.0 退避倍数
EVENTBUS_RETRY_MAX_DELAY 60.0 最大重试延迟(秒)
EVENTBUS_DLQ_ENABLED false 启用死信队列
EVENTBUS_DLQ_TOPIC_SUFFIX .dlq DLQ topic 后缀
EVENTBUS_REDIS_URL redis://localhost:6379/0 Redis 连接地址
EVENTBUS_REDIS_MAX_CONNECTIONS 10 Redis 最大连接数
EVENTBUS_KAFKA_BOOTSTRAP_SERVERS localhost:9092 Kafka broker 地址
EVENTBUS_KAFKA_GROUP_ID fastapi-eventbus Kafka 消费组 ID

也可以通过代码直接配置:

from fastapi_eventbus.config import EventBusSettings

settings = EventBusSettings(
    backend="redis",
    redis_url="redis://my-redis:6379/1",
    max_retries=5,
    dlq_enabled=True,
)
bus = EventBus(settings=settings)

后端

内存同步后端(默认)

零依赖,适合开发和测试:

bus = EventBus()  # 默认使用 sync 后端

Redis 后端

pip install fastapi-eventbus[redis]
# 通过环境变量
# EVENTBUS_BACKEND=redis
# EVENTBUS_REDIS_URL=redis://localhost:6379/0

# 或代码配置
from fastapi_eventbus.backends.redis import RedisBackend

backend = RedisBackend(url="redis://localhost:6379/0")
bus = EventBus(backend=backend)

Kafka 后端

pip install fastapi-eventbus[kafka]
from fastapi_eventbus.backends.kafka import KafkaBackend

backend = KafkaBackend(
    bootstrap_servers="localhost:9092",
    group_id="my-service",
    auto_offset_reset="earliest",
    session_timeout_ms=30000,
    enable_auto_commit=False,  # 推荐:手动提交 offset
)
bus = EventBus(backend=backend)

Kafka 后端特性:

  • Rebalance 安全:rebalance 期间自动暂停事件处理,完成后恢复
  • 手动 offset 提交:每条消息处理成功后才提交,保证 at-least-once 语义
  • 异常自动重连:consumer 异常时 5 秒退避后重连
  • 批量发布:publish_many() 使用 batch send + flush

生命周期管理

简单模式

bus = EventBus()
app = FastAPI(lifespan=create_lifespan(bus))

组合模式

与你自己的 lifespan 逻辑组合:

from contextlib import asynccontextmanager
from fastapi_eventbus import EventBus, eventbus_lifespan

bus = EventBus()

@asynccontextmanager
async def lifespan(app):
    # 你的启动逻辑
    async with eventbus_lifespan(bus, app):
        yield
    # 你的关闭逻辑

app = FastAPI(lifespan=lifespan)

事件处理器

装饰器注册

from fastapi_eventbus import on_event, BaseEvent

@on_event("order.created")
async def handle_order(event: BaseEvent) -> None:
    print(f"订单创建: {event.event_id}")

# 同步处理器也支持(会自动在线程池中执行)
@on_event("order.created")
def sync_handler(event: BaseEvent) -> None:
    print("同步处理")

运行时动态注册

async def my_handler(event: BaseEvent) -> None:
    pass

await bus.registry.register("some.topic", my_handler)
await bus.subscribe("some.topic")

重试与死信队列

from fastapi_eventbus import EventBus, RetryPolicy

policy = RetryPolicy(
    max_retries=5,
    delay=0.5,
    backoff_factor=2.0,
    max_delay=30.0,
)

bus = EventBus(
    retry_policy=policy,
    settings=EventBusSettings(dlq_enabled=True),
)

# 查看 DLQ 中的失败事件
failed = bus.dlq.peek(limit=20)
for f in failed:
    print(f"事件 {f.event.event_id} 失败 {f.attempts} 次: {f.error}")

Metrics 采集

内置可插拔的 metrics 采集协议,零开销默认关闭。

内存采集器(测试/调试)

from fastapi_eventbus import EventBus, InMemoryMetricsCollector

metrics = InMemoryMetricsCollector()
bus = EventBus(metrics=metrics)

# 使用后查看指标
print(metrics.snapshot())
# {'published': {'user.created': 10}, 'dispatched': {'user.created': 10}, ...}

自定义采集器(对接 Prometheus 等)

实现 MetricsCollector 协议即可:

from fastapi_eventbus import MetricsCollector

class PrometheusCollector:
    def __init__(self):
        from prometheus_client import Counter, Histogram
        self.published = Counter("eventbus_published_total", "Events published", ["topic"])
        self.dispatched = Counter("eventbus_dispatched_total", "Events dispatched", ["topic"])
        self.errors = Counter("eventbus_errors_total", "Dispatch errors", ["topic"])
        self.retries = Counter("eventbus_retries_total", "Retries", ["topic"])
        self.dlq = Counter("eventbus_dlq_total", "DLQ events", ["topic"])
        self.pub_duration = Histogram("eventbus_publish_seconds", "Publish duration", ["topic"])
        self.disp_duration = Histogram("eventbus_dispatch_seconds", "Dispatch duration", ["topic"])

    def inc_published(self, topic: str) -> None:
        self.published.labels(topic=topic).inc()

    def inc_dispatched(self, topic: str) -> None:
        self.dispatched.labels(topic=topic).inc()

    def inc_dispatch_error(self, topic: str) -> None:
        self.errors.labels(topic=topic).inc()

    def inc_retry(self, topic: str, attempt: int) -> None:
        self.retries.labels(topic=topic).inc()

    def inc_dlq(self, topic: str) -> None:
        self.dlq.labels(topic=topic).inc()

    def observe_publish_duration(self, topic: str, duration: float) -> None:
        self.pub_duration.labels(topic=topic).observe(duration)

    def observe_dispatch_duration(self, topic: str, duration: float) -> None:
        self.disp_duration.labels(topic=topic).observe(duration)

bus = EventBus(metrics=PrometheusCollector())

采集的指标

指标 说明
inc_published 事件发布计数(按 topic)
inc_dispatched 事件成功分发计数
inc_dispatch_error 分发错误计数
inc_retry 重试计数(含 attempt 序号)
inc_dlq 进入死信队列计数
observe_publish_duration 发布耗时
observe_dispatch_duration 分发耗时

Tracing 追踪

内置轻量级 span 追踪,可对接 OpenTelemetry/Jaeger/Zipkin。

LoggingTracer(开发调试)

from fastapi_eventbus import EventBus, LoggingTracer

tracer = LoggingTracer()
bus = EventBus(tracer=tracer)

# 每次 publish/dispatch 都会自动创建 span 并输出日志:
# SPAN START publish [a1b2c3d4] topic=user.created event=xxx
# SPAN END publish [a1b2c3d4] status=ok duration=1.23ms

# 查看所有 span
for span in tracer.spans:
    print(span.to_dict())

自定义 Tracer(对接 OpenTelemetry)

from fastapi_eventbus import EventTracer, EventSpan
from opentelemetry import trace

otel_tracer = trace.get_tracer("fastapi-eventbus")

class OTelTracer:
    def start_span(self, operation, topic="", event_id="", parent_span=None):
        otel_span = otel_tracer.start_span(
            operation,
            attributes={"topic": topic, "event_id": event_id},
        )
        span = EventSpan(operation=operation, topic=topic, event_id=event_id)
        span.attributes["otel_span"] = str(id(otel_span))
        self._otel_spans = getattr(self, "_otel_spans", {})
        self._otel_spans[span.span_id] = otel_span
        return span

    def finish_span(self, span):
        otel_span = getattr(self, "_otel_spans", {}).pop(span.span_id, None)
        if otel_span:
            if span.status.value == "error":
                otel_span.set_status(trace.StatusCode.ERROR, span.error_message)
            otel_span.end()

bus = EventBus(tracer=OTelTracer())

Span 数据结构

每个 span 包含:

{
    "span_id": "a1b2c3d4e5f6g7h8",
    "trace_id": "correlation-id-from-middleware",
    "parent_span_id": "",
    "operation": "publish",       # publish | dispatch
    "topic": "user.created",
    "event_id": "abc123",
    "status": "ok",               # ok | error
    "duration_ms": 1.23,
    "attributes": {},
    "error_message": "",
}

中间件

from fastapi_eventbus import EventBusMiddleware, get_correlation_id

app.add_middleware(EventBusMiddleware)

@app.get("/test")
async def test():
    cid = get_correlation_id()  # 当前请求的 correlation ID
    return {"correlation_id": cid}
  • 自动为每个请求生成 Correlation ID(或从 X-Correlation-ID 请求头读取)
  • 响应头自动附带 X-Correlation-ID
  • Tracing 的 trace_id 自动关联 Correlation ID

健康检查

from fastapi import Depends
from fastapi_eventbus import EventBus, get_event_bus

@app.get("/health")
async def health(bus: EventBus = Depends(get_event_bus)):
    results = await bus.health.check()
    healthy = all(r.status.value == "healthy" for r in results)
    return {
        "status": "healthy" if healthy else "unhealthy",
        "backends": [
            {"name": r.backend, "status": r.status.value, "detail": r.detail}
            for r in results
        ],
    }

开发

# 克隆并安装
git clone https://github.com/xjaqil/fastapi-eventbus.git
cd fastapi-eventbus
python3 -m venv .venv
source .venv/bin/activate
pip install -e ".[all,dev]"

# 运行测试
pytest

# 代码检查
ruff check .
ruff format .

# 类型检查
mypy src/

项目结构

src/fastapi_eventbus/
├── __init__.py          # 公共 API
├── typing.py            # 类型定义
├── py.typed             # PEP 561 标记
├── core/                # 核心抽象(BaseEvent, Handler, Publisher, Subscriber, 异常)
├── backends/            # 后端实现(sync, redis, kafka)
├── middleware/          # 请求级事件追踪中间件
├── ext/                 # FastAPI 集成(EventBus, 依赖注入, 生命周期)
├── registry/            # Handler 注册表 + @on_event 装饰器
├── retry/               # 重试策略 + 死信队列
├── config/              # Pydantic Settings 配置
├── metrics/             # 可插拔 Metrics 采集
├── tracing/             # 可插拔 Tracing 追踪
└── health/              # 后端健康检查

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastapi_eventbus-0.1.1.tar.gz (32.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastapi_eventbus-0.1.1-py3-none-any.whl (33.2 kB view details)

Uploaded Python 3

File details

Details for the file fastapi_eventbus-0.1.1.tar.gz.

File metadata

  • Download URL: fastapi_eventbus-0.1.1.tar.gz
  • Upload date:
  • Size: 32.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.11

File hashes

Hashes for fastapi_eventbus-0.1.1.tar.gz
Algorithm Hash digest
SHA256 bda1eb90c64762cd10c2321cf1e93fb067888bfa12c0081bcd73b0c11552ee61
MD5 f93b8ae124227e835edf6b6d8cab7c7e
BLAKE2b-256 5902b096366df51c1ea64367fa619eaea2fc7d7e5c80b7b517eb306d8443ce60

See more details on using hashes here.

File details

Details for the file fastapi_eventbus-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for fastapi_eventbus-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 7db7e0a3f63a408c369ed455d3b0d4f49f96af2cf930172e571d7a17c87ca044
MD5 01d42fb03753fa1448746a5cab0976c0
BLAKE2b-256 7773851e6458bd0421d02de73a5c096a27b93cea441e3660711b16824146f573

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page