Skip to main content

An abstract data flow framework for quantitative trading

Project description

Build Status Coverage


An abstract data-flow framework for quantitative trading, which decouples data initialization, data composition and data processing.


pip install compton


from compton import (


We call a tuple of hashable parameters as a vector which is used to identify a certain kind of data.

from enum import Enum

class DataType(Enum):
    KLINE = 1
    ORDER_BOOK = 2

class TimeSpan(Enum):
    DAY = 1
    WEEK = 2

vector = (DataType.KLINE, TimeSpan.DAY)

Orchestrator(reducers, loop=None)

  • reducers List[Reducer] reducers to compose data
  • loop? Optional[EventLoop] The event loop object to use. In most cases, you should NOT pass this argument, unless you exact know what you are doing.

orchestrator.connect(provider: Provider) -> self

Connects to a data provider

orchestrator.subscribe(consumer: Consumer) -> self

Subscribes the consumer to orchestrator.

orchestrator.add(symbol: str) -> self

Adds a new symbol to orchestrator, and start the data flow for symbol


Provider is an abstract class which provides initial data and data updates.

A provider should be implemented to support many symbols

We must inherit class Provider and implement some abstract method before use.

  • @property vector returns an Vector
  • async def init() method returns the initial data
  • There is an protected method self.dispatch(symbol, payload) to set the payload updated, which should only be called in a coroutine, or a RuntimeError is raised.
class MyProvider(Provider):
    def vector(self):
        return (DataType.KLINE, TimeSpan.DAY)

    async def init(self, symbol):
        return {}


Another abstract class which handles data composition.

The reducer.vector could be a generic vector which applies partial match to other vectors

class MyReducer(Reducer):
    def vector(self):
        # So, MyReducer support both
        # - (DataType.KLINE, TimeSpan.DAY)
        # - and (DataType.KLINE, TimeSpan.WEEK)
        return (DataType.KLINE,)

    def merge(self, old, new):
        # `old` might be `None`, if `new` is the initial data
        if old is None:
            # We could clean the initial data
            return clean(new)

        return {**old, **new}


A consumer could subscribes to more than one kind of data types

class MyConsumer(Consumer):
    def vectors(self):
        # Subscribe to two kinds of data types
        return [
            (DataType.KLINE, TimeSpan.DAY),
            (DataType.KLINE, TimeSpan.WEEK)

    def all(self) -> bool:
        `True` indicates that the consumer will only go processing
        if both of the data corresponds with the two vectors have changes

        And by default, `Consumer::all` is False
        return True

    def concurrency(self) -> int:
        Concurrency limit for method `process()`

        By default, `Consumer::concurrency` is `0` which means no limit
        return 1

    def should_process(self, *payloads) -> bool:
        If this method returns `False`, then the data update will not be processed
        return True

    # Then there will be
    # both `kline_day` and `kline_week` passed into method `process`
    async def process(self, symbol, kline_day, kline_week):
        await doSomething(symbol, kline_day, kline_week)



Project details

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for compton, version 4.0.6
Filename, size File type Python version Upload date Hashes
Filename, size compton-4.0.6-py3-none-any.whl (10.5 kB) File type Wheel Python version py3 Upload date Hashes View
Filename, size compton-4.0.6.tar.gz (11.6 kB) File type Source Python version None Upload date Hashes View

Supported by

AWS AWS Cloud computing Datadog Datadog Monitoring DigiCert DigiCert EV certificate Facebook / Instagram Facebook / Instagram PSF Sponsor Fastly Fastly CDN Google Google Object Storage and Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Salesforce Salesforce PSF Sponsor Sentry Sentry Error logging StatusPage StatusPage Status page