Skip to main content

基于 ZeroMQ 的轻量级消息队列系统

Project description

PulseMQ

基于 ZeroMQ 的轻量级消息队列系统,配套 Grafana 风格的 Web 管理后台。

特性

  • 双 Socket 架构 — ROUTER + PULL,消息延迟低
  • 多格式支持 — 字符串、msgpack、PyArrow
  • Web 管理后台 — Grafana 暗色主题,ECharts 监控图表
  • 权限管理 — 用户 token 认证,按 topic 控制推送/订阅权限
  • 单进程部署 — 一条命令启动全部服务
  • PyPI 安装pip install pulse-mq 即可使用

快速安装

pip install pulse-mq

快速开始

方式一:命令行启动

# 复制配置文件
cp .env.example .env

# 启动服务(同时启动 ZMQ 消息服务 + HTTP 管理后台)
pulse-mq-server

首次启动会自动生成 admin 密码,输出到控制台和 admin_credentials.txt

管理后台默认地址: http://localhost:8080

方式二:Python 代码启动

from pulse_mq import PulseMQServer

server = PulseMQServer()  # 自动读取 .env 配置
server.start()            # 启动全部服务,阻塞运行(Ctrl+C 停止)

客户端使用

from pulse_mq import PulseMQClient

client = PulseMQClient(
    host="localhost",
    port=5555,
    username="your-username",
    token="your-token",
)

# 订阅消息(回调方式)
def on_message(msg):
    print(f"Topic: {msg.topic}, Data: {msg.data}, Format: {msg.format}")

client.subscribe("metrics/cpu", callback=on_message)

# 推送消息
client.publish("metrics/cpu", data=42.5, format="string")

# 运行(阻塞,处理心跳和消息接收)
client.run()

PulseMQServer 参数

from pulse_mq import PulseMQServer, Config

# 方式 1:自动读取 .env 配置
server = PulseMQServer()

# 方式 2:指定 .env 文件路径
server = PulseMQServer(env_file="/path/to/.env")

# 方式 3:编程方式传入配置
config = Config(
    host="0.0.0.0",
    http_port=8080,
    zmq_router_port=5555,
    zmq_pull_port=5556,
    db_type="sqlite",
    db_path="./pulse_mq.db",
    admin_password="my-password",
)
server = PulseMQServer(config=config)

server.start() 参数

参数 类型 默认值 说明
with_http bool True 是否同时启动 HTTP 管理后台
block bool True 是否阻塞主线程(保持服务运行)
# 默认行为:启动全部服务,阻塞运行
server.start()

# 仅启动 ZMQ 消息服务,不启动 HTTP 管理后台
server.start(with_http=False)

# 启动全部服务,但不阻塞(嵌入已有应用)
server.start(block=False)

# 非阻塞模式下,需要自行保持进程运行
# 例如在 Flask/FastAPI 应用中嵌入
server.start(with_http=False, block=False)

server.publish() 参数

参数 类型 默认值 说明
topic str 必填 Topic 名称,格式 group/topic
data str/bytes/dict/DataFrame 必填 消息数据
format str "string" 数据格式:"string" / "msgpack" / "pyarrow"
# 字符串消息
server.publish("alerts/system", data="CPU 过高", format="string")

# msgpack 消息(自动序列化 dict/list)
server.publish("data/events", data={"key": "value"}, format="msgpack")

# PyArrow 消息(自动序列化 pandas DataFrame)
import pandas as pd
df = pd.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]})
server.publish("metrics/batch", data=df, format="pyarrow")

server.stop()

停止服务,关闭所有 ZMQ socket 和线程。


PulseMQClient 参数

from pulse_mq import PulseMQClient

# 方式 1:构造函数参数
client = PulseMQClient(
    host="localhost",    # 服务端地址
    port=5555,           # ZMQ ROUTER 端口
    username="user1",    # 用户名(在管理后台创建)
    token="your-token",  # Token(在管理后台生成)
)

# 方式 2:从环境变量读取
# export PULSE_CLIENT_HOST=localhost
# export PULSE_CLIENT_PORT=5555
# export PULSE_CLIENT_USERNAME=user1
# export PULSE_CLIENT_TOKEN=your-token
client = PulseMQClient.from_env()

PulseMQClient() 构造参数

参数 类型 默认值 说明
host str "localhost" 服务端地址
port int 5555 ZMQ ROUTER 端口(对应服务端 PULSE_ZMQ_ROUTER_PORT
username str "" 用户名(在管理后台 Users 页面创建)
token str "" 用户 Token(在管理后台 Users 页面生成)

client.subscribe() 参数

参数 类型 说明
topic str 订阅的 Topic,格式 group/topic
callback Callable 回调函数,消息到达时调用,参数为 PulseMQMessage 对象
def on_message(msg):
    print(f"Topic: {msg.topic}")     # Topic 名称
    print(f"Data: {msg.data}")       # 消息数据(已反序列化)
    print(f"Format: {msg.format}")   # 数据格式:"string"/"msgpack"/"pyarrow"

client.subscribe("metrics/cpu", callback=on_message)

client.publish() 参数

参数 类型 默认值 说明
topic str 必填 Topic 名称,格式 group/topic
data str/bytes/dict/DataFrame 必填 消息数据
format str "string" 数据格式:"string" / "msgpack" / "pyarrow"

client.run() / client.run_async()

方法 说明
client.run() 阻塞运行,处理心跳和消息接收(适合独立脚本)
client.run_async() 异步运行(适合 asyncio 环境)

client.unsubscribe(topic)

取消订阅指定 topic。

client.disconnect()

断开连接,关闭 ZMQ socket。


消息格式

格式 说明 示例
string 原生字符串,最长 4096 字节 client.publish("t/d", data="hello", format="string")
msgpack MessagePack 序列化 client.publish("t/d", data={"key": "val"}, format="msgpack")
pyarrow PyArrow IPC 格式(支持 DataFrame) client.publish("t/d", data=df, format="pyarrow")

Topic 命名规则

格式: group/topic(强制一层层级)

  • 允许字符: a-z A-Z 0-9 _ - .
  • 每段长度: 1-128 字符
  • 示例: metrics/cpu, logs/app-error, data_1/test.topic
  • 客户端推送/订阅时,topic 不存在会自动创建

配置项

所有配置通过 .env 文件或环境变量设置:

环境变量 默认值 说明
PULSE_HOST 0.0.0.0 HTTP 服务监听地址
PULSE_HTTP_PORT 8080 HTTP 服务端口
PULSE_ZMQ_ROUTER_PORT 5555 ZMQ ROUTER 端口(客户端连接入口)
PULSE_ZMQ_PULL_PORT 5556 ZMQ PULL 端口(接收客户端推送的消息)
PULSE_DB_TYPE sqlite 数据库类型:sqlitemysql
PULSE_DB_PATH ./pulse_mq.db SQLite 文件路径(仅 sqlite 模式)
PULSE_DB_URL - MySQL 连接 URL(仅 mysql 模式,格式:mysql+pymysql://user:pass@host:3306/db
PULSE_QUEUE_SIZE 1000 每个 topic 内存队列保留的最大消息条数
PULSE_AUTH_CACHE_TTL 60 客户端认证缓存从数据库刷新的间隔(秒)
PULSE_JWT_SECRET 自动生成 JWT 签名密钥(留空则首次启动自动生成)
PULSE_JWT_EXPIRE_HOURS 24 JWT Token 过期时间(小时)
PULSE_ADMIN_PASSWORD 自动生成 admin 初始密码(留空则首次启动自动生成)
PULSE_HEARTBEAT_INTERVAL 30 客户端心跳发送间隔(秒)
PULSE_HEARTBEAT_TIMEOUT 60 服务端判定客户端断线的超时时间(秒)
PULSE_STATS_RETENTION_DAYS 7 消息统计数据保留天数

客户端环境变量

客户端通过 PulseMQClient.from_env() 从环境变量读取配置:

环境变量 说明
PULSE_CLIENT_HOST 服务端地址
PULSE_CLIENT_PORT ZMQ ROUTER 端口
PULSE_CLIENT_USERNAME 用户名
PULSE_CLIENT_TOKEN 用户 Token

管理后台

启动服务后访问 http://localhost:8080,使用 admin 账号登录。

  • Dashboard — 概览统计、Topic 监控图表、系统配置、快速开始代码
  • Topics — Topic 列表、今日消息量、最近消息时间、7 天趋势图
  • Users — 用户管理、Token 生成/复制、启停状态
  • Permissions — 权限矩阵,按用户/Topic 控制推送(P)和订阅(S)权限

数据库

默认使用 SQLite,可通过配置切换到 MySQL:

# SQLite(默认)
PULSE_DB_TYPE=sqlite
PULSE_DB_PATH=./pulse_mq.db

# MySQL
PULSE_DB_TYPE=mysql
PULSE_DB_URL=mysql+pymysql://user:password@host:3306/pulse_mq

开发

# 克隆项目
git clone https://github.com/your-username/pulse-mq.git
cd pulse-mq

# 创建虚拟环境
uv venv
source .venv/bin/activate  # Linux/macOS
# 或 .venv\Scripts\activate  # Windows

# 安装依赖
uv pip install -e ".[dev]"

# 运行测试
pytest tests/ -v

许可证

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pulse_mq-0.1.5.tar.gz (171.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pulse_mq-0.1.5-py3-none-any.whl (36.8 kB view details)

Uploaded Python 3

File details

Details for the file pulse_mq-0.1.5.tar.gz.

File metadata

  • Download URL: pulse_mq-0.1.5.tar.gz
  • Upload date:
  • Size: 171.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for pulse_mq-0.1.5.tar.gz
Algorithm Hash digest
SHA256 ba6a4a1f503eef4a6b058cb09264e53e20423a7ee6b17d5c0f3b52e726a2021b
MD5 3bdb6bdbc142bcf8464cc2fe98a364a3
BLAKE2b-256 0c87fb35deac84350f147e66d664d2065e8516b18a1b08fbd66bde7225211ad3

See more details on using hashes here.

File details

Details for the file pulse_mq-0.1.5-py3-none-any.whl.

File metadata

  • Download URL: pulse_mq-0.1.5-py3-none-any.whl
  • Upload date:
  • Size: 36.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for pulse_mq-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 3a1a0e7c19f0ed639c2fc63a0495045f08f5916f2af4c08dae450f6cd0cc6de3
MD5 ee27afc1b7e303a7e9918838353f603a
BLAKE2b-256 89c0cb1631de2f74f22401c3474cbf0c1266afdcd24042514e60069888b24afd

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page