Skip to main content

No project description provided

Project description

Conduits - A Declarative Pipelining Tool For Pandas

Traditional tools for declaring pipelines in Python suck. They are mostly imperative, and can sometimes requires that you adhere to strong contracts in order to use them (looking at you Scikit Learn pipelines ಠ_ಠ). It is also usually done completely differently to the way the pipelines where developed during the ideation phase, requiring significate rewrite to get them to work in the new paradigm.

Modelled off the declarative pipeline of Flask, Conduits aims to give you a nicer, simpler, and more flexible way of declaring your data processing pipelines.

Installation

pip install conduits

Quickstart

import pandas as pd
from conduits import Pipeline

##########################
## Pipeline Declaration ##
##########################

pipeline = Pipeline()
pipeline["transformed"] = False


@pipeline.step(dependencies=["first_step"])
def second_step(data, *, adder=0):
    return data + adder


@pipeline.step()
def first_step(data, *, power=1):
    return data ** power 


@pipeline.step(dependencies=["second_step"])
def third_step(data, fit: bool, transform: bool):
    if transform:
        pipeline["transformed"] = True

    return data


###############
## Execution ##
###############

df = pd.DataFrame({"X": [1, 2, 3], "Y": [10, 20, 30]})

assert pipeline["transformed"] == False

output = pipeline.fit_transform(df, adder=1, power=2)

assert output.X.sum() != 29  # Addition before square => False!
assert output.X.sum() == 17  # Square before addition => True!
assert pipeline["transformed"] == True

pipeline.save("pipeline.joblib")

reloaded = Pipeline().load("pipeline.joblib")
assert reloaded["transformed"] == True  # State is persisted on reload.

Usage Guide

Declarations

Pipeline Decorator

Your pipeline is defined using a standard decorator syntax. You can wrap your pipeline steps using the decorator:

@pipeline.step()
def transformer(df):
    return df + 1

Function Signature

The decorated function's signature carries all the information Conduits needs to infer what functionality it needs to activate. You can pass in a single dataframe or many dataframes into the signature, and this will be carried through during the fit(), transform(), and fit_transform() calls. Hence both function signatures would work:

pipeline = Pipeline()

...

@pipeline.step()
def single_frame(data):
    return data + 1

...

df = pipeline.fit_transform(df)
pipeline = Pipeline()

...

@pipeline.step()
def Xy_transfomer(X, y):
    return X + 1, y

...

X, y = pipeline.fit_transform(X, y)

You can also define hyperparameters that your function has access to by including them in the function signature after the * separator (see PEP 3102). It is recommended that you set a default value for the hyperparameter but it is not necessary

pipeline = Pipeline()

...

@pipeline.step()
def adder(data, *, n):
    return data + n

@pipeline.step()
def multiplier(data, *, m=1):
    return data * m

...

df = pipeline.fit_transform(df)  # Fail! `n` isn't passed in
df = pipeline.fit_transform(df, n=1)  # Will succeed! `n`=1, `m`=1
df = pipeline.fit_transform(df, n=1, m=2)  # Will succeed! `n`=`, `m`=2

Stateful Transformers

If your transformer is stateful, you can optionally supply the function with fit and transform boolean arguments. They will be set as True when the appropriate method is called. Both arguments are optional and independent of one another (i.e. you can just have the fit argument without the transform argument).

@pipeline.step()
def stateful(data: pd.DataFrame, fit: bool, transform: bool):
    if fit:
        scaler = StandardScaler()
        pipeline["scaler"] = scaler.fit(data)
    
    if transform:
        data = pipeline["scaler"].transform(data)

    return data

Pipeline Serialisation

You should not serialise the pipeline object itself. Rather, you should use the pipeline.save(path) and pipeline.load(path) to handle serialisation and deserialisation.

If there are any dependencies between your pipeline steps, you may specify these in your decorator and they will be run prior to this step being run in the pipeline. If a step has no dependencies specified it will be assumed that it can be run at any point.

@pipeline.step(dependencies=["add_feature_X", "add_feature_Y"])
def combine_X_with_Y(df):
    return df.X + df.Y

API

Conduits attempts to mock the Scikit Learn API as best as possible. Your defined piplines have the standard methods of:

pipeline.fit(df)
out = pipeline.transform(df)
out = pipeline.fit_transform(df)

Note that for the current release you can only supply pandas dataframes or series objects. It will not accept numpy arrays.

You can save artifacts into the pipeline using standard dictionary notation.

pipeline["artifact"] = [1, 2, 3]
artifact = pipeline["artifact"]

You can serialise all artifacts within the pipeline using the pipeline.save(path) and pipeline.load(path) methods. The pipeline will be serialised using the joblib library.

pipeline = Pipeline()
...
pipeline.save("pipeline.joblib")
pipeline = Pipeline().load("pipeline.joblib")

Tests

In order to run the testing suite you should install the dev.requirements.txt file. It comes with all the core dependencies used in testing and packaging. Once you have your dependencies installed, you can run the tests via the target:

make tests

The tests rely on pytest-regressions to test some functionality. If you make a change you can refresh the regression targets with:

make regressions

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

conduits-0.2.1.tar.gz (4.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

conduits-0.2.1-py3-none-any.whl (5.2 kB view details)

Uploaded Python 3

File details

Details for the file conduits-0.2.1.tar.gz.

File metadata

  • Download URL: conduits-0.2.1.tar.gz
  • Upload date:
  • Size: 4.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.7

File hashes

Hashes for conduits-0.2.1.tar.gz
Algorithm Hash digest
SHA256 1964bfba6560594627d86ee3f848666aa96e1ecde8bcceadb738694d4fbe8f8e
MD5 76b38501572ada21cf71752e3d15994a
BLAKE2b-256 84f2d936ec8b094e8bb90f9a3f56430bec680f29390fba9a0274fafa33b4a7a5

See more details on using hashes here.

File details

Details for the file conduits-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: conduits-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 5.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.7

File hashes

Hashes for conduits-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 e164c1e34c8baf3e521b20f513bb5602c6c5ef82d38295e8f26218203599e9fb
MD5 0e2a899fcbf475e270bb5c152e3439e2
BLAKE2b-256 3e5002a5623f579abb205cf42a2a1418396bd2b2fa1dde78c5681164e4cebdd8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page