Skip to main content

A powerful parallel pipelining tool

Project description

Olympipe

coverage status

Zero-boilerplate parallel pipelines for Python.

Turn any iterator into a multi-process pipeline with a single line. Each .task() runs in its own process, bypassing the GIL — no multiprocessing.Pool, no queues, no plumbing.

from olympipe import Pipeline

results = (
    Pipeline(range(1000))
    .task(heavy_compute, count=8)   # 8 parallel workers
    .filter(lambda x: x > 0)
    .batch(32)
    .wait_for_result()
)

Installation

pip install olympipe

Why Olympipe?

  • Chainable API — compose steps like pandas or streams
  • True multiprocessing — each step is a separate process, GIL-free
  • PyTorch-safe — auto-detects torch and switches to spawn context to avoid CUDA/DataLoader deadlocks
  • Type-safe — fully typed, pipeline errors caught at IDE time
  • Batteries included — split/gather, time-windows, disk caching, HTTP server source

Basic usage

from olympipe import Pipeline

results = (
    Pipeline(range(20))
    .task(lambda x: x * 2)           # transform
    .filter(lambda x: x % 4 == 0)    # keep even multiples of 4
    .batch(3)                         # group into lists of 3
    .wait_for_result()
)
# [[0, 8, 16], [24, 32, 40]]

Parallel workers

Scale any step horizontally by passing count=N. Workers share the same input queue and output queue — no manual coordination needed.

import time
from olympipe import Pipeline

def slow_io(url: str) -> bytes:
    # simulate network call
    time.sleep(0.5)
    return b"data"

# 200 URLs processed with 20 concurrent workers
results = (
    Pipeline(my_urls)
    .task(slow_io, count=20)
    .wait_for_result()
)

Machine learning inference

Olympipe shines for ML workloads: saturate your GPU(s) with a pipeline that loads data in parallel, batches it, runs inference, and post-processes results — all without a single thread/process management line.

import torch
from torch import nn
from olympipe import Pipeline

class Classifier:
    """One instance per worker process — each gets its own model copy."""

    def __init__(self, device: str = "cuda"):
        self.device = device
        self.model = nn.Sequential(nn.Linear(512, 128), nn.ReLU(), nn.Linear(128, 10))
        self.model.load_state_dict(torch.load("weights.pt"))
        self.model.eval().to(device)

    def predict(self, batch: list) -> list:
        with torch.no_grad():
            x = torch.stack(batch).to(self.device)
            return self.model(x).argmax(dim=1).tolist()


def load_and_preprocess(path: str) -> torch.Tensor:
    # CPU-bound preprocessing, runs in parallel with GPU inference
    img = load_image(path)
    return preprocess(img)


predictions = (
    Pipeline(image_paths)
    .task(load_and_preprocess, count=4)   # 4 CPU workers loading/preprocessing
    .batch(64)                             # feed GPU in batches of 64
    .class_task(Classifier, Classifier.predict, ["cuda"])  # 1 GPU worker
    .explode(lambda x: x)                 # flatten back to individual predictions
    .wait_for_result()
)

Note: Olympipe detects torch at startup and automatically uses spawn context instead of fork, preventing CUDA deadlocks. Set OLYMPIPE_FORCE_FORK=1 to override.


Stateful workers

Use .class_task() when each worker needs persistent state (model weights, DB connection, accumulator…). The class is instantiated once per worker.

from olympipe import Pipeline

class RunningStats:
    def __init__(self):
        self.n = 0
        self.total = 0.0

    def update(self, x: float) -> dict:
        self.n += 1
        self.total += x
        return {"n": self.n, "mean": self.total / self.n}

results = Pipeline(sensor_stream).class_task(RunningStats, RunningStats.update).wait_for_result()

Split & gather

Branch your pipeline into independent streams and merge them back.

from typing import Optional, Tuple
from olympipe import Pipeline

def route(x: int) -> Tuple[Optional[int], Optional[int]]:
    return (x, None) if x % 2 == 0 else (None, x)

evens, odds = Pipeline(range(20)).split(route, n=2)

results = (
    evens.task(lambda x: x * 10)          # multiply evens
         .gather(odds.task(lambda x: -x)) # negate odds, then merge
         .wait_for_result()
)

HTTP server pipeline

Turn an HTTP endpoint into a pipeline source. Each incoming request becomes a packet; downstream tasks process and respond.

import socket
from olympipe import Pipeline
from olympipe.helpers.server import send_json_response

def handle_request(pair):
    conn: socket.socket
    data: dict
    conn, data = pair

    result = heavy_computation(data["payload"])
    send_json_response(conn, {"result": result})
    return result

Pipeline.server(
    [("POST", "/compute", lambda body: body)],
    port=8000,
    inactivity_timeout=60.0,
).task(handle_request, count=4).wait_for_completion()

LLM inference server (HuggingFace)

Serve a HuggingFace model as an HTTP API. The model loads once per worker process; requests are batched and processed in parallel while the server stays responsive.

import socket
from olympipe import Pipeline
from olympipe.helpers.server import send_json_response


class LLMWorker:
    """Loaded once per worker — keeps the model in GPU memory across requests."""

    def __init__(self, model_name: str = "mistralai/Mistral-7B-Instruct-v0.3"):
        from transformers import pipeline as hf_pipeline

        self.pipe = hf_pipeline(
            "text-generation",
            model=model_name,
            device_map="auto",      # spreads across available GPUs
            torch_dtype="auto",
        )

    def generate(self, pair: tuple) -> tuple:
        conn: socket.socket
        data: dict
        conn, data = pair

        prompt = data.get("prompt", "")
        max_new_tokens = data.get("max_new_tokens", 256)

        output = self.pipe(prompt, max_new_tokens=max_new_tokens, do_sample=False)
        generated = output[0]["generated_text"][len(prompt):]

        send_json_response(conn, {"response": generated, "model": self.pipe.model.name_or_path})
        return {"prompt": prompt, "response": generated}


Pipeline.server(
    [("POST", "/generate", lambda body: body)],
    port=8000,
).class_task(LLMWorker, LLMWorker.generate, ["mistralai/Mistral-7B-Instruct-v0.3"]).wait_for_completion()

Call it:

curl -X POST http://localhost:8000/generate \
     -H "Content-Type: application/json" \
     -d '{"prompt": "Explain transformers in one sentence:", "max_new_tokens": 128}'

Tip: Scale to multiple GPUs by passing count=N to .class_task() — each worker gets its own model replica on a separate device.


Step caching

Cache intermediate results to disk. Reruns skip already-computed steps automatically — great for iterating on the end of a slow pipeline.

import tempfile
from olympipe import Pipeline

def expensive_step_1(x: int) -> int:
    time.sleep(1)  # simulate heavy computation
    return x ** 2

def expensive_step_2(x: int) -> int:
    time.sleep(1)
    return x + 1

with tempfile.TemporaryDirectory() as cache_dir:
    # First run: computes everything, writes .pkl files
    results = (
        Pipeline(range(100))
        .cached_task(expensive_step_1, cache_dir=cache_dir)
        .cached_task(expensive_step_2, cache_dir=cache_dir)
        .uncache()
        .wait_for_result()
    )

    # Second run: instant — reads from disk, skips all computation
    results_again = (
        Pipeline(range(100))
        .cached_task(expensive_step_1, cache_dir=cache_dir)
        .cached_task(expensive_step_2, cache_dir=cache_dir)
        .uncache()
        .wait_for_result()
    )

Temporal batching (streams)

Group items arriving within a time window — ideal for audio frames, sensor feeds, or any real-time stream.

import time
from olympipe import Pipeline

def slow_producer(x: int) -> int:
    time.sleep(0.05)
    return x

results = (
    Pipeline(range(100))
    .task(slow_producer)
    .temporal_batch(0.5)     # collect items for 500ms, then emit as a list
    .task(lambda batch: sum(batch))
    .wait_for_result()
)

API reference

Method Description
.task(fn, count=1) Apply fn to each item; count parallel workers
.filter(fn=None) Keep items where fn(x) is truthy (or non-None)
.batch(n, complete=True) Group into lists of size n
.explode(fn) Flatten: one item → many
.split(fn, n=2) Route items to n independent branches
.gather(*pipes) Merge multiple pipelines into one
.reduce(acc, fn) Fold stream into a single accumulated value
.temporal_batch(s) Group items arriving within s seconds
.cached_task(fn, cache_dir=…) Compute + persist to disk; skip on re-run
.class_task(Cls, Cls.method) Stateful per-worker instance
.timeout(s) Abort if no item arrives within s seconds
.limit(n) Stop after n items pass through
.debug() Print each item as it passes (inspect mid-pipeline)
Pipeline.server(routes, port=…) HTTP server as a pipeline source
.wait_for_result() Block and collect all results as a list
.wait_for_completion() Block until the pipeline finishes (discard output)
.wait_and_reduce(acc, fn) Combine reduce + wait_for_result in one call

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

olympipe-1.8.6.tar.gz (25.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

olympipe-1.8.6-py3-none-any.whl (32.6 kB view details)

Uploaded Python 3

File details

Details for the file olympipe-1.8.6.tar.gz.

File metadata

  • Download URL: olympipe-1.8.6.tar.gz
  • Upload date:
  • Size: 25.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.4 CPython/3.12.13 Linux/5.15.154+

File hashes

Hashes for olympipe-1.8.6.tar.gz
Algorithm Hash digest
SHA256 709a313ffbccf78982c8a28a82dcfa53de4fd7185fa2406ee2223028cbb7e7c3
MD5 2046bbc51db794970d2b787456ecaaba
BLAKE2b-256 8f54b8b4c5b533cd33eaef49e3d00e55f946828bc799297fe3d1d1a4756ba90a

See more details on using hashes here.

File details

Details for the file olympipe-1.8.6-py3-none-any.whl.

File metadata

  • Download URL: olympipe-1.8.6-py3-none-any.whl
  • Upload date:
  • Size: 32.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.4 CPython/3.12.13 Linux/5.15.154+

File hashes

Hashes for olympipe-1.8.6-py3-none-any.whl
Algorithm Hash digest
SHA256 46cf09a9a3c634f3d9a43e3ecb37e47e7b03b97dbc4472676794b55fdde221cd
MD5 4727dda0f81f3a8eeb47b683e8827149
BLAKE2b-256 a5027e4098060220d5a7fccc255e0071104dc3e6010f500d7674631c4d8a635c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page