Python SDK for pq-futures CTP trading system
Project description
CTPLite
Python SDK for CTPLite. 提供 gRPC 和 REST 两种客户端接口,方便开发人员在 Python 基于CTP API进行交易。
功能特性
gRPC 客户端
- 认证服务
- 登录、登出、刷新token
- 查询登录状态(查询UserInstance的详细登录状态信息)
- Token自动刷新机制(后台自动维护token有效性)
- 连接自动重连机制(支持指数退避重试)
- 行情服务
- 订阅/取消订阅行情数据(流式推送)
- CTP行情登出
- 交易服务
- 下单、撤单
- 查询持仓、资金账户、订单、成交
- 流式接收订单状态更新(支持自动重连)
- 流式接收连接状态更新(交易/行情连接状态监控)
- 查询合约信息、保证金率、手续费率
- 结算确认、查询结算信息
- 查询交易所、投资者信息
- 查询最大报单量
- CTP交易登出
REST 客户端
- 认证服务
- 登录、登出
- 查询登录状态(查询UserInstance的详细登录状态信息)
- 行情服务
- 订阅/取消订阅行情数据(支持Kafka topic)
- 交易服务
- 下单、撤单
- 查询持仓、资金账户、订单、成交
- 查询合约信息、保证金率、手续费率
- 结算确认、查询结算信息
- 查询交易所、投资者信息
- 查询最大报单量
- CTP交易登出
安装
pip install ctplite
pip install -U ctplite -i https://pypi.org/simple
配置说明
所有配置项通过 .env 文件进行管理,SDK 会自动读取 .env 文件中的配置。
配置项说明
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
CTPLITE_GRPC_HOST |
str | localhost |
gRPC服务器地址 |
CTPLITE_GRPC_PORT |
int | 50051 |
gRPC服务器端口 |
CTPLITE_REST_HOST |
str | localhost |
REST服务器地址 |
CTPLITE_REST_PORT |
int | 8080 |
REST服务器端口 |
CTP_MD_FRONT |
str | - | CTP行情前置地址(必填) |
CTP_TD_FRONT |
str | - | CTP交易前置地址(必填) |
CTP_BROKER_ID |
str | - | CTP经纪商代码(必填) |
CTP_USER_ID |
str | - | CTP用户代码(必填) |
CTP_PASSWORD |
str | - | CTP密码(必填) |
CTP_APP_ID |
str | simnow_client_test |
CTP应用标识(可选) |
CTP_AUTH_CODE |
str | 0000000000000000 |
CTP认证码(可选) |
CTP_INVESTOR_ID |
str | - | CTP投资者代码(可选) |
CTPLITE_TOKEN |
str | - | 会话token(登录后自动设置,可选) |
快速开始
配置环境变量
推荐使用 .env 文件来管理配置,避免在代码中硬编码敏感信息。
.env 文件应该放在运行 Python 脚本的目录(当前工作目录)。
推荐位置:项目根目录
my_project/
├── .env # 放在这里(推荐)
├── main.py # 运行: python main.py
.env 文件内容示例:
# gRPC 服务器配置
CTPLITE_GRPC_HOST=localhost
CTPLITE_GRPC_PORT=50051
# REST 服务器配置
CTPLITE_REST_HOST=localhost
CTPLITE_REST_PORT=8080
# CTP 前置地址(必填)
CTP_MD_FRONT=tcp://182.254.243.31:40011
CTP_TD_FRONT=tcp://182.254.243.31:40001
# CTP 认证信息
CTP_BROKER_ID=9999
CTP_USER_ID=your_user_id
CTP_PASSWORD=your_password
CTP_APP_ID=simnow_client_test
CTP_AUTH_CODE=0000000000000000
CTP_INVESTOR_ID=244753
# Token 认证(如果使用token认证,可替代密码认证)
CTPLITE_TOKEN=your_token
使用 gRPC 客户端
from ctplite import GrpcClient
# 创建客户端并连接(配置会自动从 .env 文件读取)
client = GrpcClient()
try:
# 连接到gRPC服务器(建立网络连接)
client.connect()
# 登录并获取token(用户认证)
client.login()
# 查询登录状态
status_resp = client.query_login_status()
if status_resp.error_code == 0:
print(f"实例状态: {status_resp.state}")
print(f"MD登录状态: {status_resp.md_logged_in}, MD连接状态: {status_resp.md_connected}")
print(f"TD登录状态: {status_resp.td_logged_in}, TD连接状态: {status_resp.td_connected}")
# 查询资金账户
account_resp = client.query_trading_account()
if account_resp.error_code == 0:
account = account_resp.account
print(f"账户余额: {account.balance:.2f}")
print(f"可用资金: {account.available:.2f}")
except Exception as e:
print(f"发生错误: {e}")
finally:
# 关闭连接
client.close()
使用 REST 客户端
from ctplite import RestClient
# 创建客户端(配置会自动从 .env 文件读取)
client = RestClient()
try:
# 登录
result = client.login()
print(f"登录成功: {result['success']}")
# 查询登录状态
result = client.query_login_status()
if result.get('success'):
data = result['data']
print(f"实例状态: {data.get('state')}")
print(f"MD登录状态: {data.get('md_logged_in')}, MD连接状态: {data.get('md_connected')}")
print(f"TD登录状态: {data.get('td_logged_in')}, TD连接状态: {data.get('td_connected')}")
# 查询资金账户
result = client.query_trading_account()
if result.get('success'):
account = result['data'].get('account', {})
print(f"账户余额: {account.get('balance', 0):.2f}")
print(f"可用资金: {account.get('available', 0):.2f}")
except Exception as e:
print(f"发生错误: {e}")
finally:
# 登出
client.logout()
SDK 客户端接口详情
gRPC 客户端接口
connect
函数定义
def connect(self, max_retries: int = -1, initial_retry_delay: float = 1.0, max_retry_delay: float = 60.0) -> bool
功能说明
连接到gRPC服务器,建立网络连接。支持自动重连机制,使用指数退避策略。
参数说明
- max_retries (int, optional): 最大重试次数,默认为
-1(无限重试) - initial_retry_delay (float, optional): 初始重试延迟(秒),默认为
1.0 - max_retry_delay (float, optional): 最大重试延迟(秒),默认为
60.0
返回值
bool: 连接是否成功
使用案例
from ctplite import GrpcClient
client = GrpcClient()
# 连接到服务器(无限重试,指数退避)
client.connect()
# 或指定最大重试次数
client.connect(max_retries=5, initial_retry_delay=2.0, max_retry_delay=30.0)
login
函数定义
def login(self, ctp_md_front: Optional[str] = None, ctp_td_front: Optional[str] = None) -> bool
功能说明
登录并获取token。登录成功后,token会自动保存,后续请求自动使用。同时会启动token自动刷新机制。
参数说明
- ctp_md_front (Optional[str], optional): CTP行情前置地址(格式:
tcp://IP:PORT),如果不提供则从config读取 - ctp_td_front (Optional[str], optional): CTP交易前置地址(格式:
tcp://IP:PORT),如果不提供则从config读取
返回值
bool: 是否登录成功
使用案例
from ctplite import GrpcClient
client = GrpcClient()
client.connect()
# 使用配置文件中的前置地址登录
client.login()
# 或指定前置地址
client.login(
ctp_md_front="tcp://182.254.243.31:40011",
ctp_td_front="tcp://182.254.243.31:40001"
)
logout
函数定义
def logout(self) -> bool
功能说明
登出并清除token。登出时会停止token自动刷新机制。
返回值
bool: 是否登出成功
使用案例
client.logout()
query_login_status
函数定义
def query_login_status(self) -> auth_pb2.QueryLoginStatusResponse
功能说明
查询UserInstance的详细登录状态信息,包括实例状态、MD/TD登录状态、连接状态等。
返回值
QueryLoginStatusResponse: 包含以下字段:error_code(int): 错误码,0表示成功error_message(str): 错误消息state(str): 实例状态(CREATING/ONLINE/MIGRATING/RECOVERING/RECONNECTING/OFFLINE/FAILED)md_logged_in(bool): MD登录状态md_connected(bool): MD连接状态td_logged_in(bool): TD登录状态td_connected(bool): TD连接状态md_reconnecting(bool): MD是否正在重连td_reconnecting(bool): TD是否正在重连ref_count(int): 引用该实例的有效会话数量last_access_time(int): 最后访问时间(Unix秒)node_address(str): 实例所在节点地址
使用案例
status_resp = client.query_login_status()
if status_resp.error_code == 0:
print(f"实例状态: {status_resp.state}")
print(f"MD登录状态: {status_resp.md_logged_in}, MD连接状态: {status_resp.md_connected}")
print(f"TD登录状态: {status_resp.td_logged_in}, TD连接状态: {status_resp.td_connected}")
print(f"节点地址: {status_resp.node_address}")
subscribe_market_data
函数定义
def subscribe_market_data(self, symbols: List[str], kafka_topic: str) -> market_data_pb2.SubscribeResponse
功能说明
订阅行情数据。订阅成功后,行情数据会统一推送到指定的Kafka topic,不通过gRPC流返回。
参数说明
- symbols (List[str]): 合约代码列表,例如:
['IF2603', 'IH2603'] - kafka_topic (str): Kafka topic(必填),指定数据推送的Kafka主题
返回值
SubscribeResponse: 包含以下字段:error_code(int): 错误码,0表示成功error_message(str): 错误消息subscribed_symbols(List[str]): 成功订阅的合约代码列表
使用案例
from ctplite import GrpcClient
client = GrpcClient()
client.connect()
client.login()
# 订阅行情,数据推送到Kafka
response = client.subscribe_market_data(
symbols=['IF2603', 'IH2603', 'IC2603'],
kafka_topic='mkt_ctp_ticks'
)
if response.error_code == 0:
print(f"订阅成功: {list(response.subscribed_symbols)}")
print("行情数据将推送到Kafka topic: mkt_ctp_ticks")
unsubscribe_market_data
函数定义
def unsubscribe_market_data(self, symbols: List[str]) -> market_data_pb2.UnsubscribeResponse
功能说明
取消订阅行情数据。
参数说明
- symbols (List[str]): 要取消订阅的合约代码列表
返回值
UnsubscribeResponse: 包含错误码和错误消息
使用案例
response = client.unsubscribe_market_data(['IF2603', 'IH2603'])
if response.error_code == 0:
print("取消订阅成功")
place_order
函数定义
def place_order(
self,
symbol: str,
exchange: str,
direction: trading_pb2.Direction,
offset: trading_pb2.Offset,
price: float,
volume: int,
order_type: trading_pb2.OrderType = trading_pb2.OrderType.LIMIT
) -> trading_pb2.OrderResponse
功能说明
下单。支持限价单、市价单、FAK、FOK等多种订单类型。
参数说明
- symbol (str): 合约代码,例如:
'IF2603' - exchange (str): 交易所代码,例如:
'CFFEX' - direction (trading_pb2.Direction): 买卖方向
trading_pb2.Direction.BUY: 买入trading_pb2.Direction.SELL: 卖出
- offset (trading_pb2.Offset): 开平标志
trading_pb2.Offset.OPEN: 开仓trading_pb2.Offset.CLOSE: 平仓trading_pb2.Offset.CLOSE_TODAY: 平今trading_pb2.Offset.CLOSE_YESTERDAY: 平昨
- price (float): 价格(市价单为0)
- volume (int): 数量
- order_type (trading_pb2.OrderType, optional): 订单类型,默认为
LIMITtrading_pb2.OrderType.LIMIT: 限价单trading_pb2.OrderType.MARKET: 市价单trading_pb2.OrderType.FAK: FAK(立即成交剩余撤销)trading_pb2.OrderType.FOK: FOK(全部成交否则撤销)
返回值
OrderResponse: 包含以下字段:error_code(int): 错误码,0表示成功error_message(str): 错误消息order_id(str): 订单编号
使用案例
from ctplite import GrpcClient
from ctplite.proto import trading_pb2
client = GrpcClient()
client.connect()
client.login()
# 限价单:买入开仓
response = client.place_order(
symbol='IF2603',
exchange='CFFEX',
direction=trading_pb2.Direction.BUY,
offset=trading_pb2.Offset.OPEN,
price=3500.0,
volume=1,
order_type=trading_pb2.OrderType.LIMIT
)
if response.error_code == 0:
print(f"下单成功,订单编号: {response.order_id}")
# 市价单:卖出平仓
response = client.place_order(
symbol='IF2603',
exchange='CFFEX',
direction=trading_pb2.Direction.SELL,
offset=trading_pb2.Offset.CLOSE,
price=0, # 市价单价格为0
volume=1,
order_type=trading_pb2.OrderType.MARKET
)
cancel_order
函数定义
def cancel_order(
self,
order_id: str,
symbol: str,
exchange: str
) -> trading_pb2.OrderResponse
功能说明
撤单。
参数说明
- order_id (str): 订单编号
- symbol (str): 合约代码
- exchange (str): 交易所代码
返回值
OrderResponse: 包含错误码、错误消息和订单编号
使用案例
response = client.cancel_order(
order_id='123456',
symbol='IF2603',
exchange='CFFEX'
)
if response.error_code == 0:
print("撤单成功")
query_position
函数定义
def query_position(
self,
symbol: str = "",
exchange: str = ""
) -> trading_pb2.PositionResponse
功能说明
查询持仓。可以查询所有持仓或指定合约的持仓。
参数说明
- symbol (str, optional): 合约代码,空字符串表示查询所有持仓
- exchange (str, optional): 交易所代码(可选)
返回值
PositionResponse: 包含以下字段:error_code(int): 错误码,0表示成功error_message(str): 错误消息positions(List[Position]): 持仓列表,每个Position包含:symbol(str): 合约代码exchange(str): 交易所代码direction(str): 持仓方向('LONG'/'SHORT')position(int): 持仓数量today_position(int): 今日持仓yesterday_position(int): 昨日持仓open_cost(float): 开仓成本position_cost(float): 持仓成本profit(float): 持仓盈亏
使用案例
# 查询所有持仓
position_resp = client.query_position()
if position_resp.error_code == 0:
print(f"持仓数量: {len(position_resp.positions)}")
for pos in position_resp.positions:
print(f"{pos.symbol}: {pos.position}手, 盈亏: {pos.profit:.2f}")
# 查询指定合约的持仓
position_resp = client.query_position(symbol='IF2603', exchange='CFFEX')
query_trading_account
函数定义
def query_trading_account(self) -> trading_pb2.TradingAccountResponse
功能说明
查询资金账户信息。
返回值
TradingAccountResponse: 包含以下字段:error_code(int): 错误码,0表示成功error_message(str): 错误消息account(TradingAccount): 账户信息,包含:balance(float): 账户余额available(float): 可用资金frozen(float): 冻结资金margin(float): 占用保证金profit(float): 持仓盈亏commission(float): 手续费
使用案例
account_resp = client.query_trading_account()
if account_resp.error_code == 0:
account = account_resp.account
print(f"账户余额: {account.balance:.2f}")
print(f"可用资金: {account.available:.2f}")
print(f"占用保证金: {account.margin:.2f}")
print(f"持仓盈亏: {account.profit:.2f}")
query_order
函数定义
def query_order(
self,
symbol: str = "",
exchange: str = "",
order_sys_id: str = ""
) -> trading_pb2.OrderQueryResponse
功能说明
查询订单。可以查询所有订单或通过系统订单号查询指定订单。
参数说明
- symbol (str, optional): 合约代码,空字符串表示查询所有订单
- exchange (str, optional): 交易所代码(可选)
- order_sys_id (str, optional): 系统订单号(可选,CTP API仅支持通过系统订单号查询)
返回值
OrderQueryResponse: 包含错误码、错误消息和订单列表
使用案例
# 查询所有订单
order_resp = client.query_order()
if order_resp.error_code == 0:
print(f"订单数量: {len(order_resp.orders)}")
# 通过系统订单号查询
order_resp = client.query_order(order_sys_id='123456')
query_trade
函数定义
def query_trade(
self,
symbol: str = "",
exchange: str = "",
trade_id: str = ""
) -> trading_pb2.TradeQueryResponse
功能说明
查询成交记录。
参数说明
- symbol (str, optional): 合约代码,空字符串表示查询所有成交
- exchange (str, optional): 交易所代码(可选)
- trade_id (str, optional): 成交编号(可选)
返回值
TradeQueryResponse: 包含错误码、错误消息和成交列表
使用案例
# 查询所有成交
trade_resp = client.query_trade()
if trade_resp.error_code == 0:
print(f"成交数量: {len(trade_resp.trades)}")
# 查询指定合约的成交
trade_resp = client.query_trade(symbol='IF2603', exchange='CFFEX')
stream_order_status
函数定义
def stream_order_status(
self,
max_retries: int = -1,
initial_retry_delay: float = 1.0,
max_retry_delay: float = 60.0
) -> Iterator[trading_pb2.OrderStatusUpdate]
功能说明
流式接收订单状态更新。支持自动重连机制,使用指数退避策略。
参数说明
- max_retries (int, optional): 最大重试次数,默认为
-1(无限重试) - initial_retry_delay (float, optional): 初始重试延迟(秒),默认为
1.0 - max_retry_delay (float, optional): 最大重试延迟(秒),默认为
60.0
返回值
Iterator[OrderStatusUpdate]: 订单状态更新消息的迭代器,每个消息包含:error_code(int): 错误码error_message(str): 错误消息order(Order): 订单信息,包含订单编号、状态、成交数量等
使用案例
# 在后台线程中监听订单状态
import threading
def listen_order_status():
try:
for update in client.stream_order_status():
if update.error_code == 0 and update.order:
order = update.order
print(f"订单状态更新: {order.order_id}, 状态: {order.status}, 成交数量: {order.filled_volume}")
except Exception as e:
print(f"监听订单状态出错: {e}")
# 启动监听线程
thread = threading.Thread(target=listen_order_status, daemon=True)
thread.start()
stream_connection_status
函数定义
def stream_connection_status(
self,
connection_type: str = "TD",
max_retries: int = -1,
initial_retry_delay: float = 1.0,
max_retry_delay: float = 60.0
) -> Iterator[common_pb2.ConnectionStatusUpdate]
功能说明
流式接收连接状态更新。用于监控交易或行情连接状态。
参数说明
- connection_type (str, optional): 连接类型,默认为
"TD""TD": 交易连接"MD": 行情连接
- max_retries (int, optional): 最大重试次数,默认为
-1(无限重试) - initial_retry_delay (float, optional): 初始重试延迟(秒),默认为
1.0 - max_retry_delay (float, optional): 最大重试延迟(秒),默认为
60.0
返回值
Iterator[ConnectionStatusUpdate]: 连接状态更新消息的迭代器
使用案例
# 监听交易连接状态
for update in client.stream_connection_status(connection_type="TD"):
print(f"交易连接状态: {update.connected}, 消息: {update.message}")
# 监听行情连接状态
for update in client.stream_connection_status(connection_type="MD"):
print(f"行情连接状态: {update.connected}, 消息: {update.message}")
query_instrument
函数定义
def query_instrument(
self,
symbol: str = "",
exchange: str = ""
) -> trading_pb2.InstrumentQueryResponse
功能说明
查询合约信息。
参数说明
- symbol (str, optional): 合约代码,空字符串表示查询所有合约
- exchange (str, optional): 交易所代码(可选)
返回值
InstrumentQueryResponse: 包含错误码、错误消息和合约信息列表
使用案例
# 查询指定合约信息
instrument_resp = client.query_instrument(symbol='IF2603', exchange='CFFEX')
if instrument_resp.error_code == 0:
for inst in instrument_resp.instruments:
print(f"合约代码: {inst.symbol}, 合约名称: {inst.instrument_name}")
print(f"合约乘数: {inst.volume_multiple}, 最小变动价位: {inst.price_tick}")
query_instrument_margin_rate
函数定义
def query_instrument_margin_rate(
self,
symbol: str = "",
exchange: Optional[str] = None,
hedge_flag: str = "1"
) -> trading_pb2.InstrumentMarginRateResponse
功能说明
查询合约保证金率。
参数说明
- symbol (str, optional): 合约代码,空字符串表示查询所有
- exchange (Optional[str], optional): 交易所代码(可选)
- hedge_flag (str, optional): 投机套保标志,默认为
"1"(投机)
返回值
InstrumentMarginRateResponse: 包含错误码、错误消息和保证金率列表
使用案例
margin_resp = client.query_instrument_margin_rate(symbol='IF2603', exchange='CFFEX')
if margin_resp.error_code == 0:
for rate in margin_resp.margin_rates:
print(f"合约: {rate.symbol}, 保证金率: {rate.long_margin_ratio}")
query_instrument_commission_rate
函数定义
def query_instrument_commission_rate(
self,
symbol: str = "",
exchange: str = ""
) -> trading_pb2.InstrumentCommissionRateResponse
功能说明
查询合约手续费率。
参数说明
- symbol (str, optional): 合约代码,空字符串表示查询所有
- exchange (str, optional): 交易所代码(可选)
返回值
InstrumentCommissionRateResponse: 包含错误码、错误消息和手续费率列表
使用案例
commission_resp = client.query_instrument_commission_rate(symbol='IF2603', exchange='CFFEX')
if commission_resp.error_code == 0:
for rate in commission_resp.commission_rates:
print(f"合约: {rate.symbol}, 开仓手续费: {rate.open_ratio_by_money}")
query_max_order_volume
函数定义
def query_max_order_volume(
self,
symbol: str,
exchange: str,
direction: trading_pb2.Direction,
offset: trading_pb2.Offset,
hedge_flag: str = ""
) -> trading_pb2.MaxOrderVolumeResponse
功能说明
查询最大报单量。
参数说明
- symbol (str): 合约代码(必需)
- exchange (str): 交易所代码(必需)
- direction (trading_pb2.Direction): 买卖方向
- offset (trading_pb2.Offset): 开平标志
- hedge_flag (str, optional): 投机套保标志(可选)
返回值
MaxOrderVolumeResponse: 包含错误码、错误消息和最大报单量
使用案例
max_vol_resp = client.query_max_order_volume(
symbol='IF2603',
exchange='CFFEX',
direction=trading_pb2.Direction.BUY,
offset=trading_pb2.Offset.OPEN
)
if max_vol_resp.error_code == 0:
print(f"最大报单量: {max_vol_resp.max_volume}")
REST 客户端接口
login
函数定义
def login(
self,
use_token: bool = True,
ctp_md_front: Optional[str] = None,
ctp_td_front: Optional[str] = None
) -> Dict[str, Any]
功能说明
登录并获取token。如果已有token且use_token=True,则直接返回成功(不重复登录)。
参数说明
- use_token (bool, optional): 如果为True且已有token,则直接返回成功,默认为
True - ctp_md_front (Optional[str], optional): CTP行情前置地址(格式:
tcp://IP:PORT),如果不提供则从config读取 - ctp_td_front (Optional[str], optional): CTP交易前置地址(格式:
tcp://IP:PORT),如果不提供则从config读取
返回值
Dict[str, Any]: 响应数据字典,标准格式包含:success(bool): 是否成功code(int): 错误码,0表示成功msg(str): 消息data(dict): 数据,包含token字段
使用案例
from ctplite import RestClient
client = RestClient()
# 登录
result = client.login()
if result.get('success'):
print(f"登录成功,token: {result['data'].get('token')}")
# 指定前置地址登录
result = client.login(
ctp_md_front="tcp://182.254.243.31:40011",
ctp_td_front="tcp://182.254.243.31:40001"
)
logout
函数定义
def logout(self) -> Dict[str, Any]
功能说明
登出并清除token。
返回值
Dict[str, Any]: 响应数据字典,标准格式包含success、code、msg、data
使用案例
result = client.logout()
if result.get('success'):
print("登出成功")
query_login_status
函数定义
def query_login_status(self, use_token: bool = True) -> Dict[str, Any]
功能说明
查询UserInstance的详细登录状态信息。
参数说明
- use_token (bool, optional): 是否使用token认证(如果已登录),默认为
True
返回值
Dict[str, Any]: 响应数据字典,data字段包含:instance_key(str): 实例键user_id(str): 用户IDbroker_id(str): 经纪商IDinvestor_id(str): 投资者IDstate(str): 实例状态,可能的值:creating: 正在创建(获取锁后)online: 在线(CTP已连接并登录)migrating: 正在迁移(路由表更新中)recovering: 正在恢复(目标节点重建中)reconnecting: 正在重连(CTP断线重连中)offline: 离线(实例已停止)failed: 失败(创建/恢复失败)
md_logged_in(bool): MD登录状态md_connected(bool): MD连接状态td_logged_in(bool): TD登录状态td_connected(bool): TD连接状态md_reconnecting(bool): MD是否正在重连td_reconnecting(bool): TD是否正在重连ref_count(int): 引用计数last_access_time(int): 最后访问时间node_address(str): 节点地址
使用案例
result = client.query_login_status()
if result.get('success'):
data = result['data']
print(f"实例状态: {data.get('state')}")
print(f"MD登录状态: {data.get('md_logged_in')}, MD连接状态: {data.get('md_connected')}")
print(f"TD登录状态: {data.get('td_logged_in')}, TD连接状态: {data.get('td_connected')}")
subscribe_market_data
函数定义
def subscribe_market_data(
self,
symbols: List[str],
kafka_topic: str,
use_token: bool = True
) -> Dict[str, Any]
功能说明
订阅行情数据。订阅成功后,行情数据会统一推送到指定的Kafka topic。
参数说明
- symbols (List[str]): 合约代码列表
- kafka_topic (str): Kafka topic(必填),指定数据推送的Kafka主题
- use_token (bool, optional): 是否使用token认证(如果已登录),默认为
True
返回值
Dict[str, Any]: 响应数据字典,标准格式包含success、code、msg、data
使用案例
result = client.subscribe_market_data(
symbols=['IF2603', 'IH2603'],
kafka_topic='mkt_ctp_ticks'
)
if result.get('success'):
print("订阅成功,行情数据将推送到Kafka topic: mkt_ctp_ticks")
unsubscribe_market_data
函数定义
def unsubscribe_market_data(self, symbols: List[str], use_token: bool = True) -> Dict[str, Any]
功能说明
取消订阅行情数据。
参数说明
- symbols (List[str]): 要取消订阅的合约代码列表
- use_token (bool, optional): 是否使用token认证(如果已登录),默认为
True
返回值
Dict[str, Any]: 响应数据字典,标准格式包含success、code、msg、data
使用案例
result = client.unsubscribe_market_data(['IF2603', 'IH2603'])
if result.get('success'):
print("取消订阅成功")
place_order
函数定义
def place_order(
self,
symbol: str,
exchange: str,
direction: str, # 'BUY' or 'SELL'
offset: str, # 'OPEN', 'CLOSE', 'CLOSE_TODAY', 'CLOSE_YESTERDAY'
price: float,
volume: int,
order_type: str = 'LIMIT', # 'LIMIT', 'MARKET', 'FAK', 'FOK'
use_token: bool = True
) -> Dict[str, Any]
功能说明
下单。支持限价单、市价单、FAK、FOK等多种订单类型。
参数说明
- symbol (str): 合约代码
- exchange (str): 交易所代码
- direction (str): 买卖方向,
'BUY'或'SELL' - offset (str): 开平标志,
'OPEN'、'CLOSE'、'CLOSE_TODAY'、'CLOSE_YESTERDAY' - price (float): 价格(市价单为0)
- volume (int): 数量
- order_type (str, optional): 订单类型,默认为
'LIMIT',可选值:'LIMIT'、'MARKET'、'FAK'、'FOK' - use_token (bool, optional): 是否使用token认证(如果已登录),默认为
True
返回值
Dict[str, Any]: 响应数据字典,data字段包含order_id(订单编号)
使用案例
# 限价单:买入开仓
result = client.place_order(
symbol='IF2603',
exchange='CFFEX',
direction='BUY',
offset='OPEN',
price=3500.0,
volume=1,
order_type='LIMIT'
)
if result.get('success'):
print(f"下单成功,订单编号: {result['data'].get('order_id')}")
# 市价单:卖出平仓
result = client.place_order(
symbol='IF2603',
exchange='CFFEX',
direction='SELL',
offset='CLOSE',
price=0, # 市价单价格为0
volume=1,
order_type='MARKET'
)
cancel_order
函数定义
def cancel_order(
self,
order_id: str,
symbol: str,
exchange: str,
use_token: bool = True
) -> Dict[str, Any]
功能说明
撤单。
参数说明
- order_id (str): 订单编号
- symbol (str): 合约代码
- exchange (str): 交易所代码
- use_token (bool, optional): 是否使用token认证(如果已登录),默认为
True
返回值
Dict[str, Any]: 响应数据字典,标准格式包含success、code、msg、data
使用案例
result = client.cancel_order(
order_id='123456',
symbol='IF2603',
exchange='CFFEX'
)
if result.get('success'):
print("撤单成功")
query_position
函数定义
def query_position(
self,
symbol: str = "",
exchange: str = "",
use_token: bool = True
) -> Dict[str, Any]
功能说明
查询持仓。可以查询所有持仓或指定合约的持仓。
参数说明
- symbol (str, optional): 合约代码,空字符串表示查询所有持仓
- exchange (str, optional): 交易所代码(可选)
- use_token (bool, optional): 是否使用token认证(如果已登录),默认为
True
返回值
Dict[str, Any]: 响应数据字典,data字段包含positions列表
使用案例
# 查询所有持仓
result = client.query_position()
if result.get('success'):
positions = result['data'].get('positions', [])
print(f"持仓数量: {len(positions)}")
for pos in positions:
print(f"{pos.get('symbol')}: {pos.get('position')}手")
# 查询指定合约的持仓
result = client.query_position(symbol='IF2603', exchange='CFFEX')
query_trading_account
函数定义
def query_trading_account(self, use_token: bool = True) -> Dict[str, Any]
功能说明
查询资金账户信息。
参数说明
- use_token (bool, optional): 是否使用token认证(如果已登录),默认为
True
返回值
Dict[str, Any]: 响应数据字典,data字段包含account字典,包含:balance(float): 账户余额available(float): 可用资金frozen(float): 冻结资金margin(float): 占用保证金profit(float): 持仓盈亏commission(float): 手续费
使用案例
result = client.query_trading_account()
if result.get('success'):
account = result['data'].get('account', {})
print(f"账户余额: {account.get('balance', 0):.2f}")
print(f"可用资金: {account.get('available', 0):.2f}")
print(f"占用保证金: {account.get('margin', 0):.2f}")
query_order
函数定义
def query_order(
self,
symbol: str = "",
exchange: str = "",
order_sys_id: str = "",
use_token: bool = True
) -> Dict[str, Any]
功能说明
查询订单。可以查询所有订单或通过系统订单号查询指定订单。
参数说明
- symbol (str, optional): 合约代码,空字符串表示查询所有订单
- exchange (str, optional): 交易所代码(可选)
- order_sys_id (str, optional): 系统订单号(可选,CTP API仅支持通过系统订单号查询)
- use_token (bool, optional): 是否使用token认证(如果已登录),默认为
True
返回值
Dict[str, Any]: 响应数据字典,data字段包含orders列表
使用案例
# 查询所有订单
result = client.query_order()
if result.get('success'):
orders = result['data'].get('orders', [])
print(f"订单数量: {len(orders)}")
# 通过系统订单号查询
result = client.query_order(order_sys_id='123456')
query_trade
函数定义
def query_trade(
self,
symbol: str = "",
exchange: str = "",
trade_id: str = "",
use_token: bool = True
) -> Dict[str, Any]
功能说明
查询成交记录。
参数说明
- symbol (str, optional): 合约代码,空字符串表示查询所有成交
- exchange (str, optional): 交易所代码(可选)
- trade_id (str, optional): 成交编号(可选)
- use_token (bool, optional): 是否使用token认证(如果已登录),默认为
True
返回值
Dict[str, Any]: 响应数据字典,data字段包含trades列表
使用案例
# 查询所有成交
result = client.query_trade()
if result.get('success'):
trades = result['data'].get('trades', [])
print(f"成交数量: {len(trades)}")
# 查询指定合约的成交
result = client.query_trade(symbol='IF2603', exchange='CFFEX')
query_instrument
函数定义
def query_instrument(
self,
symbol: str = "",
exchange: str = "",
use_token: bool = True
) -> Dict[str, Any]
功能说明
查询合约信息。
参数说明
- symbol (str, optional): 合约代码,空字符串表示查询所有合约
- exchange (str, optional): 交易所代码(可选)
- use_token (bool, optional): 是否使用token认证(如果已登录),默认为
True
返回值
Dict[str, Any]: 响应数据字典,data字段包含instruments列表
使用案例
result = client.query_instrument(symbol='IF2603', exchange='CFFEX')
if result.get('success'):
instruments = result['data'].get('instruments', [])
for inst in instruments:
print(f"合约代码: {inst.get('symbol')}, 合约名称: {inst.get('instrument_name')}")
query_instrument_margin_rate
函数定义
def query_instrument_margin_rate(
self,
symbol: str = "",
exchange: str = "",
hedge_flag: str = "",
use_token: bool = True
) -> Dict[str, Any]
功能说明
查询合约保证金率。
参数说明
- symbol (str, optional): 合约代码,空字符串表示查询所有
- exchange (str, optional): 交易所代码(可选)
- hedge_flag (str, optional): 投机套保标志(可选)
- use_token (bool, optional): 是否使用token认证(如果已登录),默认为
True
返回值
Dict[str, Any]: 响应数据字典,data字段包含margin_rates列表
使用案例
result = client.query_instrument_margin_rate(symbol='IF2603', exchange='CFFEX')
if result.get('success'):
rates = result['data'].get('margin_rates', [])
for rate in rates:
print(f"合约: {rate.get('symbol')}, 保证金率: {rate.get('long_margin_ratio')}")
query_instrument_commission_rate
函数定义
def query_instrument_commission_rate(
self,
symbol: str = "",
exchange: str = "",
use_token: bool = True
) -> Dict[str, Any]
功能说明
查询合约手续费率。
参数说明
- symbol (str, optional): 合约代码,空字符串表示查询所有
- exchange (str, optional): 交易所代码(可选)
- use_token (bool, optional): 是否使用token认证(如果已登录),默认为
True
返回值
Dict[str, Any]: 响应数据字典,data字段包含commission_rates列表
使用案例
result = client.query_instrument_commission_rate(symbol='IF2603', exchange='CFFEX')
if result.get('success'):
rates = result['data'].get('commission_rates', [])
for rate in rates:
print(f"合约: {rate.get('symbol')}, 开仓手续费: {rate.get('open_ratio_by_money')}")
query_max_order_volume
函数定义
def query_max_order_volume(
self,
symbol: str,
exchange: str,
direction: str, # 'BUY' or 'SELL'
offset: str, # 'OPEN', 'CLOSE', 'CLOSE_TODAY', 'CLOSE_YESTERDAY'
hedge_flag: str = "",
use_token: bool = True
) -> Dict[str, Any]
功能说明
查询最大报单量。
参数说明
- symbol (str): 合约代码(必需)
- exchange (str): 交易所代码(必需)
- direction (str): 买卖方向,
'BUY'或'SELL' - offset (str): 开平标志,
'OPEN'、'CLOSE'、'CLOSE_TODAY'、'CLOSE_YESTERDAY' - hedge_flag (str, optional): 投机套保标志(可选)
- use_token (bool, optional): 是否使用token认证(如果已登录),默认为
True
返回值
Dict[str, Any]: 响应数据字典,data字段包含max_volume(最大报单量)
使用案例
result = client.query_max_order_volume(
symbol='IF2603',
exchange='CFFEX',
direction='BUY',
offset='OPEN'
)
if result.get('success'):
print(f"最大报单量: {result['data'].get('max_volume')}")
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file ctplite-0.1.10.tar.gz.
File metadata
- Download URL: ctplite-0.1.10.tar.gz
- Upload date:
- Size: 47.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.8.20
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
247cfd4690305490511b65547069cc11de6cdb40ee9e31e406516e4e6493b784
|
|
| MD5 |
62624f1d40e9df1f3949649a096bf08c
|
|
| BLAKE2b-256 |
01b4305eadbaea2401a3b284acf4e61a585ccffd1089b9c0f88766ac3e93dbb6
|