Skip to main content

A ZeroMQ based message system with topic subscription, authentication, and heartbeat

Project description

ZMQ Server

一个基于 ZeroMQ 的通用消息系统,支持 Topic 订阅、心跳机制、用户认证和消息转发。

功能特性

  • Topic 订阅发布:客户端可以订阅感兴趣的 Topic,服务器只将消息发送给订阅了该 Topic 的客户端
  • 用户认证:支持用户名密码认证,确保系统安全
  • 心跳机制:自动检测客户端连接状态,清理超时客户端
  • 消息转发:服务器只负责消息转发,不关心消息内容
  • 客户端ID管理:支持手动设置客户端ID(最大8位),相同ID不能同时在线
  • 自定义处理:支持自定义消息处理器,灵活处理各种消息类型
  • 详细日志:完整的日志系统,方便调试和监控

安装

从 PyPI 安装

pip install zmq-server

本地安装

pip install -r requirements.txt

快速开始

服务器端

from zmq_server import ZMQServer
import threading

# 创建服务器
server = ZMQServer(
    host="*", 
    port=5555, 
    users={
        "admin": "admin123",
        "user1": "password1"
    },
    heartbeat_interval=30,  # 心跳间隔(秒)
    client_timeout=60  # 客户端超时时间(秒)
)

# 启动服务器
server_thread = threading.Thread(target=server.start, daemon=True)
server_thread.start()

客户端

from zmq_server import ZMQClient

# 创建客户端(client_id 必须且最大8位)
client = ZMQClient(
    client_id="client001",  # 客户端ID(必填,最大8位)
    server_host="localhost",
    server_port=5555
)
client.connect()

# 认证
client.authenticate("user1", "password1")

# 订阅 Topic
client.subscribe("tech")
client.subscribe("news")

# 发布消息
client.publish("tech", "Hello World!")

# 开始接收消息
client.start_receiving()

详细使用

服务器配置

创建服务器

from zmq_server import ZMQServer

# 基本配置
server = ZMQServer(
    host="*",            # 监听地址
    port=5555,           # 监听端口
    users={              # 用户字典
        "admin": "admin123",
        "user1": "password1"
    },
    heartbeat_interval=30,  # 心跳间隔(秒)
    client_timeout=60      # 客户端超时时间(秒)
)

# 添加用户(可选)
server.add_user("user2", "password2")

启动服务器

# 方式1:在主线程启动
server.start()

# 方式2:在后台线程启动
import threading
server_thread = threading.Thread(target=server.start, daemon=True)
server_thread.start()

停止服务器

server.stop()

客户端配置

创建客户端

from zmq_server import ZMQClient

# 创建客户端(client_id 为必填参数)
client = ZMQClient(
    client_id="client001",  # 客户端ID(必填,最大8位)
    server_host="localhost",  # 服务器地址
    server_port=5555          # 服务器端口
)

# 连接服务器
client.connect()

客户端ID限制

  • 必填参数:创建客户端时必须提供 client_id
  • 长度限制:客户端ID不能超过8位
  • 唯一性:相同的客户端ID不能同时在线
# 正确的客户端ID
client = ZMQClient(client_id="client001")  # 6位,有效
client = ZMQClient(client_id="c1234567")  # 8位,有效

# 错误的客户端ID
client = ZMQClient(client_id="client00123")  # 9位,会抛出 ValueError

认证

# 认证
if client.authenticate("user1", "password1"):
    print("认证成功")
else:
    print("认证失败")

Topic 操作

# 订阅 Topic
client.subscribe("tech")
client.subscribe("news")

# 取消订阅
client.unsubscribe("news")

# 获取已订阅的 Topic
subscribed = client.get_subscribed_topics()
print(f"已订阅: {subscribed}")

消息发布

# 发布消息
client.publish("tech", "Python 4.0 即将发布!")
client.publish("news", "今天天气晴朗")

消息接收

# 开始接收消息
client.start_receiving()

# 停止接收消息
client.stop_receiving()

自定义消息处理器

# 自定义消息处理器
def custom_handler(message):
    topic = message.get("topic", "")
    content = message.get("content", "")
    print(f"收到消息 - 主题: {topic}, 内容: {content}")

# 注册处理器
client.register_handler("message", custom_handler)

# 设置默认处理器(处理未注册类型的消息)
client.set_default_handler(lambda msg: print(f"默认处理: {msg}"))

消息格式

认证消息

{
  "type": "auth",
  "username": "user1",
  "password": "password1"
}

订阅消息

{
  "type": "subscribe",
  "topic": "tech"
}

发布消息

{
  "type": "publish",
  "topic": "tech",
  "content": "Hello World!"
}

广播消息(服务器发送给客户端)

{
  "type": "message",
  "topic": "tech",
  "from": "user1",
  "content": "Hello World!",
  "timestamp": 1640995200
}

心跳消息

{
  "type": "heartbeat",
  "timestamp": 1640995200
}

日志系统

日志路径

日志文件保存在 logs/ 目录下,按日期命名:

  • logs/zmq_2024-01-01.log

日志内容

  • 服务器启动/停止
  • 客户端认证(成功/失败)
  • Topic 订阅/取消订阅
  • 消息发布/转发
  • 心跳检测
  • 客户端超时
  • 错误信息

高级功能

在线客户端管理

服务器会自动管理在线客户端,认证成功后会在日志中显示当前在线客户端列表:

[2024-01-01 12:00:00] INFO     客户端 user1 认证成功 - 客户端ID: client001
[2024-01-01 12:00:00] INFO     当前在线客户端: 1
[2024-01-01 12:00:00] INFO     =============在线客户端信息============
[2024-01-01 12:00:00] INFO     client_id:client001 - client_user: user1
[2024-01-01 12:00:00] INFO     =============在线客户端信息============

自定义消息处理器

服务器端也支持自定义消息处理器:

def custom_message_handler(identity, message):
    # 处理自定义消息类型
    return {
        "type": "custom_response",
        "status": "success",
        "message": "自定义消息处理成功"
    }

# 注册自定义处理器
server.register_handler("custom_message", custom_message_handler)

服务器统计

# 获取服务器统计信息
stats = server.get_statistics()
print(f"连接客户端数: {stats['connected_clients']}")
print(f"Topic数量: {stats['topic_statistics']['total_topics']}")

技术参数

服务器参数

参数 类型 默认值 说明
host str "*" 服务器监听地址
port int 5555 服务器监听端口
users dict None 用户字典 {"username": "password"}
heartbeat_interval int 30 心跳间隔(秒)
client_timeout int 60 客户端超时时间(秒)

客户端参数

参数 类型 默认值 说明
client_id str 必填 客户端ID(最大8位)
server_host str "localhost" 服务器地址
server_port int 5555 服务器端口

注意事项

  1. 客户端ID:创建客户端时必须提供 client_id,且长度不能超过8位
  2. 认证:客户端必须先认证才能使用其他功能
  3. Topic:客户端只能接收已订阅 Topic 的消息
  4. 心跳:客户端会自动响应服务器的心跳包
  5. 超时:超过客户端超时时间未响应的客户端会被自动清理
  6. 日志:确保 logs/ 目录存在且可写

版本信息

故障排查

常见问题

客户端ID已在线

认证失败:客户端ID client001 已在线

解决方案:使用不同的客户端ID,或确保之前的客户端已正确关闭

认证失败

认证失败:用户名或密码错误

解决方案:检查用户名和密码是否正确

客户端ID过长

ValueError: 客户端ID不能超过8位

解决方案:使用长度不超过8位的客户端ID

未认证错误

未认证,无法订阅主题

解决方案:先调用 authenticate() 进行认证

示例代码

完整服务器示例

from zmq_server import ZMQServer

if __name__ == '__main__':
    users = {
        "admin": "admin123",
        "user1": "password1",
        "user2": "password2"
    }

    server = ZMQServer(
        host="*",
        port=5555,
        users=users,
        heartbeat_interval=10,
        client_timeout=20
    )
    
    print("服务器启动,按 Ctrl+C 退出")
    server.start()

完整客户端示例

from zmq_server import ZMQClient
import time

if __name__ == '__main__':
    # 创建客户端
    client = ZMQClient(
        client_id="test001",
        server_host="localhost",
        server_port=5555
    )
    client.connect()
    
    # 认证
    if client.authenticate("user1", "password1"):
        print("认证成功")
        
        # 订阅
        client.subscribe("chat")
        client.subscribe("通知")
        
        # 开始接收
        client.start_receiving()
        
        # 发布消息
        time.sleep(1)
        client.publish("chat", "大家好!")
        client.publish("通知", "系统维护通知")
        
        print("客户端运行中,按 Ctrl+C 退出")
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            print("退出")
    
    client.close()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

zmq_server-1.2.0.tar.gz (13.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

zmq_server-1.2.0-py3-none-any.whl (10.0 kB view details)

Uploaded Python 3

File details

Details for the file zmq_server-1.2.0.tar.gz.

File metadata

  • Download URL: zmq_server-1.2.0.tar.gz
  • Upload date:
  • Size: 13.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for zmq_server-1.2.0.tar.gz
Algorithm Hash digest
SHA256 0ffb319056a00be5cbfa87d5374217439df6d858e82953799488657d15672164
MD5 e9d990bc00f9e93ba06d3960f2cc5857
BLAKE2b-256 00e6ee204459e60a132f8a6572516dbb8e097f3c6ddfed79a07b31ab2f9e49ee

See more details on using hashes here.

File details

Details for the file zmq_server-1.2.0-py3-none-any.whl.

File metadata

  • Download URL: zmq_server-1.2.0-py3-none-any.whl
  • Upload date:
  • Size: 10.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for zmq_server-1.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 169922b735f1de479bed160213afbb40d686e84a4a289a552e86165f97b4537e
MD5 6397e3de89a2b78352e544134d2203bc
BLAKE2b-256 e1e62ed60a942ed8c64414b2b21f156006696cf51eab42a767f01264defcfef7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page