A pipelining framework designed for data analysis but can be useful to other applications
Project description
A pipelining framework for Python. Developers can create nodes and chain them together to create pipelines.
Classes that extend ```Node``` must implement ```run``` method that will be called whenever new data is available.
A simple example
```python
from pyPipelining import Node, Pipeline
class Generate(Node):
def setup(self):
self.pos = 0
def run(self, data):
if self.pos < self.size:
self.emit(self.pos)
self.pos = self.pos + 1
else:
self.close()
class Square(Node):
def run(self, data):
self.emit(data**2)
pipeline = Pipeline(Generate("gen", size=10) | Square("square"))
print(pipeline)
pipeline.run()
```
Nodes can also specify a batch size that dictates how much data should be pushed to the node.
For example, building on the previous example. In this case ```batch_size``` is specified in the nodes ```setup``` method. Alternatively, it can be set when creating the node (ex. ```Printer("print", batch_size=5)```)
```python
class Printer(Node):
def setup(self):
self.batch_size = Node.BATCH_SIZE_ALL
def run(self, data):
print(data)
pipeline = Pipeline(Generate("gen", size=10) | Square("square") | Printer("print"))
print(pipeline)
pipeline.run()
```
Classes that extend ```Node``` must implement ```run``` method that will be called whenever new data is available.
A simple example
```python
from pyPipelining import Node, Pipeline
class Generate(Node):
def setup(self):
self.pos = 0
def run(self, data):
if self.pos < self.size:
self.emit(self.pos)
self.pos = self.pos + 1
else:
self.close()
class Square(Node):
def run(self, data):
self.emit(data**2)
pipeline = Pipeline(Generate("gen", size=10) | Square("square"))
print(pipeline)
pipeline.run()
```
Nodes can also specify a batch size that dictates how much data should be pushed to the node.
For example, building on the previous example. In this case ```batch_size``` is specified in the nodes ```setup``` method. Alternatively, it can be set when creating the node (ex. ```Printer("print", batch_size=5)```)
```python
class Printer(Node):
def setup(self):
self.batch_size = Node.BATCH_SIZE_ALL
def run(self, data):
print(data)
pipeline = Pipeline(Generate("gen", size=10) | Square("square") | Printer("print"))
print(pipeline)
pipeline.run()
```
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file pyPipelineStream-0.2.0.tar.gz
.
File metadata
- Download URL: pyPipelineStream-0.2.0.tar.gz
- Upload date:
- Size: 2.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | cc4e78a2daca2575457a16794adf872421bfa6a9572e4066b4490bae3feb6072 |
|
MD5 | d884607eb6cb7b6af721cf150ffe030f |
|
BLAKE2b-256 | 1128f56f0e1025e7cbceca320d2b1727cdf0bbbb57c9a66f2ef1f426bdbe0f1c |