The framework for algorithmic decision-makers.
Project description
The data science framework for decision-making research
Make the best decisions, consistently. Research decision-making processes with your data and deploy them with confidence.
🐍 Declare your processes in Python
Define and combine asynchronous schedules and events, and parameterize your processes with Pydantic fields.
See example
import random
from cubyc import Process, schedules
class MyProcess(Process):
counter: int = 0
def setup(self):
(self.bewteen(hours=(8, 12)) & self.every(minutes=5)).do(self.morning_function)
self.when(lambda: not self.counter % 10).do(self.random_function)
@schedules.every(hours=1)
def hourly_function(self):
print("I run every hour")
self.counter += random.randint(1, 100)
def morning_function(self):
print("I run every 5 minutes between 8AM and noon")
self.counter += 1
def random_function(self):
print("I run when the counter is a multiple of 10")
⏳ Backtest locally, deploy to production
Backtest your decision-making processes on historical data and deploy them to production in a single line of code.
See example
from datetime import timedelta
process = MyProcess(counter=10)
# backtests the 2023 year
process.run(start="2023-1-1", end="2023-12-31")
# backtests from 2023 to today, and runs live for 10 days
process.run(start="2023-1-1", end=timedelta(days=10))
# runs live for 100 days
process.run(end=timedelta(days=100))
🚀 Integrate any data source
Access your data and utilize it across processes with modular data feeds.
See example
from datetime import timedelta
from cubyc import Process, DataFeed
class MyDataFeed(DataFeed):
def name(self):
return 'my_data_feed'
def load_from_source(self, start_datetime, end_datetime):
# query and return data from one of your data sources
...
class MyProcess(Process):
datafeed: DataFeed = MyDataFeed()
def setup(self):
self.datafeed.last(values=10) # returns the last 10 values
self.datafeed.last(timedelta(days=5)) # returns the last 5 days of data
self.datafeed.current() # returns the current value
️🌊 Monitor and visualize your processes
Track experiments, visualize logs, and evaluate your decision-making processes with Cubyc’s Push platform.
Install Cubyc
Simple installation from PyPI
pip install cubyc
Other installation options
From source
Clone the repository, navigate to the root directory, and run
python setup.py install
FAQ
How is Cubyc different from workflow orchestration tools?
Workflow orchestration automates repetitive tasks or processes, specifically handling the movement of information or data between systems. Conversely, Cubyc serves as a decision support system (DSS) to automate decision-making and optimize or improve a specific outcome.
Algorithmic DSS | Workflow Orchestrators | |
---|---|---|
Purpose | Automate decisions and improve or optimize outcomes | Automate repetitive tasks or processes |
Use Cases | Algorithmic trading, recommendation systems, Ad campaigns, pricing engines | ETL, order fulfillment, customer onboarding, DevOps Automation |
Libraries | Cubyc | Airflow, Luigi, Prefect, Dagster |
Frequency | High-frequency, real-time decision-making | Low-frequency, batch processing |
How do backtests improve decision-making?
Evaluation metrics such as accuracy, precision, and recall offer insights into model performance but lack actionable information on decision-making effectiveness. For example, a model with 90% test accuracy may not necessarily improve revenue. To assess impact, actionable metrics like ROI, revenue, and conversions rate are crucial. Backtests offer actionable insights into the potential impacts of different decisions through simulations of past events. For instance, they can help understand: - A portfolio's sharpe ratio over the past 10 years with monthly vs quarterly rebalancing - An ad campaign's annual ROI with different budget allocations - A marketplace's 2020 revenue with alternative pricing processes - A service's historical fraud rate with adjustments to the model's threshold
Quickstart
Cubyc is divided into the core
and data
modules, and the research
platform.
🔬 Core
The core
module contains the classes and functions that power Cubyc's Process with scheduling, backtesting,
and logging.
Learn more
Process
The cubyc.core.Process
class is Cubyc's main class, providing a interface to define your schedules and events.
from cubyc.core import Process, schedules
class MyProcess(Process):
hyperparameter: int
def __init__(self, variable: int, *args, **kwargs):
super().__init__(*args, **kwargs)
self.variable = variable
def setup(self):
self.variable = self.hyperparameter
@schedules.every(hours=1)
def update(self):
self.variable += 1
self.log(f'Variable: {self.variable}')
Process = MyProcess(variable=0, hyperparameter=7)
Setup
The setup()
function gets called before your Process is run live or backtested.
Be sure to define your Process's initial state here!
from cubyc import Process
class MyProcess(Process):
def setup(self):
# define your initial state here
self.state_boolean = False
self.state_counter = 0
Versioning and hyperparameters
In addition to passing arguments to your Process's __init__
constructor, you can also
define attribute hyperparameters with Pydantic's field types.
from cubyc import Process
class MyProcess(Process):
# define your hyperparameters using SQLAlchemy's ORM-style here
repeats: int
name: str
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
for _ in range(self.repeats):
self.log(f'Hello, {self.name}!')
my_process = MyProcess(hyperparameter_1=3, hyperparameter_2='Jens')
If you're using Cubyc's Experiment as a Service (EaaS) platform, your run hyperparameters will be automatically in the cloud.
Note
Be sure to add type hints to your hyperparameters.
Structured and Unstructured Logging
Logging messages or variables in Cubyc is easy! Simply call self.log()
the message or variable(s) you want to log.
- Unstructured logging: Pass a string to
self.log()
to log a message to the console. - Structured logging: Pass a dictionary to
self.log()
to record the variable(s) for later analysis.
from cubyc import Process
class MyProcess(Process):
def setup(self):
self.log('Hello world!')
self.log({'variable': 1, 'another_variable': 2})
Will log the following to console:
[07/03/23 10:30:00] INFO Hello World!
And generate the following record in the Process's .csv logfile:
timestamp | key | value |
---|---|---|
2023-07-03 10:30:00 | variable | 1 |
2023-07-03 10:30:00 | another_variable | 2 |
You can then query the Process's structured .csv logfile with the logs()
function.
run = my_process.do(...)
run.logs('variable')
The above code would return a pd.DataFrame
containing all the Process's structured logs for the key variable
.
Scheduling
Cubyc's core.scheduling.schedulers
module provides a powerful scheduling functionality for your Process.
from cubyc.core import schedules
from cubyc import Process
class MyProcess(Process):
def setup(self):
self.every(hours=1).do(self.say_hello)
def say_hello_world(self):
print('Hello, world!')
@schedules.market_open(market='NYSE')
def update(self):
print('Updating...')
self.after(hours=1).do(lambda: print('One hour later...'))
Scheduling Primitives
Cubyc defines six scheduling primitives that you can use to create your Process:
on
runs a function on a specific date and time.every
runs a function everyn
years, months, days, hours, minutes or seconds.between
runs a function between two dates.after
runs a function after a specific date.when
runs a function when a specific event occurs.once
runs a function when a specific event occurs, but only once.
Simple Schedules
The execution times of simple schedules (on
, every
, between
, after
) is deterministic and
can be calculated in advance. To create a simple schedule, you can call
schedule = self.<SCHEDULE>(...)
Then, run a function with said schedule by calling the run
method with the function as a parameter:
schedule.do(<FUNCTION>)
For example, to create a schedule that runs an update
function every 4 hours, you can write:
self.every(hours=4).do(self.update)
Similarly, to schedule the function do_something_else
15 days after running the
do_something
function:
def do_something(self):
self.after(days=15).do(self.do_something_else)
Decorated Simple Schedules
You can also create simple schedules by decorating functions with the desired simple schedule.
For example, to schedule your Process's hourly_update
function to run every hour, and
weekday_update
to run between weekdays, we can decorate them as:
from cubyc import Process, schedules
class MyProcess(Process):
@schedules.every(hours=1)
def hourly_update(self):
pass
@schedules.between(days_of_week='mon-fri')
def daily_update(self):
pass
Conditional Schedules
Sometimes, we want to run a function when a specific random event occurs. Conditional schedulers (when
, once
)
allow us to do this by taking a function that returns a boolean value as a parameter, and running the function
when the function returns True
.
For example, to schedule the update()
when the function condition()
returns True
,
we write the following code:
import numpy as np
from cubyc import Process
class MyProcess(Process):
def setup(self):
self.counter = 0
self.when(self.condition).do(self.update)
def condition(self):
# returns True when a specific condition is met
return np.random.rand() < 0.1
def update(self):
# update logic here
self.counter += 1
If we instead wanted to run the function update
only once after condition
returns True
, we would instead schedule it with:
def setup(self):
self.once(self.condition).do(self.update)
Compound Schedules
Schedules can be combined to create compound schedules. To do this, you can use the &
and |
operators to
combine schedulers with a logical AND or OR respectively.
For example, if we want to run the update
function every 5 minutes and
between 10 AM and 4:00 PM, we would define our setup function as:
def setup(self):
(self.every(minutes=5) & self.between(hours=(10, 16))).do(self.update)
Alternatively, if we wanted to run the same function between 10 AM and 12:00 PM or between 1:00 PM and 4:00 PM:
def setup(self):
(self.between(hours=(10, 12) | self.between(hours=(13, 16)))).do(self.update)
Now let's put it all together! Suppose we want to run the function update
every 5 minutes,
between 10 AM and 4:00 PM, and only on the 3rd of the month:
def setup(self):
(self.every(minutes=5) & self.between(hours=(10, 16)) & self.on(day=3)).do(self.update)
Note You can use lambda functions to create schedules with parameters.
📈 Data
The data
module at Cubyc enables you to easily load, cache, and access data from any source.
Learn more
Data Feed
Cubyc's DataFeed
class is a pluggable interface for loading data from any data source.
Creating a Data Feed
Data feeds automate the process of loading, caching data, as well as retrieving data with the correct frequency and format.
They can be easily created by subclassing the DataFeed
class and implementing only four methods:
- [required]
name()
: Returns the name of the data feed. - [required]
load(start_datetime, end_datetime)
: Loads data from your data source between the specified dates. - [optional]
schema()
: Returns a dictionary defining the data feed's schema for each column. - [optional]
metadata()
: Returns a dictionary containing the data feed's metadata for each column.
The following snippet defines a custom data feed that loads data from the Federal Reserve's FRED API:
import numpy as np
from datetime import datetime
import pandas as pd
from cubyc.data import DataFeed
class FREDDataFeed(DataFeed):
def __init__(self, api_key):
super().__init__()
self.api_key = api_key
def name(self):
return 'FRED'
def schema(self) -> dict:
return {'open': np.dtype(str), 'close': np.dtype('float64')}
def metadata(self) -> dict:
return {'series_id': 'FRED series ID from https://fred.stlouisfed.org/categories',
'value': 'Numerical value of the series.'}
def load_from_source(self, start_datetime: datetime, end_datetime: datetime, **kwargs):
# Load data from FRED's REST API here
return pd.DataFrame(...)
Under the hood, Cubyc automatically check that the data you are loading from your data source matches the specified schema.
You can also call <DATAFEED-OBJECT>.info()
to get a summary of the datafeed's schema and metadata.
fred_datafeed = FREDDataFeed('FRED-API-KEY-ID')
fred_datafeed.info()
This would return:
dtype | metadata | |
---|---|---|
series_id | <U0 | FRED series ID from https://fred.stlouisfed.org/categories |
value | float64 | Numerical value of the series. |
To add a data feed to your Process, simply add your data feed as a Pydantic field to your Process's class.
class MyProcess(Process):
FRED_datafeed: DataFeed = FREDDataFeed('FRED-API-KEY-ID')
def update(self):
data = self.FRED_datafeed.last(...)
...
Accessing Data
DataFeed
s have four important functions you need to be aware of:
next
: Returns the next data point(s) in the data feed, given the current time.last
: Returns the last data point(s) in the data feed, given the current time.current
: Returns the current data point(s) in the data feed if available.between
: Returns all data points between two dates.
For example, to retrieve the last 10 reviews from a Yelp data feed:
self.yelp_datafeed.last(values=10, columns=['reviews'], company='Chipotle')
Alternatively, if we wanted to retrieve the last 24 hours of movie reviews from a IMDb data feed:
self.imdb_datafeed.last(values=datetime.timedelta(hours=24), columns=['reviews'], movie='The Dark Knight')
To retrieve Apple's (AAPL) volume from Yahoo Finance datafeed between 2020-01-01 and 2023-01-11:
self.yf_datafeed.between(start_date=datetime.datetime(2020, 1, 1), end_date=datetime.datetime(2023, 1, 11), columns='vwap')
And finally, to retrieve the current price of a data feed:
self.yelp_datafeed.current(columns='price', company='Chipotle')
Note Data retrieved by data feeds are cached by default to a 'cubyc.sqlite' database in the current directory. To cache data to a different location, specify the
url
parameter in the data feed's constructor.
Data Streams
The built-in DataStream
class enables streaming of real-time data from WebSockets APIs
Creating a Data Stream
To stream data from a WebSocket API, you can subclass the ```DataStream``` class and implement the `run` methodfrom cubyc import DataStream
from polygon import WebSocketClient
class PolygonDataStream(DataStream):
def __init__(self, polygon_api_key: str):
self.ws = WebSocketClient(api_key=polygon_api_key)
def run(self, func):
self.ws.do(handle_msg=func)
Streaming
to connect to the WebSocket API with your custom data stream, simply use the listen
decorator:
from cubyc import Process, schedules
class MyProcess(Process):
@schedules.listen(PolygonDataStream('POLYGON-API-KEY-ID'))
def update(self, msg):
# handle msg here
...
Warning Data streams are only supported during live deployment.
🎨 Research
Cubyc's research
control panel provides a powerful interface to visualize and monitor your processes.
Learn more
Experiment as a Service (EaaS)
Cubyc's Experiment as a Service (EaaS) platform provides a powerful interface to visualize and monitor your processes. It allows you to track and version experiments, visualize logs, and evaluate different hyperparameter configurations of your decision-making processes.
Runs
If you provided an API key, Cubyc automatically logs your process's runs to the cloud.
You can view all your runs in the cloud under the Runs
tab in the research control panel.
Logs
When logging structured data, Cubyc automatically logs your structured data to the cloud in a tabular format.
Logs can be viewed and queried in the Logs
tab in the research control panel.
License & Contact
Cubyc is distributed under the GNU Lesser General Public License. Contact us at growth@cubyc.com for any questions.
Contributing
We welcome contributions from the community! If you have any ideas, suggestions, or bug reports, please open an issue or a pull request.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.