高性能纯 pub→sub 消息系统
Project description
PulseMQ
面向金融行情的高性能 pub → sub 消息中间件,基于 ZeroMQ 构建。采用单进程 pub → sub 无 broker架构,publisher 进程同时承担数据生产、权限控制、流量统计和后台管理界面。
特性
- 单进程架构 — publisher 即服务,无独立 broker,部署极简
- 高性能 — 基于 ZeroMQ PUB,SNDHWM=0 无丢消息;burst 模式可压榨到硬件极限
- 多数据格式 —
str/bytes/DataFrame/list[dict]等类型,发布端零配置自动推断 record_count - 多种序列化 —
str、msgpack(默认)、json、pyarrowIPC、bytes透传 - 可选压缩 —
none(默认)、snappy、lz4、zstd - PLAIN 认证 — ZeroMQ PLAIN 协议 + ZAP handler,api_key 白名单机制
- 实时监控 — 分钟粒度流量统计,内存 8 小时窗口 + SQLite 持久化
- 可视化后台 — 内置深色 Web UI(ECharts 折线图 + SSE 实时推送),支持 1H/6H 时间范围切换,60 秒滚动均值
- 优雅关闭 — Producer 任务 drain、Admin 停止、PUB socket linger 后退出
- 纳秒时间戳 — 帧级时间戳独立成帧,端到端延迟可精确测量
安装
要求 Python >= 3.13
pip install pulse-mq
依赖项:ZeroMQ、msgspec、python-snappy、lz4、zstandard、pyarrow、pandas 全部开箱即用。
快速开始
启动 Publisher
# CLI 零配置启动
pulse-mq
更常见的用法是在 Python 中注册自己的 producer:
from pulsemq import PulsePublisher
pub = PulsePublisher()
@pub.producer(name="sh_market", interval=2.0)
async def sh_market():
# 任意可序列化对象
return {"symbol": "600000", "price": 10.5, "volume": 12345}
@pub.producer(name="deep_quote", interval=0.5, compression="lz4")
async def deep_quote():
import pandas as pd
return pd.DataFrame({
"price": [10.5, 10.6, 10.7],
"volume": [100, 200, 300],
})
pub.start() # 阻塞运行
PulsePublisher 也提供 start_async() 方便嵌入其他 asyncio 程序。
订阅消息
import asyncio
from pulsemq import PulseSubscriber
async def main():
# 关闭认证时 username/password 可省略
async with PulseSubscriber("tcp://localhost:5555") as sub:
async for msg in sub.subscribe("sh_market"):
print(msg.topic, msg.payload, msg.timestamp_ns)
# 开启认证时必须传入凭证
async with PulseSubscriber(
"tcp://localhost:5555",
username="user1",
password="pulse_sk_xxx",
) as sub:
async for msg in sub.subscribe("sh_market", "deep_quote"):
print(msg.topic, msg.payload)
asyncio.run(main())
PulseMessage 字段:
| 字段 | 类型 | 说明 |
|---|---|---|
topic |
str |
topic 名称 |
payload |
Any |
解码后的数据 |
raw_payload |
bytes |
解码前的原始字节 |
record_count |
int |
本帧包含的记录条数 |
timestamp_ns |
int |
publisher 发送时的纳秒时间戳 |
serializer |
str |
使用的序列化格式名 |
compression |
str |
使用的压缩算法名 |
数据类型与序列化
支持的返回类型(白名单)
Producer 回调只接受以下 7 种返回类型,其余一律抛 TypeError:
pd.DataFrame / list[pd.DataFrame] / list[dict] / list[str] / dict / str / bytes
数据类型 × 序列化器 强绑定对照表
PulseMQ 采用强类型绑定(方案 A):数据类型与序列化器一一对应,不匹配会在发布时抛 TypeError。单元格 = record_count 值(合法)或 ❌(不匹配,报错):
| 返回类型 | msgpack |
json |
pyarrow |
str |
bytes |
record_count |
|---|---|---|---|---|---|---|
pd.DataFrame(N 行) |
✅ | ✅ | ✅ | ❌ | ❌ | N(行数) |
list[pd.DataFrame] |
✅ | ✅ | ✅ | ❌ | ❌ | 行数和 |
list[dict](N 个) |
✅ | ✅ | ✅ | ❌ | ❌ | N |
list[str](N 个) |
✅ | ✅ | ❌ | ❌ | ❌ | N |
dict |
✅ | ✅ | ✅ | ❌ | ❌ | 1 |
str |
❌ | ❌ | ❌ | ✅ | ❌ | 1 |
bytes |
❌ | ❌ | ❌ | ❌ | ✅ | 1 |
绑定规则:
str数据 → 只能用str序列化器(纯 UTF-8,最快)bytes数据 → 只能用bytes序列化器(零拷贝透传,最快)pd.DataFrame/list[pd.DataFrame]/list[dict]/dict→ 可选msgpack/json/pyarrowlist[str]→ 可选msgpack/json
return "hello" # str → 1 record, 用 str
return b"\x00\x01" # bytes → 1 record, 用 bytes
return {"a": 1} # dict → 1 record, 用 msgpack/json/pyarrow
return [{"a": 1}, {"a": 2}] # list[dict] → 2 records, 用 msgpack/json/pyarrow
return ["a", "b", "c"] # list[str] → 3 records, 用 msgpack/json
return pd.DataFrame({"a": [1, 2]}) # DataFrame → 2 records, 用 msgpack/json/pyarrow
return [df1, df2] # list[DataFrame]→ 行数和, 用 msgpack/json/pyarrow
record_count 推断:DataFrame/Table 按行数;
list[dict]/list[str]按 list 长度;list[DataFrame]按各 DataFrame 行数之和;dict/str/bytes按 1。单帧上限 1,000,000 条。list 元素必须类型一致:
list[dict]要求所有元素都是 dict,list[str]要求都是 str。混合类型(如[{"a":1}, "hello"])会抛TypeError。白名单外类型全部报错:标量(int/float/bool)、
pa.Table、list[bytes]、list[int]、set、tuple等均不支持。
序列化格式(5 种)
通过 producer 装饰器的 serializer 参数声明。序列化器会根据数据类型自动校验,无需手动匹配(配错会报错提示):
@pub.producer(name="market", serializer="msgpack", compression="none")
async def market():
return {"symbol": "600000", "price": 10.5}
@pub.producer(name="ticks", serializer="pyarrow", compression="zstd")
async def ticks():
return pd.DataFrame(...)
@pub.producer(name="log", serializer="str") # str 数据必须用 str
async def log():
return "some log line"
@pub.producer(name="raw", serializer="bytes") # bytes 数据必须用 bytes
async def raw():
return b"\x01\x02\x03"
| 格式 | 后端 | 适用数据类型 | 特点 |
|---|---|---|---|
msgpack |
msgspec.msgpack |
dict / list[dict] / list[str] / DataFrame / list[DataFrame] | 通用结构化,二进制紧凑 |
json |
msgspec.json |
同 msgpack(不含 bytes) | 人类可读、跨语言 |
pyarrow |
pyarrow IPC |
dict / list[dict] / DataFrame / list[DataFrame] | 列存 IPC,分析场景(可选依赖) |
str |
UTF-8 | 仅 str | 纯文本透传,最快 |
bytes |
透传 | 仅 bytes | 二进制透传,最快 |
pyarrow为可选依赖:未安装时该格式不注册,使用会抛KeyError。其余 4 种为硬依赖,始终可用。
pyarrow类型严格:返回list[str]或标量时会抛TypeError,提示改用msgpack/json。
压缩算法(4 种)
通过 compression 参数声明,默认 none:
| 算法 | 后端 | 压缩比 | 速度 | 适用场景 |
|---|---|---|---|---|
none(默认) |
— | 1.00x | 最快 | 调试 / 极小数据 |
snappy |
python-snappy |
低 | 极快 | 速度优先 |
lz4 |
lz4.frame |
中 | 极快 | 速度与压缩比平衡,金融行情常用 |
zstd |
zstandard |
高 | 中 | 压缩比优先,带宽受限场景 |
4 种压缩算法可与任意序列化格式自由组合(5×4 = 20 种合法组合)。
Burst 模式
极限性能测试场景可用 burst_producer 装饰器,无间隔连续发送(回调返回 None 时停止):
@pub.burst_producer(name="bench", cache_size=200_000)
async def bench():
if not has_more():
return None
return [generate_record() for _ in range(1000)]
配置
环境变量
| 变量名 | 说明 | 默认值 |
|---|---|---|
PULSEMQ_BIND |
ZMQ PUB 绑定地址 | tcp://*:5555 |
PULSEMQ_ADMIN_BIND |
Admin 后台绑定地址 | 0.0.0.0:9090 |
PULSEMQ_STATS_DB |
统计 SQLite 路径 | sqlite://./stats.sqlite |
PULSEMQ_API_KEYS |
API Key 列表 user1:pass1,user2:pass2,空=关闭认证 |
"" |
Python 配置
from pulsemq import PublisherConfig, PulsePublisher
config = PublisherConfig(
bind="tcp://*:5555",
admin_bind="0.0.0.0:9090",
stats_db="sqlite://./stats.sqlite",
stats_retention_minutes=480, # 内存窗口,默认 8 小时
api_keys_str="alice:pulse_sk_alice,bob:pulse_sk_bob",
)
pub = PulsePublisher(config)
# 或运行时追加 key
pub.add_api_key("carol", "pulse_sk_carol")
PulsePublisher 构造参数 bind / admin_bind / api_keys 可在启动前覆盖配置。
监控与 Admin 后台
Publisher 启动后,Admin 后台默认监听 0.0.0.0:9090,提供深色 Web UI 和 REST/SSE 接口。
Web UI
浏览器打开 http://localhost:9090/ 即可看到实时监控面板:
- 顶部指标卡片:活跃主题数、消息量/秒(记录数口径,60 秒滚动均值)、流量/秒(压缩后字节,60 秒滚动均值)、运行时间
- ECharts 流量折线图:点击 topic 卡片叠加折线(最多 5 个,LRU 淘汰),支持 1H / 6H 时间范围切换,30 秒自动刷新历史数据,玻璃态美化 + 渐变填充
- Topic 列表:实时显示每个 topic 的记录速率、当前分钟记录数和缓存用量
REST API
# 实时指标快照(含 60 秒滚动均值)
curl http://localhost:9090/api/v1/stats/realtime
# 所有 topic 列表
curl http://localhost:9090/api/v1/topics
# 单个 topic 分钟级历史(支持 minutes 参数)
curl http://localhost:9090/api/v1/topics/sh_market/history?minutes=60
curl http://localhost:9090/api/v1/topics/sh_market/history?minutes=360
# 系统状态
curl http://localhost:9090/api/v1/system/status
# 健康检查
curl http://localhost:9090/healthz
SSE 实时推送
curl -N http://localhost:9090/api/v1/stats/stream
每 1 秒一帧 JSON,结构与 /api/v1/stats/realtime 一致。Web UI 与外部看板可直接订阅。
协议帧格式
每条 ZMQ 消息由 4 帧组成:
| 帧序号 | 内容 | 说明 |
|---|---|---|
| 1 | topic | UTF-8 字节串 |
| 2 | meta | 6 字节:[msg_type(1)][flags(1)][record_count(4, big-endian uint32)] |
| 3 | timestamp | 8 字节 big-endian int64,纳秒 |
| 4 | payload | 序列化 + 压缩后的字节 |
msg_type:0x01= DATA,0x02= PINGflags:bit[0:2]序列化格式编码,bit[3:4]压缩算法编码- 单帧
record_count上限 1,000,000
性能基准
Burst 极限测试
scripts/bench_burst.py 提供单场景 burst 极限性能测试:
python scripts/bench_burst.py
全矩阵 Benchmark
scripts/bench_pubsub_matrix.py 对所有合法的 (序列化 × 压缩 × 数据形态) 组合做全面测试:
python scripts/bench_pubsub_matrix.py
覆盖 48 个合法组合,同时测试:
- 纯编解码性能(序列化 + 压缩,不经过网络)
- 端到端 pub→sub 性能(吞吐量、延迟 p50/p90/p99、压缩率)
- 正确性验证(pub 端发送数据在 sub 端完整还原)
v2.1.0 典型测试结果
纯编解码性能(200 次迭代平均):
| 组合 | 编码 ops/s | 解码 ops/s | 编码 μs | 压缩率 |
|---|---|---|---|---|
| bytes+none | 14.6M | 29.9M | 0.07 | 1.00x |
| msgpack+none | 5.6M | 9.3M | 0.18 | 1.00x |
| msgpack+lz4+list_dict | 172K | 96K | 5.8 | 0.12x |
| msgpack+zstd+large_dict | 27K | 209K | 37.6 | 0.00x |
端到端 pub→sub(经过 ZMQ 网络,单 subscriber,50 条消息/组合):
| 组合 | 记录吞吐/s | 延迟 p50 | 延迟 p99 |
|---|---|---|---|
| json+none+list_dict | 880,514 | 2.68ms | 3.51ms |
| msgpack+none+list_dict | 825,900 | 2.74ms | 3.05ms |
| msgpack+none+dataframe | 135,096 | 17.8ms | 34.2ms |
| pyarrow+none+dataframe | 86,663 | 27.6ms | 53.7ms |
测试环境:Windows 11,Python 3.13,单机 localhost
更新日志
v2.3.0
⚠️ Breaking Change:数据类型收紧为白名单,序列化器改为强类型绑定。
- 数据类型白名单:producer 回调只接受 7 种类型——
pd.DataFrame/list[pd.DataFrame]/list[dict]/list[str]/dict/str/bytes。其余(标量、pa.Table、list[bytes]/list[int]、混合 list、set/tuple 等)一律抛TypeError - 序列化器强绑定(方案 A):
str数据只能用str序列化器,bytes数据只能用bytes序列化器,结构化数据(DataFrame/dict/list)用 msgpack/json/pyarrow。配错会在发布时报错 - 修复
list[pd.DataFrame]的 record_count bug:原按 list 长度(DataFrame 个数)计,现按各 DataFrame 行数之和(与 payload 展平后的实际记录数一致) bytes × json报错:json 序列化器明确拒绝 bytes(避免 base64 编码后解码类型变形为 str 的语义不一致)- 缓存按记录数淘汰:
TopicBuffer从"按帧数(deque maxlen)"改为"按累计记录数"淘汰,DataFrame 一批 N 条占 N 配额。监控显示N / 上限(满)格式 - pyarrow 序列化器严格化:遇到不支持的类型(list[str]、标量等)抛
TypeError,而非静默退回 msgpack 导致编解码不一致 - 监控 UI 文案精确化:消息量/流量卡片副标题标注"近60秒估算",tooltip 说明算法口径;主题卡片去掉 record_count_current,缓存显示
N/M(满) - 文档:新增「数据类型 × 序列化器 强绑定对照表」;更新序列化器/压缩算法表格
- 测试:新增
tests/test_data_types.py(59 个专项用例);e2e 矩阵扩展至 7 种数据形态;修复 burst 测试跨分钟 flaky
v2.2.2
- 文档修正:README 安装命令包名
pulsemq→pulse-mq(PyPI 实际包名) - 文档同步:监控卡片描述对齐 v2.2.0 的记录数口径与玻璃态 UI;补全 v2.2.0/v2.2.1 更新日志
v2.2.1
- 启动修复:
publisher.py补if __name__ == "__main__"守卫,修复python -m pulsemq.publisher无法启动的问题 - 版本号统一:新增
pulsemq/_version.py作为版本号单一来源,publisher.__version__与/api/v1/system/status的SERVER_VERSION动态读取一致(修复后者写死 2.0.0) - 健壮性增强:
subscriber遵守 asyncio 取消协议,CancelledError时清理 socket 后重新抛出- admin 路由异常补
logger.debug日志(不再静默吞掉) _respond_html/_respond_json复用_STATUS_TEXT状态文本映射TrafficStats读路径快照迭代,规避并发clear()的RuntimeError- SSE 队列满时主动断开死客户端,避免内存泄漏
_topic_historyoff-by-one 修正(>= minutes→>= minutes - 1)
v2.2.0
- 消息量口径变更:监控指标从"帧数"(发送次数)切换为"记录数"(record_count)。卡片速率、折线图、topic 列表全部改用
record_rate_1min,一条带 N 条记录的批量消息现在如实显示为 N 条/秒 - 监控 UI 美化与中文化:玻璃态卡片 + 渐变发光 + ECharts 渐变填充 + 全面中文文案 + emoji 图标
v2.1.0
- 监控 UI 全面升级:深色渐变主题,ECharts 折线图支持 1H/6H 时间范围切换
- 60 秒滚动均值:Messages/s 和 Data/s 改为近 60 秒的加权均值,不再每分钟重置(注:v2.2.0 起 Messages/s 改用记录数口径)
- 折线图交互优化:首次进入自动选中第一个 topic,30 秒自动刷新历史,hover tooltip 不再闪烁
- 后端去重:history API 合并内存 + SQLite 数据,按 timestamp 去重
- 全矩阵 Benchmark:新增
scripts/bench_pubsub_matrix.py,覆盖 48 种组合的性能与正确性测试
v2.0.2
- 协议帧 record_count 从 uint16 扩展到 uint32,单帧上限 1,000,000 条
- 重写 README 对齐 v2 架构
许可证
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pulse_mq-2.3.0.tar.gz.
File metadata
- Download URL: pulse_mq-2.3.0.tar.gz
- Upload date:
- Size: 438.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3731a17508f07dbf69c787421aff9a9f39e7dceafc4cd4f3fd1fe35d0b50aa45
|
|
| MD5 |
d5385a1a172e4fbad961d90410b79040
|
|
| BLAKE2b-256 |
bf378240594d55e5a6d54143ccca659352eb1c2fdf1dd91c97bbd8eb6858527d
|
File details
Details for the file pulse_mq-2.3.0-py3-none-any.whl.
File metadata
- Download URL: pulse_mq-2.3.0-py3-none-any.whl
- Upload date:
- Size: 380.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
11d8fc0e67614a57582098f72e2b726ac0dbe3f5f36ab9f5ba2392839cbd3def
|
|
| MD5 |
d0028e0096d595726ef8c3a4cc6abc4f
|
|
| BLAKE2b-256 |
0619d364b87c65d3bf291d99476f213f4a84f60af86b2398b24256dc9c2e9513
|