Skip to main content

pipelining generators like python, not like shell

Project description

Rohrleitung
===========

Rohrleitung (ger.): _pipeline_

Didn't like the shell-like approach the other pipelining packages took, where you
would concatenate specially written generators with the UNIX pipe `|`. While that's
great for the CLI and short shell scripts, I wanted a more programmable way.

Worth mentioning is surely [genpipeline](https://github.com/fkarb/genpipeline) that
implements coroutine-based pipelines as presented by [David Beazley's Coroutines
intro](http://www.dabeaz.com/coroutines/). But this means you have to do everything
in coroutines, I built Rohrleitung so I could just use classic generator-based
filters. Look into both and decide which way is cleaner for your environment.

ISC licenced, see the LICENCE file.

Examples:
---------

from functools import partial
# pip install toolz
from toolz.curried import interpose

from rohrleitung import Pipeline, L


def three_n_plus_one(n):
if n % 2:
return 3 * n + 1
else:
return n / 2


@L
def collatz_length(n, l=0):
if int(n) < 1:
raise ValueError('Nope')
if n == 1:
return l
else:
# L changes call signatures of decorated function, so in recursive ones
# you have to adapt.
return collatz_length.__wrapped__(three_n_plus_one(n), l + 1)

# building pipelines in a programmatic way, without immediately executing them
# standard list manipulation can change pipeline on the fly
pipeline = [
partial(filter, lambda x: x % 2), # Standard python's filter function
lambda y: (2 * x for x in y), # Like using L, but manually
L(lambda x: x ** 3), # Using L helper function
collatz_length,
]
pipeline.append(interpose('a'))
print(list(Pipeline(pipeline, range(10))))

a = Pipeline([L(lambda x: 2 * x), L(lambda x: x + 1), L(lambda x: x ** 2)])
for i, k in enumerate(a(range(3))):
print("(2x+1)^2, x={}: {}".format(i, k))

# With toolz' curried filter function we don't need partial
from toolz.curried import filter
newpipeline = [
collatz_length,
filter(lambda x: not(x % 2)),
L(lambda x: bin(x)),
L(lambda x: x.count('1'))
]
p1 = Pipeline(newpipeline)
for i in p1(range(1, 10)):
print(i)

# Reuse same pipeline
for i in p1(range(1, 3)):
print(i)

# Or modify it, then use again
p1.pipeline.insert(2, L(lambda x: x + 1))
for i in p1(range(1, 10)):
print(i)

p2 = Pipeline(pipeline)
p3 = 2 * (p2 + [L(lambda x: int(str(x), 16))]) + p1
# Alternativly 2 * p2 | p1 if you prefer shell syntax
for i in p3(range(1, 10)):
print(i)

# Slicing and cutting pipelines on the fly
for i in p1[1:4:2](range(10)):
print(i)

Project details


Release history Release notifications

This version
History Node

0.1.1

History Node

0.1

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Filename, size & hash SHA256 hash help File type Python version Upload date
rohrleitung-0.1.1.tar.gz (3.9 kB) Copy SHA256 hash SHA256 Source None Jun 1, 2016

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging CloudAMQP CloudAMQP RabbitMQ AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page