Skip to main content

CTP client v6.3.15

Project description

CTP期货 API Python Wrapper

Build Status Build status Codacy Badge pypi status pyversion implementation Downloads

CTP Website

Version: v6.3.15

Platform: Linux 64bit, Windows 64bit

Python Requirement: x86-64

Especially Support PyPy3-v5.10.1-3.5.3 Linux 64bit

Inspire By lovelylain

Install

Before you install ctpwrapper package, you need to make sure you have already install cython package.

>>>pip install cython --upgrade
>>>pip install ctpwrapper --upgrade

Donate [捐助]

Usage

MdApi Class Wrapper

import sys

from ctpwrapper import ApiStructure
from ctpwrapper import MdApiPy


class Md(MdApiPy):
    """
    """
    
    def __init__(self, broker_id, investor_id, password, request_id=1):

        self.broker_id = broker_id
        self.investor_id = investor_id
        self.password = password
        self.request_id = request_id

    def OnRspError(self, pRspInfo, nRequestID, bIsLast):

        self.ErrorRspInfo(pRspInfo, nRequestID)

    def ErrorRspInfo(self, info, request_id):
        """
        :param info:
        :return:
        """
        if info.ErrorID != 0:
            print('request_id=%s ErrorID=%d, ErrorMsg=%s',
                  request_id, info.ErrorID, info.ErrorMsg.decode('gbk'))
        return info.ErrorID != 0

    def OnFrontConnected(self):
        """
        :return:
        """

        user_login = ApiStructure.ReqUserLoginField(BrokerID=self.broker_id,
                                                    UserID=self.investor_id,
                                                    Password=self.password)
        self.ReqUserLogin(user_login, self.request_id)

    def OnFrontDisconnected(self, nReason):

        print("Md OnFrontDisconnected %s", nReason)
        sys.exit()

    def OnHeartBeatWarning(self, nTimeLapse):
        """心跳超时警告。当长时间未收到报文时,该方法被调用。
        @param nTimeLapse 距离上次接收报文的时间
        """
        print('Md OnHeartBeatWarning, time = %s', nTimeLapse)

    def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast):
        """
        用户登录应答
        :param pRspUserLogin:
        :param pRspInfo:
        :param nRequestID:
        :param bIsLast:
        :return:
        """
        if pRspInfo.ErrorID != 0:
            print("Md OnRspUserLogin failed error_id=%s msg:%s",
                  pRspInfo.ErrorID, pRspInfo.ErrorMsg.decode('gbk'))
        else:
            print("Md user login successfully")
            print(pRspUserLogin)
            print(pRspInfo)


BORDKER_ID = ""
INVESTOR_ID = ""
PASSWORD = ""
SERVER = ""

md = Md(BORDKER_ID, INVESTOR_ID, PASSWORD)
md.Create()
md.RegisterFront(SERVER)
md.Init()
day = md.GetTradingDay()

print(day)
print("api worker!")

Trader Class Wrapper

from ctpwrapper import ApiStructure
from ctpwrapper import TraderApiPy

class Trader(TraderApiPy):

    def __init__(self, broker_id, investor_id, password, request_id=1):
        self.request_id = request_id
        self.broker_id = broker_id.encode()
        self.investor_id = investor_id.encode()
        self.password = password.encode()

    def OnRspError(self, pRspInfo, nRequestID, bIsLast):

        self.ErrorRspInfo(pRspInfo, nRequestID)

    def ErrorRspInfo(self, info, request_id):
        """
        :param info:
        :return:
        """
        if info.ErrorID != 0:
            print('request_id=%s ErrorID=%d, ErrorMsg=%s',
                  request_id, info.ErrorID, info.ErrorMsg.decode('gbk'))
        return info.ErrorID != 0

    def OnHeartBeatWarning(self, nTimeLapse):
        """心跳超时警告。当长时间未收到报文时,该方法被调用。
        @param nTimeLapse 距离上次接收报文的时间
        """
        print("on OnHeartBeatWarning time: ", nTimeLapse)

    def OnFrontDisconnected(self, nReason):
        print("on FrontDisConnected disconnected", nReason)

    def OnFrontConnected(self):

        req = ApiStructure.ReqUserLoginField(BrokerID=self.broker_id,
                                             UserID=self.investor_id,
                                             Password=self.password)
        self.ReqUserLogin(req, self.request_id)
        print("trader on front connection")

    def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast):

        if pRspInfo.ErrorID != 0:
            print("Md OnRspUserLogin failed error_id=%s msg:%s",
                  pRspInfo.ErrorID, pRspInfo.ErrorMsg.decode('gbk'))
        else:
            print("Md user login successfully")

            inv = ApiStructure.QryInvestorField(BrokerID=self.broker_id, InvestorID=self.investor_id)

            self.ReqQryInvestor(inv, self.inc_request_id())
            req = ApiStructure.SettlementInfoConfirmField.from_dict({"BrokerID": self.broker_id,
                                                                     "InvestorID": self.investor_id})

            self.ReqSettlementInfoConfirm(req, self.inc_request_id())

    def OnRspSettlementInfoConfirm(self, pSettlementInfoConfirm, pRspInfo, nRequestID, bIsLast):
        print(pSettlementInfoConfirm, pRspInfo)
        print(pRspInfo.ErrorMsg.decode("GBK"))

    def inc_request_id(self):
        self.request_id += 1
        return self.request_id

    def OnRspQryInvestor(self, pInvestor, pRspInfo, nRequestID, bIsLast):
        print(pInvestor, pRspInfo)

if __name__ == "__main__":
    investor_id = ""
    broker_id = ""
    password = ""
    server = ""

    user_trader = Trader(broker_id=broker_id, investor_id=investor_id, password=password)

    user_trader.Create()
    user_trader.RegisterFront(server)
    user_trader.SubscribePrivateTopic(2) # 只传送登录后的流内容
    user_trader.SubscribePrivateTopic(2) # 只传送登录后的流内容

    user_trader.Init()

    print("trader started")
    print(user_trader.GetTradingDay())

    user_trader.Join()

Documentation

CTP's documentation can be found in the docs

Contact

If you have any question about the ctpwrapper API, contact 365504029@qq.com

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for ctpwrapper, version 6.3.15.3
Filename, size & hash File type Python version Upload date
ctpwrapper-6.3.15.3.tar.gz (12.7 MB) View hashes Source None

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN SignalFx SignalFx Supporter DigiCert DigiCert EV certificate StatusPage StatusPage Status page