An abstract data flow framework for quantitative trading
Project description
python-compton
An abstract data-flow framework for quantitative trading, which decouples data initialization, data composition and data processing.
Install
pip install compton
Usage
from compton import (
Orchestrator,
Provider,
Reducer,
Consumer
)
Vector
We call a tuple of hashable parameters as a vector which is used to identify a certain kind of data.
from enum import Enum
class DataType(Enum):
KLINE = 1
ORDER_BOOK = 2
class TimeSpan(Enum):
DAY = 1
WEEK = 2
vector = (DataType.KLINE, TimeSpan.DAY)
Orchestrator(reducers, loop=None)
- reducers
List[Reducer]
reducers to compose data - loop?
Optional[EventLoop]
The event loop object to use. In most cases, you should NOT pass this argument, unless you exact know what you are doing.
Orchestrator(
reducers
).connect(
provider
).subscribe(
consumer
).add('TSLA')
orchestrator.connect(provider: Provider) -> self
Connects to a data provider
orchestrator.subscribe(consumer: Consumer) -> self
Subscribes the consumer to orchestrator.
orchestrator.add(symbol: str) -> self
Adds a new symbol to orchestrator, and start the data flow for symbol
Provider
Provider
is an abstract class which provides initial data and data updates.
A provider should be implemented to support many symbols
We must inherit class Provider
and implement some abstract method before use.
@property vector
returns anVector
async def init()
method returns the initial data- There is an protected method
self.dispatch(symbol, payload)
to set the payload updated, which should only be called in a coroutine, or aRuntimeError
is raised.
class MyProvider(Provider):
@property
def vector(self):
return (DataType.KLINE, TimeSpan.DAY)
async def init(self, symbol):
return {}
Reducer
Another abstract class which handles data composition.
The reducer.vector
could be a generic vector which applies partial match to other vectors
class MyReducer(Reducer):
@property
def vector(self):
# So, MyReducer support both
# - (DataType.KLINE, TimeSpan.DAY)
# - and (DataType.KLINE, TimeSpan.WEEK)
return (DataType.KLINE,)
def merge(self, old, new):
# `old` might be `None`, if `new` is the initial data
if old is None:
# We could clean the initial data
return clean(new)
return {**old, **new}
Consumer
A consumer could subscribes to more than one kind of data types
class MyConsumer(Consumer):
@property
def vectors(self):
# Subscribe to two kinds of data types
return [
(DataType.KLINE, TimeSpan.DAY),
(DataType.KLINE, TimeSpan.WEEK)
]
@property
def all(self) -> bool:
"""
`True` indicates that the consumer will only go processing
if both of the data corresponds with the two vectors have changes
And by default, `Consumer::all` is False
"""
return True
@property
def concurrency(self) -> int:
"""
Concurrency limit for method `process()`
By default, `Consumer::concurrency` is `0` which means no limit
"""
return 1
def should_process(self, *payloads) -> bool:
"""
If this method returns `False`, then the data update will not be processed
"""
return True
# Then there will be
# both `kline_day` and `kline_week` passed into method `process`
async def process(self, symbol, kline_day, kline_week):
await doSomething(symbol, kline_day, kline_week)
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.