Skip to main content

基于 ZeroMQ 的轻量级消息队列系统

Project description

PulseMQ

基于 ZeroMQ 的轻量级消息队列系统,配套 Grafana 风格的 Web 管理后台。

特性

  • 双 Socket 架构 — ROUTER + PULL,消息延迟低
  • 多格式支持 — 字符串、msgpack、PyArrow
  • Web 管理后台 — Grafana 暗色主题,ECharts 监控图表
  • 权限管理 — 用户 token 认证,按 topic 控制推送/订阅权限
  • 单进程部署 — 一条命令启动全部服务
  • PyPI 安装pip install pulse-mq 即可使用

快速安装

pip install pulse-mq

快速开始

启动服务端

# 复制配置文件
cp .env.example .env

# 启动服务
pulse-mq-server

首次启动会自动生成 admin 密码,输出到控制台和 admin_credentials.txt

管理后台默认地址: http://localhost:8080

客户端使用

from pulse_mq import PulseMQClient

client = PulseMQClient(
    host="localhost",
    port=5555,
    username="your-username",
    token="your-token",
)

# 订阅消息(回调方式)
def on_message(msg):
    print(f"Topic: {msg.topic}, Data: {msg.data}, Format: {msg.format}")

client.subscribe("metrics/cpu", callback=on_message)

# 推送消息
client.publish("metrics/cpu", data=42.5, format="string")

# 运行(阻塞,处理心跳和消息接收)
client.run()

服务端 SDK

from pulse_mq import PulseMQServer

server = PulseMQServer()  # 自动读取 .env 配置
server.start()

# 服务端直接推送消息
server.publish("alerts/system", data="CPU 过高", format="string")
server.publish("metrics/batch", data=df, format="pyarrow")  # DataFrame

server.stop()

消息格式

格式 说明 示例
string 原生字符串,最长 4096 字节 client.publish("t/d", data="hello", format="string")
msgpack MessagePack 序列化 client.publish("t/d", data={"key": "val"}, format="msgpack")
pyarrow PyArrow IPC 格式(支持 DataFrame) client.publish("t/d", data=df, format="pyarrow")

Topic 命名规则

格式: group/topic(强制一层层级)

  • 允许字符: a-z A-Z 0-9 _ - .
  • 每段长度: 1-128 字符
  • 示例: metrics/cpu, logs/app-error, data_1/test.topic
  • 客户端推送/订阅时,topic 不存在会自动创建

配置项

所有配置通过 .env 文件或环境变量设置:

环境变量 默认值 说明
PULSE_HOST 0.0.0.0 HTTP 服务监听地址
PULSE_HTTP_PORT 8080 HTTP 服务端口
PULSE_ZMQ_ROUTER_PORT 5555 ZMQ ROUTER 端口
PULSE_ZMQ_PULL_PORT 5556 ZMQ PULL 端口
PULSE_DB_TYPE sqlite 数据库类型 (sqlite/mysql)
PULSE_DB_PATH ./pulse_mq.db SQLite 文件路径
PULSE_DB_URL - MySQL 连接 URL
PULSE_QUEUE_SIZE 1000 每 topic 内存队列大小
PULSE_AUTH_CACHE_TTL 60 auth 缓存刷新间隔(秒)
PULSE_JWT_SECRET 自动生成 JWT 签名密钥
PULSE_JWT_EXPIRE_HOURS 24 JWT 过期时间(小时)
PULSE_ADMIN_PASSWORD 自动生成 admin 初始密码
PULSE_HEARTBEAT_INTERVAL 30 客户端心跳间隔(秒)
PULSE_HEARTBEAT_TIMEOUT 60 服务端断线超时(秒)
PULSE_STATS_RETENTION_DAYS 7 统计数据保留天数

管理后台

启动服务后访问 http://localhost:8080,使用 admin 账号登录。

  • Dashboard — 概览统计、Topic 监控图表、系统配置、快速开始代码
  • Topics — Topic 列表、今日消息量、最近消息时间、7 天趋势图
  • Users — 用户管理、Token 生成/复制、启停状态
  • Permissions — 权限矩阵,按用户/Topic 控制推送(P)和订阅(S)权限

数据库

默认使用 SQLite,可通过配置切换到 MySQL:

# SQLite(默认)
PULSE_DB_TYPE=sqlite
PULSE_DB_PATH=./pulse_mq.db

# MySQL
PULSE_DB_TYPE=mysql
PULSE_DB_URL=mysql+pymysql://user:password@host:3306/pulse_mq

开发

# 克隆项目
git clone https://github.com/your-username/pulse-mq.git
cd pulse-mq

# 创建虚拟环境
uv venv
source .venv/bin/activate  # Linux/macOS
# 或 .venv\Scripts\activate  # Windows

# 安装依赖
uv pip install -e ".[dev]"

# 运行测试
pytest tests/ -v

许可证

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pulse_mq-0.1.1.tar.gz (166.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pulse_mq-0.1.1-py3-none-any.whl (35.0 kB view details)

Uploaded Python 3

File details

Details for the file pulse_mq-0.1.1.tar.gz.

File metadata

  • Download URL: pulse_mq-0.1.1.tar.gz
  • Upload date:
  • Size: 166.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for pulse_mq-0.1.1.tar.gz
Algorithm Hash digest
SHA256 687c9e75fd64179d670f88e480c77e755f5aa460ed5b71659180d6915e3f2364
MD5 21549e8d04e76c329ffbd8699653e585
BLAKE2b-256 f119a6716cb029110147a688372f796c9e7f16c4a1319ada48b0187aea9e44ac

See more details on using hashes here.

File details

Details for the file pulse_mq-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: pulse_mq-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 35.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for pulse_mq-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 89a8bf54ff9dbe6782b2b6773dde1708821676c89d5538a08373985642aabac5
MD5 c83c1c981f69dd488d06d01e9bb7b6fe
BLAKE2b-256 328c71607c743b5bacd0f5fd913e45c782f540fa103cd2c6f97acea9edfba50c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page