Framework to implement collection pipelines in python.
Experimental collection pipeline pattern implementation in python.
cat('/tmp/file.txt') | filter('some line') | filter('some line 2') | out()
Library only works on Python 3. There are no plans to support previous python versions.
Every pipeline has a data source (cat(), http(), etc.) and an optional data transformation/filtering and output processors.
Source Transformations Output +---------------+----------------------+-------+ | | | | v v v V echo('18.104.22.168') | split('.') | count() | out()
You can save a partial pipe and reuse it later.
from collection_pipelines import * word_list = echo('word1 word2 word3') | words() word_list | out() # will print the words to stdout word_list | filter(word2) | freq() | bar() # will draw a bar chart for word frequencies
If you want to write your own sources, transformers or outputs there’s couple of base classes you should get familiar with.
Let’s implement a very basic filter that forwards only even numbers.
from collection_pipelines import * class even(CollectionPipelineProcessor): def process(self, item): if isinstance(item, int): if item % 2 == 0: self.receiver.send(item) echo([1, 2, 3]) | even() | out()
Every source object must extend the CollectionPipelineSource class and implement the on_begin() method.
E.g. this source will send random integer to a pipeline:
import random class rand_int(CollectionPipelineSource): def on_begin(self): self.receiver.send(random.randint(0, 1000)) self.receiver.close()
Every transformer and filter is a python object that instantiates a class that extends CollectionPipelineProcessor class. All the work is done in process() method. This methods receives an item passing the pipeline.
You might either ignore, transform or simply pass forward the items. To send item further to the pipe use self.receiver.send(item).
E.g. if you wanted to multiply all items, you could implement the method like this
def process(self, item): self.receiver.send(item * 2)
Pipeline output processors must extend the CollectionPipelineOutput class. Output processors are special in a way that they don’t forwards the items any further. They trigger the pipeline execution.
Implementing an output processor is very similar to implementing a transformer.
class stdout(CollectionPipelineOutput): def process(self, item): print(item)
Such processor would print an item as soon as it received one. There’s also a special method on_done(), which is called when all items in the pipeline are processed.
E.g. if you wanted an output processor to print items only when you received all of them, the class would look like
class stdout(CollectionPipelineOutput): def __init__(self): self.items =  def process(self, item): self.items.append(item) def on_done(self): for item in self.items: print(item)