基于 ZeroMQ 的消息队列系统,专为金融市场数据分发设计
Project description
PulseMQ
基于 ZeroMQ 的高性能消息队列系统,专为金融市场数据分发设计。
特性
- 高性能:基于 ZMQ ROUTER/XPUB 架构,P50 延迟 < 1ms
- 多格式:支持 BYTES、STRING、DataFrame(MessagePack/Apache Arrow)
- 权限控制:namespace 级角色鉴权(sub/pub/admin),全量内存缓存
- 管理后台:JWT 认证,实时监控仪表盘,完整的 CRUD API
- 客户端 SDK:Python 客户端,支持异步和同步两种模式,自动重连、心跳、订阅恢复
- 压缩传输:支持 zlib、lz4、zstd 压缩,订阅端自动解压
- 零配置:无需配置文件即可启动,YAML/CLI 可选覆盖
快速开始
安装
pip install pulse-mq
从源码安装(开发模式)
# 克隆仓库
git clone https://github.com/yourname/pulse-mq.git
cd pulse-mq
# 创建虚拟环境(Python >= 3.13)
uv venv
# 激活虚拟环境
source .venv/Scripts/activate # Windows
source .venv/bin/activate # Linux/Mac
# 安装依赖
uv pip install -e ".[dev]"
启动 Broker
# 零配置启动(默认端口:PUB=5555, SUB=5556, Admin=8080)
pulsemq
启动后自动创建管理员账号,凭证保存在 .pulsemq_admin 文件中:
==================================================
PulseMQ Broker v4.3.0
PUB(XSUB) : tcp://0.0.0.0:5555
SUB(XPUB) : tcp://0.0.0.0:5556
Admin : http://0.0.0.0:8080
Health : http://0.0.0.0:8080/health
==================================================
更多启动方式:
# 指定配置文件
pulsemq --config config.yaml
# CLI 覆盖个别参数(优先级:CLI > YAML > 默认值)
pulsemq --host 127.0.0.1 --pub-port 6000 --sub-port 6001 --admin-port 9090 --log-level debug
# 指定数据库路径
pulsemq --db-path /data/pulsemq.db
通过 Python 脚本启动:
"""run_server.py — 用 Python 脚本启动 Broker"""
from pulse_mq import PulseMQServer
# 方式一:零配置
server = PulseMQServer()
# 方式二:关键字参数覆盖
# server = PulseMQServer(
# pub_port=6000,
# sub_port=6001,
# admin_port=9090,
# log_level="debug",
# )
# 方式三:使用完整 Config 对象
# from pulse_mq.config import Config, ServerConfig, StorageConfig, LoggingConfig
# config = Config(server=ServerConfig(pub_port=6000, sub_port=6001))
# server = PulseMQServer(config)
server.run()
访问管理后台
打开浏览器访问 http://localhost:8080,使用 .pulsemq_admin 中的账号密码登录。
使用示例
1. 完整流程:创建用户 → 授权 → 发布/订阅
以下示例展示从零开始的完整使用流程。
1.1 启动 Broker
pulsemq
1.2 通过管理 API 准备用户和权限
"""prepare.py — 创建 namespace、用户并授权"""
import requests
BASE = "http://localhost:8080"
# 1. 登录获取 JWT(凭证来自 .pulsemq_admin 文件)
with open(".pulsemq_admin") as f:
admin_creds = eval(f.read()) # 实际使用时按文件格式解析
resp = requests.post(f"{BASE}/login", json={
"username": admin_creds["username"],
"password": admin_creds["password"],
})
jwt_token = resp.json()["token"]
headers = {"Authorization": f"Bearer {jwt_token}"}
# 2. 创建 namespace
resp = requests.post(f"{BASE}/api/namespaces", headers=headers, json={
"name": "market_data",
"description": "股票行情数据",
})
print(f"namespace: {resp.json()}")
# 3. 创建发布者用户(mq_normal 类型,会自动生成 token)
resp = requests.post(f"{BASE}/api/users", headers=headers, json={
"username": "publisher_01",
"user_type": "mq_normal",
})
publisher = resp.json()
print(f"publisher token: {publisher['token']}")
# 4. 创建订阅者用户
resp = requests.post(f"{BASE}/api/users", headers=headers, json={
"username": "subscriber_01",
"user_type": "mq_normal",
})
subscriber = resp.json()
print(f"subscriber token: {subscriber['token']}")
# 5. 获取 namespace ID
resp = requests.get(f"{BASE}/api/namespaces", headers=headers)
ns_list = resp.json()
ns_id = next(ns["id"] for ns in ns_list if ns["name"] == "market_data")
# 6. 授权发布者在 market_data namespace 发布(pub 角色)
resp = requests.post(
f"{BASE}/api/namespaces/{ns_id}/permissions",
headers=headers,
json={"user_id": publisher["id"], "role": "pub"},
)
print(f"授权 publisher: {resp.json()}")
# 7. 授权订阅者在 market_data namespace 订阅(sub 角色)
resp = requests.post(
f"{BASE}/api/namespaces/{ns_id}/permissions",
headers=headers,
json={"user_id": subscriber["id"], "role": "sub"},
)
print(f"授权 subscriber: {resp.json()}")
1.3 发布者:发送行情数据
"""publisher.py — 发布股票行情数据"""
import asyncio
import json
import time
from pulse_mq import PulseMQClient
async def main():
# 连接 Broker(使用管理后台生成的 token)
client = PulseMQClient(
host="localhost",
pub_port=5555,
sub_port=5556,
username="publisher_01",
token="上面生成的 token",
)
await client.async_connect()
print("发布者已连接")
try:
# 模拟发送 K 线数据(bytes 格式)
for i in range(100):
kline = json.dumps({
"symbol": "sz000651",
"type": "kline_1m",
"open": 10.5 + i * 0.01,
"high": 10.6 + i * 0.01,
"low": 10.4 + i * 0.01,
"close": 10.55 + i * 0.01,
"volume": 10000 + i * 100,
"timestamp": int(time.time() * 1000),
}).encode("utf-8")
await client.async_publish(
namespace="market_data",
topic="sz000651.kline.1m",
data=kline,
format="bytes",
)
print(f"已发送第 {i+1} 条 K 线数据")
await asyncio.sleep(1)
print("发布完成")
finally:
await client.async_close()
asyncio.run(main())
1.4 订阅者:接收行情数据
"""subscriber.py — 订阅股票行情数据"""
import asyncio
from pulse_mq import PulseMQClient
async def main():
client = PulseMQClient(
host="localhost",
pub_port=5555,
sub_port=5556,
username="subscriber_01",
token="上面生成的 token",
)
await client.async_connect()
print("订阅者已连接")
try:
# 订阅 sz000651 所有 K 线周期(> 通配符匹配多级 topic)
async for msg in client.async_subscribe(namespace="market_data", topic="sz000651.kline.>"):
print(
f"[{msg.namespace}] {msg.topic}: "
f"{len(msg.data)} bytes"
)
finally:
await client.async_close()
asyncio.run(main())
注意:
pub_port对应 Broker 的 XSUB 端口(默认 5555),sub_port对应 XPUB 端口(默认 5556),两者都需要显式指定。
2. 多格式发布
PulseMQ 支持 4 种数据格式:
import asyncio
import json
import pandas as pd
from pulse_mq import PulseMQClient
async def main():
client = PulseMQClient(
host="localhost",
pub_port=5555,
sub_port=5556,
username="demo",
token="your-token",
)
await client.async_connect()
try:
# ── BYTES:原始字节流(适用于加密数据、二进制协议)──
await client.async_publish(
namespace="market_data",
topic="sz000651.raw",
data=b"\x01\x02\x03\x04",
format="bytes",
)
# ── STRING:UTF-8 文本 ──
await client.async_publish(
namespace="market_data",
topic="sz000651.signal",
data=json.dumps({"action": "BUY", "price": 10.5}),
format="string",
)
# ── DF_MSGPACK:小批量 DataFrame(< 1MB)──
df_small = pd.DataFrame({
"symbol": ["sz000651", "sz000651", "sz000651"],
"price": [10.5, 10.6, 10.55],
"volume": [100, 200, 150],
})
await client.async_publish(
namespace="market_data",
topic="sz000651.snapshot",
data=df_small,
format="df_msgpack",
)
# ── DF_PYARROW:大批量 DataFrame(< 64MB,性能更好)──
df_large = pd.DataFrame({
"symbol": ["sz000651"] * 100000,
"timestamp": pd.date_range("2025-01-01", periods=100000, freq="ms"),
"price": (10.5 + __import__("random").random() * 0.1 for _ in range(100000)),
"volume": (__import__("random").randint(100, 500) for _ in range(100000)),
})
await client.async_publish(
namespace="market_data",
topic="sz000651.tick_batch",
data=df_large,
format="df_pyarrow",
)
print("所有格式发布完成")
finally:
await client.async_close()
asyncio.run(main())
3. 通配符订阅
Topic 支持两种通配符:
| 通配符 | 含义 | 示例 |
|---|---|---|
* |
匹配单级 | sz000651.kline.* 匹配 sz000651.kline.1m,不匹配 sz000651.kline.1m.raw |
> |
匹配多级(仅限末尾) | sz000651.> 匹配 sz000651.kline.1m、sz000651.trade 等 |
import asyncio
from pulse_mq import PulseMQClient
async def main():
client = PulseMQClient(
host="localhost",
pub_port=5555,
sub_port=5556,
username="trader",
token="your-token",
)
await client.async_connect()
try:
# 订阅单只股票的所有行情
async for msg in client.async_subscribe(namespace="market_data", topic="sz000651.>"):
print(f"[全行情] {msg.topic}")
# 订阅所有股票的 1 分钟 K 线
async for msg in client.async_subscribe(namespace="market_data", topic="*.kline.1m"):
print(f"[1m K线] {msg.topic}")
finally:
await client.async_close()
asyncio.run(main())
4. 同步模式
除了异步模式,客户端也支持同步调用。subscribe() 和 publish() 在 run() 之前只是注册意图,run() 内部统一执行:
from pulse_mq import PulseMQClient, PulseMessage
client = PulseMQClient(
host="localhost",
pub_port=5555,
sub_port=5556,
username="subscriber_01",
token="your-token",
)
def on_message(msg: PulseMessage):
print(f"[{msg.namespace}] {msg.topic}: {msg.data}")
# 注册订阅
client.subscribe(namespace="market_data", topic="sz000651.>", callback=on_message)
# 注册发布(run() 前只记录,run() 内执行)
client.publish(namespace="market_data", topic="sz000651.signal", data="BUY", format="string")
# 阻塞运行:连接 → 认证 → 发送注册的 publish → 进入消息循环
client.run()
run() 之后仍可调用 publish() 发送新消息(线程安全)。
5. 压缩传输
async_publish 支持 compress 参数,订阅端自动解压:
await client.async_publish(
namespace="market_data",
topic="sz000651.tick_batch",
data=large_dataframe,
format="df_pyarrow",
compress="zstd", # 可选 "zlib" | "lz4" | "zstd"
)
支持的压缩算法:
| 算法 | 参数值 | 特点 |
|---|---|---|
| zlib | "zlib" |
标准库内置,无需额外依赖 |
| lz4 | "lz4" |
压缩/解压速度极快 |
| zstd | "zstd" |
压缩率高,综合性能好 |
6. PulseMessage
订阅收到的消息为 PulseMessage 对象:
@dataclass(frozen=True)
class PulseMessage:
namespace: str # 命名空间
topic: str # 主题
data: Any # 自动反序列化后的数据
format: str # "bytes" | "string" | "df_msgpack" | "df_pyarrow"
timestamp: int # 毫秒时间戳
compressed: str | None # 压缩算法,无压缩时为 None
7. 查询信息
import asyncio
from pulse_mq import PulseMQClient
async def main():
client = PulseMQClient(
host="localhost",
pub_port=5555,
sub_port=5556,
username="query_user",
token="your-token",
)
await client.async_connect()
try:
# 查询可访问的 namespace 列表
namespaces = await client.async_query_namespaces()
print("Namespaces:", namespaces)
# [{'name': 'market_data', 'topics': ['sz000651.kline.1m', ...], 'topic_count': 5}]
# 查询指定 namespace 下的 topic 列表
topics = await client.async_query_topics("market_data")
print("Topics:", topics)
# ['sz000651.kline.1m', 'sz000651.kline.5m', 'sz000651.trade']
finally:
await client.async_close()
asyncio.run(main())
8. 多订阅者并发
"""multi_subscriber.py — 多个订阅者并发接收"""
import asyncio
from pulse_mq import PulseMQClient
async def subscriber(name: str, token: str, namespace: str, topic: str):
"""单个订阅者协程"""
client = PulseMQClient(
host="localhost",
pub_port=5555,
sub_port=5556,
username=name,
token=token,
)
await client.async_connect()
try:
async for msg in client.async_subscribe(namespace=namespace, topic=topic):
print(f"[{name}] {msg.topic}: {len(msg.data)} bytes")
finally:
await client.async_close()
async def main():
# 3 个订阅者并发运行
await asyncio.gather(
subscriber("sub_kline", "token-1", "market_data", "sz000651.kline.>"),
subscriber("sub_trade", "token-2", "market_data", "sz000651.trade"),
subscriber("sub_all", "token-3", "market_data", "sz000651.>"),
)
asyncio.run(main())
9. 使用 YAML 配置启动
创建 config.yaml:
server:
host: "0.0.0.0"
pub_port: 6000
sub_port: 6001
admin_port: 9090
storage:
db_path: "/data/pulsemq.db"
auth:
jwt_secret: "your-production-secret-key-at-least-32-bytes"
jwt_expiry: 43200 # 12 小时
limits:
max_connections_per_user: 20
max_subscriptions_per_conn: 1000
broker:
hwm: 50000
logging:
dir: "/var/log/pulsemq"
level: "info"
max_days: 30
pulsemq --config config.yaml
10. 通过 curl 调用管理 API
# 登录获取 JWT
TOKEN=$(curl -s -X POST http://localhost:8080/login \
-H "Content-Type: application/json" \
-d '{"username":"admin","password":"from_.pulsemq_admin"}' | python -c "import sys,json; print(json.load(sys.stdin)['token'])")
# 查看所有 namespace
curl -s -H "Authorization: Bearer $TOKEN" http://localhost:8080/api/namespaces | python -m json.tool
# 创建 namespace
curl -s -X POST http://localhost:8080/api/namespaces \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{"name":"trade_signal","description":"交易信号"}'
# 创建用户
curl -s -X POST http://localhost:8080/api/users \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{"username":"signal_bot","user_type":"mq_normal"}'
# 授权(需先获取 namespace_id 和 user_id)
curl -s -X POST http://localhost:8080/api/namespaces/1/permissions \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{"user_id":2,"role":"pub"}'
# 查看活跃连接
curl -s -H "Authorization: Bearer $TOKEN" http://localhost:8080/api/connections
# 查看系统统计
curl -s -H "Authorization: Bearer $TOKEN" http://localhost:8080/api/stats/system
# 健康检查(无需认证)
curl http://localhost:8080/health
# 查看审计日志
curl -s -H "Authorization: Bearer $TOKEN" \
"http://localhost:8080/api/audit-logs?limit=20" | python -m json.tool
11. 自动重连与订阅恢复
客户端 SDK 内置自动重连机制(指数退避:1s → 2s → 4s → ...,上限 30s,最多 10 次)。重连成功后自动恢复所有活跃订阅。
import asyncio
from pulse_mq import PulseMQClient
async def main():
# reconnect=True(默认开启)
client = PulseMQClient(
host="localhost",
pub_port=5555,
sub_port=5556,
username="robust_sub",
token="your-token",
)
await client.async_connect()
try:
async for msg in client.async_subscribe(namespace="market_data", topic="sz000651.>"):
print(f"收到: {msg.topic}")
# 即使 Broker 重启,客户端会自动重连并恢复订阅
finally:
await client.async_close()
asyncio.run(main())
如需禁用自动重连:
client = PulseMQClient(
host="localhost",
pub_port=5555,
sub_port=5556,
username="user",
token="token",
reconnect=False,
)
架构
Publisher → ROUTER (port 5555) → [Broker] → XPUB (port 5556) → Subscriber
│
├── AUTH 认证
├── PUB 消息转发
├── SUB 订阅管理
└── QUERY 查询响应
消息格式
每条消息由 6 个 ZMQ frame 组成:
| Frame | 名称 | 说明 |
|---|---|---|
| 0 | Header | 20 字节二进制(版本/类型/格式/标志/长度/计数/时间戳) |
| 1 | Namespace | UTF-8 字符串,默认 "default" |
| 2 | Topic | UTF-8 字符串,最多 3 级(如 sz000651.kline.1m) |
| 3 | Username | UTF-8 字符串 |
| 4 | Payload | 二进制载荷 |
| 5 | Reserved | 预留扩展 |
数据格式
| 格式 | 值 | 说明 | 大小限制 |
|---|---|---|---|
| BYTES | 0x00 | 原始字节流 | 10 MB |
| STRING | 0x01 | UTF-8 文本 | 8 KB |
| DF_MSGPACK | 0x02 | DataFrame → MessagePack | 1 MB |
| DF_PYARROW | 0x03 | DataFrame → Apache Arrow IPC | 64 MB |
权限模型
权限控制在 namespace 级别,采用 3 级角色体系:
sub:订阅权pub:发布权(包含 sub)admin:管理权(包含 pub)
用户类型:
super_admin:后台管理 + 全部 MQ 权限mq_super:全部 MQ 权限(无后台管理)mq_normal:需通过 namespace_permissions 鉴权admin_only:仅后台管理
配置
零配置启动
不指定任何参数时使用默认值:
server:
host: "0.0.0.0"
pub_port: 5555
sub_port: 5556
admin_port: 8080
storage:
db_path: "./pulsemq.db"
auth:
token_hash: "sha256"
ping_interval: 30
ping_timeout: 3
limits:
max_connections_per_user: 10
max_subscriptions_per_conn: 500
cache_refresh_sec: 300
broker:
hwm: 10000
logging:
dir: "./logs"
max_days: 7
level: "info"
YAML 配置文件
创建 config.yaml,只覆盖需要修改的项:
server:
pub_port: 6000
admin_port: 9090
storage:
db_path: "/data/pulsemq.db"
启动时指定配置文件:
pulsemq --config config.yaml
CLI 参数
命令行参数优先级最高:CLI > YAML > 默认值
pulsemq --host 127.0.0.1 --pub-port 7000 --log-level debug
API
健康检查
curl http://localhost:8080/health
# {"status":"ok","version":"4.3.0","uptime_seconds":1234,"connections":5}
管理 API
| 端点 | 方法 | 说明 |
|---|---|---|
/login |
POST | 登录获取 JWT |
/api/namespaces |
GET/POST | 命名空间列表/创建 |
/api/namespaces/{id} |
GET/PUT/DELETE | 命名空间详情/更新/删除 |
/api/namespaces/{id}/permissions |
POST | 授权 |
/api/topics |
GET | 主题列表(按 namespace 过滤) |
/api/topics/{id} |
GET/DELETE | 主题详情/删除 |
/api/users |
GET/POST | 用户列表/创建 |
/api/users/{id} |
PUT/DELETE | 用户更新/删除 |
/api/users/{id}/reset-token |
POST | 重置 token |
/api/connections |
GET | 活跃连接列表 |
/api/connections/disconnect-user |
POST | 断开用户连接 |
/api/stats/system |
GET | 系统统计 |
/api/stats/topics |
GET | 主题统计 |
/api/audit-logs |
GET | 审计日志 |
所有 API 需要在请求头中携带 JWT:
curl -H "Authorization: Bearer <token>" http://localhost:8080/api/namespaces
目录结构
pulse-mq/
├── pulse_mq/
│ ├── __init__.py # 版本号
│ ├── errors.py # 自定义异常
│ ├── config.py # 配置系统
│ ├── protocol.py # 协议编解码
│ ├── auth.py # 认证(token/password)
│ ├── permission.py # 权限缓存与鉴权
│ ├── stats.py # 统计采集
│ ├── proxy.py # ZMQ 消息代理
│ ├── broker.py # Broker 主进程
│ ├── server.py # PulseMQServer(服务端公共 API)
│ ├── client.py # PulseMQClient + PulseMessage(客户端公共 API)
│ ├── _client.py # 客户端 SDK(内部模块)
│ ├── cli.py # CLI 入口
│ ├── admin/
│ │ ├── app.py # HTTP API
│ │ └── static/
│ │ └── index.html # 管理后台前端
│ └── db/
│ ├── base.py # 数据库抽象基类
│ ├── sqlite.py # SQLite 适配器
│ ├── schema.py # DDL 定义
│ └── queries.py # CRUD 操作
├── tests/ # 测试文件
├── docs/ # 设计文档
├── pyproject.toml # 项目配置
└── README.md
测试
# 运行全部测试
python -m pytest tests/ -v
# 运行特定模块测试
python -m pytest tests/test_broker.py -v
依赖
核心依赖
pyzmq- ZeroMQ Python 绑定aiosqlite- 异步 SQLitebcrypt- 密码哈希msgpack- MessagePack 序列化pyarrow- Apache Arrowpandas- DataFrame 支持pyyaml- YAML 配置解析zstandard/lz4- 压缩算法aiohttp- HTTP 服务PyJWT- JWT 认证
开发依赖
pytest- 测试框架pytest-asyncio- 异步测试支持pytest-aiohttp- aiohttp 测试支持
设计文档
| 文档 | 内容 |
|---|---|
docs/PulseMQ协议模型-v4.0.md |
消息帧结构、数据格式、消息类型、Topic 规则、通配符 |
docs/消息队列权限模型.md |
权限模型、认证机制、缓存策略、审计 |
docs/PulseMQ后台管理系统设计.md |
管理后台功能、监控指标、仪表盘、SQLite 表结构 |
docs/PulseMQ Broker 运行设计.md |
配置、启动流程、流控、健康检查、关闭、日志、客户端 SDK |
许可证
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pulse_mq-0.4.0.tar.gz.
File metadata
- Download URL: pulse_mq-0.4.0.tar.gz
- Upload date:
- Size: 79.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7156e1e108340cf0d64df8b06bd19bac032c08262278b4dbc9fe3b4578c7001a
|
|
| MD5 |
d1ef85f22a05afb65c04939740aafcf2
|
|
| BLAKE2b-256 |
168cb0cea706ffb8b66e84425af79a86b1d9694ac26ad6105725618de5cedf8e
|
File details
Details for the file pulse_mq-0.4.0-py3-none-any.whl.
File metadata
- Download URL: pulse_mq-0.4.0-py3-none-any.whl
- Upload date:
- Size: 87.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b778bfaae441a0e865add94afa20cb950fb8a61bfc99ee8627be0c21252d8339
|
|
| MD5 |
77c54d3409b76117bbbdc2e00cba16d8
|
|
| BLAKE2b-256 |
dbf5560792f7c0587de1b48766de2c815d221583e9e52afcf1438ff222e78840
|