Skip to main content

基于 ZeroMQ 的轻量级消息队列系统

Project description

PulseMQ

基于 ZeroMQ 的轻量级消息队列系统,配套 Grafana 风格的 Web 管理后台。

特性

  • 双 Socket 架构 — ROUTER + PULL,消息延迟低
  • 多格式支持 — 字符串、msgpack、PyArrow
  • Web 管理后台 — Grafana 暗色主题,ECharts 监控图表
  • 权限管理 — 用户 token 认证,按 topic 控制推送/订阅权限,支持自动创建 topic
  • 单进程部署 — 一条命令启动全部服务
  • PyPI 安装pip install pulse-mq 即可使用

快速安装

pip install pulse-mq

快速开始

方式一:命令行启动

# 复制配置文件
cp .env.example .env

# 启动服务(同时启动 ZMQ 消息服务 + HTTP 管理后台)
pulse-mq-server

首次启动会自动生成 admin 密码,输出到控制台和 admin_credentials.txt

管理后台默认地址: http://localhost:8080

方式二:Python 代码启动

from pulse_mq import PulseMQServer

server = PulseMQServer()  # 自动读取 .env 配置
server.run()              # 启动全部服务,阻塞运行(Ctrl+C 停止)

客户端使用

from pulse_mq import PulseMQClient

client = PulseMQClient(
    host="localhost",
    port=5555,
    username="your-username",
    token="your-token",      # 在管理后台 Users 页面生成
    heartbeat_interval=30,   # 心跳间隔(秒),默认 30
)

# 订阅消息(回调方式)
def on_message(msg):
    print(f"Topic: {msg.topic}, Data: {msg.data}, Format: {msg.format}")

client.subscribe("metrics/cpu", callback=on_message)

# 运行(阻塞,处理心跳和消息接收)
client.run()

发布消息

from pulse_mq import PulseMQClient

# 发布端也需要先连接
pub = PulseMQClient(host="localhost", port=5555, username="user1", token="your-token")
pub.run()  # 阻塞运行

# 注意:subscribe() 必须在 run() 之前调用,这样连接后会自动订阅
# publish() 在 run() 运行期间调用(通常在另一个线程)
pub.publish("metrics/cpu", data="42.5")

异步客户端

import asyncio
from pulse_mq import PulseMQClient

async def main():
    client = PulseMQClient(host="localhost", port=5555, username="user1", token="your-token")
    client.subscribe("metrics/cpu", lambda msg: print(msg.data))
    await client.run_async()  # 异步运行

asyncio.run(main())

PulseMQServer 参数

from pulse_mq import PulseMQServer, Config

# 方式 1:自动读取 .env 配置
server = PulseMQServer()

# 方式 2:指定 .env 文件路径
server = PulseMQServer(env_file="/path/to/.env")

# 方式 3:编程方式传入配置
config = Config(
    host="0.0.0.0",
    http_port=8080,
    zmq_router_port=5555,
    zmq_pull_port=5556,
    db_type="sqlite",
    db_path="./pulse_mq.db",
    admin_password="my-password",
)
server = PulseMQServer(config=config)

server.start() — 非阻塞启动

启动服务,非阻塞,立即返回。可继续调用 server.publish() 推送消息。

参数 类型 默认值 说明
with_http bool True 是否同时启动 HTTP 管理后台
# 启动全部服务(非阻塞)
server.start()

# 仅启动 ZMQ 消息服务,不启动 HTTP 管理后台
server.start(with_http=False)

# 启动后可以继续推送消息
server.start()
server.publish("alerts/system", data="CPU 过高", format="string")

# 需要自行保持进程运行(例如在其他框架中嵌入)
import time
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    server.stop()

server.run() — 阻塞运行

启动服务并阻塞运行(适合独立脚本),Ctrl+C 停止。等同于 start() + 阻塞等待。

参数 类型 默认值 说明
with_http bool True 是否同时启动 HTTP 管理后台
server.run()              # 阻塞运行,Ctrl+C 停止
server.run(with_http=False)

server.publish()

参数 类型 默认值 说明
topic str 必填 Topic 名称,格式 group/topic
data str/bytes/dict/DataFrame 必填 消息数据
format str "string" 数据格式:"string" / "msgpack" / "pyarrow"
server.publish("alerts/system", data="CPU 过高", format="string")
server.publish("data/events", data={"key": "value"}, format="msgpack")

server.stop()

停止服务,关闭所有 ZMQ socket 和线程。


PulseMQClient 参数

from pulse_mq import PulseMQClient

# 构造函数
client = PulseMQClient(
    host="localhost",          # 服务端地址
    port=5555,                 # ZMQ ROUTER 端口
    username="user1",          # 用户名(在管理后台创建)
    token="your-token",        # Token(在管理后台生成)
    heartbeat_interval=30,     # 心跳间隔(秒)
)

# 从环境变量读取
client = PulseMQClient.from_env()

PulseMQClient() 构造参数

参数 类型 默认值 说明
host str "localhost" 服务端地址
port int 5555 ZMQ ROUTER 端口
username str "" 用户名(管理后台 Users 页面创建)
token str "" 用户 Token(管理后台 Users 页面生成)
heartbeat_interval int 30 心跳发送间隔(秒)

client.subscribe(topic, callback)

参数 类型 说明
topic str 订阅的 Topic,格式 group/topic
callback Callable 回调函数,参数为 PulseMQMessage 对象

注意: subscribe() 必须在 run() 之前调用。连接认证后会自动发送订阅请求。

def on_message(msg):
    print(f"Topic: {msg.topic}")     # Topic 名称
    print(f"Data: {msg.data}")       # 消息数据(已反序列化)
    print(f"Format: {msg.format}")   # 数据格式:"string"/"msgpack"/"pyarrow"

client.subscribe("metrics/cpu", callback=on_message)

client.publish(topic, data, format)

参数 类型 默认值 说明
topic str 必填 Topic 名称,格式 group/topic
data str/bytes/dict/DataFrame 必填 消息数据
format str "string" 数据格式

注意: publish() 需要在 run() 之后调用(即连接建立后),否则会抛出 RuntimeError

client.run() / client.run_async()

方法 说明
client.run() 阻塞运行,处理心跳和消息接收(适合独立脚本)
client.run_async() 异步运行,内部使用线程池(适合 asyncio 环境)

client.unsubscribe(topic)

取消订阅指定 topic。

client.disconnect()

断开连接,关闭 ZMQ socket。


消息格式

格式 说明 示例
string 原生字符串,最长 4096 字节 publish("t/d", data="hello", format="string")
msgpack MessagePack 序列化 publish("t/d", data={"key": "val"}, format="msgpack")
pyarrow PyArrow IPC 格式(支持 DataFrame) publish("t/d", data=df, format="pyarrow")

Topic 命名规则

格式: group/topic(强制一层 / 分隔)

  • 允许字符: a-z A-Z 0-9 _ - .
  • 每段长度: 1-128 字符
  • 示例: metrics/cpu, logs/app-error, data_1/test.topic

权限说明

用户权限

权限 说明
is_active 用户是否启用,禁用后无法连接
can_create_topic 推送消息到不存在的 topic 时自动创建

Topic 权限

通过管理后台 Permissions 页面设置:

权限 说明
can_pub 允许推送到该 topic
can_sub 允许订阅该 topic

推送消息到已有 topic 需要 can_pub 权限。如果 topic 不存在且用户有 can_create_topic 权限,则自动创建 topic 并推送成功。


配置项

所有配置通过 .env 文件或环境变量设置:

环境变量 默认值 说明
PULSE_HOST 0.0.0.0 HTTP 服务监听地址
PULSE_HTTP_PORT 8080 HTTP 服务端口
PULSE_ZMQ_ROUTER_PORT 5555 ZMQ ROUTER 端口(客户端连接入口)
PULSE_ZMQ_PULL_PORT 5556 ZMQ PULL 端口(接收客户端推送的消息)
PULSE_DB_TYPE sqlite 数据库类型:sqlitemysql
PULSE_DB_PATH ./pulse_mq.db SQLite 文件路径(仅 sqlite 模式)
PULSE_DB_URL - MySQL 连接 URL(仅 mysql 模式,格式:mysql+pymysql://user:pass@host:3306/db
PULSE_QUEUE_SIZE 1000 每个 topic 内存队列保留的最大消息条数
PULSE_AUTH_CACHE_TTL 60 客户端认证缓存从数据库刷新的间隔(秒)
PULSE_JWT_SECRET 自动生成 JWT 签名密钥(留空则首次启动自动生成)
PULSE_JWT_EXPIRE_HOURS 24 JWT Token 过期时间(小时)
PULSE_ADMIN_PASSWORD 自动生成 admin 初始密码(留空则首次启动自动生成)
PULSE_HEARTBEAT_TIMEOUT 60 服务端判定客户端断线的超时时间(秒)
PULSE_STATS_RETENTION_DAYS 7 消息统计数据保留天数

客户端环境变量

客户端通过 PulseMQClient.from_env() 从环境变量读取配置:

环境变量 说明
PULSE_CLIENT_HOST 服务端地址
PULSE_CLIENT_PORT ZMQ ROUTER 端口
PULSE_CLIENT_USERNAME 用户名
PULSE_CLIENT_TOKEN 用户 Token

管理后台

启动服务后访问 http://localhost:8080,使用 admin 账号登录。

  • Dashboard — 概览统计、Topic 监控图表、系统配置、快速开始代码
  • Topics — Topic 列表、今日消息量、最近消息时间、7 天趋势图
  • Users — 用户管理、Token 生成/复制、启停状态、创建 topic 权限
  • Permissions — 权限矩阵,按用户/Topic 控制推送(P)和订阅(S)权限

数据库

默认使用 SQLite,可通过配置切换到 MySQL:

# SQLite(默认)
PULSE_DB_TYPE=sqlite
PULSE_DB_PATH=./pulse_mq.db

# MySQL
PULSE_DB_TYPE=mysql
PULSE_DB_URL=mysql+pymysql://user:password@host:3306/pulse_mq

开发

# 克隆项目
git clone https://github.com/your-username/pulse-mq.git
cd pulse-mq

# 创建虚拟环境
uv venv
source .venv/bin/activate  # Linux/macOS
# 或 .venv\Scripts\activate  # Windows

# 安装依赖
uv pip install -e ".[dev]"

# 运行测试
pytest tests/ -v

许可证

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pulse_mq-0.1.7.tar.gz (189.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pulse_mq-0.1.7-py3-none-any.whl (39.8 kB view details)

Uploaded Python 3

File details

Details for the file pulse_mq-0.1.7.tar.gz.

File metadata

  • Download URL: pulse_mq-0.1.7.tar.gz
  • Upload date:
  • Size: 189.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for pulse_mq-0.1.7.tar.gz
Algorithm Hash digest
SHA256 d814c03bf070d9900f753eb0e7f3267c8641be46e3da90e8c655972c3988ecb5
MD5 ee01755ce8c14afadc70256a26c1de52
BLAKE2b-256 f26edbb25383749f2bc87d675f8cc1e6a0df8fb0b6b7d0f20c55ba3bfcd7ea60

See more details on using hashes here.

File details

Details for the file pulse_mq-0.1.7-py3-none-any.whl.

File metadata

  • Download URL: pulse_mq-0.1.7-py3-none-any.whl
  • Upload date:
  • Size: 39.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for pulse_mq-0.1.7-py3-none-any.whl
Algorithm Hash digest
SHA256 4c44eccbd01071a59a1626489ec028cafd8df77a9701ef5ef5c6208cdc9174c3
MD5 f9b2079f50f3b7255e7b00e748492014
BLAKE2b-256 e71921e987fbdcff9063f3fb7e8c0f0ba14bf0e6a20a5b954e0b2ab798434144

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page