Skip to main content

A powerful parallel pipelining tool

Project description

Olympipe

coverage status

Zero-boilerplate parallel pipelines for Python.

Turn any iterator into a multi-process pipeline with a single line. Each .task() runs in its own process, bypassing the GIL — no multiprocessing.Pool, no queues, no plumbing.

from olympipe import Pipeline

results = (
    Pipeline(range(1000))
    .task(heavy_compute, count=8)   # 8 parallel workers
    .filter(lambda x: x > 0)
    .batch(32)
    .wait_for_result()
)

Installation

pip install olympipe

Why Olympipe?

  • Chainable API — compose steps like pandas or streams
  • True multiprocessing — each step is a separate process, GIL-free
  • PyTorch-safe — auto-detects torch and switches to spawn context to avoid CUDA/DataLoader deadlocks
  • Type-safe — fully typed, pipeline errors caught at IDE time
  • Batteries included — split/gather, time-windows, disk caching, HTTP server source

Basic usage

from olympipe import Pipeline

results = (
    Pipeline(range(20))
    .task(lambda x: x * 2)           # transform
    .filter(lambda x: x % 4 == 0)    # keep even multiples of 4
    .batch(3)                         # group into lists of 3
    .wait_for_result()
)
# [[0, 8, 16], [24, 32, 40]]

Parallel workers

Scale any step horizontally by passing count=N. Workers share the same input queue and output queue — no manual coordination needed.

import time
from olympipe import Pipeline

def slow_io(url: str) -> bytes:
    # simulate network call
    time.sleep(0.5)
    return b"data"

# 200 URLs processed with 20 concurrent workers
results = (
    Pipeline(my_urls)
    .task(slow_io, count=20)
    .wait_for_result()
)

Machine learning inference

Olympipe shines for ML workloads: saturate your GPU(s) with a pipeline that loads data in parallel, batches it, runs inference, and post-processes results — all without a single thread/process management line.

import torch
from torch import nn
from olympipe import Pipeline

class Classifier:
    """One instance per worker process — each gets its own model copy."""

    def __init__(self, device: str = "cuda"):
        self.device = device
        self.model = nn.Sequential(nn.Linear(512, 128), nn.ReLU(), nn.Linear(128, 10))
        self.model.load_state_dict(torch.load("weights.pt"))
        self.model.eval().to(device)

    def predict(self, batch: list) -> list:
        with torch.no_grad():
            x = torch.stack(batch).to(self.device)
            return self.model(x).argmax(dim=1).tolist()


def load_and_preprocess(path: str) -> torch.Tensor:
    # CPU-bound preprocessing, runs in parallel with GPU inference
    img = load_image(path)
    return preprocess(img)


predictions = (
    Pipeline(image_paths)
    .task(load_and_preprocess, count=4)   # 4 CPU workers loading/preprocessing
    .batch(64)                             # feed GPU in batches of 64
    .class_task(Classifier, Classifier.predict, ["cuda"])  # 1 GPU worker
    .explode(lambda x: x)                 # flatten back to individual predictions
    .wait_for_result()
)

Note: Olympipe detects torch at startup and automatically uses spawn context instead of fork, preventing CUDA deadlocks. Set OLYMPIPE_FORCE_FORK=1 to override.


Stateful workers

Use .class_task() when each worker needs persistent state (model weights, DB connection, accumulator…). The class is instantiated once per worker.

from olympipe import Pipeline

class RunningStats:
    def __init__(self):
        self.n = 0
        self.total = 0.0

    def update(self, x: float) -> dict:
        self.n += 1
        self.total += x
        return {"n": self.n, "mean": self.total / self.n}

results = Pipeline(sensor_stream).class_task(RunningStats, RunningStats.update).wait_for_result()

Split & gather

Branch your pipeline into independent streams and merge them back.

from typing import Optional, Tuple
from olympipe import Pipeline

def route(x: int) -> Tuple[Optional[int], Optional[int]]:
    return (x, None) if x % 2 == 0 else (None, x)

evens, odds = Pipeline(range(20)).split(route, n=2)

results = (
    evens.task(lambda x: x * 10)          # multiply evens
         .gather(odds.task(lambda x: -x)) # negate odds, then merge
         .wait_for_result()
)

HTTP server pipeline

Turn an HTTP endpoint into a pipeline source. Each incoming request becomes a packet; downstream tasks process and respond.

import socket
from olympipe import Pipeline
from olympipe.helpers.server import send_json_response

def handle_request(pair):
    conn: socket.socket
    data: dict
    conn, data = pair

    result = heavy_computation(data["payload"])
    send_json_response(conn, {"result": result})
    return result

Pipeline.server(
    [("POST", "/compute", lambda body: body)],
    port=8000,
    inactivity_timeout=60.0,
).task(handle_request, count=4).wait_for_completion()

LLM inference server (HuggingFace)

Serve a HuggingFace model as an HTTP API. The model loads once per worker process; requests are batched and processed in parallel while the server stays responsive.

import socket
from olympipe import Pipeline
from olympipe.helpers.server import send_json_response


class LLMWorker:
    """Loaded once per worker — keeps the model in GPU memory across requests."""

    def __init__(self, model_name: str = "mistralai/Mistral-7B-Instruct-v0.3"):
        from transformers import pipeline as hf_pipeline

        self.pipe = hf_pipeline(
            "text-generation",
            model=model_name,
            device_map="auto",      # spreads across available GPUs
            torch_dtype="auto",
        )

    def generate(self, pair: tuple) -> tuple:
        conn: socket.socket
        data: dict
        conn, data = pair

        prompt = data.get("prompt", "")
        max_new_tokens = data.get("max_new_tokens", 256)

        output = self.pipe(prompt, max_new_tokens=max_new_tokens, do_sample=False)
        generated = output[0]["generated_text"][len(prompt):]

        send_json_response(conn, {"response": generated, "model": self.pipe.model.name_or_path})
        return {"prompt": prompt, "response": generated}


Pipeline.server(
    [("POST", "/generate", lambda body: body)],
    port=8000,
).class_task(LLMWorker, LLMWorker.generate, ["mistralai/Mistral-7B-Instruct-v0.3"]).wait_for_completion()

Call it:

curl -X POST http://localhost:8000/generate \
     -H "Content-Type: application/json" \
     -d '{"prompt": "Explain transformers in one sentence:", "max_new_tokens": 128}'

Tip: Scale to multiple GPUs by passing count=N to .class_task() — each worker gets its own model replica on a separate device.


Step caching

Cache intermediate results to disk. Reruns skip already-computed steps automatically — great for iterating on the end of a slow pipeline.

import tempfile
from olympipe import Pipeline

def expensive_step_1(x: int) -> int:
    time.sleep(1)  # simulate heavy computation
    return x ** 2

def expensive_step_2(x: int) -> int:
    time.sleep(1)
    return x + 1

with tempfile.TemporaryDirectory() as cache_dir:
    # First run: computes everything, writes .pkl files
    results = (
        Pipeline(range(100))
        .cached_task(expensive_step_1, cache_dir=cache_dir)
        .cached_task(expensive_step_2, cache_dir=cache_dir)
        .uncache()
        .wait_for_result()
    )

    # Second run: instant — reads from disk, skips all computation
    results_again = (
        Pipeline(range(100))
        .cached_task(expensive_step_1, cache_dir=cache_dir)
        .cached_task(expensive_step_2, cache_dir=cache_dir)
        .uncache()
        .wait_for_result()
    )

Temporal batching (streams)

Group items arriving within a time window — ideal for audio frames, sensor feeds, or any real-time stream.

import time
from olympipe import Pipeline

def slow_producer(x: int) -> int:
    time.sleep(0.05)
    return x

results = (
    Pipeline(range(100))
    .task(slow_producer)
    .temporal_batch(0.5)     # collect items for 500ms, then emit as a list
    .task(lambda batch: sum(batch))
    .wait_for_result()
)

API reference

Method Description
.task(fn, count=1) Apply fn to each item; count parallel workers
.filter(fn=None) Keep items where fn(x) is truthy (or non-None)
.batch(n, complete=True) Group into lists of size n
.explode(fn) Flatten: one item → many
.split(fn, n=2) Route items to n independent branches
.gather(*pipes) Merge multiple pipelines into one
.reduce(acc, fn) Fold stream into a single accumulated value
.temporal_batch(s) Group items arriving within s seconds
.cached_task(fn, cache_dir=…) Compute + persist to disk; skip on re-run
.class_task(Cls, Cls.method) Stateful per-worker instance
.timeout(s) Abort if no item arrives within s seconds
.limit(n) Stop after n items pass through
.debug() Print each item as it passes (inspect mid-pipeline)
Pipeline.server(routes, port=…) HTTP server as a pipeline source
.wait_for_result() Block and collect all results as a list
.wait_for_completion() Block until the pipeline finishes (discard output)
.wait_and_reduce(acc, fn) Combine reduce + wait_for_result in one call

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

olympipe-1.8.3.tar.gz (25.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

olympipe-1.8.3-py3-none-any.whl (32.3 kB view details)

Uploaded Python 3

File details

Details for the file olympipe-1.8.3.tar.gz.

File metadata

  • Download URL: olympipe-1.8.3.tar.gz
  • Upload date:
  • Size: 25.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.4 CPython/3.12.13 Linux/5.15.154+

File hashes

Hashes for olympipe-1.8.3.tar.gz
Algorithm Hash digest
SHA256 12f7466d73961a470d9aebb5dce150e590764edfe312fcec1d40ceeb63192557
MD5 65566b2107b8f5a93810145f18053e1a
BLAKE2b-256 c24e20295056b452516d738e8e1656eb9cf91dd7255eeff8d9468311f74170a9

See more details on using hashes here.

File details

Details for the file olympipe-1.8.3-py3-none-any.whl.

File metadata

  • Download URL: olympipe-1.8.3-py3-none-any.whl
  • Upload date:
  • Size: 32.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.4 CPython/3.12.13 Linux/5.15.154+

File hashes

Hashes for olympipe-1.8.3-py3-none-any.whl
Algorithm Hash digest
SHA256 81acfa17660ee95df7160a89f9eafb4aa39c2fc85b33192d2772e62dc65670e1
MD5 629fa37b8a95b218a0fdbf7fbab693c7
BLAKE2b-256 13c5061c4b27cd46f66b3fa52ef72b7d115fc3320d22c36f34361f3ceb0f2c9b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page