Skip to main content

An abstract data flow framework for quantitative trading

Project description

Test Coverage

python-compton

An abstract data-flow framework for quantitative trading, which decouples data initialization, data composition and data processing.

Install

pip install compton

Usage

from compton import (
  Orchestrator,
  Provider,
  Reducer,
  Consumer
)

Vector

We call a tuple of hashable parameters as a vector which is used to identify a certain kind of data.

from enum import Enum

class DataType(Enum):
    KLINE = 1
    ORDER_BOOK = 2

class TimeSpan(Enum):
    DAY = 1
    WEEK = 2

vector = (DataType.KLINE, TimeSpan.DAY)

Orchestrator(reducers, loop=None)

  • reducers List[Reducer] reducers to compose data
  • loop? Optional[EventLoop] The event loop object to use. In most cases, you should NOT pass this argument, unless you exact know what you are doing.
Orchestrator(
    reducers
).connect(
    provider
).subscribe(
    consumer
).add('TSLA')

orchestrator.connect(provider: Provider) -> self

Connects to a data provider

orchestrator.subscribe(consumer: Consumer) -> self

Subscribes the consumer to orchestrator.

orchestrator.add(symbol: str) -> self

Adds a new symbol to orchestrator, and start the data flow for symbol

Provider

Provider is an abstract class which provides initial data and data updates.

A provider should be implemented to support many symbols

We must inherit class Provider and implement some abstract method before use.

  • @property vector returns an Vector
  • async def init() method returns the initial data
  • There is an protected method self.dispatch(symbol, payload) to set the payload updated, which should only be called in a coroutine, or a RuntimeError is raised.
class MyProvider(Provider):
    @property
    def vector(self) -> Vector:
        return (DataType.KLINE, TimeSpan.DAY)

    # @abstractmethod
    async def init(self, symbol):
        return {}

    # Optional
    def when_update(self, dispatch: Dispatcher):
        # Then save the `dispatch` method for further data updating
        # by default, it saves as `self.dispatch`

Reducer

Another abstract class which handles data composition.

The reducer.vector could be a generic vector which applies partial match to other vectors

class MyReducer(Reducer):
    @property
    def vector(self):
        # So, MyReducer support both
        # - (DataType.KLINE, TimeSpan.DAY)
        # - and (DataType.KLINE, TimeSpan.WEEK)
        return (DataType.KLINE,)

    def merge(self, previous, payload):
        # `previous` might be `None`, if `payload` is the initial data
        if previous is None:
            # We could clean the initial data
            return clean(payload)

        return {**previous, **payload}

Consumer

A consumer could subscribes to more than one kind of data types

class MyConsumer(Consumer):
    @property
    def vectors(self):
        # Subscribe to two kinds of data types
        return [
            (DataType.KLINE, TimeSpan.DAY),
            (DataType.KLINE, TimeSpan.WEEK)
        ]

    @property
    def all(self) -> bool:
        """
        `True` indicates that the consumer will only go processing
        if both of the data corresponds with the two vectors have changes

        And by default, `Consumer::all` is False
        """
        return True

    @property
    def concurrency(self) -> int:
        """
        Concurrency limit for method `process()`

        By default, `Consumer::concurrency` is `0` which means no limit
        """
        return 1

    def should_process(self, *payloads) -> bool:
        """
        If this method returns `False`, then the data update will not be processed
        """
        return True

    # Then there will be
    # both `kline_day` and `kline_week` passed into method `process`
    async def process(self, symbol, kline_day, kline_week):
        await doSomething(symbol, kline_day, kline_week)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

compton-5.0.1.tar.gz (13.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

compton-5.0.1-py3-none-any.whl (11.7 kB view details)

Uploaded Python 3

File details

Details for the file compton-5.0.1.tar.gz.

File metadata

  • Download URL: compton-5.0.1.tar.gz
  • Upload date:
  • Size: 13.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for compton-5.0.1.tar.gz
Algorithm Hash digest
SHA256 68d430d31ae86a3a20829382e1149f004d2ce1e73a94e4a4cb06c56688c91ddc
MD5 3f85f8140b15f6ab6c4be805dcb830ca
BLAKE2b-256 2956afceeeb95b01d0a15ddffd5a13612fddd44c3be4ae7ad14fafb841602fa8

See more details on using hashes here.

File details

Details for the file compton-5.0.1-py3-none-any.whl.

File metadata

  • Download URL: compton-5.0.1-py3-none-any.whl
  • Upload date:
  • Size: 11.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for compton-5.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 d33ebf367cbbc29ac692bf203e93a15099473b53f1f0fbda6884bb46c235b648
MD5 1b72eb7e816621d5d284b5fddb2f98ef
BLAKE2b-256 5ae3c601087c740e7e2d1b846311f7ffbf7f47292d95a80d73866a2bbb409a7b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page