High-availability event-driven extension for FastAPI
Project description
fastapi-eventbus
高可用事件驱动 pub/sub 扩展,专为 FastAPI 设计。
特性
- 可插拔后端:内存同步、Redis、Kafka
- 装饰器注册事件处理器(
@on_event) - FastAPI 依赖注入集成
- 生命周期感知的启动/关闭
- 指数退避重试 + 死信队列(DLQ)
- 请求级 Correlation ID 中间件
- 后端健康检查
- 可插拔 Metrics 采集(支持对接 Prometheus/StatsD/Datadog)
- 可插拔 Tracing(支持对接 OpenTelemetry/Jaeger)
- Kafka Consumer Rebalance 安全处理
- 完整类型标注(PEP 561)
安装
pip install fastapi-eventbus
# Redis 支持
pip install fastapi-eventbus[redis]
# Kafka 支持
pip install fastapi-eventbus[kafka]
# 全部安装
pip install fastapi-eventbus[all]
# 开发环境
pip install -e ".[all,dev]"
快速开始
from fastapi import FastAPI, Depends
from fastapi_eventbus import (
BaseEvent, EventBus, EventBusMiddleware,
create_lifespan, get_event_bus, on_event,
)
# 1. 定义事件
class UserCreated(BaseEvent):
topic: str = "user.created"
user_id: int
username: str
# 2. 注册处理器
@on_event("user.created")
async def handle_user_created(event: BaseEvent) -> None:
print(f"新用户: {event}")
# 3. 创建应用
bus = EventBus()
app = FastAPI(lifespan=create_lifespan(bus))
app.add_middleware(EventBusMiddleware)
# 4. 发布事件
@app.post("/users")
async def create_user(bus: EventBus = Depends(get_event_bus)):
event = UserCreated(user_id=1, username="alice")
await bus.publish(event)
return {"status": "ok"}
配置
通过环境变量配置(前缀 EVENTBUS_):
| 变量 | 默认值 | 说明 |
|---|---|---|
EVENTBUS_BACKEND |
sync |
后端类型:sync / redis / kafka |
EVENTBUS_MAX_RETRIES |
3 |
最大重试次数 |
EVENTBUS_RETRY_DELAY |
1.0 |
初始重试延迟(秒) |
EVENTBUS_RETRY_BACKOFF_FACTOR |
2.0 |
退避倍数 |
EVENTBUS_RETRY_MAX_DELAY |
60.0 |
最大重试延迟(秒) |
EVENTBUS_DLQ_ENABLED |
false |
启用死信队列 |
EVENTBUS_DLQ_TOPIC_SUFFIX |
.dlq |
DLQ topic 后缀 |
EVENTBUS_REDIS_URL |
redis://localhost:6379/0 |
Redis 连接地址 |
EVENTBUS_REDIS_MAX_CONNECTIONS |
10 |
Redis 最大连接数 |
EVENTBUS_KAFKA_BOOTSTRAP_SERVERS |
localhost:9092 |
Kafka broker 地址 |
EVENTBUS_KAFKA_GROUP_ID |
fastapi-eventbus |
Kafka 消费组 ID |
也可以通过代码直接配置:
from fastapi_eventbus.config import EventBusSettings
settings = EventBusSettings(
backend="redis",
redis_url="redis://my-redis:6379/1",
max_retries=5,
dlq_enabled=True,
)
bus = EventBus(settings=settings)
后端
内存同步后端(默认)
零依赖,适合开发和测试:
bus = EventBus() # 默认使用 sync 后端
Redis 后端
pip install fastapi-eventbus[redis]
# 通过环境变量
# EVENTBUS_BACKEND=redis
# EVENTBUS_REDIS_URL=redis://localhost:6379/0
# 或代码配置
from fastapi_eventbus.backends.redis import RedisBackend
backend = RedisBackend(url="redis://localhost:6379/0")
bus = EventBus(backend=backend)
Kafka 后端
pip install fastapi-eventbus[kafka]
from fastapi_eventbus.backends.kafka import KafkaBackend
backend = KafkaBackend(
bootstrap_servers="localhost:9092",
group_id="my-service",
auto_offset_reset="earliest",
session_timeout_ms=30000,
enable_auto_commit=False, # 推荐:手动提交 offset
)
bus = EventBus(backend=backend)
Kafka 后端特性:
- Rebalance 安全:rebalance 期间自动暂停事件处理,完成后恢复
- 手动 offset 提交:每条消息处理成功后才提交,保证 at-least-once 语义
- 异常自动重连:consumer 异常时 5 秒退避后重连
- 批量发布:
publish_many()使用 batch send + flush
生命周期管理
简单模式
bus = EventBus()
app = FastAPI(lifespan=create_lifespan(bus))
组合模式
与你自己的 lifespan 逻辑组合:
from contextlib import asynccontextmanager
from fastapi_eventbus import EventBus, eventbus_lifespan
bus = EventBus()
@asynccontextmanager
async def lifespan(app):
# 你的启动逻辑
async with eventbus_lifespan(bus, app):
yield
# 你的关闭逻辑
app = FastAPI(lifespan=lifespan)
事件处理器
装饰器注册
from fastapi_eventbus import on_event, BaseEvent
@on_event("order.created")
async def handle_order(event: BaseEvent) -> None:
print(f"订单创建: {event.event_id}")
# 同步处理器也支持(会自动在线程池中执行)
@on_event("order.created")
def sync_handler(event: BaseEvent) -> None:
print("同步处理")
运行时动态注册
async def my_handler(event: BaseEvent) -> None:
pass
await bus.registry.register("some.topic", my_handler)
await bus.subscribe("some.topic")
重试与死信队列
from fastapi_eventbus import EventBus, RetryPolicy
policy = RetryPolicy(
max_retries=5,
delay=0.5,
backoff_factor=2.0,
max_delay=30.0,
)
bus = EventBus(
retry_policy=policy,
settings=EventBusSettings(dlq_enabled=True),
)
# 查看 DLQ 中的失败事件
failed = bus.dlq.peek(limit=20)
for f in failed:
print(f"事件 {f.event.event_id} 失败 {f.attempts} 次: {f.error}")
Metrics 采集
内置可插拔的 metrics 采集协议,零开销默认关闭。
内存采集器(测试/调试)
from fastapi_eventbus import EventBus, InMemoryMetricsCollector
metrics = InMemoryMetricsCollector()
bus = EventBus(metrics=metrics)
# 使用后查看指标
print(metrics.snapshot())
# {'published': {'user.created': 10}, 'dispatched': {'user.created': 10}, ...}
自定义采集器(对接 Prometheus 等)
实现 MetricsCollector 协议即可:
from fastapi_eventbus import MetricsCollector
class PrometheusCollector:
def __init__(self):
from prometheus_client import Counter, Histogram
self.published = Counter("eventbus_published_total", "Events published", ["topic"])
self.dispatched = Counter("eventbus_dispatched_total", "Events dispatched", ["topic"])
self.errors = Counter("eventbus_errors_total", "Dispatch errors", ["topic"])
self.retries = Counter("eventbus_retries_total", "Retries", ["topic"])
self.dlq = Counter("eventbus_dlq_total", "DLQ events", ["topic"])
self.pub_duration = Histogram("eventbus_publish_seconds", "Publish duration", ["topic"])
self.disp_duration = Histogram("eventbus_dispatch_seconds", "Dispatch duration", ["topic"])
def inc_published(self, topic: str) -> None:
self.published.labels(topic=topic).inc()
def inc_dispatched(self, topic: str) -> None:
self.dispatched.labels(topic=topic).inc()
def inc_dispatch_error(self, topic: str) -> None:
self.errors.labels(topic=topic).inc()
def inc_retry(self, topic: str, attempt: int) -> None:
self.retries.labels(topic=topic).inc()
def inc_dlq(self, topic: str) -> None:
self.dlq.labels(topic=topic).inc()
def observe_publish_duration(self, topic: str, duration: float) -> None:
self.pub_duration.labels(topic=topic).observe(duration)
def observe_dispatch_duration(self, topic: str, duration: float) -> None:
self.disp_duration.labels(topic=topic).observe(duration)
bus = EventBus(metrics=PrometheusCollector())
采集的指标
| 指标 | 说明 |
|---|---|
inc_published |
事件发布计数(按 topic) |
inc_dispatched |
事件成功分发计数 |
inc_dispatch_error |
分发错误计数 |
inc_retry |
重试计数(含 attempt 序号) |
inc_dlq |
进入死信队列计数 |
observe_publish_duration |
发布耗时 |
observe_dispatch_duration |
分发耗时 |
Tracing 追踪
内置轻量级 span 追踪,可对接 OpenTelemetry/Jaeger/Zipkin。
LoggingTracer(开发调试)
from fastapi_eventbus import EventBus, LoggingTracer
tracer = LoggingTracer()
bus = EventBus(tracer=tracer)
# 每次 publish/dispatch 都会自动创建 span 并输出日志:
# SPAN START publish [a1b2c3d4] topic=user.created event=xxx
# SPAN END publish [a1b2c3d4] status=ok duration=1.23ms
# 查看所有 span
for span in tracer.spans:
print(span.to_dict())
自定义 Tracer(对接 OpenTelemetry)
from fastapi_eventbus import EventTracer, EventSpan
from opentelemetry import trace
otel_tracer = trace.get_tracer("fastapi-eventbus")
class OTelTracer:
def start_span(self, operation, topic="", event_id="", parent_span=None):
otel_span = otel_tracer.start_span(
operation,
attributes={"topic": topic, "event_id": event_id},
)
span = EventSpan(operation=operation, topic=topic, event_id=event_id)
span.attributes["otel_span"] = str(id(otel_span))
self._otel_spans = getattr(self, "_otel_spans", {})
self._otel_spans[span.span_id] = otel_span
return span
def finish_span(self, span):
otel_span = getattr(self, "_otel_spans", {}).pop(span.span_id, None)
if otel_span:
if span.status.value == "error":
otel_span.set_status(trace.StatusCode.ERROR, span.error_message)
otel_span.end()
bus = EventBus(tracer=OTelTracer())
Span 数据结构
每个 span 包含:
{
"span_id": "a1b2c3d4e5f6g7h8",
"trace_id": "correlation-id-from-middleware",
"parent_span_id": "",
"operation": "publish", # publish | dispatch
"topic": "user.created",
"event_id": "abc123",
"status": "ok", # ok | error
"duration_ms": 1.23,
"attributes": {},
"error_message": "",
}
中间件
from fastapi_eventbus import EventBusMiddleware, get_correlation_id
app.add_middleware(EventBusMiddleware)
@app.get("/test")
async def test():
cid = get_correlation_id() # 当前请求的 correlation ID
return {"correlation_id": cid}
- 自动为每个请求生成 Correlation ID(或从
X-Correlation-ID请求头读取) - 响应头自动附带
X-Correlation-ID - Tracing 的
trace_id自动关联 Correlation ID
健康检查
from fastapi import Depends
from fastapi_eventbus import EventBus, get_event_bus
@app.get("/health")
async def health(bus: EventBus = Depends(get_event_bus)):
results = await bus.health.check()
healthy = all(r.status.value == "healthy" for r in results)
return {
"status": "healthy" if healthy else "unhealthy",
"backends": [
{"name": r.backend, "status": r.status.value, "detail": r.detail}
for r in results
],
}
开发
# 克隆并安装
git clone https://github.com/xjaqil/fastapi-eventbus.git
cd fastapi-eventbus
python3 -m venv .venv
source .venv/bin/activate
pip install -e ".[all,dev]"
# 运行测试
pytest
# 代码检查
ruff check .
ruff format .
# 类型检查
mypy src/
项目结构
src/fastapi_eventbus/
├── __init__.py # 公共 API
├── typing.py # 类型定义
├── py.typed # PEP 561 标记
├── core/ # 核心抽象(BaseEvent, Handler, Publisher, Subscriber, 异常)
├── backends/ # 后端实现(sync, redis, kafka)
├── middleware/ # 请求级事件追踪中间件
├── ext/ # FastAPI 集成(EventBus, 依赖注入, 生命周期)
├── registry/ # Handler 注册表 + @on_event 装饰器
├── retry/ # 重试策略 + 死信队列
├── config/ # Pydantic Settings 配置
├── metrics/ # 可插拔 Metrics 采集
├── tracing/ # 可插拔 Tracing 追踪
└── health/ # 后端健康检查
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
fastapi_eventbus-0.1.1.tar.gz
(32.2 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fastapi_eventbus-0.1.1.tar.gz.
File metadata
- Download URL: fastapi_eventbus-0.1.1.tar.gz
- Upload date:
- Size: 32.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bda1eb90c64762cd10c2321cf1e93fb067888bfa12c0081bcd73b0c11552ee61
|
|
| MD5 |
f93b8ae124227e835edf6b6d8cab7c7e
|
|
| BLAKE2b-256 |
5902b096366df51c1ea64367fa619eaea2fc7d7e5c80b7b517eb306d8443ce60
|
File details
Details for the file fastapi_eventbus-0.1.1-py3-none-any.whl.
File metadata
- Download URL: fastapi_eventbus-0.1.1-py3-none-any.whl
- Upload date:
- Size: 33.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7db7e0a3f63a408c369ed455d3b0d4f49f96af2cf930172e571d7a17c87ca044
|
|
| MD5 |
01d42fb03753fa1448746a5cab0976c0
|
|
| BLAKE2b-256 |
7773851e6458bd0421d02de73a5c096a27b93cea441e3660711b16824146f573
|