Skip to main content

基于 ZeroMQ 的轻量级消息队列系统

Project description

PulseMQ

基于 ZeroMQ 的轻量级消息队列系统,配套 Grafana 风格的 Web 管理后台。

特性

  • 双 Socket 架构 — ROUTER + PULL,消息延迟低
  • 多格式支持 — 字符串、msgpack、PyArrow
  • Web 管理后台 — Grafana 暗色主题,ECharts 监控图表
  • 权限管理 — 用户 token 认证,按 topic 控制推送/订阅权限
  • 单进程部署 — 一条命令启动全部服务
  • PyPI 安装pip install pulse-mq 即可使用

快速安装

pip install pulse-mq

快速开始

启动服务端

# 复制配置文件
cp .env.example .env

# 启动服务
pulse-mq-server

首次启动会自动生成 admin 密码,输出到控制台和 admin_credentials.txt

管理后台默认地址: http://localhost:8080

客户端使用

from pulse_mq import PulseMQClient

client = PulseMQClient(
    host="localhost",
    port=5555,
    username="your-username",
    token="your-token",
)

# 订阅消息(回调方式)
def on_message(msg):
    print(f"Topic: {msg.topic}, Data: {msg.data}, Format: {msg.format}")

client.subscribe("metrics/cpu", callback=on_message)

# 推送消息
client.publish("metrics/cpu", data=42.5, format="string")

# 运行(阻塞,处理心跳和消息接收)
client.run()

服务端 SDK

from pulse_mq import PulseMQServer

server = PulseMQServer()  # 自动读取 .env 配置
server.start()

# 服务端直接推送消息
server.publish("alerts/system", data="CPU 过高", format="string")
server.publish("metrics/batch", data=df, format="pyarrow")  # DataFrame

server.stop()

消息格式

格式 说明 示例
string 原生字符串,最长 4096 字节 client.publish("t/d", data="hello", format="string")
msgpack MessagePack 序列化 client.publish("t/d", data={"key": "val"}, format="msgpack")
pyarrow PyArrow IPC 格式(支持 DataFrame) client.publish("t/d", data=df, format="pyarrow")

Topic 命名规则

格式: group/topic(强制一层层级)

  • 允许字符: a-z A-Z 0-9 _ - .
  • 每段长度: 1-128 字符
  • 示例: metrics/cpu, logs/app-error, data_1/test.topic
  • 客户端推送/订阅时,topic 不存在会自动创建

配置项

所有配置通过 .env 文件或环境变量设置:

环境变量 默认值 说明
PULSE_HOST 0.0.0.0 HTTP 服务监听地址
PULSE_HTTP_PORT 8080 HTTP 服务端口
PULSE_ZMQ_ROUTER_PORT 5555 ZMQ ROUTER 端口
PULSE_ZMQ_PULL_PORT 5556 ZMQ PULL 端口
PULSE_DB_TYPE sqlite 数据库类型 (sqlite/mysql)
PULSE_DB_PATH ./pulse_mq.db SQLite 文件路径
PULSE_DB_URL - MySQL 连接 URL
PULSE_QUEUE_SIZE 1000 每 topic 内存队列大小
PULSE_AUTH_CACHE_TTL 60 auth 缓存刷新间隔(秒)
PULSE_JWT_SECRET 自动生成 JWT 签名密钥
PULSE_JWT_EXPIRE_HOURS 24 JWT 过期时间(小时)
PULSE_ADMIN_PASSWORD 自动生成 admin 初始密码
PULSE_HEARTBEAT_INTERVAL 30 客户端心跳间隔(秒)
PULSE_HEARTBEAT_TIMEOUT 60 服务端断线超时(秒)
PULSE_STATS_RETENTION_DAYS 7 统计数据保留天数

管理后台

启动服务后访问 http://localhost:8080,使用 admin 账号登录。

  • Dashboard — 概览统计、Topic 监控图表、系统配置、快速开始代码
  • Topics — Topic 列表、今日消息量、最近消息时间、7 天趋势图
  • Users — 用户管理、Token 生成/复制、启停状态
  • Permissions — 权限矩阵,按用户/Topic 控制推送(P)和订阅(S)权限

数据库

默认使用 SQLite,可通过配置切换到 MySQL:

# SQLite(默认)
PULSE_DB_TYPE=sqlite
PULSE_DB_PATH=./pulse_mq.db

# MySQL
PULSE_DB_TYPE=mysql
PULSE_DB_URL=mysql+pymysql://user:password@host:3306/pulse_mq

开发

# 克隆项目
git clone https://github.com/your-username/pulse-mq.git
cd pulse-mq

# 创建虚拟环境
uv venv
source .venv/bin/activate  # Linux/macOS
# 或 .venv\Scripts\activate  # Windows

# 安装依赖
uv pip install -e ".[dev]"

# 运行测试
pytest tests/ -v

许可证

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pulse_mq-0.1.2.tar.gz (167.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pulse_mq-0.1.2-py3-none-any.whl (35.1 kB view details)

Uploaded Python 3

File details

Details for the file pulse_mq-0.1.2.tar.gz.

File metadata

  • Download URL: pulse_mq-0.1.2.tar.gz
  • Upload date:
  • Size: 167.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for pulse_mq-0.1.2.tar.gz
Algorithm Hash digest
SHA256 e5584a352e167a41a588614c95fcd6b82a61beddd42a4864565e15a95597a70f
MD5 749c2f423b1d80f2a08adf3506cad49f
BLAKE2b-256 ee963e7e5a56f1c39865e89a1a3c09ef9ac42a547cd20d0df3129d5ebf3070e2

See more details on using hashes here.

File details

Details for the file pulse_mq-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: pulse_mq-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 35.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for pulse_mq-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 8d3088f47e9df8817cce3d4aa39d4c18d59e56d4c65ad1c959c24951b8a930d1
MD5 873cd10a50e8fdb2a57dcf9db583d2af
BLAKE2b-256 ae17f7a1dce6ce50d4c243aee0addf968a55fd2ea0512ce869aa1247956ecea5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page