Skip to main content

Efficient Ray-powered map/imap with backpressure, checkpointing, timeouts, and async

Project description

ray-map

Efficient Ray-powered imap/map with backpressure, checkpointing, per-item timeouts, safe exceptions, and async variants.


✨ Features

  • ⚡ Parallel map/imap on Ray with back-pressure and batching
  • 🧠 Adaptive tail batching (optional): when the iterable length is known, batch_size is reduced exponentially near the end to minimize straggler tail latency
  • 💾 Checkpointing + replay of already computed points
  • ⏱️ Per-item timeouts (worker-side, via threads)
  • 🧯 Safe exceptions: return Exception objects instead of failing the whole run
  • 🔁 Ordered or as-ready modes, (arg, res) or plain res
  • 🧰 Async API: imap_async, map_async
  • 🧩 Single-file module ray_map.py, src-layout friendly

📦 Install

pip install ray-map

Requires Python 3.9+ and Ray 2.6+.


🚀 Quickstart

from ray_map import RayMap

def foo(x):
    if x == 3:
        raise ValueError("oops")
    return x * x

rmap = RayMap(foo, batch_size=8, max_pending=-1, checkpoint_path="res.pkl")

# 1) Stream (ordered). Exceptions raise by default (safe_exceptions=False)
for y in rmap.imap(range(10)):
    print(y)

# 2) Stream (as-ready), safe exceptions, return (arg, res_or_exc)
for arg, res in rmap.imap(
    range(10), keep_order=False, safe_exceptions=True, ret_args=True, timeout=2.0
):
    print(arg, "->", res)

# 3) Collect to list
lst = rmap.map(range(1000), timeout=2.0, safe_exceptions=True)

🧠 Adaptive tail batching

What it is

When the input length is known upfront (i.e. len(iterable) works), RayMap can reduce straggler tail latency by shrinking the computation batch_size as the work nears completion.

Rule of thumb:

  • Let CPU = available Ray CPUs.
  • Let tail_threshold = tail_batches_per_core * CPU (default 2 * CPU).
  • If remaining batches (already submitted + not yet submitted) become <= tail_threshold, then batch_size is reduced exponentially (division by 2) down to tail_min_batch_size (default 1).

This yields smaller last tasks, keeping all workers busy and reducing long “tails”.

Important: adaptive tail batching is only applied when len(iterable) is available. For generators/streams with unknown length, this feature is disabled by design.

Configuration

These are RayMap(...) constructor arguments:

  • adaptive_tail_batching: bool = True
  • tail_batches_per_core: int = 2 Threshold multiplier. 2 * CPU by default.
  • tail_min_batch_size: int = 1 The minimum batch size to shrink to.

Example:

rmap = RayMap(
    foo,
    batch_size=32,
    adaptive_tail_batching=True,
    tail_batches_per_core=2,
    tail_min_batch_size=1,
)

⚙️ ray.init / ray.shutdown — how to run Ray

RayMap supports lazy initialization of Ray: if Ray is not initialized when you start computing, RayMap will call ray.init() for you (local, no dashboard) with the runtime env you pass to RayMap.

You can also manage Ray yourself. Typical patterns:

A) Manage Ray yourself (recommended in apps/tests/CI)

import os, ray

# Make your codebase visible to workers. Usually the repository root.
ray.init(runtime_env={"working_dir": os.getcwd()}, include_dashboard=False)

# ... use RayMap normally

ray.shutdown()  # ← you are responsible for shutting down Ray explicitly

Notes

  • If Ray is already initialized, RayMap will not call ray.init() again — it just prepares remote functions.
  • Prefer to set runtime_env explicitly (see working_dir below).
  • Call ray.shutdown() yourself when the program finishes or tests are done.

B) Let RayMap do it lazily

If you don’t call ray.init() yourself, the first call to imap/map will:

  • start a local Ray instance with runtime_env taken from RayMap(..., runtime_env=...) (if provided), otherwise a minimal default;
  • auto-tune max_pending if set to -1 (CPU + 16 batches);
  • no dashboard; logs suppressed by default.

You still need to shut Ray down yourself if you want a clean exit:

import ray
ray.shutdown()

What arguments can be passed to ray.init() via RayMap

RayMap forwards a subset of options to Ray:

  • address: connect to a cluster, e.g. "ray://host:port" (requires ray[default]/ray[client]). If omitted, a local instance is started.
  • password: _redis_password for legacy clusters using Redis authentication.
  • runtime_env: the runtime environment for your job, see below.
  • remote_options: forwarded to .options(...) of the remote function (e.g., {"num_cpus": 0.5, "resources": {...}}).

📂 About working_dir (runtime_env)

working_dir defines what source files are shipped to workers. That’s critical when your function fn is defined in your project code (or even inside tests/).

  • When you manage Ray yourself: set it explicitly

    ray.init(runtime_env={"working_dir": "."})  # or os.getcwd()
    
  • When RayMap initializes lazily: it uses the runtime_env you passed to RayMap. If you didn’t pass any, use RayMap(fn, runtime_env={"working_dir": "."}) to avoid import errors on workers.

If your test functions live in tests/, either:

  • include tests/ in the runtime env (e.g. via py_modules: ["tests"]), or
  • move helpers to src/test_helpers.py and import from there in tests.

ray.shutdown()

ray.shutdown() is not called by RayMap. You should call it yourself when you want to cleanly stop Ray (end of script, end of test session, etc.). In pytest, use a session-scoped fixture that starts Ray once and calls ray.shutdown() in teardown.


🧭 API Reference

Constructor

RayMap(
  fn,
  *,
  batch_size=1,
  max_pending=-1,
  checkpoint_path=None,
  checkpoint_every=100,
  address=None,
  password=None,
  runtime_env=None,
  remote_options=None,
  on_error=None,
  init_if_needed=False,

  # Adaptive tail batching
  adaptive_tail_batching=True,
  tail_batches_per_core=2,
  tail_min_batch_size=1,
)

Iteration

RayMap.imap(iterable, *, timeout=None, safe_exceptions=False, keep_order=True, ret_args=False) -> iterator
RayMap.map(iterable,  *, timeout=None, safe_exceptions=False, keep_order=True, ret_args=False) -> list

# Async variants
RayMap.imap_async(...): async iterator
RayMap.map_async(...): list
  • timeout: per-item timeout (seconds) on worker via ThreadPoolExecutor
  • safe_exceptions=True: return exception objects (no crash)
  • keep_order=True: preserve input order (1:1); False → yield as-ready
  • ret_args=True: yield (arg, res_or_exc) instead of just res_or_exc

💾 Checkpointing

  • Stores (key, arg, result_or_exc) to checkpoint_path.
  • On restart, previously computed results are yielded first; then Ray resumes the rest.
  • (Optional, future) flag to skip storing exceptions.

📐 Performance tips

  • Start with batch_size in the range 8–64. Too small → Ray scheduling overhead; too big → latency/memory.

  • Start with max_pending=-1 (auto-tunes to CPU + 16 batches).

  • For lower latency, consider keep_order=False.

  • Adaptive tail batching is helpful when:

    • the input length is known (len(iterable) works),
    • you care about tail latency,
    • your per-item workload is not extremely small (otherwise Ray overhead dominates).
  • If you need deterministic behavior in tests/benchmarks, consider fixing Ray CPUs via:

    ray.init(num_cpus=4, runtime_env={"working_dir": "."}, include_dashboard=False)
    

🧪 Examples

See examples/ directory for quickstarts and CI-ready snippets (including pytest fixtures for ray.init/ray.shutdown).


🧪 Testing

pytest -q

🧑‍💻 Development (formatting & lint)

This repo uses:

  • ruff (lint)
  • black (format)
ruff check .
black --check .

To auto-format:

black .

🧑‍💻 Development (Dev Container)

This repository includes a ready-to-use VS Code Dev Container configuration.

Prerequisites

  • Docker (Docker Desktop / Docker Engine)
  • VS Code
  • VS Code extension: Dev Containers (ms-vscode-remote.remote-containers)

How to use

  1. Open the repository in VS Code.

  2. Run: Dev Containers: Open Folder in Container

  3. After the container is built, the postCreate hook will:

    • pip install -e ".[dev,test]"
    • run pytest -q as a sanity check

Ray-specific note

The Dev Container sets --shm-size=2g to avoid Ray object store issues caused by the small default /dev/shm size inside Docker containers.


📄 License

MIT — see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ray_map-1.0.3.tar.gz (22.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ray_map-1.0.3-py3-none-any.whl (15.5 kB view details)

Uploaded Python 3

File details

Details for the file ray_map-1.0.3.tar.gz.

File metadata

  • Download URL: ray_map-1.0.3.tar.gz
  • Upload date:
  • Size: 22.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for ray_map-1.0.3.tar.gz
Algorithm Hash digest
SHA256 9257803fa2ff9c5dcc6b4425a16bcdec7e2ec2dd52fe0ea59fef83778c773cd8
MD5 243966dc9416d34fb3ca803f096ecd5a
BLAKE2b-256 9827f649b14b9c1750038976aaca39fc37c0fd031025e69dbb763540af2a9662

See more details on using hashes here.

Provenance

The following attestation bundles were made for ray_map-1.0.3.tar.gz:

Publisher: publish.yml on TovarnovM/ray_map

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file ray_map-1.0.3-py3-none-any.whl.

File metadata

  • Download URL: ray_map-1.0.3-py3-none-any.whl
  • Upload date:
  • Size: 15.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for ray_map-1.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 7354fd8a8cd2502c3cfd99fd6974239e2708890cabfd9756176a37a6bb370e50
MD5 4f0052b5636afdca98ebc9933c1a6779
BLAKE2b-256 bbff585f266528239ff8e1c584efde5173d147247ec0db2143029b8b1df0601c

See more details on using hashes here.

Provenance

The following attestation bundles were made for ray_map-1.0.3-py3-none-any.whl:

Publisher: publish.yml on TovarnovM/ray_map

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page