Python async SDK for Surge — high-speed pub/sub data pipeline for quant trading
Project description
surge-python
Surge 的 Python 异步 SDK —— 为量化交易者打造的高速低延迟发布/订阅数据管道。
P50 延迟 < 30μs | P99 延迟 < 100μs | 吞吐量 100K+ msg/s
安装
pip install surge-python
依赖:Python >= 3.9,protobuf >= 4.0
快速开始
import asyncio
import json
from surge import Publisher, Subscriber
async def main():
# ── 发布行情 ──
pub = Publisher("127.0.0.1", 9800)
await pub.connect()
tick = json.dumps({"price": 15.20, "volume": 83400}).encode()
await pub.send("md.sz.000001", tick)
await pub.disconnect()
# ── 订阅行情 ──
sub = Subscriber("127.0.0.1", 9800)
await sub.connect()
await sub.subscribe(["md.sz.*"])
async for msg in sub.messages():
data = json.loads(msg.payload)
print(f"{msg.topic} price={data['price']} latency={msg.latency_us:.1f}μs")
asyncio.run(main())
核心概念
Surge 采用 Topic 发布/订阅 模型:
- Publisher 向指定 Topic 发布消息(如
md.sz.000001) - Subscriber 订阅 Topic 模式(支持通配符
md.sz.*),实时接收消息 - SurgeClient 是底层全功能客户端,同时支持发布和订阅
[行情源] ──publish──▶ [Surge Server] ──push──▶ [策略程序]
Topic 路由
通配符匹配
API 参考
Publisher — 发布者
纯发布场景的高级封装,内置自动重连。
from surge import Publisher
pub = Publisher(host="127.0.0.1", port=9800, token=None)
await pub.connect()
| 方法 | 说明 |
|---|---|
await pub.connect() |
连接到 Surge 服务端 |
await pub.send(topic, payload, headers=None) |
发布消息,payload 为 bytes,headers 为可选 dict[str, str] |
await pub.send_batch(messages) |
批量发布,messages 为 list[tuple[str, bytes]] |
await pub.disconnect() |
断开连接 |
pub.connected |
属性,返回连接状态 bool |
示例:批量发布
messages = [
("md.sz.000001", b'{"price":15.20}'),
("md.sz.000002", b'{"price":28.50}'),
("md.sh.600519", b'{"price":1680.00}'),
]
await pub.send_batch(messages)
示例:带 Headers 发布
await pub.send(
"md.sz.000001",
b'{"price":15.20,"volume":83400}',
headers={"source": "ctp", "exchange": "SZSE"}
)
Subscriber — 订阅者
纯订阅场景的高级封装,支持回调和异步迭代两种消费模式。
from surge import Subscriber
sub = Subscriber(host="127.0.0.1", port=9800, token=None)
await sub.connect()
| 方法 | 说明 |
|---|---|
await sub.connect() |
连接到 Surge 服务端 |
await sub.subscribe(topics, snapshot=False) |
订阅 Topic 列表,支持通配符 *,snapshot=True 获取最新快照 |
sub.on_message(callback) |
注册消息回调,回调接收 SurgeMessage 对象 |
sub.on_snapshot(callback) |
注册快照回调,回调接收 SurgeSnapshot 对象 |
sub.on_disconnect(callback) |
注册断连回调 |
async for msg in sub.messages() |
异步迭代接收消息 |
await sub.disconnect() |
断开连接 |
sub.connected |
属性,返回连接状态 bool |
示例:回调模式
import json
def handle_msg(msg):
tick = json.loads(msg.payload)
print(f"[{msg.topic}] price={tick['price']} latency={msg.latency_us:.1f}μs")
sub.on_message(handle_msg)
await sub.subscribe(["md.sz.*", "md.sh.*"])
# 保持运行
await asyncio.Event().wait()
示例:异步迭代模式
await sub.subscribe(["md.sz.000001"], snapshot=True)
async for msg in sub.messages():
if isinstance(msg, SurgeSnapshot):
print(f"快照: {msg.topic} seq={msg.latest_sequence}")
else:
print(f"实时: {msg.topic} seq={msg.sequence}")
SurgeClient — 底层全功能客户端
同时支持发布和订阅,提供完整控制能力。
from surge import SurgeClient
client = SurgeClient(
host="127.0.0.1",
port=9800,
token=None, # Token 认证(可选)
auto_reconnect=True, # 断线自动重连
reconnect_interval=1.0, # 重连间隔(秒)
)
await client.connect()
| 方法 | 说明 |
|---|---|
await client.connect() |
连接服务端,如设置 token 会自动完成认证 |
await client.publish(topic, payload, headers=None) |
发布消息 |
await client.subscribe(topics, snapshot=False) |
订阅 Topic 列表 |
await client.unsubscribe(topics=None) |
取消订阅 |
client.on_message(callback) |
注册消息回调 |
client.on_snapshot(callback) |
注册快照回调 |
client.on_disconnect(callback) |
注册断连回调 |
async for msg in client.messages() |
异步迭代接收消息 |
await client.disconnect() |
优雅断开连接 |
client.connected |
属性,返回连接状态 bool |
示例:同时发布和订阅
client = SurgeClient("127.0.0.1", 9800)
await client.connect()
# 订阅所有深市行情
client.on_message(lambda msg: print(msg.topic, msg.latency_us))
await client.subscribe(["md.sz.*"], snapshot=True)
# 同时发布
await client.publish("signal.buy", b'{"code":"000001","qty":1000}')
数据类型
SurgeMessage
收到的实时消息。
| 字段 | 类型 | 说明 |
|---|---|---|
topic |
str |
Topic 名称,如 md.sz.000001 |
sequence |
int |
消息序号(Topic 内递增) |
payload |
bytes |
消息体(通常为 JSON) |
timestamp |
int |
发布时间戳(纳秒) |
headers |
dict |
附加头信息 |
latency_us |
float |
端到端延迟(微秒) |
SurgeSnapshot
订阅时获取的最新快照。
| 字段 | 类型 | 说明 |
|---|---|---|
topic |
str |
Topic 名称 |
latest_sequence |
int |
该 Topic 最新序号 |
payload |
bytes |
最新消息体 |
Topic 通配符
| 模式 | 说明 | 示例 |
|---|---|---|
| 精确匹配 | 完全匹配 Topic 名 | md.sz.000001 |
* 通配 |
匹配单层任意字符 | md.sz.* 匹配 md.sz.000001 |
| 多主题 | 传入列表订阅多个 | ["md.sz.*", "md.sh.*"] |
Token 认证
服务端开启 Token 认证时,客户端需传入 Token:
pub = Publisher("127.0.0.1", 9800, token="your-secret-token")
await pub.connect() # 自动完成认证
认证失败将抛出 PermissionError。
自动重连
所有客户端默认开启自动重连:
- 断线后自动尝试重连(间隔 1 秒)
- 重连成功后自动恢复之前的所有订阅
- 通过
on_disconnect回调感知断连事件
sub.on_disconnect(lambda: print("连接断开,正在自动重连..."))
如需关闭自动重连:
client = SurgeClient("127.0.0.1", 9800, auto_reconnect=False)
日志
SDK 使用 Python 标准 logging 模块,logger 名称为 surge:
import logging
logging.basicConfig(level=logging.DEBUG)
# 或仅开启 surge 日志
logging.getLogger("surge").setLevel(logging.DEBUG)
完整示例:A 股行情转发
import asyncio
import json
from surge import Publisher, Subscriber
async def producer():
"""模拟行情源,发布 A 股 Tick 数据"""
pub = Publisher("127.0.0.1", 9800)
await pub.connect()
ticks = [
("md.sz.000001", {"price": 15.20, "bid": 15.19, "ask": 15.21, "volume": 83400}),
("md.sh.600519", {"price": 1680.0, "bid": 1679.5, "ask": 1680.5, "volume": 1200}),
("md.sz.000858", {"price": 152.30, "bid": 152.28, "ask": 152.32, "volume": 5600}),
]
for topic, data in ticks:
await pub.send(topic, json.dumps(data).encode())
print(f"[PUB] {topic}")
await pub.disconnect()
async def consumer():
"""订阅深市所有行情,计算延迟"""
sub = Subscriber("127.0.0.1", 9800)
await sub.connect()
await sub.subscribe(["md.sz.*"], snapshot=True)
count = 0
async for msg in sub.messages():
tick = json.loads(msg.payload)
print(f"[SUB] {msg.topic} price={tick['price']} latency={msg.latency_us:.1f}μs")
count += 1
if count >= 10:
break
await sub.disconnect()
async def main():
await asyncio.gather(producer(), consumer())
asyncio.run(main())
许可证
MIT License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file surge_python-0.1.1.tar.gz.
File metadata
- Download URL: surge_python-0.1.1.tar.gz
- Upload date:
- Size: 13.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f67cf8d7b4f42e586b48d4b3da13d99ad54ffacfb2980a4ba1603607bc89002c
|
|
| MD5 |
aa10e5ec4797112cf818680acdd47114
|
|
| BLAKE2b-256 |
7531d9275ddb499f66677b7413a4604e256e5710ac9b19723bf733b6d43ecc38
|
File details
Details for the file surge_python-0.1.1-py3-none-any.whl.
File metadata
- Download URL: surge_python-0.1.1-py3-none-any.whl
- Upload date:
- Size: 12.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8eccfad49d94b1a5a975dc4b9e7eb6914ec748806a0a26488c9ddf136ec1e0db
|
|
| MD5 |
d823b6d94d8a7c3059cac0af2b37b2df
|
|
| BLAKE2b-256 |
ffa64d0aa3792e624b789f236016387d7616f6d72e9c01f77bac17847777e9f8
|