Skip to main content

API for betfair exchange stream

Project description

Asynclightweight Example

Goals:

  • Make betfairlightweight more modular, adding custom processors.
  • Async the network part.
  • Make CPU bound code run in own process,

DAG

The idea is to have several processes be able to output to one or more "child" processes

                         _ _ filwriter_processor
                       /
raw_data_processor ----
                       \ _ _ MarketProcessr -> TradingStrategy  

Usage example:

Full example main.py

import asyncio
import json
import logging
from betfair.StreamAPI import StreamAPI
from betfairlightweight.filters import streaming_market_filter, streaming_market_data_filter

logging.basicConfig(level=logging.DEBUG)


def p1(data):
    unique_id, part = data
    return part


def p2(data):
    return json.loads(data.decode('utf-8'))


class P3:

    def __call__(self, data):
        print("I'm also callable: ", data)


async def main():
    ...

    trading: APIClient = betfairlightweight.APIClient(username=USERNAME, password=PASSWORD, app_key=APP_KEY, locale="sweden")

    mf = streaming_market_filter(
        country_codes=[country_code],
        market_types=["MATCH_ODDS"]
    )

    mdf = streaming_market_data_filter(
        fields=['EX_BEST_OFFERS', 'EX_MARKET_DEF'],
        ladder_levels=3,
    )

    streaming_api = StreamAPI(trading, client_cert_path="/certs/client-2048.crt", client_key_path="/certs/client-2048.key")

    await streaming_api.set_pipeline([p1, p2, P3()])
    await streaming_api.subscribe_to_markets(market_data_filter=mdf, market_filter=mf)
    await streaming_api.wait_forever()


asyncio.run(main())

Case: Subscribe to other markets while running another subscription

    .... # Same as above

    country_codes = ['FR', 'DE', 'ES']

    while True:
        country_code = random.randrange(0, len(country_codes))
        country_code = country_codes[country_code]

        mf = streaming_market_filter(
            country_codes=[country_code],
            market_types=["MATCH_ODDS"]
        )

        mdf = streaming_market_data_filter(
            fields=['EX_BEST_OFFERS', 'EX_MARKET_DEF'],
            ladder_levels=3,
        )
        await streaming_api.subscribe_to_markets(market_data_filter=mdf, market_filter=mf)
        await asyncio.sleep(30)

Multiple stream

Supply a unique name for the stream you want to start.

TODO: Remove strem, replace stream does not change name, but replace is available just supply the same name

await streaming_api.subscribe_to_markets(market_data_filter=mdf, market_filter=mf, name="StreamName")

Processors

Processors is just any callable that have a single parameter data and returns the processed data in any form data -> data

example:

There are right now 3 processors in the repository asynclightweight.example.market_processors.

# The first processor will always take in raw data from the socket, 

def raw_data_processor(data):
    data = data.decode(encoding="utf-8").split('\r\n')[0]
    return json.loads(data)

# Market Processor, Should maybe have another name. This code is taken 
# directly from betfairlightweight. This will  create a cache which will hold new orders and output
# each update aggregated with respective Market Definition
# Note that we implement __call__ to be able to handle new incoming data.

class MarketProcessor:
    ...

    def __call__(self, data):
        op, ct = data['op'], data.get('ct', 'UPDATE')

        if op == "mcm" and data.get(self._lookup) and (ct == 'SUB_IMAGE' or ct == 'UPDATE'):
            return self.on_update(data)
        elif ct == 'HEARTBEAT':
            print("MY HEART IS BEATING!")

        return []

# Prints the marketbooks from MarketProcessor
def print_processor(data):
    for market_book in data:
        print(market_book['marketId'])


# To make all of these pipelines depend on each other we add 
# them to a list, [raw_data_processor,  MarketProcessor, print_processor]

Project details


Release history Release notifications

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for betfairasync, version 0.0.1
Filename, size File type Python version Upload date Hashes
Filename, size betfairasync-0.0.1-py3-none-any.whl (20.8 kB) File type Wheel Python version py3 Upload date Hashes View hashes
Filename, size betfairasync-0.0.1.tar.gz (18.5 kB) File type Source Python version None Upload date Hashes View hashes

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page