Skip to main content

Python CTP Risk api

Project description

py_ctp

ctp risk api for python。支持windows(x86/x64) linux(x64).

示例

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
__title__ = ''
__author__ = 'HaiFeng'
__mtime__ = '201/9/13'
"""
import os
import platform
import signal
import sys
import threading
import time

from ctp_risk.ctp_struct import CShfeFtdcRspRiskUserLoginField, CShfeFtdcRspInfoField, CShfeFtdcSequencialTradeField, CShfeFtdcInstrumentField, CShfeFtdcExchangeField
from ctp_risk.ctp_trade import Trade


class Test:

    def __init__(self, addr, out_path, interval_seconds, user, pwd, broker='6000'):
        dllpath = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'ctp_risk', f"lib{'64' if sys.maxsize > 2**32 else '32'}")
        # dllpath = os.path.dirname(os.path.abspath(__file__))  # 所有dll放在程序目录下
        # self.q = Quote(os.path.join(dllpath, 'ctp_quote.' + ('dll' if 'Windows' in platform.system() else 'so')))
        self.t = Trade(os.path.join(dllpath, 'riskapi.' + ('dll' if 'Windows' in platform.system() else 'so')))
        self.req = 0
        self.logined = False
        self.instruments: dict = {}
        self.exc_total: dict = {}
        '''保存交易所统计数据'''
        self.frontAddr = addr
        self.broker = broker
        self.investor = user
        self.pwd = pwd
        self.interval_seconds = int(interval_seconds)
        self.out_path = out_path
        signal.signal(signal.SIGINT, self.single_exit)  # ctrl c
        # signal.signal(signal.SIGTSTP, self.single_exit)  # ctrl z

    def single_exit(self, signal, frame):
        self.t.Release()
        time.sleep(1)
        sys.exit(0)

    def OnConnected(self):
        print('connected')
        threading.Thread(target=self.t.ReqRiskUserLogin, args=(self.broker, self.investor, self.pwd)).start()

    def OnDisconnected(self, reason):
        print(reason)
        self.logined = False

    def OnRspUserLogin(self, pRspRiskUserLogin: CShfeFtdcRspRiskUserLoginField, pRspInfo: CShfeFtdcRspInfoField, nRequestID: int, bIsLast: bool):
        print(f'== login == {pRspInfo.__dict__}')

        if pRspInfo.getErrorID() == 0:
            self.logined = True
            t = threading.Thread(target=self.show)
            t.setDaemon(True)
            t.start()
            self.t.ReqSubscribeTrade()

    def OnInstrument(self, inst: CShfeFtdcInstrumentField):
        self.instruments[inst.getInstrumentID()] = inst

    def OnExchange(self, exc: CShfeFtdcExchangeField):
        self.exc_total[exc.getExchangeID()] = {'volume': 0, 'turnover': 0}

    def OnTrade(self, pSequencialTrade: CShfeFtdcSequencialTradeField):
        # print(f'== trade == {pSequencialTrade.__dict__}')
        inst: CShfeFtdcInstrumentField = self.instruments[pSequencialTrade.getInstrumentID()]
        self.exc_total[pSequencialTrade.getExchangeID()]['volume'] += pSequencialTrade.getVolume()
        self.exc_total[pSequencialTrade.getExchangeID()]['turnover'] += pSequencialTrade.getPrice() * pSequencialTrade.getVolume() * inst.getVolumeMultiple()

    def show(self):
        while self.logined:
            # print(self.exc_total)
            with open(os.path.join(self.out_path, 'total.txt'), 'w', encoding='utf-8') as f:
                line = ''
                for k, v in self.exc_total.items():
                    line += f"{k},{v['volume']},{v['turnover']}\n"
                f.write(line)
            time.sleep(self.interval_seconds)

    def Run(self):
        # CreateApi时会用到log目录,需要在程序目录下创建**而非dll下**
        self.t.CreateApi()
        spi = self.t.CreateSpi()
        self.t.RegisterSpi(spi)

        self.t.OnFrontConnected = self.OnConnected
        # self.t.OnFrontDisconnected = self.OnDisconnected
        # self.t.OnRspRiskUserLogin = self.OnRspUserLogin
        #
        # self.t.OnRtnSequencialTrade = self.OnTrade
        # '''成交信息'''
        #
        # # 以下为自动推送的响应
        # self.t.OnRtnInstrument = self.OnInstrument
        # self.t.OnRtnExchange = self.OnExchange
        # self.t.OnRtnProduct = lambda x: None
        # '''交易所'''
        # self.t.OnRtnTradingCode = lambda x: None
        # '''交易编码回报'''
        # self.t.OnRtnRiskSyncAccount = lambda x: None
        # '''帐户信息'''
        # self.t.OnRtnTimeSync = lambda x: None
        # '''服务器时间'''
        # self.t.OnRtnRiskDepthMarketData = lambda x: None
        # '''实时行情'''
        # self.t.OnRtnDepartment = lambda x: None
        # '''部门'''
        # self.t.OnRtnInvestorSumInfo = lambda x: None
        # '''投资者总数'''
        # self.t.OnRtnNoticeToken = lambda x: None
        # '''模板替换字段回报'''
        # self.t.OnRtnNoticePattern = lambda x: None
        # '''通知模板回报'''
        # self.t.OnRtnRiskParamInfo = lambda x: None
        # '''风控参数信息回报'''
        # self.t.OnRtnRiskInvestorParam = lambda x: None
        # '''投资者参数回报'''
        # self.t.OnRtnInvestorPattern = lambda x: None
        # '''投资者通知模板回报'''
        # self.t.OnRtnRiskNotifyToken = lambda x: None
        # '''风险通知模板自动替换字段回报'''
        # self.t.OnRtnClientSGDataSyncStart = lambda x: None
        # '''客户端结算组数据同步开始'''
        # self.t.OnRtnClientSGDataSyncEnd = lambda x: None
        # '''客户端结算组数据同步结束'''
        # self.t.OnRtnRiskPattern = lambda x: None
        # '''风控通知模板回报'''
        # self.t.OnRtnTradeParam = lambda x: None
        # '''交易系统参数'''
        # self.t.OnRtnProductExchangeRate = lambda x: None
        # '''产品报价汇率信息'''
        # self.t.OnRtnRiskUserFunction = lambda x: None  # print(x)
        # '''风控用户权限回报'''

        self.t.RegCB()

        self.t.RegisterFront(self.frontAddr)
        self.t.Init()
        # self.t.Join()
        time.sleep(5)


if __name__ == '__main__':
    # path = os.path.dirname(os.path.abspath(__file__))
    # shutil.copy(path + '/dll/ctp_trade.dll', path + '/py_ctp/lib/ctp_trade.dll')

    a = sys.argv[1] if len(sys.argv) > 1 else 'tcp://222.68.181.130:50001'
    o = sys.argv[2] if len(sys.argv) > 2 else './'
    s = sys.argv[3] if len(sys.argv) > 3 else 10
    user = 'your investorid'
    pwd = 'your password'
    t = Test(addr=a, out_path=o, interval_seconds=s, user=user, pwd=pwd)
    t.Run()
    print('presess ctrl+c to exit')
    while True:
        pass

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ctp_risk-0.0.8.1.tar.gz (934.5 kB view details)

Uploaded Source

Built Distribution

ctp_risk-0.0.8.1-py3-none-any.whl (936.0 kB view details)

Uploaded Python 3

File details

Details for the file ctp_risk-0.0.8.1.tar.gz.

File metadata

  • Download URL: ctp_risk-0.0.8.1.tar.gz
  • Upload date:
  • Size: 934.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/40.2.0 requests-toolbelt/0.9.1 tqdm/4.36.1 CPython/3.7.4

File hashes

Hashes for ctp_risk-0.0.8.1.tar.gz
Algorithm Hash digest
SHA256 10ccb91a93f0be371f6d83a18642b8959f1d07a7ed1803d408bd9f0728c12d38
MD5 7492941d441419d002f7deb4380338b6
BLAKE2b-256 b0b24ccb34aa5d7faf7cb08ca1f91d196b5e100ace4044b47eb227e5ae7ae60b

See more details on using hashes here.

File details

Details for the file ctp_risk-0.0.8.1-py3-none-any.whl.

File metadata

  • Download URL: ctp_risk-0.0.8.1-py3-none-any.whl
  • Upload date:
  • Size: 936.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/40.2.0 requests-toolbelt/0.9.1 tqdm/4.36.1 CPython/3.7.4

File hashes

Hashes for ctp_risk-0.0.8.1-py3-none-any.whl
Algorithm Hash digest
SHA256 b141e582b601387b3f566272a169d3ae298aa5a3cf9723be7d328e7223da39bd
MD5 98f7fbedda9b5d508675530c96386244
BLAKE2b-256 f3164c15e8fd2cd831887fb8988a4074fa0ff19b734a6bd09f5f08be71d113f2

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page