Skip to main content

pipelining generators like python, not like shell

Project description

Rohrleitung
===========

Rohrleitung (ger.): _pipeline_

Didn't like the shell-like approach the other pipelining packages took, where you
would concatenate specially written generators with the UNIX pipe `|`. While that's
great for the CLI and short shell scripts, I wanted a more programmable way.

Worth mentioning is surely [genpipeline](https://github.com/fkarb/genpipeline) that
implements coroutine-based pipelines as presented by [David Beazley's Coroutines
intro](http://www.dabeaz.com/coroutines/). But this means you have to do everything
in coroutines, I built Rohrleitung so I could just use classic generator-based
filters. Look into both and decide which way is cleaner for your environment.

ISC licenced, see the LICENCE file.

Examples:
---------

from functools import partial
# pip install toolz
from toolz.curried import interpose

from rohrleitung import Pipeline, L


def three_n_plus_one(n):
if n % 2:
return 3 * n + 1
else:
return n / 2


@L
def collatz_length(n, l=0):
if int(n) < 1:
raise ValueError('Nope')
if n == 1:
return l
else:
# L changes call signatures of decorated function, so in recursive ones
# you have to adapt.
return collatz_length.__wrapped__(three_n_plus_one(n), l + 1)

# building pipelines in a programmatic way, without immediately executing them
# standard list manipulation can change pipeline on the fly
pipeline = [
partial(filter, lambda x: x % 2), # Standard python's filter function
lambda y: (2 * x for x in y), # Like using L, but manually
L(lambda x: x ** 3), # Using L helper function
collatz_length,
]
pipeline.append(interpose('a'))
print(list(Pipeline(pipeline, range(10))))

a = Pipeline([L(lambda x: 2 * x), L(lambda x: x + 1), L(lambda x: x ** 2)])
for i, k in enumerate(a(range(3))):
print("(2x+1)^2, x={}: {}".format(i, k))

# With toolz' curried filter function we don't need partial
from toolz.curried import filter
newpipeline = [
collatz_length,
filter(lambda x: not(x % 2)),
L(lambda x: bin(x)),
L(lambda x: x.count('1'))
]
p1 = Pipeline(newpipeline)
for i in p1(range(1, 10)):
print(i)

# Reuse same pipeline
for i in p1(range(1, 3)):
print(i)

# Or modify it, then use again
p1.pipeline.insert(2, L(lambda x: x + 1))
for i in p1(range(1, 10)):
print(i)

p2 = Pipeline(pipeline)
p3 = 2 * (p2 + [L(lambda x: int(str(x), 16))]) + p1
# Alternativly 2 * p2 | p1 if you prefer shell syntax
for i in p3(range(1, 10)):
print(i)

# Slicing and cutting pipelines on the fly
for i in p1[1:4:2](range(10)):
print(i)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rohrleitung-0.1.1.tar.gz (3.9 kB view details)

Uploaded Source

File details

Details for the file rohrleitung-0.1.1.tar.gz.

File metadata

File hashes

Hashes for rohrleitung-0.1.1.tar.gz
Algorithm Hash digest
SHA256 3bb1e7dadd0d8bcdbdc747326bca5879d0385e1b2848372730bef6ee27800336
MD5 a3e0ea58492f307c4eb286b3fbdaad1a
BLAKE2b-256 30df361998adeee46f04a19ebe3ebf285f8e076f4114e4cac1ed369ec6bbad68

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page