Skip to main content

Python async SDK for Surge — high-speed pub/sub data pipeline for quant trading

Project description

surge-python

Surge 的 Python 异步 SDK —— 为量化交易者打造的高速低延迟发布/订阅数据管道。

P50 延迟 < 30μs | P99 延迟 < 100μs | 吞吐量 100K+ msg/s

安装

pip install surge-python

依赖:Python >= 3.9,protobuf >= 4.0

快速开始

import asyncio
import json
from surge import Publisher, Subscriber

async def main():
    # ── 发布行情 ──
    pub = Publisher("127.0.0.1", 9800)
    await pub.connect()

    tick = json.dumps({"price": 15.20, "volume": 83400}).encode()
    await pub.send("md.sz.000001", tick)
    await pub.disconnect()

    # ── 订阅行情 ──
    sub = Subscriber("127.0.0.1", 9800)
    await sub.connect()
    await sub.subscribe(["md.sz.*"])

    async for msg in sub.messages():
        data = json.loads(msg.payload)
        print(f"{msg.topic} price={data['price']} latency={msg.latency_us:.1f}μs")

asyncio.run(main())

核心概念

Surge 采用 Topic 发布/订阅 模型:

  • Publisher 向指定 Topic 发布消息(如 md.sz.000001
  • Subscriber 订阅 Topic 模式(支持通配符 md.sz.*),实时接收消息
  • SurgeClient 是底层全功能客户端,同时支持发布和订阅
[行情源] ──publish──▶ [Surge Server] ──push──▶ [策略程序]
                        Topic 路由
                       通配符匹配

API 参考

Publisher — 发布者

纯发布场景的高级封装,内置自动重连。

from surge import Publisher

pub = Publisher(host="127.0.0.1", port=9800, token=None)
await pub.connect()
方法 说明
await pub.connect() 连接到 Surge 服务端
await pub.send(topic, payload, headers=None) 发布消息,payloadbytesheaders 为可选 dict[str, str]
await pub.send_batch(messages) 批量发布,messageslist[tuple[str, bytes]]
await pub.disconnect() 断开连接
pub.connected 属性,返回连接状态 bool

示例:批量发布

messages = [
    ("md.sz.000001", b'{"price":15.20}'),
    ("md.sz.000002", b'{"price":28.50}'),
    ("md.sh.600519", b'{"price":1680.00}'),
]
await pub.send_batch(messages)

示例:带 Headers 发布

await pub.send(
    "md.sz.000001",
    b'{"price":15.20,"volume":83400}',
    headers={"source": "ctp", "exchange": "SZSE"}
)

Subscriber — 订阅者

纯订阅场景的高级封装,支持回调和异步迭代两种消费模式。

from surge import Subscriber

sub = Subscriber(host="127.0.0.1", port=9800, token=None)
await sub.connect()
方法 说明
await sub.connect() 连接到 Surge 服务端
await sub.subscribe(topics, snapshot=False) 订阅 Topic 列表,支持通配符 *snapshot=True 获取最新快照
sub.on_message(callback) 注册消息回调,回调接收 SurgeMessage 对象
sub.on_snapshot(callback) 注册快照回调,回调接收 SurgeSnapshot 对象
sub.on_disconnect(callback) 注册断连回调
async for msg in sub.messages() 异步迭代接收消息
await sub.disconnect() 断开连接
sub.connected 属性,返回连接状态 bool

示例:回调模式

import json

def handle_msg(msg):
    tick = json.loads(msg.payload)
    print(f"[{msg.topic}] price={tick['price']} latency={msg.latency_us:.1f}μs")

sub.on_message(handle_msg)
await sub.subscribe(["md.sz.*", "md.sh.*"])

# 保持运行
await asyncio.Event().wait()

示例:异步迭代模式

await sub.subscribe(["md.sz.000001"], snapshot=True)

async for msg in sub.messages():
    if isinstance(msg, SurgeSnapshot):
        print(f"快照: {msg.topic} seq={msg.latest_sequence}")
    else:
        print(f"实时: {msg.topic} seq={msg.sequence}")

SurgeClient — 底层全功能客户端

同时支持发布和订阅,提供完整控制能力。

from surge import SurgeClient

client = SurgeClient(
    host="127.0.0.1",
    port=9800,
    token=None,                # Token 认证(可选)
    auto_reconnect=True,       # 断线自动重连
    reconnect_interval=1.0,    # 重连间隔(秒)
)
await client.connect()
方法 说明
await client.connect() 连接服务端,如设置 token 会自动完成认证
await client.publish(topic, payload, headers=None) 发布消息
await client.subscribe(topics, snapshot=False) 订阅 Topic 列表
await client.unsubscribe(topics=None) 取消订阅
client.on_message(callback) 注册消息回调
client.on_snapshot(callback) 注册快照回调
client.on_disconnect(callback) 注册断连回调
async for msg in client.messages() 异步迭代接收消息
await client.disconnect() 优雅断开连接
client.connected 属性,返回连接状态 bool

示例:同时发布和订阅

client = SurgeClient("127.0.0.1", 9800)
await client.connect()

# 订阅所有深市行情
client.on_message(lambda msg: print(msg.topic, msg.latency_us))
await client.subscribe(["md.sz.*"], snapshot=True)

# 同时发布
await client.publish("signal.buy", b'{"code":"000001","qty":1000}')

数据类型

SurgeMessage

收到的实时消息。

字段 类型 说明
topic str Topic 名称,如 md.sz.000001
sequence int 消息序号(Topic 内递增)
payload bytes 消息体(通常为 JSON)
timestamp int 发布时间戳(纳秒)
headers dict 附加头信息
latency_us float 端到端延迟(微秒)

SurgeSnapshot

订阅时获取的最新快照。

字段 类型 说明
topic str Topic 名称
latest_sequence int 该 Topic 最新序号
payload bytes 最新消息体

Topic 通配符

模式 说明 示例
精确匹配 完全匹配 Topic 名 md.sz.000001
* 通配 匹配单层任意字符 md.sz.* 匹配 md.sz.000001
多主题 传入列表订阅多个 ["md.sz.*", "md.sh.*"]

Token 认证

服务端开启 Token 认证时,客户端需传入 Token:

pub = Publisher("127.0.0.1", 9800, token="your-secret-token")
await pub.connect()  # 自动完成认证

认证失败将抛出 PermissionError

自动重连

所有客户端默认开启自动重连:

  • 断线后自动尝试重连(间隔 1 秒)
  • 重连成功后自动恢复之前的所有订阅
  • 通过 on_disconnect 回调感知断连事件
sub.on_disconnect(lambda: print("连接断开,正在自动重连..."))

如需关闭自动重连:

client = SurgeClient("127.0.0.1", 9800, auto_reconnect=False)

日志

SDK 使用 Python 标准 logging 模块,logger 名称为 surge

import logging
logging.basicConfig(level=logging.DEBUG)

# 或仅开启 surge 日志
logging.getLogger("surge").setLevel(logging.DEBUG)

完整示例:A 股行情转发

import asyncio
import json
from surge import Publisher, Subscriber

async def producer():
    """模拟行情源,发布 A 股 Tick 数据"""
    pub = Publisher("127.0.0.1", 9800)
    await pub.connect()

    ticks = [
        ("md.sz.000001", {"price": 15.20, "bid": 15.19, "ask": 15.21, "volume": 83400}),
        ("md.sh.600519", {"price": 1680.0, "bid": 1679.5, "ask": 1680.5, "volume": 1200}),
        ("md.sz.000858", {"price": 152.30, "bid": 152.28, "ask": 152.32, "volume": 5600}),
    ]

    for topic, data in ticks:
        await pub.send(topic, json.dumps(data).encode())
        print(f"[PUB] {topic}")

    await pub.disconnect()

async def consumer():
    """订阅深市所有行情,计算延迟"""
    sub = Subscriber("127.0.0.1", 9800)
    await sub.connect()
    await sub.subscribe(["md.sz.*"], snapshot=True)

    count = 0
    async for msg in sub.messages():
        tick = json.loads(msg.payload)
        print(f"[SUB] {msg.topic} price={tick['price']} latency={msg.latency_us:.1f}μs")
        count += 1
        if count >= 10:
            break

    await sub.disconnect()

async def main():
    await asyncio.gather(producer(), consumer())

asyncio.run(main())

许可证

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

surge_python-0.1.1.tar.gz (13.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

surge_python-0.1.1-py3-none-any.whl (12.0 kB view details)

Uploaded Python 3

File details

Details for the file surge_python-0.1.1.tar.gz.

File metadata

  • Download URL: surge_python-0.1.1.tar.gz
  • Upload date:
  • Size: 13.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for surge_python-0.1.1.tar.gz
Algorithm Hash digest
SHA256 f67cf8d7b4f42e586b48d4b3da13d99ad54ffacfb2980a4ba1603607bc89002c
MD5 aa10e5ec4797112cf818680acdd47114
BLAKE2b-256 7531d9275ddb499f66677b7413a4604e256e5710ac9b19723bf733b6d43ecc38

See more details on using hashes here.

File details

Details for the file surge_python-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: surge_python-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 12.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for surge_python-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 8eccfad49d94b1a5a975dc4b9e7eb6914ec748806a0a26488c9ddf136ec1e0db
MD5 d823b6d94d8a7c3059cac0af2b37b2df
BLAKE2b-256 ffa64d0aa3792e624b789f236016387d7616f6d72e9c01f77bac17847777e9f8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page