Skip to main content

SDK for interacting with the Fenix Pipeline API

Project description

Fenix Pipeline Python SDK

The Fenix Pipeline Python SDK facilitates connection to and interaction with the Fenix Pipeline API from any Python application.

Prerequisites and Notes

The following should be considered when using the SDK:

  • The Fenix Pipeline API requires an authentication token. Tokens are currently only available to select beta customers.
  • The Python SDK is written using asyncio for performance. There are not currently specific plans to add other async libraries (such as gevent) but this may be considered in future releases. If you need such support, contact Fenix to discuss.

Sample Usage

The following program demonstrates the basic use of the SDK:

import asyncio
import os

from fenix_pipeline import ConnectionClosed
from fenix_pipeline import RawDataSocket
from fenix_pipeline import SubscriptionTypes
from fenix_pipeline import Trade

async def simple_sample(event_loop):
    # read the API key from a local environment variable called `FENIX_API_KEY`
    socket = RawDataSocket(os.environ.get('FENIX_API_KEY'))
    # using a context manager
    async with await socket.connect(message_handler=print_messages) as subscriber:
        # subscribe to the `btc-usdt` stream
        await subscriber.subscribe(
            SubscriptionTypes.TRADES_BY_MARKET, 'btc-usdt')
        # just receive messages for the next 10 seconds
        await subscriber.monitor(10)
        # unsubscribe from the `btc-usdt` stream
        await subscriber.unsubscribe(
            SubscriptionTypes.TRADES_BY_MARKET, 'btc-usdt')
    # done

async def print_messages(item):
    if isinstance(item, Trade):'received: %r', item)
    else:'other message: %s', item)

event_loop = asyncio.get_event_loop()

Detailed Documentation

The SDK uses a context manager to manage connection state and an async coroutine to provide data to the user.

RawDataSocket Context Manager

The RawDataSocket class provides the context manager and manages all async activity. Create a RawDataSocket by passing an API key, and then call connect(message_handler) on the object to establish the connection:

socket = RawDataSocket(my_api_key)

# alternate, condensed version
with socket.connect(my_message_handler) as subscriber:
    # ... interact with server
    # now sleep this coroutine
    await subscriber.monitor(10)

Note: all following sections assume you are using the name subscriber as shown above.

Once you have a subscriber, you can check it's current connection state with the .connected property:

if not subscriber.connected:
    # the connection has been closed

You can let the subscriber receive events for a prescribed amount of time or indefinitely using the .monitor() method:

    # monitor for 10 seconds
    await subscriber.monitor(timeout=10)
    # monitor indefinitely
    await subscriber.monitor()
except ConnectionClosed:
    # the connection to the API was closed unexpectedly

The subscriber will continue to receive data after the call to .monitor() returns; this is simply a convenience method to sleep the context manager while data is being received.

Subscribing to Channels

Within the context manager, the subscriber object gives you methods for requesting the channels you receive.

Trade Data

You can get trade data in three basic subscription types. Each of these types is selectable from the SubscriptionTypes enum:

  • SubscriptionTypes.TRADES_BY_EXCHANGE: all trades on any market within a certain exchange
  • SubscriptionTypes.TRADES_BY_MARKET: all trades from any exchange associated with a specific market (such as BTC-USDT)
  • SubscriptionTypes.ALL_TRADES: every received trade regardless of market and regardless of exchange
from fenix_pipeline import SubscriptionType
subscription_type = SubscriptionTypes.ALL_TRADES
All Trades

To subscribe to all trades:

await subscriber.subscribe(SubscriptionTypes.ALL_TRADES)

Note: the ALL_TRADES subscription will contain a very large number of trades; ensure your code is performant and your network connection is sufficient to avoid losing data in the stream.

Trades by Market

To subscribe to all trades within a given market (in this example, BTC-USDT):

await subscriber.subscribe(SubscriptionTypes.TRADES_BY_MARKET, 'btc-usdt')
Trades by Exchange

To subscribe to all trades coming from a given exchange (in this example, Binance):

await subscriber.subscribe(SubscriptionTypes.TRADES_BY_EXCHANGE, 'binance')
Subscription Response

After sending a subscription request, a message will be sent back confirming the channel has been successfully subscribed:

await subscriber.subscribe(SubscriptionTypes.TRADES_BY_EXCHANGE, 'binance')

... will result in the following message being passed into the message_handler coroutine:

{'type': 'subscribed', 'message': 'trades/exchange/binance'}

You should then begin receiving trade data from that exchange being passed into the message_handler coroutine.

If you supply an invalid channel reference:

await subscriber.subscribe(SubscriptionTypes.TRADES_BY_MARKET, 'btc-%')

... will result in the following message being passed into the message_handler coroutine:

{'type': 'error', 'message': 'trades/market/btc-% invalid'}

Duplicate Data

Note that subscriptions may include overlapping data; for instance, if you subscribe to all trades on the BTC-USDT market and also all trades on the Binance exchange, trades that take place on Binance related to the BTC-USDT market belong to both subscriptions.

Data coming from the API may contain duplicate entities as documented in the API reference documentation. This behavior applies equally to all subscription types.

Unsubscribing from Channels

Unsubscribing from channels is exactly like subscribing but calling the unsubscribe() method of subscriber:

await subscriber.unsubscribe(SubscriptionTypes.TRADES_BY_EXCHANGE, 'binance')

... will result in the following message being passed into the message_handler coroutine:

{'type': 'unsubscribed', 'message': 'trades/exchange/binance'}

If you supply a reference to a channel you are not subscribed to:

await subscriber.unsubscribe(SubscriptionTypes.TRADES_BY_MARKET, 'btc-eudt')

... will result in the following message being passed into the message_handler coroutine:

{'type': 'error', 'message': 'trades/market/btc-eudt not subscribed'}

Data Model

Apart from subscribe, unsubscribe, and error messages described above, the messages passed into the message_handler coroutine will depend on the type of object being transferred. Currently only Trade obejcts are supported; other types will be added in the future.

Trade Data

Trade data will be received as an object of type fenix_pipeline.Trade:

>>> repr(trade)
Trade(id=binance:btc-usdt:189128621, timestamp=1570839641.248, exchange=binance, pair=btc-usdt, euid=189128621, price=8272.17, quantity=0.012112, direction=buy)
>>> trade.timestamp

Trades consist of the following fields:

  • exchange: the exchange where the trade was effected
  • market: the market the trade encompasses
  • euid: the unique ID as reported by the exchange for that trade
    • the combination of exchange, market, and euid is globally unique amongst all trades in the API
  • direction: buy or sell
  • price: a Python float representation of the price per unit of the trade
  • quantity: a Python float representation of the quantity traded
  • timestamp: a timestamp in seconds since the epoch with millisecond resolution

Trades also have a method _key() that gives a globally-unique key to this trade:

>>> trade._key()

Note: all floats will be represented by IEEE-754 double-precision values through the PI (with 53 bits of precision, or resolution to 16 significant digits); to ensure you do not lose precision, run only on 64-bit Python implementations.

Note: all timestamps can be converted to a datetime using datetime.datetime.utcfromtimestamp(t).astimezone(pytz.utc); the value above resolves to datetime.datetime(2019, 10, 12, 10, 20, 41, 248000, tzinfo=<UTC>)

Closing the Connection

Exiting the context manager automatically closes the connection and cleans up all resources.

Error Conditions

The context manager will exit and clean up any resources it is using in any of the following cases:

  • an unhandled exception in user code the context of the context manager
  • the socket is closed for any reason (by the server, the client, or the transport layer)

The socket will close and clean up resources it is using but the context manager will still remain open in the following case:

  • the message_handler coroutine raises an unhandled exception

It is the responsibility of the user to ensure any exceptions in thier message_handler coroutine are properly handled. This behavior will be simplified in a future version of the SDK.

Project details

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fenix-pipeline-sdk-0.9.3.tar.gz (7.4 kB view hashes)

Uploaded Source

Built Distribution

fenix_pipeline_sdk-0.9.3-py3-none-any.whl (7.3 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page