Skip to main content

Simple library to compose pipeline in the sklearn way thanks to generators

Project description

alt Downloads

Genpipes

Library to write readable and reproductible pipelines using decorators and generators. Tested for Python > 3.6.9.

Installation

pip install genpipes

Usage

Below some use case on how to use the library.

Quick Start

This quick start assume that you have pandas installed as dependency in you project.

import pandas as pd
from genpipes import declare, compose
from collections.abc import Iterable

@declare.generator()
def data_to_be_processed(path:str) -> pd.DataFrame:
    df = pd.read_csv(path)
    return df


@declare.processor(inputs=["col1"])
def filter_by(stream: Iterable[pd.DataFrame], col_to_filter:str, value:str):
    for df in stream:
        dff = df[df[col_to_filter] == value]
        yield dff

pipe = compose.Pipeline(steps=[
    ("fetching datasource from some csv file", data_to_be_processed, {}),
    ("performing some filtering based on col1", filter_by, {"value": "some_value"} )
])

output = pipe.run()

Declaring a data source

The first task in data processing is usally to write code to acquire data. The library provide a decorator to declare your data source so they can be easily shared and readable.

The decorators take in a list of inputs to be passed as positional arguments to the decorated function. This way you are binding arguments to the function but you are not hardcoding arguments inside the function.

# my_datasource.py
import pandas as pd
from genpipes import declare

@declare.generator(inputs=["some_file.csv"])
def data_to_be_processed(path:str) -> pd.DataFrame:
    df = pd.read_csv(path)
    return df

# other_file.py
from my_datasource import data_to_be_processed

df = data_to_be_processed()

However if you want to let some arguments be defined later you could use keywords arguments like so :

# my_datasource.py
import pandas as pd
from genpipes import declare

@declare.generator(inputs=["some_file.csv"])
def data_to_be_processed(path:str, read_options:dict) -> pd.DataFrame:
    df = pd.read_csv(path, **read_options)
    return df

# other_file.py
from my_datasource import data_to_be_processed

df = data_to_be_processed(read_options={"encoding":"latin1"})

Declaring generator

generator decorator is use to initialize a stream. Function decorated are transformed to a Python generator object. You can decorate any function like a @generator.

import pandas as pd
from genpipes import declare, compose

@declare.generator(inputs=["some_file.csv"])
def data_to_be_processed(path:str) -> pd.DataFrame:
    df = pd.read_csv(path)
    return df

Or a more complexe function

import pandas as pd
from genpipes import declare, compose

@declare.generator(inputs=["some_file.csv"])
def data_one(path:str) -> pd.DataFrame:
    df = pd.read_csv(path)
    return df

@declare.generator(inputs=["some_file_bis.csv"])
def data_two(path:str) -> pd.DataFrame:
    df = pd.read_csv(path)
    return df


@declare.generator(inputs=[data_one, data_two])
def merging_data(input_one:Callable, input_two:Callable) -> pd.DataFrame:
    df_one = input_one()
    df_two = input_two()
    df_merged = df_one.merge(df_two, on="key")
    return df_merged

Decorated function will not received the value from the stream. But the wrapper does receive the value from stream and push it downstream unchanged.

That's why when calling your function once decorated you have to pass it as first argument a generator object, so if you want to test you function you can do like that:

empty_stream = () # use to feed the generator decorated function

gen = merging_data(empty_stream)

df_merge = next(gen)# consumming merging_data

Because the decorator returns a function that create a generator object you can create many generator object and feed several consumers.

empty_stream = () # use to feed the generator decorated function

gen_one = merging_data(empty_stream)
gen_two = merging_data(empty_stream)

# multiple consuming
consumer_one = next(gen_one)
consumer_two = next(gen_two)

assert consumer_one.equals(consumer_two) # True

Declaring processing functions

Now that we have seen how to declare data sources and how to generate a stream thanks to generator decorator. Let's see how to declare processing functions.

import pandas as pd
from genpipes import declare, compose

@declare.generator(inputs=["some_file.csv"])
def data_one(path:str) -> pd.DataFrame:
    df = pd.read_csv(path)
    return df

@declare.generator(inputs=["some_file_bis.csv"])
def data_two(path:str) -> pd.DataFrame:
    df = pd.read_csv(path)
    return df


@declare.generator(inputs=[data_one, data_two])
def merging_data(input_one:Callable, input_two:Callable) -> pd.DataFrame:
    df_one = input_one()
    df_two = input_two()
    df_merged = df_one.merge(df_two, on="key")
    return df_merged

@declare.processor(inputs=[["col1, col2"]])
def deduplicate(stream:Iterable[pd.DataFrame], subset:List):
    for df in stream:
        df_nodup = df[~df.duplicated(subset=[subset])]
        yield df_nodup

As you can see, processor decorated function MUST BE a generator function that take as first argument a generator that represent the stream of values.

Composing pipelines

Even if we can use the decorator helper function alone, the library provide a Pipeline class that help to assemble functions decorated with both generator and processor.

A pipeline object is compose of steps that are tuple with 3 components:
1- The description of the step
2- The decorated function
3- The keywords arguments to forward as dict, if none then empty dict

import pandas as pd
from genpipes import compose, declare

@declare.generator(inputs=["some_file.csv"])
def data_one(path:str) -> pd.DataFrame:
   df = pd.read_csv(path)
   return df

@declare.generator(inputs=["some_file_bis.csv"])
def data_two(path:str) -> pd.DataFrame:
   df = pd.read_csv(path)
   return df

@declare.generator(inputs=[data_one, data_two])
def merging_data(input_one:Callable, input_two:Callable) -> pd.DataFrame:
   df_one = input_one()
   df_two = input_two()
   df_merged = df_one.merge(df_two, on="key")
   return df_merged

@declare.processor()
def deduplicate(stream:Iterable[pd.DataFrame], subset:List):
   for df in stream:
       df_nodup = df[~df.duplicated(subset=[subset])]
       yield df_nodup


pipe = compose.Pipeline(
   steps=[
       ("data source is the merging of data one and data two",merging_data,{}) # empty dict use here as there is no kwargs,
       ("droping dups",deduplicate, {"subset": ["col1"]} ) # forwarding subset as kwarg
   ]
)

When declaring pipeline objects we are not evaluating them. For that we need to call the run method. The run method return the last object pulled out from the stream. In our case it will be the dedup dataframe from the last step.

dedup_df = pipe.run()

We can run the pipeline multiple time, it will re do all the steps:

dedup_df = pipe.run()
dedup_df_bis = pipe.run()
assert dedup_df.equals(dedup_df_bis) # True

pipeline objects can be used in other pipeline instance as a step:

@declare.processor()
def filtering_df(stream:Iterable[pd.DataFrame]):
    for df in stream:
        dff = df.filter("some expr")
        yield dff

other_pipe = compose.Pipeline(steps=[
    ("take input other pipeline instance",pipe, {} ),
    ("filtering the output of the first pipe", filtering_df, {})
])

output_from_second_pipe = other_pipe.run() # will run the first pipe instance

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

genpipes-2.0.1.tar.gz (17.0 kB view details)

Uploaded Source

Built Distribution

genpipes-2.0.1-py3-none-any.whl (17.1 kB view details)

Uploaded Python 3

File details

Details for the file genpipes-2.0.1.tar.gz.

File metadata

  • Download URL: genpipes-2.0.1.tar.gz
  • Upload date:
  • Size: 17.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.2.2 CPython/3.8.10 Linux/5.10.16.3-microsoft-standard-WSL2

File hashes

Hashes for genpipes-2.0.1.tar.gz
Algorithm Hash digest
SHA256 1f9defa22656fe8bc523102d91ca32882e1c63beda197ed7c5e354b691f13420
MD5 9e5f1717c581d6d58afd64c4af1ace5c
BLAKE2b-256 42bd0832441796038c7cf822102e5e2627a3796e7e7a5b3e3eb0b15371fdb5f7

See more details on using hashes here.

File details

Details for the file genpipes-2.0.1-py3-none-any.whl.

File metadata

  • Download URL: genpipes-2.0.1-py3-none-any.whl
  • Upload date:
  • Size: 17.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.2.2 CPython/3.8.10 Linux/5.10.16.3-microsoft-standard-WSL2

File hashes

Hashes for genpipes-2.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 e6db3c3e8594cbfef871a93a6c830cde710f13df6f6be85e29cbb2900ab1913a
MD5 135012eb2ceebb14675f81b048a5a9bc
BLAKE2b-256 3cc07a944abe290ba07c710156a85e7fd559ac0cd8c39af8236b92290cf2658d

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page