Skip to main content

Simple library to compose pipeline in the sklearn way thanks to generators

Project description

alt Downloads

Genpipes

Library to write readable and reproductible pipelines using decorators and generators. Tested for Python > 3.6.9.

Installation

pip install genpipes

Usage

Below some use case on how to use the library.

Quick Start

This quick start assume that you have pandas installed as dependency in you project.

import pandas as pd
from genpipes import declare, compose
from collections.abc import Iterable

@declare.generator()
def data_to_be_processed(path:str) -> pd.DataFrame:
    df = pd.read_csv(path)
    return df


@declare.processor(inputs=["col1"])
def filter_by(stream: Iterable[pd.DataFrame], col_to_filter:str, value:str):
    for df in stream:
        dff = df[df[col_to_filter] == value]
        yield dff

pipe = compose.Pipeline(steps=[
    ("fetching datasource from some csv file", data_to_be_processed, {}),
    ("performing some filtering based on col1", filter_by, {"value": "some_value"} )
])

output = pipe.run()

Declaring a data source

The first task in data processing is usally to write code to acquire data. The library provide a decorator to declare your data source so they can be easily shared and readable.

The decorators take in a list of inputs to be passed as positional arguments to the decorated function. This way you are binding arguments to the function but you are not hardcoding arguments inside the function.

# my_datasource.py
import pandas as pd
from genpipes import declare

@declare.generator(inputs=["some_file.csv"])
def data_to_be_processed(path:str) -> pd.DataFrame:
    df = pd.read_csv(path)
    return df

# other_file.py
from my_datasource import data_to_be_processed

df = data_to_be_processed()

However if you want to let some arguments be defined later you could use keywords arguments like so :

# my_datasource.py
import pandas as pd
from genpipes import declare

@declare.generator(inputs=["some_file.csv"])
def data_to_be_processed(path:str, read_options:dict) -> pd.DataFrame:
    df = pd.read_csv(path, **read_options)
    return df

# other_file.py
from my_datasource import data_to_be_processed

df = data_to_be_processed(read_options={"encoding":"latin1"})

Declaring generator

generator decorator is use to initialize a stream. Function decorated are transformed to a Python generator object. You can decorate any function like a @generator.

import pandas as pd
from genpipes import declare, compose

@declare.generator(inputs=["some_file.csv"])
def data_to_be_processed(path:str) -> pd.DataFrame:
    df = pd.read_csv(path)
    return df

Or a more complexe function

import pandas as pd
from genpipes import declare, compose

@declare.generator(inputs=["some_file.csv"])
def data_one(path:str) -> pd.DataFrame:
    df = pd.read_csv(path)
    return df

@declare.generator(inputs=["some_file_bis.csv"])
def data_two(path:str) -> pd.DataFrame:
    df = pd.read_csv(path)
    return df


@declare.generator(inputs=[data_one, data_two])
def merging_data(input_one:Callable, input_two:Callable) -> pd.DataFrame:
    df_one = input_one()
    df_two = input_two()
    df_merged = df_one.merge(df_two, on="key")
    return df_merged

Decorated function will not received the value from the stream. But the wrapper does receive the value from stream and push it downstream unchanged.

That's why when calling your function once decorated you have to pass it as first argument a generator object, so if you want to test you function you can do like that:

empty_stream = () # use to feed the generator decorated function

gen = merging_data(empty_stream)

df_merge = next(gen)# consumming merging_data

Because the decorator returns a function that create a generator object you can create many generator object and feed several consumers.

empty_stream = () # use to feed the generator decorated function

gen_one = merging_data(empty_stream)
gen_two = merging_data(empty_stream)

# multiple consuming
consumer_one = next(gen_one)
consumer_two = next(gen_two)

assert consumer_one.equals(consumer_two) # True

Declaring processing functions

Now that we have seen how to declare data sources and how to generate a stream thanks to generator decorator. Let's see how to declare processing functions.

import pandas as pd
from genpipes import declare, compose

@declare.generator(inputs=["some_file.csv"])
def data_one(path:str) -> pd.DataFrame:
    df = pd.read_csv(path)
    return df

@declare.generator(inputs=["some_file_bis.csv"])
def data_two(path:str) -> pd.DataFrame:
    df = pd.read_csv(path)
    return df


@declare.generator(inputs=[data_one, data_two])
def merging_data(input_one:Callable, input_two:Callable) -> pd.DataFrame:
    df_one = input_one()
    df_two = input_two()
    df_merged = df_one.merge(df_two, on="key")
    return df_merged

@declare.processor(inputs=[["col1, col2"]])
def deduplicate(stream:Iterable[pd.DataFrame], subset:List):
    for df in stream:
        df_nodup = df[~df.duplicated(subset=[subset])]
        yield df_nodup

As you can see, processor decorated function MUST BE a generator function that take as first argument a generator that represent the stream of values.

Composing pipelines

Even if we can use the decorator helper function alone, the library provide a Pipeline class that help to assemble functions decorated with both generator and processor.

A pipeline object is compose of steps that are tuple with 3 components:
1- The description of the step
2- The decorated function
3- The keywords arguments to forward as dict, if none then empty dict

import pandas as pd
from genpipes import compose, declare

@declare.generator(inputs=["some_file.csv"])
def data_one(path:str) -> pd.DataFrame:
   df = pd.read_csv(path)
   return df

@declare.generator(inputs=["some_file_bis.csv"])
def data_two(path:str) -> pd.DataFrame:
   df = pd.read_csv(path)
   return df

@declare.generator(inputs=[data_one, data_two])
def merging_data(input_one:Callable, input_two:Callable) -> pd.DataFrame:
   df_one = input_one()
   df_two = input_two()
   df_merged = df_one.merge(df_two, on="key")
   return df_merged

@declare.processor()
def deduplicate(stream:Iterable[pd.DataFrame], subset:List):
   for df in stream:
       df_nodup = df[~df.duplicated(subset=[subset])]
       yield df_nodup


pipe = compose.Pipeline(
   steps=[
       ("data source is the merging of data one and data two",merging_data,{}) # empty dict use here as there is no kwargs,
       ("droping dups",deduplicate, {"subset": ["col1"]} ) # forwarding subset as kwarg
   ]
)

When declaring pipeline objects we are not evaluating them. For that we need to call the run method. The run method return the last object pulled out from the stream. In our case it will be the dedup dataframe from the last step.

dedup_df = pipe.run()

We can run the pipeline multiple time, it will re do all the steps:

dedup_df = pipe.run()
dedup_df_bis = pipe.run()
assert dedup_df.equals(dedup_df_bis) # True

pipeline objects can be used in other pipeline instance as a step:

@declare.processor()
def filtering_df(stream:Iterable[pd.DataFrame]):
    for df in stream:
        dff = df.filter("some expr")
        yield dff

other_pipe = compose.Pipeline(steps=[
    ("take input other pipeline instance",pipe, {} ),
    ("filtering the output of the first pipe", filtering_df, {})
])

output_from_second_pipe = other_pipe.run() # will run the first pipe instance

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

genpipes-1.0.0.tar.gz (4.9 kB view details)

Uploaded Source

Built Distribution

genpipes-1.0.0-py3-none-any.whl (4.8 kB view details)

Uploaded Python 3

File details

Details for the file genpipes-1.0.0.tar.gz.

File metadata

  • Download URL: genpipes-1.0.0.tar.gz
  • Upload date:
  • Size: 4.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.12 CPython/3.8.12 Linux/5.11.0-1027-azure

File hashes

Hashes for genpipes-1.0.0.tar.gz
Algorithm Hash digest
SHA256 bce023d9867aa43cf2f4968848f81e7d1488bbfdd4d56dedea6210fbfa584849
MD5 e19fbcd604a7a6ef365e4f3b82d69676
BLAKE2b-256 3d6ecd97e2e5de8a6d7faebab8ec9ca8e44cb1e187949595b6140b133b1dfc99

See more details on using hashes here.

File details

Details for the file genpipes-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: genpipes-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 4.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.12 CPython/3.8.12 Linux/5.11.0-1027-azure

File hashes

Hashes for genpipes-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c437d66d56adcb0e01eacc07e110173479053aae338507ee9d78ed0e47b82a2a
MD5 9471bf3e25622ada3cac87e29cc61f59
BLAKE2b-256 a625da9dc4f8ff0e38e75edbbef43aff2e64c972746778890883f83543c441ba

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page