Skip to main content

基于 ZeroMQ 的消息队列系统,专为金融市场数据分发设计

Project description

PulseMQ

基于 ZeroMQ 的高性能消息队列系统,专为金融市场数据分发设计。

特性

  • 高性能:基于 ZMQ ROUTER/XPUB 架构,P50 延迟 < 1ms
  • 多格式:支持 BYTES、STRING、DataFrame(MessagePack/Apache Arrow)
  • 权限控制:namespace 级角色鉴权(sub/pub/admin),全量内存缓存
  • 管理后台:JWT 认证,实时监控仪表盘,完整的 CRUD API
  • 客户端 SDK:Python 客户端,支持异步和同步两种模式,自动重连、心跳、订阅恢复
  • 压缩传输:支持 zlib、lz4、zstd 压缩,订阅端自动解压
  • 零配置:无需配置文件即可启动,YAML/CLI 可选覆盖

快速开始

安装

pip install pulse-mq
从源码安装(开发模式)
# 克隆仓库
git clone https://github.com/yourname/pulse-mq.git
cd pulse-mq

# 创建虚拟环境(Python >= 3.13)
uv venv

# 激活虚拟环境
source .venv/Scripts/activate  # Windows
source .venv/bin/activate      # Linux/Mac

# 安装依赖
uv pip install -e ".[dev]"

启动 Broker

# 零配置启动(默认端口:PUB=5555, SUB=5556, Admin=8080)
pulsemq

启动后自动创建管理员账号,凭证保存在 .pulsemq_admin 文件中:

==================================================
  PulseMQ Broker v4.3.0
  PUB(XSUB) : tcp://0.0.0.0:5555
  SUB(XPUB) : tcp://0.0.0.0:5556
  Admin     : http://0.0.0.0:8080
  Health    : http://0.0.0.0:8080/health
==================================================

更多启动方式:

# 指定配置文件
pulsemq --config config.yaml

# CLI 覆盖个别参数(优先级:CLI > YAML > 默认值)
pulsemq --host 127.0.0.1 --pub-port 6000 --sub-port 6001 --admin-port 9090 --log-level debug

# 指定数据库路径
pulsemq --db-path /data/pulsemq.db

通过 Python 脚本启动:

"""run_server.py — 用 Python 脚本启动 Broker"""
from pulse_mq import PulseMQServer

# 方式一:零配置
server = PulseMQServer()

# 方式二:关键字参数覆盖
# server = PulseMQServer(
#     pub_port=6000,
#     sub_port=6001,
#     admin_port=9090,
#     log_level="debug",
# )

# 方式三:使用完整 Config 对象
# from pulse_mq.config import Config, ServerConfig, StorageConfig, LoggingConfig
# config = Config(server=ServerConfig(pub_port=6000, sub_port=6001))
# server = PulseMQServer(config)

server.run()

访问管理后台

打开浏览器访问 http://localhost:8080,使用 .pulsemq_admin 中的账号密码登录。

使用示例

1. 完整流程:创建用户 → 授权 → 发布/订阅

以下示例展示从零开始的完整使用流程。

1.1 启动 Broker

pulsemq

1.2 通过管理 API 准备用户和权限

"""prepare.py — 创建 namespace、用户并授权"""
import requests

BASE = "http://localhost:8080"

# 1. 登录获取 JWT(凭证来自 .pulsemq_admin 文件)
with open(".pulsemq_admin") as f:
    admin_creds = eval(f.read())  # 实际使用时按文件格式解析

resp = requests.post(f"{BASE}/login", json={
    "username": admin_creds["username"],
    "password": admin_creds["password"],
})
jwt_token = resp.json()["token"]
headers = {"Authorization": f"Bearer {jwt_token}"}

# 2. 创建 namespace
resp = requests.post(f"{BASE}/api/namespaces", headers=headers, json={
    "name": "market_data",
    "description": "股票行情数据",
})
print(f"namespace: {resp.json()}")

# 3. 创建发布者用户(mq_normal 类型,会自动生成 token)
resp = requests.post(f"{BASE}/api/users", headers=headers, json={
    "username": "publisher_01",
    "user_type": "mq_normal",
})
publisher = resp.json()
print(f"publisher token: {publisher['token']}")

# 4. 创建订阅者用户
resp = requests.post(f"{BASE}/api/users", headers=headers, json={
    "username": "subscriber_01",
    "user_type": "mq_normal",
})
subscriber = resp.json()
print(f"subscriber token: {subscriber['token']}")

# 5. 获取 namespace ID
resp = requests.get(f"{BASE}/api/namespaces", headers=headers)
ns_list = resp.json()
ns_id = next(ns["id"] for ns in ns_list if ns["name"] == "market_data")

# 6. 授权发布者在 market_data namespace 发布(pub 角色)
resp = requests.post(
    f"{BASE}/api/namespaces/{ns_id}/permissions",
    headers=headers,
    json={"user_id": publisher["id"], "role": "pub"},
)
print(f"授权 publisher: {resp.json()}")

# 7. 授权订阅者在 market_data namespace 订阅(sub 角色)
resp = requests.post(
    f"{BASE}/api/namespaces/{ns_id}/permissions",
    headers=headers,
    json={"user_id": subscriber["id"], "role": "sub"},
)
print(f"授权 subscriber: {resp.json()}")

1.3 发布者:发送行情数据

"""publisher.py — 发布股票行情数据"""
import asyncio
import json
import time
from pulse_mq import PulseMQClient


async def main():
    # 连接 Broker(使用管理后台生成的 token)
    client = PulseMQClient(
        host="localhost",
        pub_port=5555,
        sub_port=5556,
        username="publisher_01",
        token="上面生成的 token",
    )
    await client.async_connect()
    print("发布者已连接")

    try:
        # 模拟发送 K 线数据(bytes 格式)
        for i in range(100):
            kline = json.dumps({
                "symbol": "sz000651",
                "type": "kline_1m",
                "open": 10.5 + i * 0.01,
                "high": 10.6 + i * 0.01,
                "low": 10.4 + i * 0.01,
                "close": 10.55 + i * 0.01,
                "volume": 10000 + i * 100,
                "timestamp": int(time.time() * 1000),
            }).encode("utf-8")

            await client.async_publish(
                namespace="market_data",
                topic="sz000651.kline.1m",
                data=kline,
                format="bytes",
            )
            print(f"已发送第 {i+1} 条 K 线数据")
            await asyncio.sleep(1)

        print("发布完成")
    finally:
        await client.async_close()


asyncio.run(main())

1.4 订阅者:接收行情数据

"""subscriber.py — 订阅股票行情数据"""
import asyncio
from pulse_mq import PulseMQClient


async def main():
    client = PulseMQClient(
        host="localhost",
        pub_port=5555,
        sub_port=5556,
        username="subscriber_01",
        token="上面生成的 token",
    )
    await client.async_connect()
    print("订阅者已连接")

    try:
        # 订阅 sz000651 所有 K 线周期(> 通配符匹配多级 topic)
        async for msg in client.async_subscribe(namespace="market_data", topic="sz000651.kline.>"):
            print(
                f"[{msg.namespace}] {msg.topic}: "
                f"{len(msg.data)} bytes"
            )
    finally:
        await client.async_close()


asyncio.run(main())

注意pub_port 对应 Broker 的 XSUB 端口(默认 5555),sub_port 对应 XPUB 端口(默认 5556),两者都需要显式指定。

2. 多格式发布

PulseMQ 支持 4 种数据格式:

import asyncio
import json
import pandas as pd
from pulse_mq import PulseMQClient


async def main():
    client = PulseMQClient(
        host="localhost",
        pub_port=5555,
        sub_port=5556,
        username="demo",
        token="your-token",
    )
    await client.async_connect()

    try:
        # ── BYTES:原始字节流(适用于加密数据、二进制协议)──
        await client.async_publish(
            namespace="market_data",
            topic="sz000651.raw",
            data=b"\x01\x02\x03\x04",
            format="bytes",
        )

        # ── STRING:UTF-8 文本 ──
        await client.async_publish(
            namespace="market_data",
            topic="sz000651.signal",
            data=json.dumps({"action": "BUY", "price": 10.5}),
            format="string",
        )

        # ── DF_MSGPACK:小批量 DataFrame(< 1MB)──
        df_small = pd.DataFrame({
            "symbol": ["sz000651", "sz000651", "sz000651"],
            "price": [10.5, 10.6, 10.55],
            "volume": [100, 200, 150],
        })
        await client.async_publish(
            namespace="market_data",
            topic="sz000651.snapshot",
            data=df_small,
            format="df_msgpack",
        )

        # ── DF_PYARROW:大批量 DataFrame(< 64MB,性能更好)──
        df_large = pd.DataFrame({
            "symbol": ["sz000651"] * 100000,
            "timestamp": pd.date_range("2025-01-01", periods=100000, freq="ms"),
            "price": (10.5 + __import__("random").random() * 0.1 for _ in range(100000)),
            "volume": (__import__("random").randint(100, 500) for _ in range(100000)),
        })
        await client.async_publish(
            namespace="market_data",
            topic="sz000651.tick_batch",
            data=df_large,
            format="df_pyarrow",
        )

        print("所有格式发布完成")
    finally:
        await client.async_close()


asyncio.run(main())

3. 通配符订阅

Topic 支持两种通配符:

通配符 含义 示例
* 匹配单级 sz000651.kline.* 匹配 sz000651.kline.1m,不匹配 sz000651.kline.1m.raw
> 匹配多级(仅限末尾) sz000651.> 匹配 sz000651.kline.1msz000651.trade
import asyncio
from pulse_mq import PulseMQClient


async def main():
    client = PulseMQClient(
        host="localhost",
        pub_port=5555,
        sub_port=5556,
        username="trader",
        token="your-token",
    )
    await client.async_connect()

    try:
        # 订阅单只股票的所有行情
        async for msg in client.async_subscribe(namespace="market_data", topic="sz000651.>"):
            print(f"[全行情] {msg.topic}")

        # 订阅所有股票的 1 分钟 K 线
        async for msg in client.async_subscribe(namespace="market_data", topic="*.kline.1m"):
            print(f"[1m K线] {msg.topic}")
    finally:
        await client.async_close()


asyncio.run(main())

4. 同步模式

除了异步模式,客户端也支持同步调用。subscribe()publish()run() 之前只是注册意图,run() 内部统一执行:

from pulse_mq import PulseMQClient, PulseMessage

client = PulseMQClient(
    host="localhost",
    pub_port=5555,
    sub_port=5556,
    username="subscriber_01",
    token="your-token",
)

def on_message(msg: PulseMessage):
    print(f"[{msg.namespace}] {msg.topic}: {msg.data}")

# 注册订阅
client.subscribe(namespace="market_data", topic="sz000651.>", callback=on_message)

# 注册发布(run() 前只记录,run() 内执行)
client.publish(namespace="market_data", topic="sz000651.signal", data="BUY", format="string")

# 阻塞运行:连接 → 认证 → 发送注册的 publish → 进入消息循环
client.run()

run() 之后仍可调用 publish() 发送新消息(线程安全)。

5. 压缩传输

async_publish 支持 compress 参数,订阅端自动解压:

await client.async_publish(
    namespace="market_data",
    topic="sz000651.tick_batch",
    data=large_dataframe,
    format="df_pyarrow",
    compress="zstd",    # 可选 "zlib" | "lz4" | "zstd"
)

支持的压缩算法:

算法 参数值 特点
zlib "zlib" 标准库内置,无需额外依赖
lz4 "lz4" 压缩/解压速度极快
zstd "zstd" 压缩率高,综合性能好

6. PulseMessage

订阅收到的消息为 PulseMessage 对象:

@dataclass(frozen=True)
class PulseMessage:
    namespace: str        # 命名空间
    topic: str            # 主题
    data: Any             # 自动反序列化后的数据
    format: str           # "bytes" | "string" | "df_msgpack" | "df_pyarrow"
    timestamp: int        # 毫秒时间戳
    compressed: str | None  # 压缩算法,无压缩时为 None

7. 查询信息

import asyncio
from pulse_mq import PulseMQClient


async def main():
    client = PulseMQClient(
        host="localhost",
        pub_port=5555,
        sub_port=5556,
        username="query_user",
        token="your-token",
    )
    await client.async_connect()

    try:
        # 查询可访问的 namespace 列表
        namespaces = await client.async_query_namespaces()
        print("Namespaces:", namespaces)
        # [{'name': 'market_data', 'topics': ['sz000651.kline.1m', ...], 'topic_count': 5}]

        # 查询指定 namespace 下的 topic 列表
        topics = await client.async_query_topics("market_data")
        print("Topics:", topics)
        # ['sz000651.kline.1m', 'sz000651.kline.5m', 'sz000651.trade']
    finally:
        await client.async_close()


asyncio.run(main())

8. 多订阅者并发

"""multi_subscriber.py — 多个订阅者并发接收"""
import asyncio
from pulse_mq import PulseMQClient


async def subscriber(name: str, token: str, namespace: str, topic: str):
    """单个订阅者协程"""
    client = PulseMQClient(
        host="localhost",
        pub_port=5555,
        sub_port=5556,
        username=name,
        token=token,
    )
    await client.async_connect()
    try:
        async for msg in client.async_subscribe(namespace=namespace, topic=topic):
            print(f"[{name}] {msg.topic}: {len(msg.data)} bytes")
    finally:
        await client.async_close()


async def main():
    # 3 个订阅者并发运行
    await asyncio.gather(
        subscriber("sub_kline", "token-1", "market_data", "sz000651.kline.>"),
        subscriber("sub_trade", "token-2", "market_data", "sz000651.trade"),
        subscriber("sub_all", "token-3", "market_data", "sz000651.>"),
    )


asyncio.run(main())

9. 使用 YAML 配置启动

创建 config.yaml

server:
  host: "0.0.0.0"
  pub_port: 6000
  sub_port: 6001
  admin_port: 9090

storage:
  db_path: "/data/pulsemq.db"

auth:
  jwt_secret: "your-production-secret-key-at-least-32-bytes"
  jwt_expiry: 43200  # 12 小时

limits:
  max_connections_per_user: 20
  max_subscriptions_per_conn: 1000

broker:
  hwm: 50000

logging:
  dir: "/var/log/pulsemq"
  level: "info"
  max_days: 30
pulsemq --config config.yaml

10. 通过 curl 调用管理 API

# 登录获取 JWT
TOKEN=$(curl -s -X POST http://localhost:8080/login \
  -H "Content-Type: application/json" \
  -d '{"username":"admin","password":"from_.pulsemq_admin"}' | python -c "import sys,json; print(json.load(sys.stdin)['token'])")

# 查看所有 namespace
curl -s -H "Authorization: Bearer $TOKEN" http://localhost:8080/api/namespaces | python -m json.tool

# 创建 namespace
curl -s -X POST http://localhost:8080/api/namespaces \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"name":"trade_signal","description":"交易信号"}'

# 创建用户
curl -s -X POST http://localhost:8080/api/users \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"username":"signal_bot","user_type":"mq_normal"}'

# 授权(需先获取 namespace_id 和 user_id)
curl -s -X POST http://localhost:8080/api/namespaces/1/permissions \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"user_id":2,"role":"pub"}'

# 查看活跃连接
curl -s -H "Authorization: Bearer $TOKEN" http://localhost:8080/api/connections

# 查看系统统计
curl -s -H "Authorization: Bearer $TOKEN" http://localhost:8080/api/stats/system

# 健康检查(无需认证)
curl http://localhost:8080/health

# 查看审计日志
curl -s -H "Authorization: Bearer $TOKEN" \
  "http://localhost:8080/api/audit-logs?limit=20" | python -m json.tool

11. 自动重连与订阅恢复

客户端 SDK 内置自动重连机制(指数退避:1s → 2s → 4s → ...,上限 30s,最多 10 次)。重连成功后自动恢复所有活跃订阅。

import asyncio
from pulse_mq import PulseMQClient


async def main():
    # reconnect=True(默认开启)
    client = PulseMQClient(
        host="localhost",
        pub_port=5555,
        sub_port=5556,
        username="robust_sub",
        token="your-token",
    )
    await client.async_connect()
    try:
        async for msg in client.async_subscribe(namespace="market_data", topic="sz000651.>"):
            print(f"收到: {msg.topic}")
            # 即使 Broker 重启,客户端会自动重连并恢复订阅
    finally:
        await client.async_close()


asyncio.run(main())

如需禁用自动重连:

client = PulseMQClient(
    host="localhost",
    pub_port=5555,
    sub_port=5556,
    username="user",
    token="token",
    reconnect=False,
)

架构

Publisher → ROUTER (port 5555) → [Broker] → XPUB (port 5556) → Subscriber
                                    │
                                    ├── AUTH 认证
                                    ├── PUB 消息转发
                                    ├── SUB 订阅管理
                                    └── QUERY 查询响应

消息格式

每条消息由 6 个 ZMQ frame 组成:

Frame 名称 说明
0 Header 20 字节二进制(版本/类型/格式/标志/长度/计数/时间戳)
1 Namespace UTF-8 字符串,默认 "default"
2 Topic UTF-8 字符串,最多 3 级(如 sz000651.kline.1m
3 Username UTF-8 字符串
4 Payload 二进制载荷
5 Reserved 预留扩展

数据格式

格式 说明 大小限制
BYTES 0x00 原始字节流 10 MB
STRING 0x01 UTF-8 文本 8 KB
DF_MSGPACK 0x02 DataFrame → MessagePack 1 MB
DF_PYARROW 0x03 DataFrame → Apache Arrow IPC 64 MB

权限模型

权限控制在 namespace 级别,采用 3 级角色体系:

  • sub:订阅权
  • pub:发布权(包含 sub)
  • admin:管理权(包含 pub)

用户类型:

  • super_admin:后台管理 + 全部 MQ 权限
  • mq_super:全部 MQ 权限(无后台管理)
  • mq_normal:需通过 namespace_permissions 鉴权
  • admin_only:仅后台管理

配置

零配置启动

不指定任何参数时使用默认值:

server:
  host: "0.0.0.0"
  pub_port: 5555
  sub_port: 5556
  admin_port: 8080

storage:
  db_path: "./pulsemq.db"

auth:
  token_hash: "sha256"
  ping_interval: 30
  ping_timeout: 3

limits:
  max_connections_per_user: 10
  max_subscriptions_per_conn: 500
  cache_refresh_sec: 300

broker:
  hwm: 10000

logging:
  dir: "./logs"
  max_days: 7
  level: "info"

YAML 配置文件

创建 config.yaml,只覆盖需要修改的项:

server:
  pub_port: 6000
  admin_port: 9090

storage:
  db_path: "/data/pulsemq.db"

启动时指定配置文件:

pulsemq --config config.yaml

CLI 参数

命令行参数优先级最高:CLI > YAML > 默认值

pulsemq --host 127.0.0.1 --pub-port 7000 --log-level debug

API

健康检查

curl http://localhost:8080/health
# {"status":"ok","version":"4.3.0","uptime_seconds":1234,"connections":5}

管理 API

端点 方法 说明
/login POST 登录获取 JWT
/api/namespaces GET/POST 命名空间列表/创建
/api/namespaces/{id} GET/PUT/DELETE 命名空间详情/更新/删除
/api/namespaces/{id}/permissions POST 授权
/api/topics GET 主题列表(按 namespace 过滤)
/api/topics/{id} GET/DELETE 主题详情/删除
/api/users GET/POST 用户列表/创建
/api/users/{id} PUT/DELETE 用户更新/删除
/api/users/{id}/reset-token POST 重置 token
/api/connections GET 活跃连接列表
/api/connections/disconnect-user POST 断开用户连接
/api/stats/system GET 系统统计
/api/stats/topics GET 主题统计
/api/audit-logs GET 审计日志

所有 API 需要在请求头中携带 JWT:

curl -H "Authorization: Bearer <token>" http://localhost:8080/api/namespaces

目录结构

pulse-mq/
├── pulse_mq/
│   ├── __init__.py          # 版本号
│   ├── errors.py            # 自定义异常
│   ├── config.py            # 配置系统
│   ├── protocol.py          # 协议编解码
│   ├── auth.py              # 认证(token/password)
│   ├── permission.py        # 权限缓存与鉴权
│   ├── stats.py             # 统计采集
│   ├── proxy.py             # ZMQ 消息代理
│   ├── broker.py            # Broker 主进程
│   ├── server.py            # PulseMQServer(服务端公共 API)
│   ├── client.py            # PulseMQClient + PulseMessage(客户端公共 API)
│   ├── _client.py           # 客户端 SDK(内部模块)
│   ├── cli.py               # CLI 入口
│   ├── admin/
│   │   ├── app.py           # HTTP API
│   │   └── static/
│   │       └── index.html   # 管理后台前端
│   └── db/
│       ├── base.py          # 数据库抽象基类
│       ├── sqlite.py        # SQLite 适配器
│       ├── schema.py        # DDL 定义
│       └── queries.py       # CRUD 操作
├── tests/                   # 测试文件
├── docs/                    # 设计文档
├── pyproject.toml           # 项目配置
└── README.md

测试

# 运行全部测试
python -m pytest tests/ -v

# 运行特定模块测试
python -m pytest tests/test_broker.py -v

依赖

核心依赖

  • pyzmq - ZeroMQ Python 绑定
  • aiosqlite - 异步 SQLite
  • bcrypt - 密码哈希
  • msgpack - MessagePack 序列化
  • pyarrow - Apache Arrow
  • pandas - DataFrame 支持
  • pyyaml - YAML 配置解析
  • zstandard / lz4 - 压缩算法
  • aiohttp - HTTP 服务
  • PyJWT - JWT 认证

开发依赖

  • pytest - 测试框架
  • pytest-asyncio - 异步测试支持
  • pytest-aiohttp - aiohttp 测试支持

设计文档

文档 内容
docs/PulseMQ协议模型-v4.0.md 消息帧结构、数据格式、消息类型、Topic 规则、通配符
docs/消息队列权限模型.md 权限模型、认证机制、缓存策略、审计
docs/PulseMQ后台管理系统设计.md 管理后台功能、监控指标、仪表盘、SQLite 表结构
docs/PulseMQ Broker 运行设计.md 配置、启动流程、流控、健康检查、关闭、日志、客户端 SDK

许可证

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pulse_mq-0.3.1.tar.gz (94.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pulse_mq-0.3.1-py3-none-any.whl (61.7 kB view details)

Uploaded Python 3

File details

Details for the file pulse_mq-0.3.1.tar.gz.

File metadata

  • Download URL: pulse_mq-0.3.1.tar.gz
  • Upload date:
  • Size: 94.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for pulse_mq-0.3.1.tar.gz
Algorithm Hash digest
SHA256 5d010b3b00058bb3f77e0c4ae287293efc3d70eb2c48ffb26ffcc31cd7edbf9a
MD5 65a5f3b2e67b33e686a5de4cbd2e54cc
BLAKE2b-256 10a84c62748a1068aceb7e8f05087f437b947d72b603d1c3dd60cb90e450f0fd

See more details on using hashes here.

File details

Details for the file pulse_mq-0.3.1-py3-none-any.whl.

File metadata

  • Download URL: pulse_mq-0.3.1-py3-none-any.whl
  • Upload date:
  • Size: 61.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for pulse_mq-0.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 8e0e202d4eef62a9f98444bdb2569d8621e90d7fbb2e7816d96323e7c06ba543
MD5 c266fa4a62e843b889911e6e9b2370a7
BLAKE2b-256 c11fb52316c6d2bfd3e0f6dd11be84097669d6c1720b3cf1c4f85cb6b335aad8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page