Skip to main content

A powerful parallel pipelining tool

Project description

Olympipe

coverage status

Zero-boilerplate parallel pipelines for Python.

Turn any iterator into a multi-process pipeline with a single line. Each .task() runs in its own process, bypassing the GIL — no multiprocessing.Pool, no queues, no plumbing.

from olympipe import Pipeline

results = (
    Pipeline(range(1000))
    .task(heavy_compute, count=8)   # 8 parallel workers
    .filter(lambda x: x > 0)
    .batch(32)
    .wait_for_result()
)

Installation

pip install olympipe

Why Olympipe?

  • Chainable API — compose steps like pandas or streams
  • True multiprocessing — each step is a separate process, GIL-free
  • PyTorch-safe — auto-detects torch and switches to spawn context to avoid CUDA/DataLoader deadlocks
  • Type-safe — fully typed, pipeline errors caught at IDE time
  • Batteries included — split/gather, time-windows, disk caching, HTTP server source

Basic usage

from olympipe import Pipeline

results = (
    Pipeline(range(20))
    .task(lambda x: x * 2)           # transform
    .filter(lambda x: x % 4 == 0)    # keep even multiples of 4
    .batch(3)                         # group into lists of 3
    .wait_for_result()
)
# [[0, 8, 16], [24, 32, 40]]

Parallel workers

Scale any step horizontally by passing count=N. Workers share the same input queue and output queue — no manual coordination needed.

import time
from olympipe import Pipeline

def slow_io(url: str) -> bytes:
    # simulate network call
    time.sleep(0.5)
    return b"data"

# 200 URLs processed with 20 concurrent workers
results = (
    Pipeline(my_urls)
    .task(slow_io, count=20)
    .wait_for_result()
)

Machine learning inference

Olympipe shines for ML workloads: saturate your GPU(s) with a pipeline that loads data in parallel, batches it, runs inference, and post-processes results — all without a single thread/process management line.

import torch
from torch import nn
from olympipe import Pipeline

class Classifier:
    """One instance per worker process — each gets its own model copy."""

    def __init__(self, device: str = "cuda"):
        self.device = device
        self.model = nn.Sequential(nn.Linear(512, 128), nn.ReLU(), nn.Linear(128, 10))
        self.model.load_state_dict(torch.load("weights.pt"))
        self.model.eval().to(device)

    def predict(self, batch: list) -> list:
        with torch.no_grad():
            x = torch.stack(batch).to(self.device)
            return self.model(x).argmax(dim=1).tolist()


def load_and_preprocess(path: str) -> torch.Tensor:
    # CPU-bound preprocessing, runs in parallel with GPU inference
    img = load_image(path)
    return preprocess(img)


predictions = (
    Pipeline(image_paths)
    .task(load_and_preprocess, count=4)   # 4 CPU workers loading/preprocessing
    .batch(64)                             # feed GPU in batches of 64
    .class_task(Classifier, Classifier.predict, ["cuda"])  # 1 GPU worker
    .explode(lambda x: x)                 # flatten back to individual predictions
    .wait_for_result()
)

Note: Olympipe detects torch at startup and automatically uses spawn context instead of fork, preventing CUDA deadlocks. Set OLYMPIPE_FORCE_FORK=1 to override.


Stateful workers

Use .class_task() when each worker needs persistent state (model weights, DB connection, accumulator…). The class is instantiated once per worker.

from olympipe import Pipeline

class RunningStats:
    def __init__(self):
        self.n = 0
        self.total = 0.0

    def update(self, x: float) -> dict:
        self.n += 1
        self.total += x
        return {"n": self.n, "mean": self.total / self.n}

results = Pipeline(sensor_stream).class_task(RunningStats, RunningStats.update).wait_for_result()

Split & gather

Branch your pipeline into independent streams and merge them back.

from typing import Optional, Tuple
from olympipe import Pipeline

def route(x: int) -> Tuple[Optional[int], Optional[int]]:
    return (x, None) if x % 2 == 0 else (None, x)

evens, odds = Pipeline(range(20)).split(route, n=2)

results = (
    evens.task(lambda x: x * 10)          # multiply evens
         .gather(odds.task(lambda x: -x)) # negate odds, then merge
         .wait_for_result()
)

HTTP server pipeline

Turn an HTTP endpoint into a pipeline source. Each incoming request becomes a packet; downstream tasks process and respond.

import socket
from olympipe import Pipeline
from olympipe.helpers.server import send_json_response

def handle_request(pair):
    conn: socket.socket
    data: dict
    conn, data = pair

    result = heavy_computation(data["payload"])
    send_json_response(conn, {"result": result})
    return result

Pipeline.server(
    [("POST", "/compute", lambda body: body)],
    port=8000,
    inactivity_timeout=60.0,
).task(handle_request, count=4).wait_for_completion()

LLM inference server (HuggingFace)

Serve a HuggingFace model as an HTTP API. The model loads once per worker process; requests are batched and processed in parallel while the server stays responsive.

import socket
from olympipe import Pipeline
from olympipe.helpers.server import send_json_response


class LLMWorker:
    """Loaded once per worker — keeps the model in GPU memory across requests."""

    def __init__(self, model_name: str = "mistralai/Mistral-7B-Instruct-v0.3"):
        from transformers import pipeline as hf_pipeline

        self.pipe = hf_pipeline(
            "text-generation",
            model=model_name,
            device_map="auto",      # spreads across available GPUs
            torch_dtype="auto",
        )

    def generate(self, pair: tuple) -> tuple:
        conn: socket.socket
        data: dict
        conn, data = pair

        prompt = data.get("prompt", "")
        max_new_tokens = data.get("max_new_tokens", 256)

        output = self.pipe(prompt, max_new_tokens=max_new_tokens, do_sample=False)
        generated = output[0]["generated_text"][len(prompt):]

        send_json_response(conn, {"response": generated, "model": self.pipe.model.name_or_path})
        return {"prompt": prompt, "response": generated}


Pipeline.server(
    [("POST", "/generate", lambda body: body)],
    port=8000,
).class_task(LLMWorker, LLMWorker.generate, ["mistralai/Mistral-7B-Instruct-v0.3"]).wait_for_completion()

Call it:

curl -X POST http://localhost:8000/generate \
     -H "Content-Type: application/json" \
     -d '{"prompt": "Explain transformers in one sentence:", "max_new_tokens": 128}'

Tip: Scale to multiple GPUs by passing count=N to .class_task() — each worker gets its own model replica on a separate device.


Step caching

Cache intermediate results to disk. Reruns skip already-computed steps automatically — great for iterating on the end of a slow pipeline.

import tempfile
from olympipe import Pipeline

def expensive_step_1(x: int) -> int:
    time.sleep(1)  # simulate heavy computation
    return x ** 2

def expensive_step_2(x: int) -> int:
    time.sleep(1)
    return x + 1

with tempfile.TemporaryDirectory() as cache_dir:
    # First run: computes everything, writes .pkl files
    results = (
        Pipeline(range(100))
        .cached_task(expensive_step_1, cache_dir=cache_dir)
        .cached_task(expensive_step_2, cache_dir=cache_dir)
        .uncache()
        .wait_for_result()
    )

    # Second run: instant — reads from disk, skips all computation
    results_again = (
        Pipeline(range(100))
        .cached_task(expensive_step_1, cache_dir=cache_dir)
        .cached_task(expensive_step_2, cache_dir=cache_dir)
        .uncache()
        .wait_for_result()
    )

Temporal batching (streams)

Group items arriving within a time window — ideal for audio frames, sensor feeds, or any real-time stream.

import time
from olympipe import Pipeline

def slow_producer(x: int) -> int:
    time.sleep(0.05)
    return x

results = (
    Pipeline(range(100))
    .task(slow_producer)
    .temporal_batch(0.5)     # collect items for 500ms, then emit as a list
    .task(lambda batch: sum(batch))
    .wait_for_result()
)

API reference

Method Description
.task(fn, count=1) Apply fn to each item; count parallel workers
.filter(fn=None) Keep items where fn(x) is truthy (or non-None)
.batch(n, complete=True) Group into lists of size n
.explode(fn) Flatten: one item → many
.split(fn, n=2) Route items to n independent branches
.gather(*pipes) Merge multiple pipelines into one
.reduce(acc, fn) Fold stream into a single accumulated value
.temporal_batch(s) Group items arriving within s seconds
.cached_task(fn, cache_dir=…) Compute + persist to disk; skip on re-run
.class_task(Cls, Cls.method) Stateful per-worker instance
.timeout(s) Abort if no item arrives within s seconds
.limit(n) Stop after n items pass through
.debug() Print each item as it passes (inspect mid-pipeline)
Pipeline.server(routes, port=…) HTTP server as a pipeline source
.wait_for_result() Block and collect all results as a list
.wait_for_completion() Block until the pipeline finishes (discard output)
.wait_and_reduce(acc, fn) Combine reduce + wait_for_result in one call

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

olympipe-1.8.2.tar.gz (25.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

olympipe-1.8.2-py3-none-any.whl (32.4 kB view details)

Uploaded Python 3

File details

Details for the file olympipe-1.8.2.tar.gz.

File metadata

  • Download URL: olympipe-1.8.2.tar.gz
  • Upload date:
  • Size: 25.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.4 CPython/3.12.13 Linux/5.15.154+

File hashes

Hashes for olympipe-1.8.2.tar.gz
Algorithm Hash digest
SHA256 6a865c330c21ab28242d4beb2996321a6a6613c606c046efaf910a0dbf42a86f
MD5 91bbc28eb063a15d22e80624225ece7a
BLAKE2b-256 6b7efc8defa3b7b9ab89b371c553648131ad6e05dffe536eda98f85f25a93445

See more details on using hashes here.

File details

Details for the file olympipe-1.8.2-py3-none-any.whl.

File metadata

  • Download URL: olympipe-1.8.2-py3-none-any.whl
  • Upload date:
  • Size: 32.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.4 CPython/3.12.13 Linux/5.15.154+

File hashes

Hashes for olympipe-1.8.2-py3-none-any.whl
Algorithm Hash digest
SHA256 aaa8ff02cb5fc683edeaea69790b781ca69e94ec3d8c1b3f6831f6daa5eebc0f
MD5 b48e9b406d55c4594e0760276de615b8
BLAKE2b-256 444065801cb1a4380e38ccd3d8a1517ae0429b059c93a6ff611892c6ac034eba

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page