高性能金融行情消息中间件
Project description
PulseMQ
高性能金融行情消息中间件,基于 ZeroMQ 构建。
特性
- 高性能 — 基于 ZeroMQ 异步 I/O
- Pub/Sub 模式 — 支持 topic 通配符匹配(
*单段、>多段) - 三层消息模型 —
str(文本/JSON)、bytes(二进制透传)、DataFrame(结构化数据) - 多种序列化 — String、Msgpack、PyArrow IPC、Bytes
- 可选压缩 — Snappy、LZ4、Zstandard
- 认证与权限 — ZAP 认证 + 基于 topic 的细粒度权限控制
- 过载保护 — 双缓冲 + 优先级丢弃,控制消息永不饿死
- 实时监控 — EWMA 速率统计、P50/P99 延迟追踪、HTTP 指标 API
- 自动重连 — 指数退避重连策略
- 优雅关闭 — drain 缓冲区后再关闭连接
安装
要求 Python >= 3.13
pip install pulsemq
包含全部依赖:压缩(Snappy / LZ4 / Zstandard)、序列化(Msgpack / PyArrow / Pandas)均开箱即用。
快速开始
启动服务端
# 使用 CLI 命令
pulse-mq
# 或在 Python 中
from pulsemq import PulseServer
server = PulseServer()
await server.start()
客户端使用
from pulsemq import PulseClient
async with PulseClient("tcp://localhost:5555", api_key="your_key") as client:
# 发布文本消息
await client.publish("market.sh.600000", '{"price": 10.5}')
# 发布二进制数据
await client.publish("raw.feed", b'\x00\x01\x02')
# 发布 DataFrame
import pandas as pd
df = pd.DataFrame({"price": [10.5, 10.6], "volume": [100, 200]})
await client.publish("market.data", df, format="msgpack")
await client.publish("market.data", df, format="pyarrow")
# 订阅消息(支持通配符)
async for msg in client.subscribe("market.sh.*"):
print(msg.topic, msg.payload)
# 多 topic 订阅
async for msg in client.subscribe("topic-a", "topic-b", "team-a.>"):
print(msg.topic, msg.payload)
使用压缩
await client.publish("topic", data, compression="snappy")
await client.publish("topic", data, compression="lz4")
await client.publish("topic", data, compression="zstd")
配置
环境变量
| 变量名 | 说明 | 默认值 |
|---|---|---|
PULSEMQ_BIND |
ROUTER 绑定地址 | tcp://*:5555 |
PULSEMQ_XPUB_BIND |
XPUB 绑定地址 | tcp://*:5556 |
PULSEMQ_DB_URL |
数据库路径 | sqlite://./pulse_mq.db |
PULSEMQ_AUTH_ENABLED |
启用认证 | true |
PULSEMQ_ADMIN_KEY |
默认管理员 API Key | pulse_sk_admin_default |
PULSEMQ_CONCURRENCY |
最大并发数 | 100 |
PULSEMQ_BATCH_SIZE |
最大批处理大小 | 64 |
PULSEMQ_SERIALIZER |
默认序列化格式 | msgpack |
PULSEMQ_COMPRESSOR |
默认压缩算法 | none |
PULSEMQ_USE_UVLOOP |
使用 uvloop | true |
PULSEMQ_ZMQ_RCVHWM |
ZMQ 接收高水位 | 10000 |
PULSEMQ_ZMQ_SNDHWM |
ZMQ 发送高水位 | 10000 |
Python 配置
from pulsemq import ServerConfig, PulseServer
config = ServerConfig(
bind="tcp://*:5555",
xpub_bind="tcp://*:5556",
auth_enabled=True,
max_concurrency=200,
default_serializer="msgpack",
)
server = PulseServer(config)
监控
服务端启动后默认在 0.0.0.0:9090 暴露 HTTP 指标接口:
curl http://localhost:9090/metrics
返回 JSON 格式的实时指标:
{
"timestamp": 1717660800.0,
"msg_rate": 1250.3,
"record_rate": 5000.0,
"bytes_rate": 1048576.0,
"latency_p50_ms": 0.125,
"latency_p99_ms": 2.340,
"active_connections": 42,
"active_subscriptions": 128,
"error_rate": 0.0,
"dropped_total": 0,
"backpressure": false,
"engine_batch_size": 32,
"engine_pending_tasks": 5,
"engine_concurrency_usage": 0.05
}
错误处理
客户端提供完整的异常层级:
from pulsemq import (
PulseError, # 基类
PulseConnectionError, # 连接失败
PulseAuthError, # 认证失败
PulsePermissionError, # 权限不足
PulseTimeoutError, # 超时
PulseServerError, # 服务端错误(含错误码)
)
Topic 通配符
| 通配符 | 说明 | 示例 |
|---|---|---|
* |
中间位置匹配恰好一个段;末尾匹配一个或多个段 | market.sh.* 匹配 market.sh.600000 |
> |
匹配一个或多个段 | team-a.> 匹配 team-a.mkt.sh.600000 |
许可证
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
pulse_mq-0.6.0.tar.gz
(216.8 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
pulse_mq-0.6.0-py3-none-any.whl
(55.5 kB
view details)
File details
Details for the file pulse_mq-0.6.0.tar.gz.
File metadata
- Download URL: pulse_mq-0.6.0.tar.gz
- Upload date:
- Size: 216.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.16 {"installer":{"name":"uv","version":"0.9.16","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dc585cc04d5b8cd463d3518f73d9a3c92c5c143b8a9ee7fefc204def339a71c8
|
|
| MD5 |
9ce8a1f59d80bcd0e1631c908fa4aec4
|
|
| BLAKE2b-256 |
9497151546026936c63fd093a0068decca35c47fed745d8fb70426f822404827
|
File details
Details for the file pulse_mq-0.6.0-py3-none-any.whl.
File metadata
- Download URL: pulse_mq-0.6.0-py3-none-any.whl
- Upload date:
- Size: 55.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.16 {"installer":{"name":"uv","version":"0.9.16","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1357acea89c078623034ee8cf1a6be98802a937cd1f16e0ce32d632f99df5ccd
|
|
| MD5 |
d3e3c4467b464001bc4ce1019e71ad3e
|
|
| BLAKE2b-256 |
cb6f2104ebb35e08ab0a1e77ad729ccf89e7377b5b1b4930ae90c9f85604c8ce
|