A powerful parallel pipelining tool
Project description
Olympipe
This project will make pipelines easy to use to improve parallel computing using the basic multiprocessing module. This module uses type checking to ensure your data process validity from the start.
Basic usage
Each pipeline starts from an interator as a source of packets (a list, tuple, or any complex iterator). This pipeline will then be extended by adding basic .task(<function>)
. The pipeline process join the main process when using the .wait_for_results()
or .wait_for_completion()
functions.
from olympipe import Pipeline
def times_2(x: int) -> int:
return x * 2
p = Pipeline(range(10))
p1 = p.task(times_2) # Multiply each packet by 2
# or
p1 = p.task(lambda x: x * 2) # using a lambda function
res = p1.wait_for_result()
print(res) # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Filtering
You can choose which packets to .filter(<keep_function>)
by passing them a function returning True or False when applied to this packet.
from olympipe import Pipeline
p = Pipeline(range(20))
p1 = p.filter(lambda x: x % 2 == 0) # Keep pair numbers
p2 = p1.batch(2) # Group in arrays of 2 elements
res = p2.wait_for_result()
print(res) # [[0, 2], [4, 6], [8, 10], [12, 14], [16, 18]]
In line formalization
You can chain declarations to have a more readable pipeline.
from olympipe import Pipeline
[res] = Pipeline(range(20)).filter(lambda x: x % 2 == 0).batch(2).wait_for_results()
print(res) # [[0, 2], [4, 6], [8, 10], [12, 14], [16, 18]]
Debugging
Interpolate .debug()
function anywhere in the pipe to print packets as they arrive in the pipe.
from olympipe import Pipeline
p = Pipeline(range(20))
p1 = p.filter(lambda x: x % 2 == 0).debug() # Keep pair numbers
p2 = p1.batch(2).debug() # Group in arrays of 2 elements
p2.wait_for_completion()
Real time processing (for sound, video...)
Use the .temporal_batch(<seconds_float>)
pipe to aggregate packets received at this point each <seconds_float> seconds.
import time
from olympipe import Pipeline
def delay(x: int) -> int:
time.sleep(0.1)
return x
p = Pipeline(range(20)).task(delay) # Wait 0.1 s for each queue element
p1 = p.filter(lambda x: x % 2 == 0) # Keep pair numbers
p2 = p1.temporal_batch(1.0) # Group in arrays of 2 elements
[res] = p2.wait_for_results()
print(res) # [[0, 2, 4, 6, 8], [10, 12, 14, 16, 18], []]
Using classes in a pipeline
You can add a stateful class instance to a pipeline. The method used will be typecheked as well to ensure data coherence. You just have to use the .class_task(<Class>, <Class.method>, ...)
method where Class.method is the actual method you will use to process each packet.
item_count = 5
class StockPile:
def __init__(self, mul:int):
self.mul = mul
self.last = 0
def pile(self, num: int) -> int:
out = self.last
self.last = num * self.mul
return out
p1 = Pipeline(range(item_count))
p2 = p1.class_task(StockPile, StockPile.pile, [3])
[res] = p2.wait_for_results()
print(res) # [0, 0, 3, 6, 9]
This project is still an early version, feedback is very helpful.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file olympipe-1.4.5.tar.gz
.
File metadata
- Download URL: olympipe-1.4.5.tar.gz
- Upload date:
- Size: 12.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.7.1 CPython/3.12.1 Linux/5.4.109+
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8ae77f9499f942189256f11f7445fce3d976fcb576266a9ba6e562f002e1e34c |
|
MD5 | 1215080be30f743d1884ef04539d9710 |
|
BLAKE2b-256 | 0771aa16adac918f964e0e3b3702df8e6634fe9a178eedcbcf63b7635d6616a8 |
File details
Details for the file olympipe-1.4.5-py3-none-any.whl
.
File metadata
- Download URL: olympipe-1.4.5-py3-none-any.whl
- Upload date:
- Size: 17.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.7.1 CPython/3.12.1 Linux/5.4.109+
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | e7623c856cacfff0b0b536364e02df2ad8661bff03276edaef0125e89f454d7f |
|
MD5 | ec4a5319038c5b4a498c461772997c06 |
|
BLAKE2b-256 | ad237f390c403d9ea5222b829a0f28479ffe9f3e144d6c35a3529123ab66c34b |