Skip to main content

A powerful parallel pipelining tool

Project description

Olympipe

coverage status

Zero-boilerplate parallel pipelines for Python.

Turn any iterator into a multi-process pipeline with a single line. Each .task() runs in its own process, bypassing the GIL — no multiprocessing.Pool, no queues, no plumbing.

from olympipe import Pipeline

results = (
    Pipeline(range(1000))
    .task(heavy_compute, count=8)   # 8 parallel workers
    .filter(lambda x: x > 0)
    .batch(32)
    .wait_for_result()
)

Installation

pip install olympipe

Why Olympipe?

  • Chainable API — compose steps like pandas or streams
  • True multiprocessing — each step is a separate process, GIL-free
  • PyTorch-safe — auto-detects torch and switches to spawn context to avoid CUDA/DataLoader deadlocks
  • Type-safe — fully typed, pipeline errors caught at IDE time
  • Batteries included — split/gather, time-windows, disk caching, HTTP server source

Basic usage

from olympipe import Pipeline

results = (
    Pipeline(range(20))
    .task(lambda x: x * 2)           # transform
    .filter(lambda x: x % 4 == 0)    # keep even multiples of 4
    .batch(3)                         # group into lists of 3
    .wait_for_result()
)
# [[0, 8, 16], [24, 32, 40]]

Parallel workers

Scale any step horizontally by passing count=N. Workers share the same input queue and output queue — no manual coordination needed.

import time
from olympipe import Pipeline

def slow_io(url: str) -> bytes:
    # simulate network call
    time.sleep(0.5)
    return b"data"

# 200 URLs processed with 20 concurrent workers
results = (
    Pipeline(my_urls)
    .task(slow_io, count=20)
    .wait_for_result()
)

Machine learning inference

Olympipe shines for ML workloads: saturate your GPU(s) with a pipeline that loads data in parallel, batches it, runs inference, and post-processes results — all without a single thread/process management line.

import torch
from torch import nn
from olympipe import Pipeline

class Classifier:
    """One instance per worker process — each gets its own model copy."""

    def __init__(self, device: str = "cuda"):
        self.device = device
        self.model = nn.Sequential(nn.Linear(512, 128), nn.ReLU(), nn.Linear(128, 10))
        self.model.load_state_dict(torch.load("weights.pt"))
        self.model.eval().to(device)

    def predict(self, batch: list) -> list:
        with torch.no_grad():
            x = torch.stack(batch).to(self.device)
            return self.model(x).argmax(dim=1).tolist()


def load_and_preprocess(path: str) -> torch.Tensor:
    # CPU-bound preprocessing, runs in parallel with GPU inference
    img = load_image(path)
    return preprocess(img)


predictions = (
    Pipeline(image_paths)
    .task(load_and_preprocess, count=4)   # 4 CPU workers loading/preprocessing
    .batch(64)                             # feed GPU in batches of 64
    .class_task(Classifier, Classifier.predict, ["cuda"])  # 1 GPU worker
    .explode(lambda x: x)                 # flatten back to individual predictions
    .wait_for_result()
)

Note: Olympipe detects torch at startup and automatically uses spawn context instead of fork, preventing CUDA deadlocks. Set OLYMPIPE_FORCE_FORK=1 to override.


Stateful workers

Use .class_task() when each worker needs persistent state (model weights, DB connection, accumulator…). The class is instantiated once per worker.

from olympipe import Pipeline

class RunningStats:
    def __init__(self):
        self.n = 0
        self.total = 0.0

    def update(self, x: float) -> dict:
        self.n += 1
        self.total += x
        return {"n": self.n, "mean": self.total / self.n}

results = Pipeline(sensor_stream).class_task(RunningStats, RunningStats.update).wait_for_result()

Split & gather

Branch your pipeline into independent streams and merge them back.

from typing import Optional, Tuple
from olympipe import Pipeline

def route(x: int) -> Tuple[Optional[int], Optional[int]]:
    return (x, None) if x % 2 == 0 else (None, x)

evens, odds = Pipeline(range(20)).split(route, n=2)

results = (
    evens.task(lambda x: x * 10)          # multiply evens
         .gather(odds.task(lambda x: -x)) # negate odds, then merge
         .wait_for_result()
)

HTTP server pipeline

Turn an HTTP endpoint into a pipeline source. Each incoming request becomes a packet; downstream tasks process and respond.

import socket
from olympipe import Pipeline
from olympipe.helpers.server import send_json_response

def handle_request(pair):
    conn: socket.socket
    data: dict
    conn, data = pair

    result = heavy_computation(data["payload"])
    send_json_response(conn, {"result": result})
    return result

Pipeline.server(
    [("POST", "/compute", lambda body: body)],
    port=8000,
    inactivity_timeout=60.0,
).task(handle_request, count=4).wait_for_completion()

LLM inference server (HuggingFace)

Serve a HuggingFace model as an HTTP API. The model loads once per worker process; requests are batched and processed in parallel while the server stays responsive.

import socket
from olympipe import Pipeline
from olympipe.helpers.server import send_json_response


class LLMWorker:
    """Loaded once per worker — keeps the model in GPU memory across requests."""

    def __init__(self, model_name: str = "mistralai/Mistral-7B-Instruct-v0.3"):
        from transformers import pipeline as hf_pipeline

        self.pipe = hf_pipeline(
            "text-generation",
            model=model_name,
            device_map="auto",      # spreads across available GPUs
            torch_dtype="auto",
        )

    def generate(self, pair: tuple) -> tuple:
        conn: socket.socket
        data: dict
        conn, data = pair

        prompt = data.get("prompt", "")
        max_new_tokens = data.get("max_new_tokens", 256)

        output = self.pipe(prompt, max_new_tokens=max_new_tokens, do_sample=False)
        generated = output[0]["generated_text"][len(prompt):]

        send_json_response(conn, {"response": generated, "model": self.pipe.model.name_or_path})
        return {"prompt": prompt, "response": generated}


Pipeline.server(
    [("POST", "/generate", lambda body: body)],
    port=8000,
).class_task(LLMWorker, LLMWorker.generate, ["mistralai/Mistral-7B-Instruct-v0.3"]).wait_for_completion()

Call it:

curl -X POST http://localhost:8000/generate \
     -H "Content-Type: application/json" \
     -d '{"prompt": "Explain transformers in one sentence:", "max_new_tokens": 128}'

Tip: Scale to multiple GPUs by passing count=N to .class_task() — each worker gets its own model replica on a separate device.


Step caching

Cache intermediate results to disk. Reruns skip already-computed steps automatically — great for iterating on the end of a slow pipeline.

import tempfile
from olympipe import Pipeline

def expensive_step_1(x: int) -> int:
    time.sleep(1)  # simulate heavy computation
    return x ** 2

def expensive_step_2(x: int) -> int:
    time.sleep(1)
    return x + 1

with tempfile.TemporaryDirectory() as cache_dir:
    # First run: computes everything, writes .pkl files
    results = (
        Pipeline(range(100))
        .cached_task(expensive_step_1, cache_dir=cache_dir)
        .cached_task(expensive_step_2, cache_dir=cache_dir)
        .uncache()
        .wait_for_result()
    )

    # Second run: instant — reads from disk, skips all computation
    results_again = (
        Pipeline(range(100))
        .cached_task(expensive_step_1, cache_dir=cache_dir)
        .cached_task(expensive_step_2, cache_dir=cache_dir)
        .uncache()
        .wait_for_result()
    )

Temporal batching (streams)

Group items arriving within a time window — ideal for audio frames, sensor feeds, or any real-time stream.

import time
from olympipe import Pipeline

def slow_producer(x: int) -> int:
    time.sleep(0.05)
    return x

results = (
    Pipeline(range(100))
    .task(slow_producer)
    .temporal_batch(0.5)     # collect items for 500ms, then emit as a list
    .task(lambda batch: sum(batch))
    .wait_for_result()
)

API reference

Method Description
.task(fn, count=1) Apply fn to each item; count parallel workers
.filter(fn=None) Keep items where fn(x) is truthy (or non-None)
.batch(n, complete=True) Group into lists of size n
.explode(fn) Flatten: one item → many
.split(fn, n=2) Route items to n independent branches
.gather(*pipes) Merge multiple pipelines into one
.reduce(acc, fn) Fold stream into a single accumulated value
.temporal_batch(s) Group items arriving within s seconds
.cached_task(fn, cache_dir=…) Compute + persist to disk; skip on re-run
.class_task(Cls, Cls.method) Stateful per-worker instance
.timeout(s) Abort if no item arrives within s seconds
.limit(n) Stop after n items pass through
.debug() Print each item as it passes (inspect mid-pipeline)
Pipeline.server(routes, port=…) HTTP server as a pipeline source
.wait_for_result() Block and collect all results as a list
.wait_for_completion() Block until the pipeline finishes (discard output)
.wait_and_reduce(acc, fn) Combine reduce + wait_for_result in one call

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

olympipe-1.8.4.tar.gz (25.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

olympipe-1.8.4-py3-none-any.whl (33.0 kB view details)

Uploaded Python 3

File details

Details for the file olympipe-1.8.4.tar.gz.

File metadata

  • Download URL: olympipe-1.8.4.tar.gz
  • Upload date:
  • Size: 25.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.4 CPython/3.12.13 Linux/5.15.154+

File hashes

Hashes for olympipe-1.8.4.tar.gz
Algorithm Hash digest
SHA256 d702a00518785b4becb2c0c727c13f646d1d830b1f198464aad0cc8eec0ddb88
MD5 fb348689c60fd1dfb306a0280d4bf840
BLAKE2b-256 27a771018ab62fa35f0d4a602c53ea2444ad73aaca10c5f4decced48f5050f45

See more details on using hashes here.

File details

Details for the file olympipe-1.8.4-py3-none-any.whl.

File metadata

  • Download URL: olympipe-1.8.4-py3-none-any.whl
  • Upload date:
  • Size: 33.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.4 CPython/3.12.13 Linux/5.15.154+

File hashes

Hashes for olympipe-1.8.4-py3-none-any.whl
Algorithm Hash digest
SHA256 69dc3cee14906738af5bc572652e14e25a184e192fe2062cd74da028f5e0bf87
MD5 6849a6eb30894cfd7c0947ac16eb3bba
BLAKE2b-256 d6ba13f86c01891fbf4d9707d7928a3f461b7d87b59c4fa952bee596a3d346bf

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page