Skip to main content

The framework for algorithmic decision-makers.

Project description

The data science framework for decision-making research

Make the best decisions, consistently. Research decision-making processes with your data and deploy them with confidence.

🐍 Declare your processes in Python

Define and combine asynchronous schedules and events, and parameterize your processes with Pydantic fields.

See example
import random
from cubyc import Process, schedules

class MyProcess(Process):
    counter: int = 0

    def setup(self):
        (self.bewteen(hours=(8, 12)) & self.every(minutes=5)).do(self.morning_function)
        self.when(lambda: not self.counter % 10).do(self.random_function)

    @schedules.every(hours=1)
    def hourly_function(self):
        print("I run every hour")
        self.counter += random.randint(1, 100)

    def morning_function(self):
        print("I run every 5 minutes between 8AM and noon")
        self.counter += 1

    def random_function(self):
        print("I run when the counter is a multiple of 10")

⏳ Backtest locally, deploy to production

Backtest your decision-making processes on historical data and deploy them to production in a single line of code.

See example
from datetime import timedelta

process = MyProcess(counter=10)

# backtests the 2023 year
process.run(start="2023-1-1", end="2023-12-31") 

# backtests from 2023 to today, and runs live for 10 days
process.run(start="2023-1-1", end=timedelta(days=10))

# runs live for 100 days
process.run(end=timedelta(days=100))          

🚀 Integrate any data source

Access your data and utilize it across processes with modular data feeds.

See example
from datetime import timedelta

from cubyc import Process, DataFeed


class MyDataFeed(DataFeed):

    def name(self):
        return 'my_data_feed'

    def load_from_source(self, start_datetime, end_datetime):
        # query and return data from one of your data sources
        ...


class MyProcess(Process):
    
    datafeed: DataFeed = MyDataFeed()

    def setup(self):
        self.datafeed.last(values=10)           # returns the last 10 values 
        self.datafeed.last(timedelta(days=5))   # returns the last 5 days of data
        self.datafeed.current()                 # returns the current value 

️🌊 Monitor and visualize your processes

Track experiments, visualize logs, and evaluate your decision-making processes with Cubyc’s Push platform.

See example
Coming soon!

Stay in touch for updates.


Install Cubyc

Simple installation from PyPI

pip install cubyc
Other installation options

From source

Clone the repository, navigate to the root directory, and run

python setup.py install

FAQ

How is Cubyc different from workflow orchestration tools?
Workflow orchestration automates repetitive tasks or processes, specifically handling the movement of information or data between systems. Conversely, Cubyc serves as a decision support system (DSS) to automate decision-making and optimize or improve a specific outcome.
Algorithmic DSS Workflow Orchestrators
Purpose Automate decisions and improve or optimize outcomes Automate repetitive tasks or processes
Use Cases Algorithmic trading, recommendation systems, Ad campaigns, pricing engines ETL, order fulfillment, customer onboarding, DevOps Automation
Libraries Cubyc Airflow, Luigi, Prefect, Dagster
Frequency High-frequency, real-time decision-making Low-frequency, batch processing
How do backtests improve decision-making?
Evaluation metrics such as accuracy, precision, and recall offer insights into model performance but lack actionable information on decision-making effectiveness. For example, a model with 90% test accuracy may not necessarily improve revenue. To assess impact, actionable metrics like ROI, revenue, and conversions rate are crucial. Backtests offer actionable insights into the potential impacts of different decisions through simulations of past events. For instance, they can help understand: - A portfolio's sharpe ratio over the past 10 years with monthly vs quarterly rebalancing - An ad campaign's annual ROI with different budget allocations - A marketplace's 2020 revenue with alternative pricing processes - A service's historical fraud rate with adjustments to the model's threshold

Quickstart

Cubyc is divided into the core and data modules, and the research platform.

🔬 Core

The core module contains the classes and functions that power Cubyc's Process with scheduling, backtesting, and logging.

Learn more

Process

The cubyc.core.Process class is Cubyc's main class, providing a interface to define your schedules and events.

from cubyc.core import Process, schedules

class MyProcess(Process):
  
    hyperparameter: int
    
    def __init__(self, variable: int, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.variable = variable
    
    def setup(self):
        self.variable = self.hyperparameter
        
    @schedules.every(hours=1)
    def update(self):
        self.variable += 1
        self.log(f'Variable: {self.variable}')

Process = MyProcess(variable=0, hyperparameter=7)
Setup

The setup() function gets called before your Process is run live or backtested. Be sure to define your Process's initial state here!

from cubyc import Process

class MyProcess(Process):

    def setup(self):
        # define your initial state here
        self.state_boolean = False
        self.state_counter = 0
Versioning and hyperparameters

In addition to passing arguments to your Process's __init__ constructor, you can also define attribute hyperparameters with Pydantic's field types.

from cubyc import Process

class MyProcess(Process):
  
    # define your hyperparameters using SQLAlchemy's ORM-style here
    repeats: int
    name: str
  
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
      
        for _ in range(self.repeats):
            self.log(f'Hello, {self.name}!')

my_process = MyProcess(hyperparameter_1=3, hyperparameter_2='Jens')

If you're using Cubyc's Experiment as a Service (EaaS) platform, your run hyperparameters will be automatically in the cloud.

Note

Be sure to add type hints to your hyperparameters.

Structured and Unstructured Logging

Logging messages or variables in Cubyc is easy! Simply call self.log() the message or variable(s) you want to log.

  • Unstructured logging: Pass a string to self.log() to log a message to the console.
  • Structured logging: Pass a dictionary to self.log() to record the variable(s) for later analysis.
from cubyc import Process

class MyProcess(Process):
  
    def setup(self):
        self.log('Hello world!')
        self.log({'variable': 1, 'another_variable': 2})        

Will log the following to console:

[07/03/23 10:30:00] INFO Hello World!

And generate the following record in the Process's .csv logfile:

timestamp key value
2023-07-03 10:30:00 variable 1
2023-07-03 10:30:00 another_variable 2

You can then query the Process's structured .csv logfile with the logs() function.

run = my_process.do(...)
run.logs('variable')

The above code would return a pd.DataFrame containing all the Process's structured logs for the key variable.

Scheduling

Cubyc's core.scheduling.schedulers module provides a powerful scheduling functionality for your Process.

from cubyc.core import schedules
from cubyc import Process

class MyProcess(Process):
  
    def setup(self):
        self.every(hours=1).do(self.say_hello)
        
    def say_hello_world(self):
        print('Hello, world!')

    @schedules.market_open(market='NYSE')
    def update(self):
        print('Updating...')
        self.after(hours=1).do(lambda: print('One hour later...'))
Scheduling Primitives

Cubyc defines six scheduling primitives that you can use to create your Process:

  • on runs a function on a specific date and time.
  • every runs a function every n years, months, days, hours, minutes or seconds.
  • between runs a function between two dates.
  • after runs a function after a specific date.
  • when runs a function when a specific event occurs.
  • once runs a function when a specific event occurs, but only once.
Simple Schedules

The execution times of simple schedules (on, every, between, after) is deterministic and can be calculated in advance. To create a simple schedule, you can call

schedule = self.<SCHEDULE>(...)

Then, run a function with said schedule by calling the run method with the function as a parameter:

schedule.do(<FUNCTION>)

For example, to create a schedule that runs an update function every 4 hours, you can write:

self.every(hours=4).do(self.update)

Similarly, to schedule the function do_something_else 15 days after running the do_something function:

def do_something(self):
    self.after(days=15).do(self.do_something_else)
Decorated Simple Schedules

You can also create simple schedules by decorating functions with the desired simple schedule. For example, to schedule your Process's hourly_update function to run every hour, and weekday_update to run between weekdays, we can decorate them as:

from cubyc import Process, schedules

class MyProcess(Process):
    
    @schedules.every(hours=1)
    def hourly_update(self):
        pass

    @schedules.between(days_of_week='mon-fri')
    def daily_update(self):
        pass
Conditional Schedules

Sometimes, we want to run a function when a specific random event occurs. Conditional schedulers (when, once) allow us to do this by taking a function that returns a boolean value as a parameter, and running the function when the function returns True.

For example, to schedule the update() when the function condition() returns True, we write the following code:

import numpy as np
from cubyc import Process

class MyProcess(Process):
    
    def setup(self):
      self.counter = 0
      self.when(self.condition).do(self.update)

    def condition(self):
        # returns True when a specific condition is met
        return np.random.rand() < 0.1

    def update(self):
        # update logic here
        self.counter += 1

If we instead wanted to run the function update only once after condition returns True, we would instead schedule it with:

def setup(self):
    self.once(self.condition).do(self.update)
Compound Schedules

Schedules can be combined to create compound schedules. To do this, you can use the & and | operators to combine schedulers with a logical AND or OR respectively.

For example, if we want to run the update function every 5 minutes and between 10 AM and 4:00 PM, we would define our setup function as:

def setup(self):
    (self.every(minutes=5) & self.between(hours=(10, 16))).do(self.update)

Alternatively, if we wanted to run the same function between 10 AM and 12:00 PM or between 1:00 PM and 4:00 PM:

def setup(self):
    (self.between(hours=(10, 12) | self.between(hours=(13, 16)))).do(self.update)

Now let's put it all together! Suppose we want to run the function update every 5 minutes, between 10 AM and 4:00 PM, and only on the 3rd of the month:

def setup(self):
    (self.every(minutes=5) & self.between(hours=(10, 16)) & self.on(day=3)).do(self.update)

Note You can use lambda functions to create schedules with parameters.

📈 Data

The data module at Cubyc enables you to easily load, cache, and access data from any source.

Learn more

Data Feed

Cubyc's DataFeed class is a pluggable interface for loading data from any data source.

Creating a Data Feed

Data feeds automate the process of loading, caching data, as well as retrieving data with the correct frequency and format. They can be easily created by subclassing the DataFeed class and implementing only four methods:

  • [required] name(): Returns the name of the data feed.
  • [required] load(start_datetime, end_datetime): Loads data from your data source between the specified dates.
  • [optional] schema(): Returns a dictionary defining the data feed's schema for each column.
  • [optional] metadata(): Returns a dictionary containing the data feed's metadata for each column.

The following snippet defines a custom data feed that loads data from the Federal Reserve's FRED API:

import numpy as np
from datetime import datetime
import pandas as pd
from cubyc.data import DataFeed

class FREDDataFeed(DataFeed):
  
    def __init__(self, api_key):
        super().__init__()
        self.api_key = api_key

    def name(self):
        return 'FRED'

    def schema(self) -> dict:
        return {'open': np.dtype(str), 'close': np.dtype('float64')}

    def metadata(self) -> dict:
        return {'series_id': 'FRED series ID from https://fred.stlouisfed.org/categories',
                'value': 'Numerical value of the series.'}
    
    def load_from_source(self, start_datetime: datetime, end_datetime: datetime, **kwargs):
        # Load data from FRED's REST API here
        return pd.DataFrame(...)

Under the hood, Cubyc automatically check that the data you are loading from your data source matches the specified schema. You can also call <DATAFEED-OBJECT>.info() to get a summary of the datafeed's schema and metadata.

fred_datafeed = FREDDataFeed('FRED-API-KEY-ID')
fred_datafeed.info()

This would return:

dtype metadata
series_id <U0 FRED series ID from https://fred.stlouisfed.org/categories
value float64 Numerical value of the series.

To add a data feed to your Process, simply add your data feed as a Pydantic field to your Process's class.

class MyProcess(Process):
    FRED_datafeed: DataFeed = FREDDataFeed('FRED-API-KEY-ID')

    def update(self):
        data = self.FRED_datafeed.last(...)
        ...
Accessing Data

DataFeeds have four important functions you need to be aware of:

  • next: Returns the next data point(s) in the data feed, given the current time.
  • last: Returns the last data point(s) in the data feed, given the current time.
  • current: Returns the current data point(s) in the data feed if available.
  • between: Returns all data points between two dates.

For example, to retrieve the last 10 reviews from a Yelp data feed:

self.yelp_datafeed.last(values=10, columns=['reviews'], company='Chipotle')

Alternatively, if we wanted to retrieve the last 24 hours of movie reviews from a IMDb data feed:

self.imdb_datafeed.last(values=datetime.timedelta(hours=24), columns=['reviews'], movie='The Dark Knight')

To retrieve Apple's (AAPL) volume from Yahoo Finance datafeed between 2020-01-01 and 2023-01-11:

self.yf_datafeed.between(start_date=datetime.datetime(2020, 1, 1), end_date=datetime.datetime(2023, 1, 11), columns='vwap')

And finally, to retrieve the current price of a data feed:

self.yelp_datafeed.current(columns='price', company='Chipotle')

Note Data retrieved by data feeds are cached by default to a 'cubyc.sqlite' database in the current directory. To cache data to a different location, specify the url parameter in the data feed's constructor.

Data Streams

The built-in DataStream class enables streaming of real-time data from WebSockets APIs

Creating a Data Stream To stream data from a WebSocket API, you can subclass the ```DataStream``` class and implement the `run` method
from cubyc import DataStream
from polygon import WebSocketClient


class PolygonDataStream(DataStream):

    def __init__(self, polygon_api_key: str):
        self.ws = WebSocketClient(api_key=polygon_api_key)

    def run(self, func):
        self.ws.do(handle_msg=func)
Streaming

to connect to the WebSocket API with your custom data stream, simply use the listen decorator:

from cubyc import Process, schedules

class MyProcess(Process):
   
    @schedules.listen(PolygonDataStream('POLYGON-API-KEY-ID'))
    def update(self, msg):
        # handle msg here
        ...

Warning Data streams are only supported during live deployment.

🎨 Research

Cubyc's research control panel provides a powerful interface to visualize and monitor your processes.

Learn more

Experiment as a Service (EaaS)

Cubyc's Experiment as a Service (EaaS) platform provides a powerful interface to visualize and monitor your processes. It allows you to track and version experiments, visualize logs, and evaluate different hyperparameter configurations of your decision-making processes.

Runs

If you provided an API key, Cubyc automatically logs your process's runs to the cloud. You can view all your runs in the cloud under the Runs tab in the research control panel.

Run Tab

Logs

When logging structured data, Cubyc automatically logs your structured data to the cloud in a tabular format. Logs can be viewed and queried in the Logs tab in the research control panel.

Log Tab


License & Contact

Cubyc is distributed under the GNU Lesser General Public License. Contact us at growth@cubyc.com for any questions.


Contributing

We welcome contributions from the community! If you have any ideas, suggestions, or bug reports, please open an issue or a pull request.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cubyc-0.0.1.tar.gz (12.5 kB view hashes)

Uploaded Source

Built Distribution

cubyc-0.0.1-py3-none-any.whl (11.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page