A ZeroMQ based message system with topic subscription, authentication, and heartbeat
Project description
ZMQ Server
一个基于 ZeroMQ 的通用消息系统,支持 Topic 订阅、心跳机制、用户认证和消息转发。
功能特性
- Topic 订阅发布:客户端可以订阅感兴趣的 Topic,服务器只将消息发送给订阅了该 Topic 的客户端
- 用户认证:支持用户名密码认证,确保系统安全
- 心跳机制:自动检测客户端连接状态,清理超时客户端
- 消息转发:服务器只负责消息转发,不关心消息内容
- 客户端ID管理:支持手动设置客户端ID(最大8位),相同ID不能同时在线
- 自定义处理:支持自定义消息处理器,灵活处理各种消息类型
- 详细日志:完整的日志系统,方便调试和监控
安装
从 PyPI 安装
pip install zmq-server
本地安装
pip install -r requirements.txt
快速开始
服务器端
from zmq_server import ZMQServer
import threading
# 创建服务器
server = ZMQServer(
host="*",
port=5555,
users={
"admin": "admin123",
"user1": "password1"
},
heartbeat_interval=30, # 心跳间隔(秒)
client_timeout=60 # 客户端超时时间(秒)
)
# 启动服务器
server_thread = threading.Thread(target=server.start, daemon=True)
server_thread.start()
客户端
from zmq_server import ZMQClient
# 创建客户端(client_id 必须且最大8位)
client = ZMQClient(
client_id="client001", # 客户端ID(必填,最大8位)
server_host="localhost",
server_port=5555
)
client.connect()
# 认证
client.authenticate("user1", "password1")
# 订阅 Topic
client.subscribe("tech")
client.subscribe("news")
# 发布消息
client.publish("tech", "Hello World!")
# 开始接收消息
client.start_receiving()
详细使用
服务器配置
创建服务器
from zmq_server import ZMQServer
# 基本配置
server = ZMQServer(
host="*", # 监听地址
port=5555, # 监听端口
users={ # 用户字典
"admin": "admin123",
"user1": "password1"
},
heartbeat_interval=30, # 心跳间隔(秒)
client_timeout=60 # 客户端超时时间(秒)
)
# 添加用户(可选)
server.add_user("user2", "password2")
启动服务器
# 方式1:在主线程启动
server.start()
# 方式2:在后台线程启动
import threading
server_thread = threading.Thread(target=server.start, daemon=True)
server_thread.start()
停止服务器
server.stop()
客户端配置
创建客户端
from zmq_server import ZMQClient
# 创建客户端(client_id 为必填参数)
client = ZMQClient(
client_id="client001", # 客户端ID(必填,最大8位)
server_host="localhost", # 服务器地址
server_port=5555 # 服务器端口
)
# 连接服务器
client.connect()
客户端ID限制
- 必填参数:创建客户端时必须提供 client_id
- 长度限制:客户端ID不能超过8位
- 唯一性:相同的客户端ID不能同时在线
# 正确的客户端ID
client = ZMQClient(client_id="client001") # 6位,有效
client = ZMQClient(client_id="c1234567") # 8位,有效
# 错误的客户端ID
client = ZMQClient(client_id="client00123") # 9位,会抛出 ValueError
认证
# 认证
if client.authenticate("user1", "password1"):
print("认证成功")
else:
print("认证失败")
Topic 操作
# 订阅 Topic
client.subscribe("tech")
client.subscribe("news")
# 取消订阅
client.unsubscribe("news")
# 获取已订阅的 Topic
subscribed = client.get_subscribed_topics()
print(f"已订阅: {subscribed}")
消息发布
# 发布消息
client.publish("tech", "Python 4.0 即将发布!")
client.publish("news", "今天天气晴朗")
消息接收
# 开始接收消息
client.start_receiving()
# 停止接收消息
client.stop_receiving()
自定义消息处理器
# 自定义消息处理器
def custom_handler(message):
topic = message.get("topic", "")
content = message.get("content", "")
print(f"收到消息 - 主题: {topic}, 内容: {content}")
# 注册处理器
client.register_handler("message", custom_handler)
# 设置默认处理器(处理未注册类型的消息)
client.set_default_handler(lambda msg: print(f"默认处理: {msg}"))
消息格式
认证消息
{
"type": "auth",
"username": "user1",
"password": "password1"
}
订阅消息
{
"type": "subscribe",
"topic": "tech"
}
发布消息
{
"type": "publish",
"topic": "tech",
"content": "Hello World!"
}
广播消息(服务器发送给客户端)
{
"type": "message",
"topic": "tech",
"from": "user1",
"content": "Hello World!",
"timestamp": 1640995200
}
心跳消息
{
"type": "heartbeat",
"timestamp": 1640995200
}
日志系统
日志路径
日志文件保存在 logs/ 目录下,按日期命名:
logs/zmq_2024-01-01.log
日志内容
- 服务器启动/停止
- 客户端认证(成功/失败)
- Topic 订阅/取消订阅
- 消息发布/转发
- 心跳检测
- 客户端超时
- 错误信息
高级功能
在线客户端管理
服务器会自动管理在线客户端,认证成功后会在日志中显示当前在线客户端列表:
[2024-01-01 12:00:00] INFO 客户端 user1 认证成功 - 客户端ID: client001
[2024-01-01 12:00:00] INFO 当前在线客户端: 1
[2024-01-01 12:00:00] INFO =============在线客户端信息============
[2024-01-01 12:00:00] INFO client_id:client001 - client_user: user1
[2024-01-01 12:00:00] INFO =============在线客户端信息============
自定义消息处理器
服务器端也支持自定义消息处理器:
def custom_message_handler(identity, message):
# 处理自定义消息类型
return {
"type": "custom_response",
"status": "success",
"message": "自定义消息处理成功"
}
# 注册自定义处理器
server.register_handler("custom_message", custom_message_handler)
服务器统计
# 获取服务器统计信息
stats = server.get_statistics()
print(f"连接客户端数: {stats['connected_clients']}")
print(f"Topic数量: {stats['topic_statistics']['total_topics']}")
技术参数
服务器参数
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
| host | str | "*" | 服务器监听地址 |
| port | int | 5555 | 服务器监听端口 |
| users | dict | None | 用户字典 {"username": "password"} |
| heartbeat_interval | int | 30 | 心跳间隔(秒) |
| client_timeout | int | 60 | 客户端超时时间(秒) |
客户端参数
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
| client_id | str | 必填 | 客户端ID(最大8位) |
| server_host | str | "localhost" | 服务器地址 |
| server_port | int | 5555 | 服务器端口 |
注意事项
- 客户端ID:创建客户端时必须提供 client_id,且长度不能超过8位
- 认证:客户端必须先认证才能使用其他功能
- Topic:客户端只能接收已订阅 Topic 的消息
- 心跳:客户端会自动响应服务器的心跳包
- 超时:超过客户端超时时间未响应的客户端会被自动清理
- 日志:确保
logs/目录存在且可写
版本信息
- 版本:1.0.0
- 作者:haifeng
- 邮箱:fenglex@126.com
- 许可证:MIT
故障排查
常见问题
客户端ID已在线
认证失败:客户端ID client001 已在线
解决方案:使用不同的客户端ID,或确保之前的客户端已正确关闭
认证失败
认证失败:用户名或密码错误
解决方案:检查用户名和密码是否正确
客户端ID过长
ValueError: 客户端ID不能超过8位
解决方案:使用长度不超过8位的客户端ID
未认证错误
未认证,无法订阅主题
解决方案:先调用 authenticate() 进行认证
示例代码
完整服务器示例
from zmq_server import ZMQServer
if __name__ == '__main__':
users = {
"admin": "admin123",
"user1": "password1",
"user2": "password2"
}
server = ZMQServer(
host="*",
port=5555,
users=users,
heartbeat_interval=10,
client_timeout=20
)
print("服务器启动,按 Ctrl+C 退出")
server.start()
完整客户端示例
from zmq_server import ZMQClient
import time
if __name__ == '__main__':
# 创建客户端
client = ZMQClient(
client_id="test001",
server_host="localhost",
server_port=5555
)
client.connect()
# 认证
if client.authenticate("user1", "password1"):
print("认证成功")
# 订阅
client.subscribe("chat")
client.subscribe("通知")
# 开始接收
client.start_receiving()
# 发布消息
time.sleep(1)
client.publish("chat", "大家好!")
client.publish("通知", "系统维护通知")
print("客户端运行中,按 Ctrl+C 退出")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("退出")
client.close()
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
zmq_server-1.1.0.tar.gz
(13.1 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file zmq_server-1.1.0.tar.gz.
File metadata
- Download URL: zmq_server-1.1.0.tar.gz
- Upload date:
- Size: 13.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d0bf663849215ff018a02f00b964dcd9919d47bba5e41c599714a55dc73e8646
|
|
| MD5 |
83a6fffba9c023b21780b7e9cbd4346c
|
|
| BLAKE2b-256 |
660fed92dd128de6d3a4e7661efbc304720ac3fbeed0f2085dd867aea3ea5493
|
File details
Details for the file zmq_server-1.1.0-py3-none-any.whl.
File metadata
- Download URL: zmq_server-1.1.0-py3-none-any.whl
- Upload date:
- Size: 10.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
10a69aabaf04de8910c53845b33decf062246e926675a1a3660d8026aa2c20f9
|
|
| MD5 |
1e88f5c46dc94845bbbaff1714989ee7
|
|
| BLAKE2b-256 |
2b4b787f8f1be15443ce2c416d104fa49bac718918a403c2a61185bed31658af
|