基于 ZeroMQ 的轻量级消息队列系统
Project description
PulseMQ
基于 ZeroMQ 的轻量级消息队列系统,配套 Grafana 风格的 Web 管理后台。
特性
- 双 Socket 架构 — ROUTER + PULL,消息延迟低
- 多格式支持 — 字符串、msgpack、PyArrow
- Web 管理后台 — Grafana 暗色主题,ECharts 监控图表
- 权限管理 — 用户 token 认证,按 topic 控制推送/订阅权限
- 单进程部署 — 一条命令启动全部服务
- PyPI 安装 —
pip install pulse-mq即可使用
快速安装
pip install pulse-mq
快速开始
启动服务端
# 复制配置文件
cp .env.example .env
# 启动服务
pulse-mq-server
首次启动会自动生成 admin 密码,输出到控制台和 admin_credentials.txt。
管理后台默认地址: http://localhost:8080
客户端使用
from pulse_mq import PulseMQClient
client = PulseMQClient(
host="localhost",
port=5555,
username="your-username",
token="your-token",
)
# 订阅消息(回调方式)
def on_message(msg):
print(f"Topic: {msg.topic}, Data: {msg.data}, Format: {msg.format}")
client.subscribe("metrics/cpu", callback=on_message)
# 推送消息
client.publish("metrics/cpu", data=42.5, format="string")
# 运行(阻塞,处理心跳和消息接收)
client.run()
服务端 SDK
from pulse_mq import PulseMQServer
server = PulseMQServer() # 自动读取 .env 配置
server.start()
# 服务端直接推送消息
server.publish("alerts/system", data="CPU 过高", format="string")
server.publish("metrics/batch", data=df, format="pyarrow") # DataFrame
server.stop()
消息格式
| 格式 | 说明 | 示例 |
|---|---|---|
string |
原生字符串,最长 4096 字节 | client.publish("t/d", data="hello", format="string") |
msgpack |
MessagePack 序列化 | client.publish("t/d", data={"key": "val"}, format="msgpack") |
pyarrow |
PyArrow IPC 格式(支持 DataFrame) | client.publish("t/d", data=df, format="pyarrow") |
Topic 命名规则
格式: group/topic(强制一层层级)
- 允许字符:
a-z A-Z 0-9 _ - . - 每段长度: 1-128 字符
- 示例:
metrics/cpu,logs/app-error,data_1/test.topic - 客户端推送/订阅时,topic 不存在会自动创建
配置项
所有配置通过 .env 文件或环境变量设置:
| 环境变量 | 默认值 | 说明 |
|---|---|---|
PULSE_HOST |
0.0.0.0 |
HTTP 服务监听地址 |
PULSE_HTTP_PORT |
8080 |
HTTP 服务端口 |
PULSE_ZMQ_ROUTER_PORT |
5555 |
ZMQ ROUTER 端口 |
PULSE_ZMQ_PULL_PORT |
5556 |
ZMQ PULL 端口 |
PULSE_DB_TYPE |
sqlite |
数据库类型 (sqlite/mysql) |
PULSE_DB_PATH |
./pulse_mq.db |
SQLite 文件路径 |
PULSE_DB_URL |
- | MySQL 连接 URL |
PULSE_QUEUE_SIZE |
1000 |
每 topic 内存队列大小 |
PULSE_AUTH_CACHE_TTL |
60 |
auth 缓存刷新间隔(秒) |
PULSE_JWT_SECRET |
自动生成 | JWT 签名密钥 |
PULSE_JWT_EXPIRE_HOURS |
24 |
JWT 过期时间(小时) |
PULSE_ADMIN_PASSWORD |
自动生成 | admin 初始密码 |
PULSE_HEARTBEAT_INTERVAL |
30 |
客户端心跳间隔(秒) |
PULSE_HEARTBEAT_TIMEOUT |
60 |
服务端断线超时(秒) |
PULSE_STATS_RETENTION_DAYS |
7 |
统计数据保留天数 |
管理后台
启动服务后访问 http://localhost:8080,使用 admin 账号登录。
- Dashboard — 概览统计、Topic 监控图表、系统配置、快速开始代码
- Topics — Topic 列表、今日消息量、最近消息时间、7 天趋势图
- Users — 用户管理、Token 生成/复制、启停状态
- Permissions — 权限矩阵,按用户/Topic 控制推送(P)和订阅(S)权限
数据库
默认使用 SQLite,可通过配置切换到 MySQL:
# SQLite(默认)
PULSE_DB_TYPE=sqlite
PULSE_DB_PATH=./pulse_mq.db
# MySQL
PULSE_DB_TYPE=mysql
PULSE_DB_URL=mysql+pymysql://user:password@host:3306/pulse_mq
开发
# 克隆项目
git clone https://github.com/your-username/pulse-mq.git
cd pulse-mq
# 创建虚拟环境
uv venv
source .venv/bin/activate # Linux/macOS
# 或 .venv\Scripts\activate # Windows
# 安装依赖
uv pip install -e ".[dev]"
# 运行测试
pytest tests/ -v
许可证
MIT License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
pulse_mq-0.1.0.tar.gz
(166.6 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
pulse_mq-0.1.0-py3-none-any.whl
(34.7 kB
view details)
File details
Details for the file pulse_mq-0.1.0.tar.gz.
File metadata
- Download URL: pulse_mq-0.1.0.tar.gz
- Upload date:
- Size: 166.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cb05efcf31192517c5bed6090563d457293f96e3c1c3f333cc7f29b8d75b8d5b
|
|
| MD5 |
22e2d2aa30a9e07a14fca6eda8bb6f29
|
|
| BLAKE2b-256 |
c286e8932ca0796bd0a9eebdf533c13f67275be62f71b0472443afb7f902cb5b
|
File details
Details for the file pulse_mq-0.1.0-py3-none-any.whl.
File metadata
- Download URL: pulse_mq-0.1.0-py3-none-any.whl
- Upload date:
- Size: 34.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dd14ce9fbf1bfc68d1d65f57cf549f936c463ebd2b358c60b1f53894528ef4dc
|
|
| MD5 |
fb5baa5bea662ba12eec333298e0caa8
|
|
| BLAKE2b-256 |
83c17a631f07163c6c5847b62279fe4e2c669e2f5c2889b62fc2e259e4ca74ac
|