Skip to main content

Simple data transformation pipeline.

Project description

A simple data transformation pipeline based on python’s iteration protocol that runs on python versions 2.7, 3.3 and 3.4.

+----------+      +-------------+      +-------------+      +--------+      +----------+
| Producer | ---> | Transformer | ---> | Transformer | ---> | Tester | ---> | Consumer |
+----------+      +-------------+      +-------------+      +--------+      +----------+

A pipeline model expects 4 types of filters:

  • Producer: starting point, outbound only;
  • Transformer: input, processing, output;
  • Tester: input, discard or pass-thru;
  • Consumer: ending point, inbound only.
import plumber

@plumber.filter
def upper(data):
    return data.upper()

ppl = plumber.Pipeline(upper)
output = ppl.run("Hey Jude, don't make it bad")

print(''.join(output))
"HEY JUDE, DON'T MAKE IT BAD"

Since the design is based on python’s iteration protocol, both producers and consumers are ordinary iterable objects. Transformers are implemented as callables that accept a single argument, perform the processing and return the result.

Input data may also be checked against some preconditions in order to decide if the transformation should happen or be by-passed. For example:

import plumber

def is_vowel(data):
    if data not in 'aeiou':
        raise plumber.UnmetPrecondition()

@plumber.filter
@plumber.precondition(is_vowel)
def upper(data):
    return data.upper()

ppl = plumber.Pipeline(upper)
output = ppl.run("Hey Jude, don't make it bad")

print(''.join(output))
"hEy jUdE, dOn't mAkE It bAd"

Prefetching

If you think the pipes are taking too long to move data forward, you can use a prefetching feature. To use it, just define the upper limit of items to be pre fetched.

Using the same example as above:

ppl = plumber.Pipeline(stripper, upper)
transformed_data = ppl.run([" I am the Great Cornholio!", "Hey Jude, don't make it bad "],
                           prefetch=2)

for td in transformed_data:
    print(td)

I AM THE GREAT CORNHOLIO!
"HEY JUDE, DON'T MAKE IT BAD"

By default the prefetching mechanism is thread-based, so be careful with cpu-bound pipelines.

Installation

Pypi (recommended):

$ pip install picles.plumber

Source code (development version):

$ git clone https://github.com/picleslivre/plumber.git && cd plumber && python setup.py install

Use license

This project is licensed under FreeBSD 2-clause. See LICENSE for more details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Filename, size & hash SHA256 hash help File type Python version Upload date
picles.plumber-0.11-py2.py3-none-any.whl (6.9 kB) Copy SHA256 hash SHA256 Wheel py2.py3
picles.plumber-0.11.tar.gz (4.9 kB) Copy SHA256 hash SHA256 Source None

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN SignalFx SignalFx Supporter DigiCert DigiCert EV certificate StatusPage StatusPage Status page