基于 ZeroMQ 的轻量级消息队列系统
Project description
PulseMQ
基于 ZeroMQ 的轻量级消息队列系统,配套 Grafana 风格的 Web 管理后台。
特性
- 双 Socket 架构 — ROUTER + PULL,消息延迟低
- 多格式支持 — 字符串、msgpack、PyArrow
- Web 管理后台 — Grafana 暗色主题,ECharts 监控图表
- 权限管理 — 用户 token 认证,按 topic 控制推送/订阅权限
- 单进程部署 — 一条命令启动全部服务
- PyPI 安装 —
pip install pulse-mq即可使用
快速安装
pip install pulse-mq
快速开始
方式一:命令行启动
# 复制配置文件
cp .env.example .env
# 启动服务(同时启动 ZMQ 消息服务 + HTTP 管理后台)
pulse-mq-server
首次启动会自动生成 admin 密码,输出到控制台和 admin_credentials.txt。
管理后台默认地址: http://localhost:8080
方式二:Python 代码启动
from pulse_mq import PulseMQServer
server = PulseMQServer() # 自动读取 .env 配置
server.run() # 启动全部服务,阻塞运行(Ctrl+C 停止)
客户端使用
from pulse_mq import PulseMQClient
client = PulseMQClient(
host="localhost",
port=5555,
username="your-username",
token="your-token",
)
# 订阅消息(回调方式)
def on_message(msg):
print(f"Topic: {msg.topic}, Data: {msg.data}, Format: {msg.format}")
client.subscribe("metrics/cpu", callback=on_message)
# 推送消息
client.publish("metrics/cpu", data=42.5, format="string")
# 运行(阻塞,处理心跳和消息接收)
client.run()
PulseMQServer 参数
from pulse_mq import PulseMQServer, Config
# 方式 1:自动读取 .env 配置
server = PulseMQServer()
# 方式 2:指定 .env 文件路径
server = PulseMQServer(env_file="/path/to/.env")
# 方式 3:编程方式传入配置
config = Config(
host="0.0.0.0",
http_port=8080,
zmq_router_port=5555,
zmq_pull_port=5556,
db_type="sqlite",
db_path="./pulse_mq.db",
admin_password="my-password",
)
server = PulseMQServer(config=config)
server.start() 参数
启动服务,非阻塞,立即返回。可继续调用 server.publish() 推送消息。
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
with_http |
bool |
True |
是否同时启动 HTTP 管理后台 |
# 启动全部服务(非阻塞)
server.start()
# 仅启动 ZMQ 消息服务,不启动 HTTP 管理后台
server.start(with_http=False)
# 启动后可以继续推送消息
server.start()
server.publish("alerts/system", data="CPU 过高", format="string")
# 需要自行保持进程运行(例如在其他框架中嵌入)
import time
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
server.stop()
server.run() 参数
启动服务并阻塞运行(适合独立脚本),Ctrl+C 停止。等同于 start() + 阻塞等待。
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
with_http |
bool |
True |
是否同时启动 HTTP 管理后台 |
# 独立脚本用法
server.run() # 阻塞运行,Ctrl+C 停止
# 不启动 HTTP 管理后台
server.run(with_http=False)
server.publish() 参数
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
topic |
str |
必填 | Topic 名称,格式 group/topic |
data |
str/bytes/dict/DataFrame |
必填 | 消息数据 |
format |
str |
"string" |
数据格式:"string" / "msgpack" / "pyarrow" |
# 字符串消息
server.publish("alerts/system", data="CPU 过高", format="string")
# msgpack 消息(自动序列化 dict/list)
server.publish("data/events", data={"key": "value"}, format="msgpack")
# PyArrow 消息(自动序列化 pandas DataFrame)
import pandas as pd
df = pd.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]})
server.publish("metrics/batch", data=df, format="pyarrow")
server.stop()
停止服务,关闭所有 ZMQ socket 和线程。
PulseMQClient 参数
from pulse_mq import PulseMQClient
# 方式 1:构造函数参数
client = PulseMQClient(
host="localhost", # 服务端地址
port=5555, # ZMQ ROUTER 端口
username="user1", # 用户名(在管理后台创建)
token="your-token", # Token(在管理后台生成)
)
# 方式 2:从环境变量读取
# export PULSE_CLIENT_HOST=localhost
# export PULSE_CLIENT_PORT=5555
# export PULSE_CLIENT_USERNAME=user1
# export PULSE_CLIENT_TOKEN=your-token
client = PulseMQClient.from_env()
PulseMQClient() 构造参数
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
host |
str |
"localhost" |
服务端地址 |
port |
int |
5555 |
ZMQ ROUTER 端口(对应服务端 PULSE_ZMQ_ROUTER_PORT) |
username |
str |
"" |
用户名(在管理后台 Users 页面创建) |
token |
str |
"" |
用户 Token(在管理后台 Users 页面生成) |
client.subscribe() 参数
| 参数 | 类型 | 说明 |
|---|---|---|
topic |
str |
订阅的 Topic,格式 group/topic |
callback |
Callable |
回调函数,消息到达时调用,参数为 PulseMQMessage 对象 |
def on_message(msg):
print(f"Topic: {msg.topic}") # Topic 名称
print(f"Data: {msg.data}") # 消息数据(已反序列化)
print(f"Format: {msg.format}") # 数据格式:"string"/"msgpack"/"pyarrow"
client.subscribe("metrics/cpu", callback=on_message)
client.publish() 参数
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
topic |
str |
必填 | Topic 名称,格式 group/topic |
data |
str/bytes/dict/DataFrame |
必填 | 消息数据 |
format |
str |
"string" |
数据格式:"string" / "msgpack" / "pyarrow" |
client.run() / client.run_async()
| 方法 | 说明 |
|---|---|
client.run() |
阻塞运行,处理心跳和消息接收(适合独立脚本) |
client.run_async() |
异步运行(适合 asyncio 环境) |
client.unsubscribe(topic)
取消订阅指定 topic。
client.disconnect()
断开连接,关闭 ZMQ socket。
消息格式
| 格式 | 说明 | 示例 |
|---|---|---|
string |
原生字符串,最长 4096 字节 | client.publish("t/d", data="hello", format="string") |
msgpack |
MessagePack 序列化 | client.publish("t/d", data={"key": "val"}, format="msgpack") |
pyarrow |
PyArrow IPC 格式(支持 DataFrame) | client.publish("t/d", data=df, format="pyarrow") |
Topic 命名规则
格式: group/topic(强制一层层级)
- 允许字符:
a-z A-Z 0-9 _ - . - 每段长度: 1-128 字符
- 示例:
metrics/cpu,logs/app-error,data_1/test.topic - 客户端推送/订阅时,topic 不存在会自动创建
配置项
所有配置通过 .env 文件或环境变量设置:
| 环境变量 | 默认值 | 说明 |
|---|---|---|
PULSE_HOST |
0.0.0.0 |
HTTP 服务监听地址 |
PULSE_HTTP_PORT |
8080 |
HTTP 服务端口 |
PULSE_ZMQ_ROUTER_PORT |
5555 |
ZMQ ROUTER 端口(客户端连接入口) |
PULSE_ZMQ_PULL_PORT |
5556 |
ZMQ PULL 端口(接收客户端推送的消息) |
PULSE_DB_TYPE |
sqlite |
数据库类型:sqlite 或 mysql |
PULSE_DB_PATH |
./pulse_mq.db |
SQLite 文件路径(仅 sqlite 模式) |
PULSE_DB_URL |
- | MySQL 连接 URL(仅 mysql 模式,格式:mysql+pymysql://user:pass@host:3306/db) |
PULSE_QUEUE_SIZE |
1000 |
每个 topic 内存队列保留的最大消息条数 |
PULSE_AUTH_CACHE_TTL |
60 |
客户端认证缓存从数据库刷新的间隔(秒) |
PULSE_JWT_SECRET |
自动生成 | JWT 签名密钥(留空则首次启动自动生成) |
PULSE_JWT_EXPIRE_HOURS |
24 |
JWT Token 过期时间(小时) |
PULSE_ADMIN_PASSWORD |
自动生成 | admin 初始密码(留空则首次启动自动生成) |
PULSE_HEARTBEAT_INTERVAL |
30 |
客户端心跳发送间隔(秒) |
PULSE_HEARTBEAT_TIMEOUT |
60 |
服务端判定客户端断线的超时时间(秒) |
PULSE_STATS_RETENTION_DAYS |
7 |
消息统计数据保留天数 |
客户端环境变量
客户端通过 PulseMQClient.from_env() 从环境变量读取配置:
| 环境变量 | 说明 |
|---|---|
PULSE_CLIENT_HOST |
服务端地址 |
PULSE_CLIENT_PORT |
ZMQ ROUTER 端口 |
PULSE_CLIENT_USERNAME |
用户名 |
PULSE_CLIENT_TOKEN |
用户 Token |
管理后台
启动服务后访问 http://localhost:8080,使用 admin 账号登录。
- Dashboard — 概览统计、Topic 监控图表、系统配置、快速开始代码
- Topics — Topic 列表、今日消息量、最近消息时间、7 天趋势图
- Users — 用户管理、Token 生成/复制、启停状态
- Permissions — 权限矩阵,按用户/Topic 控制推送(P)和订阅(S)权限
数据库
默认使用 SQLite,可通过配置切换到 MySQL:
# SQLite(默认)
PULSE_DB_TYPE=sqlite
PULSE_DB_PATH=./pulse_mq.db
# MySQL
PULSE_DB_TYPE=mysql
PULSE_DB_URL=mysql+pymysql://user:password@host:3306/pulse_mq
开发
# 克隆项目
git clone https://github.com/your-username/pulse-mq.git
cd pulse-mq
# 创建虚拟环境
uv venv
source .venv/bin/activate # Linux/macOS
# 或 .venv\Scripts\activate # Windows
# 安装依赖
uv pip install -e ".[dev]"
# 运行测试
pytest tests/ -v
许可证
MIT License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pulse_mq-0.1.6.tar.gz.
File metadata
- Download URL: pulse_mq-0.1.6.tar.gz
- Upload date:
- Size: 171.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c161db3388f85613ad6dd04593bdedb46ac9d57b01f41690f1a63a6ae915f8de
|
|
| MD5 |
2054f3de425a19b9b0fb82650dbbab28
|
|
| BLAKE2b-256 |
48272ffe941fbd15de42310d94ba3f671045aca2679e8fbfe018f2a1b453130b
|
File details
Details for the file pulse_mq-0.1.6-py3-none-any.whl.
File metadata
- Download URL: pulse_mq-0.1.6-py3-none-any.whl
- Upload date:
- Size: 36.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f45702abc26ce04e70ca48145396f370b1e961e62fba4ad03233c75f3e06a565
|
|
| MD5 |
6192074f644fae31b1387a0b9581dff2
|
|
| BLAKE2b-256 |
1df53eb9b9601e061a19c59449655bea617e7e1f34c86b47eb987b78772ff76c
|