An abstract data flow framework for quantitative trading
Project description
python-compton
An abstract data-flow framework for quantitative trading, which decouples data initialization, data composition and data processing.
Install
pip install compton
Usage
from compton import (
Orchestrator,
Provider,
Reducer,
Consumer
)
Vector
We call a tuple of hashable parameters as a vector which is used to identify a certain kind of data.
from enum import Enum
class DataType(Enum):
KLINE = 1
ORDER_BOOK = 2
class TimeSpan(Enum):
DAY = 1
WEEK = 2
vector = (DataType.KLINE, TimeSpan.DAY)
Orchestrator(reducers, loop=None)
- reducers
List[Reducer]
reducers to compose data - loop
EventLoop
The event loop object to use
The following code shows how to use compton in a non-coroutine environmennt
loop = asyncio.new_event_loop()
orchestrator = Orchestrator(
Reducers,
loop
)
orchestrator.add('US.TSLA')
loop.run_forever()
orchestrator.connect(provider: Provider) -> self
Connects to a data provider
orchestrator.subscribe(consumer: Consumer) -> self
Subscribes the consumer to orchestrator.
orchestrator.add(symbol: str) -> self
Adds a new symbol to orchestrator, and start the data flow for symbol
Provider
Provider
is an abstract class which provides initial data and data updates.
A provider should be implemented to support many symbols
We must inherit class Provider
and implement some abstract method before use.
@property vector
returns anVector
async def init()
method returns the initial datadef when_update(dispatch)
registers the dispatcher which is a callable.
class MyProvider(Provider):
@property
def vector(self):
return (DataType.KLINE, TimeSpan.DAY)
def when_update(self, dispatch):
pass
async def init(self, symbol):
return {}
Reducer
Another abstract class which handles data composition.
The reducer.vector
could be a generic vector which applies partial match to other vectors
class MyReducer(Reducer):
@property
def vector(self):
# So, MyReducer support both
# - (DataType.KLINE, TimeSpan.DAY)
# - and (DataType.KLINE, TimeSpan.WEEK)
return (DataType.KLINE,)
def merge(self, old, new):
# `old` might be `None`, if `new` is the initial data
if old is None:
# We could clean the initial data
return clean(new)
return {**old, **new}
Consumer
A consumer could subscribes to more than one kind of data types
class MyConsumer(Consumer):
@property
def vectors(self):
# Subscribe to two kinds of data types
return [
(DataType.KLINE, TimeSpan.DAY),
(DataType.KLINE, TimeSpan.WEEK)
]
@property
def all(self):
# `True` indicates that the consumer will only go processing
# if both of the data corresponds with the two vectors have changes
# And by default, `Consumer::all` is False
return True
@property
def concurrency(self):
# concurrency limit for method `process()`
# By default, `Consumer::concurrency` is `0` which means no limit
return 1
# Then there will be
# both `kline_day` and `kline_week` passed into method `process`
async def process(self, symbol, kline_day, kline_week):
await doSomething(symbol, kline_day, kline_week)
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.