Skip to main content

A powerful parallel pipelining tool

Project description

Olympipe

coverage status

Zero-boilerplate parallel pipelines for Python.

📖 Documentation & examples: datatoy.fr/en/olympipe

Turn any iterator into a multi-process pipeline with a single line. Each .task() runs in its own process, bypassing the GIL — no multiprocessing.Pool, no queues, no plumbing.

from olympipe import Pipeline

results = (
    Pipeline(range(1000))
    .task(heavy_compute, count=8)   # 8 parallel workers
    .filter(lambda x: x > 0)
    .batch(32)
    .wait_for_result()
)

Installation

pip install olympipe

Why Olympipe?

  • Chainable API — compose steps like pandas or streams
  • True multiprocessing — each step is a separate process, GIL-free
  • PyTorch-safe — auto-detects torch and switches to spawn context to avoid CUDA/DataLoader deadlocks
  • Type-safe — fully typed, pipeline errors caught at IDE time
  • Batteries included — split/gather, time-windows, disk caching, HTTP server source

Basic usage

from olympipe import Pipeline

results = (
    Pipeline(range(20))
    .task(lambda x: x * 2)           # transform
    .filter(lambda x: x % 4 == 0)    # keep even multiples of 4
    .batch(3)                         # group into lists of 3
    .wait_for_result()
)
# [[0, 8, 16], [24, 32, 40]]

Parallel workers

Scale any step horizontally by passing count=N. Workers share the same input queue and output queue — no manual coordination needed.

import time
from olympipe import Pipeline

def slow_io(url: str) -> bytes:
    # simulate network call
    time.sleep(0.5)
    return b"data"

# 200 URLs processed with 20 concurrent workers
results = (
    Pipeline(my_urls)
    .task(slow_io, count=20)
    .wait_for_result()
)

Machine learning inference

Olympipe shines for ML workloads: saturate your GPU(s) with a pipeline that loads data in parallel, batches it, runs inference, and post-processes results — all without a single thread/process management line.

import torch
from torch import nn
from olympipe import Pipeline

class Classifier:
    """One instance per worker process — each gets its own model copy."""

    def __init__(self, device: str = "cuda"):
        self.device = device
        self.model = nn.Sequential(nn.Linear(512, 128), nn.ReLU(), nn.Linear(128, 10))
        self.model.load_state_dict(torch.load("weights.pt"))
        self.model.eval().to(device)

    def predict(self, batch: list) -> list:
        with torch.no_grad():
            x = torch.stack(batch).to(self.device)
            return self.model(x).argmax(dim=1).tolist()


def load_and_preprocess(path: str) -> torch.Tensor:
    # CPU-bound preprocessing, runs in parallel with GPU inference
    img = load_image(path)
    return preprocess(img)


predictions = (
    Pipeline(image_paths)
    .task(load_and_preprocess, count=4)   # 4 CPU workers loading/preprocessing
    .batch(64)                             # feed GPU in batches of 64
    .class_task(Classifier, Classifier.predict, ["cuda"])  # 1 GPU worker
    .explode(lambda x: x)                 # flatten back to individual predictions
    .wait_for_result()
)

Note: Olympipe detects torch at startup and automatically uses spawn context instead of fork, preventing CUDA deadlocks. Set OLYMPIPE_FORCE_FORK=1 to override.


Stateful workers

Use .class_task() when each worker needs persistent state (model weights, DB connection, accumulator…). The class is instantiated once per worker.

from olympipe import Pipeline

class RunningStats:
    def __init__(self):
        self.n = 0
        self.total = 0.0

    def update(self, x: float) -> dict:
        self.n += 1
        self.total += x
        return {"n": self.n, "mean": self.total / self.n}

results = Pipeline(sensor_stream).class_task(RunningStats, RunningStats.update).wait_for_result()

Split & gather

Branch your pipeline into independent streams and merge them back.

from typing import Optional, Tuple
from olympipe import Pipeline

def route(x: int) -> Tuple[Optional[int], Optional[int]]:
    return (x, None) if x % 2 == 0 else (None, x)

evens, odds = Pipeline(range(20)).split(route, n=2)

results = (
    evens.task(lambda x: x * 10)          # multiply evens
         .gather(odds.task(lambda x: -x)) # negate odds, then merge
         .wait_for_result()
)

HTTP server pipeline

Turn an HTTP endpoint into a pipeline source. Each incoming request becomes a packet; downstream tasks process and respond.

import socket
from olympipe import Pipeline
from olympipe.helpers.server import send_json_response

def handle_request(pair):
    conn: socket.socket
    data: dict
    conn, data = pair

    result = heavy_computation(data["payload"])
    send_json_response(conn, {"result": result})
    return result

Pipeline.server(
    [("POST", "/compute", lambda body: body)],
    port=8000,
    inactivity_timeout=60.0,
).task(handle_request, count=4).wait_for_completion()

LLM inference server (HuggingFace)

Serve a HuggingFace model as an HTTP API. The model loads once per worker process; requests are batched and processed in parallel while the server stays responsive.

import socket
from olympipe import Pipeline
from olympipe.helpers.server import send_json_response


class LLMWorker:
    """Loaded once per worker — keeps the model in GPU memory across requests."""

    def __init__(self, model_name: str = "mistralai/Mistral-7B-Instruct-v0.3"):
        from transformers import pipeline as hf_pipeline

        self.pipe = hf_pipeline(
            "text-generation",
            model=model_name,
            device_map="auto",      # spreads across available GPUs
            torch_dtype="auto",
        )

    def generate(self, pair: tuple) -> tuple:
        conn: socket.socket
        data: dict
        conn, data = pair

        prompt = data.get("prompt", "")
        max_new_tokens = data.get("max_new_tokens", 256)

        output = self.pipe(prompt, max_new_tokens=max_new_tokens, do_sample=False)
        generated = output[0]["generated_text"][len(prompt):]

        send_json_response(conn, {"response": generated, "model": self.pipe.model.name_or_path})
        return {"prompt": prompt, "response": generated}


Pipeline.server(
    [("POST", "/generate", lambda body: body)],
    port=8000,
).class_task(LLMWorker, LLMWorker.generate, ["mistralai/Mistral-7B-Instruct-v0.3"]).wait_for_completion()

Call it:

curl -X POST http://localhost:8000/generate \
     -H "Content-Type: application/json" \
     -d '{"prompt": "Explain transformers in one sentence:", "max_new_tokens": 128}'

Tip: Scale to multiple GPUs by passing count=N to .class_task() — each worker gets its own model replica on a separate device.


Step caching

Cache intermediate results to disk. Reruns skip already-computed steps automatically — great for iterating on the end of a slow pipeline.

import tempfile
from olympipe import Pipeline

def expensive_step_1(x: int) -> int:
    time.sleep(1)  # simulate heavy computation
    return x ** 2

def expensive_step_2(x: int) -> int:
    time.sleep(1)
    return x + 1

with tempfile.TemporaryDirectory() as cache_dir:
    # First run: computes everything, writes .pkl files
    results = (
        Pipeline(range(100))
        .cached_task(expensive_step_1, cache_dir=cache_dir)
        .cached_task(expensive_step_2, cache_dir=cache_dir)
        .uncache()
        .wait_for_result()
    )

    # Second run: instant — reads from disk, skips all computation
    results_again = (
        Pipeline(range(100))
        .cached_task(expensive_step_1, cache_dir=cache_dir)
        .cached_task(expensive_step_2, cache_dir=cache_dir)
        .uncache()
        .wait_for_result()
    )

Temporal batching (streams)

Group items arriving within a time window — ideal for audio frames, sensor feeds, or any real-time stream.

import time
from olympipe import Pipeline

def slow_producer(x: int) -> int:
    time.sleep(0.05)
    return x

results = (
    Pipeline(range(100))
    .task(slow_producer)
    .temporal_batch(0.5)     # collect items for 500ms, then emit as a list
    .task(lambda batch: sum(batch))
    .wait_for_result()
)

API reference

Method Description
.task(fn, count=1) Apply fn to each item; count parallel workers
.filter(fn=None) Keep items where fn(x) is truthy (or non-None)
.batch(n, complete=True) Group into lists of size n
.explode(fn) Flatten: one item → many
.split(fn, n=2) Route items to n independent branches
.gather(*pipes) Merge multiple pipelines into one
.reduce(acc, fn) Fold stream into a single accumulated value
.temporal_batch(s) Group items arriving within s seconds
.cached_task(fn, cache_dir=…) Compute + persist to disk; skip on re-run
.class_task(Cls, Cls.method) Stateful per-worker instance
.timeout(s) Abort if no item arrives within s seconds
.limit(n) Stop after n items pass through
.debug() Print each item as it passes (inspect mid-pipeline)
Pipeline.server(routes, port=…) HTTP server as a pipeline source
.wait_for_result() Block and collect all results as a list
.wait_for_completion() Block until the pipeline finishes (discard output)
.wait_and_reduce(acc, fn) Combine reduce + wait_for_result in one call

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

olympipe-1.8.8.tar.gz (25.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

olympipe-1.8.8-py3-none-any.whl (32.6 kB view details)

Uploaded Python 3

File details

Details for the file olympipe-1.8.8.tar.gz.

File metadata

  • Download URL: olympipe-1.8.8.tar.gz
  • Upload date:
  • Size: 25.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.4 CPython/3.12.13 Linux/5.15.154+

File hashes

Hashes for olympipe-1.8.8.tar.gz
Algorithm Hash digest
SHA256 e19c3c05cb7cbf660df56f507776106a57a429de5361c0ac15f37330c94d43b2
MD5 eeb14d5cd641fcd8c2b5450863714d4d
BLAKE2b-256 ac9673c7501c633c452001fc66126a205d5e12d53e58402532f9e747eaa08e03

See more details on using hashes here.

File details

Details for the file olympipe-1.8.8-py3-none-any.whl.

File metadata

  • Download URL: olympipe-1.8.8-py3-none-any.whl
  • Upload date:
  • Size: 32.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.4 CPython/3.12.13 Linux/5.15.154+

File hashes

Hashes for olympipe-1.8.8-py3-none-any.whl
Algorithm Hash digest
SHA256 f130ccc974bf6372a42d5aca6578909b463d3e020c0df9aeb176e74609791330
MD5 40c0b3675641665e999e0031ccdcf65c
BLAKE2b-256 c7a2c8ccf33ba0fa5e367fe6f70fd3b0bb0e1a6e396978b505f4f3c1045c4381

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page