A powerful parallel pipelining tool
Project description
Olympipe
Zero-boilerplate parallel pipelines for Python.
Turn any iterator into a multi-process pipeline with a single line. Each .task() runs in its own process, bypassing the GIL — no multiprocessing.Pool, no queues, no plumbing.
from olympipe import Pipeline
results = (
Pipeline(range(1000))
.task(heavy_compute, count=8) # 8 parallel workers
.filter(lambda x: x > 0)
.batch(32)
.wait_for_result()
)
Installation
pip install olympipe
Why Olympipe?
- Chainable API — compose steps like pandas or streams
- True multiprocessing — each step is a separate process, GIL-free
- PyTorch-safe — auto-detects torch and switches to
spawncontext to avoid CUDA/DataLoader deadlocks - Type-safe — fully typed, pipeline errors caught at IDE time
- Batteries included — split/gather, time-windows, disk caching, HTTP server source
Basic usage
from olympipe import Pipeline
results = (
Pipeline(range(20))
.task(lambda x: x * 2) # transform
.filter(lambda x: x % 4 == 0) # keep even multiples of 4
.batch(3) # group into lists of 3
.wait_for_result()
)
# [[0, 8, 16], [24, 32, 40]]
Parallel workers
Scale any step horizontally by passing count=N. Workers share the same input queue and output queue — no manual coordination needed.
import time
from olympipe import Pipeline
def slow_io(url: str) -> bytes:
# simulate network call
time.sleep(0.5)
return b"data"
# 200 URLs processed with 20 concurrent workers
results = (
Pipeline(my_urls)
.task(slow_io, count=20)
.wait_for_result()
)
Machine learning inference
Olympipe shines for ML workloads: saturate your GPU(s) with a pipeline that loads data in parallel, batches it, runs inference, and post-processes results — all without a single thread/process management line.
import torch
from torch import nn
from olympipe import Pipeline
class Classifier:
"""One instance per worker process — each gets its own model copy."""
def __init__(self, device: str = "cuda"):
self.device = device
self.model = nn.Sequential(nn.Linear(512, 128), nn.ReLU(), nn.Linear(128, 10))
self.model.load_state_dict(torch.load("weights.pt"))
self.model.eval().to(device)
def predict(self, batch: list) -> list:
with torch.no_grad():
x = torch.stack(batch).to(self.device)
return self.model(x).argmax(dim=1).tolist()
def load_and_preprocess(path: str) -> torch.Tensor:
# CPU-bound preprocessing, runs in parallel with GPU inference
img = load_image(path)
return preprocess(img)
predictions = (
Pipeline(image_paths)
.task(load_and_preprocess, count=4) # 4 CPU workers loading/preprocessing
.batch(64) # feed GPU in batches of 64
.class_task(Classifier, Classifier.predict, ["cuda"]) # 1 GPU worker
.explode(lambda x: x) # flatten back to individual predictions
.wait_for_result()
)
Note: Olympipe detects torch at startup and automatically uses
spawncontext instead offork, preventing CUDA deadlocks. SetOLYMPIPE_FORCE_FORK=1to override.
Stateful workers
Use .class_task() when each worker needs persistent state (model weights, DB connection, accumulator…). The class is instantiated once per worker.
from olympipe import Pipeline
class RunningStats:
def __init__(self):
self.n = 0
self.total = 0.0
def update(self, x: float) -> dict:
self.n += 1
self.total += x
return {"n": self.n, "mean": self.total / self.n}
results = Pipeline(sensor_stream).class_task(RunningStats, RunningStats.update).wait_for_result()
Split & gather
Branch your pipeline into independent streams and merge them back.
from typing import Optional, Tuple
from olympipe import Pipeline
def route(x: int) -> Tuple[Optional[int], Optional[int]]:
return (x, None) if x % 2 == 0 else (None, x)
evens, odds = Pipeline(range(20)).split(route, n=2)
results = (
evens.task(lambda x: x * 10) # multiply evens
.gather(odds.task(lambda x: -x)) # negate odds, then merge
.wait_for_result()
)
HTTP server pipeline
Turn an HTTP endpoint into a pipeline source. Each incoming request becomes a packet; downstream tasks process and respond.
import socket
from olympipe import Pipeline
from olympipe.helpers.server import send_json_response
def handle_request(pair):
conn: socket.socket
data: dict
conn, data = pair
result = heavy_computation(data["payload"])
send_json_response(conn, {"result": result})
return result
Pipeline.server(
[("POST", "/compute", lambda body: body)],
port=8000,
inactivity_timeout=60.0,
).task(handle_request, count=4).wait_for_completion()
LLM inference server (HuggingFace)
Serve a HuggingFace model as an HTTP API. The model loads once per worker process; requests are batched and processed in parallel while the server stays responsive.
import socket
from olympipe import Pipeline
from olympipe.helpers.server import send_json_response
class LLMWorker:
"""Loaded once per worker — keeps the model in GPU memory across requests."""
def __init__(self, model_name: str = "mistralai/Mistral-7B-Instruct-v0.3"):
from transformers import pipeline as hf_pipeline
self.pipe = hf_pipeline(
"text-generation",
model=model_name,
device_map="auto", # spreads across available GPUs
torch_dtype="auto",
)
def generate(self, pair: tuple) -> tuple:
conn: socket.socket
data: dict
conn, data = pair
prompt = data.get("prompt", "")
max_new_tokens = data.get("max_new_tokens", 256)
output = self.pipe(prompt, max_new_tokens=max_new_tokens, do_sample=False)
generated = output[0]["generated_text"][len(prompt):]
send_json_response(conn, {"response": generated, "model": self.pipe.model.name_or_path})
return {"prompt": prompt, "response": generated}
Pipeline.server(
[("POST", "/generate", lambda body: body)],
port=8000,
).class_task(LLMWorker, LLMWorker.generate, ["mistralai/Mistral-7B-Instruct-v0.3"]).wait_for_completion()
Call it:
curl -X POST http://localhost:8000/generate \
-H "Content-Type: application/json" \
-d '{"prompt": "Explain transformers in one sentence:", "max_new_tokens": 128}'
Tip: Scale to multiple GPUs by passing
count=Nto.class_task()— each worker gets its own model replica on a separate device.
Step caching
Cache intermediate results to disk. Reruns skip already-computed steps automatically — great for iterating on the end of a slow pipeline.
import tempfile
from olympipe import Pipeline
def expensive_step_1(x: int) -> int:
time.sleep(1) # simulate heavy computation
return x ** 2
def expensive_step_2(x: int) -> int:
time.sleep(1)
return x + 1
with tempfile.TemporaryDirectory() as cache_dir:
# First run: computes everything, writes .pkl files
results = (
Pipeline(range(100))
.cached_task(expensive_step_1, cache_dir=cache_dir)
.cached_task(expensive_step_2, cache_dir=cache_dir)
.uncache()
.wait_for_result()
)
# Second run: instant — reads from disk, skips all computation
results_again = (
Pipeline(range(100))
.cached_task(expensive_step_1, cache_dir=cache_dir)
.cached_task(expensive_step_2, cache_dir=cache_dir)
.uncache()
.wait_for_result()
)
Temporal batching (streams)
Group items arriving within a time window — ideal for audio frames, sensor feeds, or any real-time stream.
import time
from olympipe import Pipeline
def slow_producer(x: int) -> int:
time.sleep(0.05)
return x
results = (
Pipeline(range(100))
.task(slow_producer)
.temporal_batch(0.5) # collect items for 500ms, then emit as a list
.task(lambda batch: sum(batch))
.wait_for_result()
)
API reference
| Method | Description |
|---|---|
.task(fn, count=1) |
Apply fn to each item; count parallel workers |
.filter(fn=None) |
Keep items where fn(x) is truthy (or non-None) |
.batch(n, complete=True) |
Group into lists of size n |
.explode(fn) |
Flatten: one item → many |
.split(fn, n=2) |
Route items to n independent branches |
.gather(*pipes) |
Merge multiple pipelines into one |
.reduce(acc, fn) |
Fold stream into a single accumulated value |
.temporal_batch(s) |
Group items arriving within s seconds |
.cached_task(fn, cache_dir=…) |
Compute + persist to disk; skip on re-run |
.class_task(Cls, Cls.method) |
Stateful per-worker instance |
.timeout(s) |
Abort if no item arrives within s seconds |
.limit(n) |
Stop after n items pass through |
.debug() |
Print each item as it passes (inspect mid-pipeline) |
Pipeline.server(routes, port=…) |
HTTP server as a pipeline source |
.wait_for_result() |
Block and collect all results as a list |
.wait_for_completion() |
Block until the pipeline finishes (discard output) |
.wait_and_reduce(acc, fn) |
Combine reduce + wait_for_result in one call |
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file olympipe-1.8.3.tar.gz.
File metadata
- Download URL: olympipe-1.8.3.tar.gz
- Upload date:
- Size: 25.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.4 CPython/3.12.13 Linux/5.15.154+
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
12f7466d73961a470d9aebb5dce150e590764edfe312fcec1d40ceeb63192557
|
|
| MD5 |
65566b2107b8f5a93810145f18053e1a
|
|
| BLAKE2b-256 |
c24e20295056b452516d738e8e1656eb9cf91dd7255eeff8d9468311f74170a9
|
File details
Details for the file olympipe-1.8.3-py3-none-any.whl.
File metadata
- Download URL: olympipe-1.8.3-py3-none-any.whl
- Upload date:
- Size: 32.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.4 CPython/3.12.13 Linux/5.15.154+
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
81acfa17660ee95df7160a89f9eafb4aa39c2fc85b33192d2772e62dc65670e1
|
|
| MD5 |
629fa37b8a95b218a0fdbf7fbab693c7
|
|
| BLAKE2b-256 |
13c5061c4b27cd46f66b3fa52ef72b7d115fc3320d22c36f34361f3ceb0f2c9b
|