This is a pre-production deployment of Warehouse. Changes made here affect the production instance of PyPI (
Help us improve Python packaging - Donate today!

pipelining generators like python, not like shell

Project Description

Rohrleitung (ger.): _pipeline_

Didn't like the shell-like approach the other pipelining packages took, where you
would concatenate specially written generators with the UNIX pipe `|`. While that's
great for the CLI and short shell scripts, I wanted a more programmable way.

Worth mentioning is surely [genpipeline]( that
implements coroutine-based pipelines as presented by [David Beazley's Coroutines
intro]( But this means you have to do everything
in coroutines, I built Rohrleitung so I could just use classic generator-based
filters. Look into both and decide which way is cleaner for your environment.

ISC licenced, see the LICENCE file.


from functools import partial
# pip install toolz
from toolz.curried import interpose

from rohrleitung import Pipeline, L

def three_n_plus_one(n):
if n % 2:
return 3 * n + 1
return n / 2

def collatz_length(n, l=0):
if int(n) < 1:
raise ValueError('Nope')
if n == 1:
return l
# L changes call signatures of decorated function, so in recursive ones
# you have to adapt.
return collatz_length.__wrapped__(three_n_plus_one(n), l + 1)

# building pipelines in a programmatic way, without immediately executing them
# standard list manipulation can change pipeline on the fly
pipeline = [
partial(filter, lambda x: x % 2), # Standard python's filter function
lambda y: (2 * x for x in y), # Like using L, but manually
L(lambda x: x ** 3), # Using L helper function
print(list(Pipeline(pipeline, range(10))))

a = Pipeline([L(lambda x: 2 * x), L(lambda x: x + 1), L(lambda x: x ** 2)])
for i, k in enumerate(a(range(3))):
print("(2x+1)^2, x={}: {}".format(i, k))

# With toolz' curried filter function we don't need partial
from toolz.curried import filter
newpipeline = [
filter(lambda x: not(x % 2)),
L(lambda x: bin(x)),
L(lambda x: x.count('1'))
p1 = Pipeline(newpipeline)
for i in p1(range(1, 10)):

# Reuse same pipeline
for i in p1(range(1, 3)):

# Or modify it, then use again
p1.pipeline.insert(2, L(lambda x: x + 1))
for i in p1(range(1, 10)):

p2 = Pipeline(pipeline)
p3 = 2 * (p2 + [L(lambda x: int(str(x), 16))]) + p1
# Alternativly 2 * p2 | p1 if you prefer shell syntax
for i in p3(range(1, 10)):

# Slicing and cutting pipelines on the fly
for i in p1[1:4:2](range(10)):
Release History

Release History

This version
History Node


History Node


Download Files

Download Files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

File Name & Checksum SHA256 Checksum Help Version File Type Upload Date
rohrleitung-0.1.1.tar.gz (3.9 kB) Copy SHA256 Checksum SHA256 Source Jun 1, 2016

Supported By

WebFaction WebFaction Technical Writing Elastic Elastic Search Pingdom Pingdom Monitoring Dyn Dyn DNS Sentry Sentry Error Logging CloudAMQP CloudAMQP RabbitMQ Heroku Heroku PaaS Kabu Creative Kabu Creative UX & Design Fastly Fastly CDN DigiCert DigiCert EV Certificate Rackspace Rackspace Cloud Servers DreamHost DreamHost Log Hosting