Skip to main content
Help us improve PyPI by participating in user testing. All experience levels needed!

Simple data transformation pipeline.

Project description

A simple data transformation pipeline based on python’s iteration protocol that runs on python versions 2.7, 3.3 and 3.4.

+----------+      +-------------+      +-------------+      +--------+      +----------+
| Producer | ---> | Transformer | ---> | Transformer | ---> | Tester | ---> | Consumer |
+----------+      +-------------+      +-------------+      +--------+      +----------+

A pipeline model expects 4 types of filters:

  • Producer: starting point, outbound only;
  • Transformer: input, processing, output;
  • Tester: input, discard or pass-thru;
  • Consumer: ending point, inbound only.
import plumber

@plumber.filter
def upper(data):
    return data.upper()

ppl = plumber.Pipeline(upper)
output = ppl.run("Hey Jude, don't make it bad")

print(''.join(output))
"HEY JUDE, DON'T MAKE IT BAD"

Since the design is based on python’s iteration protocol, both producers and consumers are ordinary iterable objects. Transformers are implemented as callables that accept a single argument, perform the processing and return the result.

Input data may also be checked against some preconditions in order to decide if the transformation should happen or be by-passed. For example:

import plumber

def is_vowel(data):
    if data not in 'aeiou':
        raise plumber.UnmetPrecondition()

@plumber.filter
@plumber.precondition(is_vowel)
def upper(data):
    return data.upper()

ppl = plumber.Pipeline(upper)
output = ppl.run("Hey Jude, don't make it bad")

print(''.join(output))
"hEy jUdE, dOn't mAkE It bAd"

Prefetching

If you think the pipes are taking too long to move data forward, you can use a prefetching feature. To use it, just define the upper limit of items to be pre fetched.

Using the same example as above:

ppl = plumber.Pipeline(stripper, upper)
transformed_data = ppl.run([" I am the Great Cornholio!", "Hey Jude, don't make it bad "],
                           prefetch=2)

for td in transformed_data:
    print(td)

I AM THE GREAT CORNHOLIO!
"HEY JUDE, DON'T MAKE IT BAD"

By default the prefetching mechanism is thread-based, so be careful with cpu-bound pipelines.

Installation

Pypi (recommended):

$ pip install picles.plumber

Source code (development version):

$ git clone https://github.com/picleslivre/plumber.git && cd plumber && python setup.py install

Use license

This project is licensed under FreeBSD 2-clause. See LICENSE for more details.

Project details


Release history Release notifications

This version
History Node

0.11

History Node

0.10

History Node

0.9

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Filename, size & hash SHA256 hash help File type Python version Upload date
picles.plumber-0.11-py2.py3-none-any.whl (6.9 kB) Copy SHA256 hash SHA256 Wheel py2.py3 Feb 1, 2016
picles.plumber-0.11.tar.gz (4.9 kB) Copy SHA256 hash SHA256 Source None Feb 1, 2016

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging CloudAMQP CloudAMQP RabbitMQ AWS AWS Cloud computing Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page