An abstract data flow framework for quantitative trading
Project description
python-compton
An abstract data-flow framework for quantitative trading, which decouples data initialization, data composition and data processing.
Install
pip install compton
Usage
from compton import (
Orchestrator,
Provider,
Reducer,
Consumer
)
Vector
We call a tuple of hashable parameters as a vector which is used to identify a certain kind of data.
from enum import Enum
class DataType(Enum):
KLINE = 1
ORDER_BOOK = 2
class TimeSpan(Enum):
DAY = 1
WEEK = 2
vector = (DataType.KLINE, TimeSpan.DAY)
Orchestrator(reducers)
- reducers
List[Reducer]
reducers to compose data
orchestrator.connect(provider: Provider) -> self
Connects to a data provider
orchestrator.subscribe(consumer: Consumer) -> self
Subscribes the consumer to orchestrator.
orchestrator.add(symbol: str) -> self
Adds a new symbol to orchestrator, and start the data flow for symbol
Provider
Provider
is an abstract class which provides initial data and data updates.
We must inherit class Provider
and implement some abstract method before use.
@property vector
returns anVector
async def init()
method returns the initial datadef when_update(dispatch)
registers the dispatcher which is a callable.
class MyProvider(Provider):
@property
def vector(self):
return (DataType.KLINE, TimeSpan.DAY)
async def init(self, symbol):
return {}
def when_update(self, dispatch):
pass
Reducer
Another abstract class which handles data composition.
The reducer.vector
could be a generic vector which applies partial match to other vectors
class MyReducer(Reducer):
@property
def vector(self):
# So, MyReducer support both
# - (DataType.KLINE, TimeSpan.DAY)
# - and (DataType.KLINE, TimeSpan.WEEK)
return (DataType.KLINE,)
def merge(self, old, new):
return {**old, **new}
Consumer
A consumer could subscribes to more than one kind of data types
class MyConsumer(Consumer):
@property
def vectors(self):
# Subscribe to two kinds of data types
return [
(DataType.KLINE, TimeSpan.DAY),
(DataType.KLINE, TimeSpan.WEEK)
]
# Then there will be
# both `kline_day` and `kline_week` passed into method `process`
async def process(self, symbol, kline_day, kline_week):
await doSomething(symbol, kline_day, kline_week)
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.