Skip to main content

A ZeroMQ based message system with topic subscription, authentication, and heartbeat

Project description

ZMQ Server

一个基于 ZeroMQ 的通用消息系统,支持 Topic 订阅、心跳机制、用户认证和消息转发。

功能特性

  • Topic 订阅发布:客户端可以订阅感兴趣的 Topic,服务器只将消息发送给订阅了该 Topic 的客户端
  • 用户认证:支持用户名密码认证,确保系统安全
  • 心跳机制:自动检测客户端连接状态,清理超时客户端
  • 消息转发:服务器只负责消息转发,不关心消息内容
  • 客户端ID管理:支持手动设置客户端ID(最大8位),相同ID不能同时在线
  • 自定义处理:支持自定义消息处理器,灵活处理各种消息类型
  • 详细日志:完整的日志系统,方便调试和监控

安装

从 PyPI 安装

pip install zmq-server

本地安装

pip install -r requirements.txt

快速开始

服务器端

from zmq_server import ZMQServer
import threading

# 创建服务器
server = ZMQServer(
    host="*", 
    port=5555, 
    users={
        "admin": "admin123",
        "user1": "password1"
    },
    heartbeat_interval=30,  # 心跳间隔(秒)
    client_timeout=60  # 客户端超时时间(秒)
)

# 启动服务器
server_thread = threading.Thread(target=server.start, daemon=True)
server_thread.start()

客户端

from zmq_server import ZMQClient

# 创建客户端(client_id 必须且最大8位)
client = ZMQClient(
    client_id="client001",  # 客户端ID(必填,最大8位)
    server_host="localhost",
    server_port=5555
)
client.connect()

# 认证
client.authenticate("user1", "password1")

# 订阅 Topic
client.subscribe("tech")
client.subscribe("news")

# 发布消息
client.publish("tech", "Hello World!")

# 开始接收消息
client.start_receiving()

详细使用

服务器配置

创建服务器

from zmq_server import ZMQServer

# 基本配置
server = ZMQServer(
    host="*",            # 监听地址
    port=5555,           # 监听端口
    users={              # 用户字典
        "admin": "admin123",
        "user1": "password1"
    },
    heartbeat_interval=30,  # 心跳间隔(秒)
    client_timeout=60      # 客户端超时时间(秒)
)

# 添加用户(可选)
server.add_user("user2", "password2")

启动服务器

# 方式1:在主线程启动
server.start()

# 方式2:在后台线程启动
import threading
server_thread = threading.Thread(target=server.start, daemon=True)
server_thread.start()

停止服务器

server.stop()

客户端配置

创建客户端

from zmq_server import ZMQClient

# 创建客户端(client_id 为必填参数)
client = ZMQClient(
    client_id="client001",  # 客户端ID(必填,最大8位)
    server_host="localhost",  # 服务器地址
    server_port=5555          # 服务器端口
)

# 连接服务器
client.connect()

客户端ID限制

  • 必填参数:创建客户端时必须提供 client_id
  • 长度限制:客户端ID不能超过8位
  • 唯一性:相同的客户端ID不能同时在线
# 正确的客户端ID
client = ZMQClient(client_id="client001")  # 6位,有效
client = ZMQClient(client_id="c1234567")  # 8位,有效

# 错误的客户端ID
client = ZMQClient(client_id="client00123")  # 9位,会抛出 ValueError

认证

# 认证
if client.authenticate("user1", "password1"):
    print("认证成功")
else:
    print("认证失败")

Topic 操作

# 订阅 Topic
client.subscribe("tech")
client.subscribe("news")

# 取消订阅
client.unsubscribe("news")

# 获取已订阅的 Topic
subscribed = client.get_subscribed_topics()
print(f"已订阅: {subscribed}")

消息发布

# 发布消息
client.publish("tech", "Python 4.0 即将发布!")
client.publish("news", "今天天气晴朗")

消息接收

# 开始接收消息
client.start_receiving()

# 停止接收消息
client.stop_receiving()

自定义消息处理器

# 自定义消息处理器
def custom_handler(message):
    topic = message.get("topic", "")
    content = message.get("content", "")
    print(f"收到消息 - 主题: {topic}, 内容: {content}")

# 注册处理器
client.register_handler("message", custom_handler)

# 设置默认处理器(处理未注册类型的消息)
client.set_default_handler(lambda msg: print(f"默认处理: {msg}"))

消息格式

认证消息

{
  "type": "auth",
  "username": "user1",
  "password": "password1"
}

订阅消息

{
  "type": "subscribe",
  "topic": "tech"
}

发布消息

{
  "type": "publish",
  "topic": "tech",
  "content": "Hello World!"
}

广播消息(服务器发送给客户端)

{
  "type": "message",
  "topic": "tech",
  "from": "user1",
  "content": "Hello World!",
  "timestamp": 1640995200
}

心跳消息

{
  "type": "heartbeat",
  "timestamp": 1640995200
}

日志系统

日志路径

日志文件保存在 logs/ 目录下,按日期命名:

  • logs/zmq_2024-01-01.log

日志内容

  • 服务器启动/停止
  • 客户端认证(成功/失败)
  • Topic 订阅/取消订阅
  • 消息发布/转发
  • 心跳检测
  • 客户端超时
  • 错误信息

高级功能

在线客户端管理

服务器会自动管理在线客户端,认证成功后会在日志中显示当前在线客户端列表:

[2024-01-01 12:00:00] INFO     客户端 user1 认证成功 - 客户端ID: client001
[2024-01-01 12:00:00] INFO     当前在线客户端: 1
[2024-01-01 12:00:00] INFO     =============在线客户端信息============
[2024-01-01 12:00:00] INFO     client_id:client001 - client_user: user1
[2024-01-01 12:00:00] INFO     =============在线客户端信息============

自定义消息处理器

服务器端也支持自定义消息处理器:

def custom_message_handler(identity, message):
    # 处理自定义消息类型
    return {
        "type": "custom_response",
        "status": "success",
        "message": "自定义消息处理成功"
    }

# 注册自定义处理器
server.register_handler("custom_message", custom_message_handler)

服务器统计

# 获取服务器统计信息
stats = server.get_statistics()
print(f"连接客户端数: {stats['connected_clients']}")
print(f"Topic数量: {stats['topic_statistics']['total_topics']}")

技术参数

服务器参数

参数 类型 默认值 说明
host str "*" 服务器监听地址
port int 5555 服务器监听端口
users dict None 用户字典 {"username": "password"}
heartbeat_interval int 30 心跳间隔(秒)
client_timeout int 60 客户端超时时间(秒)

客户端参数

参数 类型 默认值 说明
client_id str 必填 客户端ID(最大8位)
server_host str "localhost" 服务器地址
server_port int 5555 服务器端口

注意事项

  1. 客户端ID:创建客户端时必须提供 client_id,且长度不能超过8位
  2. 认证:客户端必须先认证才能使用其他功能
  3. Topic:客户端只能接收已订阅 Topic 的消息
  4. 心跳:客户端会自动响应服务器的心跳包
  5. 超时:超过客户端超时时间未响应的客户端会被自动清理
  6. 日志:确保 logs/ 目录存在且可写

版本信息

故障排查

常见问题

客户端ID已在线

认证失败:客户端ID client001 已在线

解决方案:使用不同的客户端ID,或确保之前的客户端已正确关闭

认证失败

认证失败:用户名或密码错误

解决方案:检查用户名和密码是否正确

客户端ID过长

ValueError: 客户端ID不能超过8位

解决方案:使用长度不超过8位的客户端ID

未认证错误

未认证,无法订阅主题

解决方案:先调用 authenticate() 进行认证

示例代码

完整服务器示例

from zmq_server import ZMQServer

if __name__ == '__main__':
    users = {
        "admin": "admin123",
        "user1": "password1",
        "user2": "password2"
    }

    server = ZMQServer(
        host="*",
        port=5555,
        users=users,
        heartbeat_interval=10,
        client_timeout=20
    )
    
    print("服务器启动,按 Ctrl+C 退出")
    server.start()

完整客户端示例

from zmq_server import ZMQClient
import time

if __name__ == '__main__':
    # 创建客户端
    client = ZMQClient(
        client_id="test001",
        server_host="localhost",
        server_port=5555
    )
    client.connect()
    
    # 认证
    if client.authenticate("user1", "password1"):
        print("认证成功")
        
        # 订阅
        client.subscribe("chat")
        client.subscribe("通知")
        
        # 开始接收
        client.start_receiving()
        
        # 发布消息
        time.sleep(1)
        client.publish("chat", "大家好!")
        client.publish("通知", "系统维护通知")
        
        print("客户端运行中,按 Ctrl+C 退出")
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            print("退出")
    
    client.close()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

zmq_server-1.0.0.tar.gz (13.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

zmq_server-1.0.0-py3-none-any.whl (10.2 kB view details)

Uploaded Python 3

File details

Details for the file zmq_server-1.0.0.tar.gz.

File metadata

  • Download URL: zmq_server-1.0.0.tar.gz
  • Upload date:
  • Size: 13.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for zmq_server-1.0.0.tar.gz
Algorithm Hash digest
SHA256 04aa02d6e5c7f3eb0b9654565a4972f2e3518958326267c5ae3ce2bb063264b7
MD5 470ee0f466176fc3afef347ea4cf0adc
BLAKE2b-256 289baf3101259bde8047f21a9f7da3981e64151a3cc84c9a71b942009211ec51

See more details on using hashes here.

File details

Details for the file zmq_server-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: zmq_server-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 10.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for zmq_server-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1c92dc90c618f39b202fb9ea7d55fe0b4410319b4cd1386dc40d730041966741
MD5 c5a307d5b9590a2e770e5352784010d4
BLAKE2b-256 0233e60620019958b6d28740a0f6a8ef4e92c3938f07828e4ca7567332ac11c5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page