高性能金融行情消息中间件
Project description
PulseMQ
高性能金融行情消息中间件,基于 ZeroMQ 构建。
特性
- 高性能 — 基于 ZeroMQ 异步 I/O,支持 uvloop 加速(Linux/macOS)
- Pub/Sub 模式 — 支持 topic 通配符匹配(
*单段、>多段) - 三层消息模型 —
str(文本/JSON)、bytes(二进制透传)、DataFrame(结构化数据) - 多种序列化 — String、Msgpack、PyArrow IPC、Bytes
- 可选压缩 — Snappy、LZ4、Zstandard
- 认证与权限 — ZAP 认证 + 基于 topic 的细粒度权限控制
- 过载保护 — 双缓冲 + 优先级丢弃,控制消息永不饿死
- 实时监控 — EWMA 速率统计、P50/P99 延迟追踪、HTTP 指标 API
- 自动重连 — 指数退避重连策略
- 优雅关闭 — drain 缓冲区后再关闭连接
安装
pip install pulsemq
可选依赖:
# 压缩支持(Snappy / LZ4 / Zstandard)
pip install pulsemq[compress]
# PyArrow + Pandas 支持(DataFrame 序列化)
pip install pulsemq[pyarrow]
# 开发工具(pytest)
pip install pulsemq[dev]
快速开始
启动服务端
# 使用 CLI 命令
pulse-mq
# 或在 Python 中
from pulsemq import PulseServer
server = PulseServer()
await server.start()
客户端使用
from pulsemq import PulseClient
async with PulseClient("tcp://localhost:5555", api_key="your_key") as client:
# 发布文本消息
await client.publish("market.sh.600000", '{"price": 10.5}')
# 发布二进制数据
await client.publish("raw.feed", b'\x00\x01\x02')
# 发布 DataFrame
import pandas as pd
df = pd.DataFrame({"price": [10.5, 10.6], "volume": [100, 200]})
await client.publish("market.data", df, format="msgpack")
await client.publish("market.data", df, format="pyarrow")
# 订阅消息(支持通配符)
async for msg in client.subscribe("market.sh.*"):
print(msg.topic, msg.payload)
# 多 topic 订阅
async for msg in client.subscribe("topic-a", "topic-b", "team-a.>"):
print(msg.topic, msg.payload)
使用压缩
await client.publish("topic", data, compression="snappy")
await client.publish("topic", data, compression="lz4")
await client.publish("topic", data, compression="zstd")
配置
环境变量
| 变量名 | 说明 | 默认值 |
|---|---|---|
PULSEMQ_BIND |
ROUTER 绑定地址 | tcp://*:5555 |
PULSEMQ_XPUB_BIND |
XPUB 绑定地址 | tcp://*:5556 |
PULSEMQ_DB_URL |
数据库路径 | sqlite://./pulse_mq.db |
PULSEMQ_AUTH_ENABLED |
启用认证 | true |
PULSEMQ_ADMIN_KEY |
默认管理员 API Key | pulse_sk_admin_default |
PULSEMQ_CONCURRENCY |
最大并发数 | 100 |
PULSEMQ_BATCH_SIZE |
最大批处理大小 | 64 |
PULSEMQ_SERIALIZER |
默认序列化格式 | msgpack |
PULSEMQ_COMPRESSOR |
默认压缩算法 | none |
PULSEMQ_USE_UVLOOP |
使用 uvloop | true |
PULSEMQ_ZMQ_RCVHWM |
ZMQ 接收高水位 | 10000 |
PULSEMQ_ZMQ_SNDHWM |
ZMQ 发送高水位 | 10000 |
Python 配置
from pulsemq import ServerConfig, PulseServer
config = ServerConfig(
bind="tcp://*:5555",
xpub_bind="tcp://*:5556",
auth_enabled=True,
max_concurrency=200,
default_serializer="msgpack",
)
server = PulseServer(config)
监控
服务端启动后默认在 0.0.0.0:9090 暴露 HTTP 指标接口:
curl http://localhost:9090/metrics
返回 JSON 格式的实时指标:
{
"timestamp": 1717660800.0,
"msg_rate": 1250.3,
"record_rate": 5000.0,
"bytes_rate": 1048576.0,
"latency_p50_ms": 0.125,
"latency_p99_ms": 2.340,
"active_connections": 42,
"active_subscriptions": 128,
"error_rate": 0.0,
"dropped_total": 0,
"backpressure": false,
"engine_batch_size": 32,
"engine_pending_tasks": 5,
"engine_concurrency_usage": 0.05
}
错误处理
客户端提供完整的异常层级:
from pulsemq import (
PulseError, # 基类
PulseConnectionError, # 连接失败
PulseAuthError, # 认证失败
PulsePermissionError, # 权限不足
PulseTimeoutError, # 超时
PulseServerError, # 服务端错误(含错误码)
)
Topic 通配符
| 通配符 | 说明 | 示例 |
|---|---|---|
* |
中间位置匹配恰好一个段;末尾匹配一个或多个段 | market.sh.* 匹配 market.sh.600000 |
> |
匹配一个或多个段 | team-a.> 匹配 team-a.mkt.sh.600000 |
许可证
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
pulse_mq-0.5.1.tar.gz
(217.3 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
pulse_mq-0.5.1-py3-none-any.whl
(55.8 kB
view details)
File details
Details for the file pulse_mq-0.5.1.tar.gz.
File metadata
- Download URL: pulse_mq-0.5.1.tar.gz
- Upload date:
- Size: 217.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
749975dfec922220dd32b2505d0efd44bf500e0c16d81f130733f46aa7c7215d
|
|
| MD5 |
8d5f5d1873c23372bd498bb1821f64d9
|
|
| BLAKE2b-256 |
4333b549e466f27af06973d289cd8dc7660a812ac18ffff25746e3650559a5d5
|
File details
Details for the file pulse_mq-0.5.1-py3-none-any.whl.
File metadata
- Download URL: pulse_mq-0.5.1-py3-none-any.whl
- Upload date:
- Size: 55.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e21ee9508c0313abe5e36b1ae23dde76369f6e6b38065c1a0570d1ae8b3d41d1
|
|
| MD5 |
988c7931a6dd88284198dfcab97f6ef6
|
|
| BLAKE2b-256 |
fa608042447cb8d4082e6763d9fcf54067f3fd61cfb4e3a8b084c86eb0ba4d69
|