A pipelining framework designed for data analysis but can be useful to other applications
Project description
A parallel pipelining framework for Python. Developers can create nodes and chain them together to create pipelines.
Classes that extend Node must implement run method that will be called whenever new data is available.
Table of Contents
Installation
pip install pypiper
Example Usage
from pyPiper import Node, Pipeline
class Generate(Node):
def setup(self, size):
self.size = size
self.pos = 0
def run(self, data):
if self.pos < self.size:
self.emit(self.pos)
self.pos = self.pos + 1
else:
self.close()
class Square(Node):
def run(self, data):
self.emit(data**2)
pipeline = Pipeline(Generate("gen", size=10) | Square("square"))
print(pipeline)
pipeline.run()
Nodes can also specify a batch size that dictates how much data should be pushed to the node. For example, building on the previous example. In this case batch_size is specified in the nodes setup method. Alternatively, it can be set when creating the node (ex. Printer("print", batch_size=5))
class Printer(Node):
def setup(self):
self.batch_size = Node.BATCH_SIZE_ALL
def run(self, data):
print(data)
pipeline = Pipeline(Generate("gen", size=10) | Square("square") | Printer("print"))
print(pipeline)
pipeline.run()
Parallel Execution
To process pipelines in parallel, pass n_threads > 1 when creating the pipeline. Parallel execution is done using multiprocessing and is well suited to CPU intensive tasks such as audio processing and feature extraction. For example:
class Generate(Node):
def setup(self, size):
self.pos = 0
def run(self, data):
if self.pos < self.size:
self.emit(self.pos)
self.pos = self.pos + 1
else:
self.close()
pipeline = Pipeline(Generate("gen", size=10) | Square("square") | Printer("print"), n_threads=2)
print(pipeline)
pipeline.run()
Stream Names
You can also name input and output streams. For example:
gen = EvenOddGenerate("gen", size=20, out_streams=["even", "odd"])
double = Double("double", out_streams="num", in_streams="even")
square = Square("square", out_streams="num", in_streams="odd")
printer1 = Printer("p1", in_streams="num", batch_size=Node.BATCH_SIZE_ALL)
printer2 = Printer("p2", in_streams="num", batch_size=Node.BATCH_SIZE_ALL)
p = Pipeline(gen | [double | printer1, square | printer2], quiet=False)
p.run()
EvenOddGenerate generates a pair of numbers. using the out_streams parameter, we name the first number even and second number odd. When initializing the double and square nodes, we tell double to take the even number and square to take the odd number.
If multiple output streams are passed into a node, by default, they will be come into the node as a list. For example,
gen = EvenOddGenerate("gen", size=10, out_streams=["even", "odd"])
printer = Printer("p1", batch_size=1)
p = Pipeline(gen | printer, quiet=False)
p.run()
Will output
[0,1],
[2,3],
...
However, if you can split the streams by specifying their names in the input streams in_streams parameter. So, ```python gen = EvenOddGenerate(“gen”, size=20, out_streams=[“even”, “odd”])
printer = Printer(“p1”, in_streams=[“even”, “odd”], batch_size=1)
p = Pipeline(gen | printer, quiet=False) p.run() ```
Will generate:
0,
1,
2,
3,
...
Progress Updates
When calling pipeline.run(), you can provide a callback function for progress updates. Whenever the pipelines makes progress, it calls this function with the number of items that have been processed so far and the total number of items that need to be processed. For example, if you were using a tqdm progress bar, you could use the following code:
from tqdm import tqdm
class TqdmUpdate(tqdm):
def update(self, done, total_size=None):
if total_size is not None:
self.total = total_size
self.n = done
super().refresh()
if __name__ == '__main__':
gen = Generate("gen", size=10)
double = Double("double")
sleeper = Sleep("sleep")
p = Pipeline(gen | [double, sleeper], n_threads=4, quiet=True)
with TqdmUpdate(desc="Progress") as pbar:
p.run(update_callback=pbar.update)
Projects Using PyPiper
COVFEFE: A feature extraction tool focusing on lexical, syntactic and pragmatic features from text and audio features from audio.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file pyPiper-0.5.3.tar.gz
.
File metadata
- Download URL: pyPiper-0.5.3.tar.gz
- Upload date:
- Size: 22.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.12.1 pkginfo/1.4.2 requests/2.19.1 setuptools/40.1.0 requests-toolbelt/0.8.0 tqdm/4.25.0 CPython/3.6.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | bdf39c8d590922252dd30e3acdba93542622f11651436b2c942f3bb02a13f114 |
|
MD5 | 5cd4c8f6e17861866692ef31bd97eaa7 |
|
BLAKE2b-256 | 9d530f5a4e815b27f40a73eb5f65d9b1a2c847ecf6e163077716d54b42c5160b |