Efficient Ray-powered map/imap with backpressure, checkpointing, timeouts, and async
Project description
ray-map
Efficient Ray-powered imap/map with backpressure, checkpointing, per-item timeouts, safe exceptions, and async variants.
✨ Features
- ⚡ Parallel map/imap on Ray with back-pressure and batching
- 🧠 Adaptive tail batching (optional): when the iterable length is known,
batch_sizeis reduced exponentially near the end to minimize straggler tail latency - 💾 Checkpointing + replay of already computed points
- ⏱️ Per-item timeouts (worker-side, via threads)
- 🧯 Safe exceptions: return
Exceptionobjects instead of failing the whole run - 🔁 Ordered or as-ready modes,
(arg, res)or plainres - 🧰 Async API:
imap_async,map_async - 🧩 Single-file module
ray_map.py,src-layout friendly
📦 Install
pip install ray-map
Requires Python 3.9+ and Ray 2.6+.
🚀 Quickstart
from ray_map import RayMap
def foo(x):
if x == 3:
raise ValueError("oops")
return x * x
rmap = RayMap(foo, batch_size=8, max_pending=-1, checkpoint_path="res.pkl")
# 1) Stream (ordered). Exceptions raise by default (safe_exceptions=False)
for y in rmap.imap(range(10)):
print(y)
# 2) Stream (as-ready), safe exceptions, return (arg, res_or_exc)
for arg, res in rmap.imap(
range(10), keep_order=False, safe_exceptions=True, ret_args=True, timeout=2.0
):
print(arg, "->", res)
# 3) Collect to list
lst = rmap.map(range(1000), timeout=2.0, safe_exceptions=True)
🧠 Adaptive tail batching
What it is
When the input length is known upfront (i.e. len(iterable) works), RayMap can reduce straggler tail latency by shrinking the computation batch_size as the work nears completion.
Rule of thumb:
- Let
CPU = available Ray CPUs. - Let
tail_threshold = tail_batches_per_core * CPU(default2 * CPU). - If remaining batches (already submitted + not yet submitted) become
<= tail_threshold, thenbatch_sizeis reduced exponentially (division by 2) down totail_min_batch_size(default1).
This yields smaller last tasks, keeping all workers busy and reducing long “tails”.
Important: adaptive tail batching is only applied when
len(iterable)is available. For generators/streams with unknown length, this feature is disabled by design.
Configuration
These are RayMap(...) constructor arguments:
adaptive_tail_batching: bool = Truetail_batches_per_core: int = 2Threshold multiplier.2 * CPUby default.tail_min_batch_size: int = 1The minimum batch size to shrink to.
Example:
rmap = RayMap(
foo,
batch_size=32,
adaptive_tail_batching=True,
tail_batches_per_core=2,
tail_min_batch_size=1,
)
⚙️ ray.init / ray.shutdown — how to run Ray
RayMap supports lazy initialization of Ray: if Ray is not initialized when you start computing, RayMap will call ray.init() for you (local, no dashboard) with the runtime env you pass to RayMap.
You can also manage Ray yourself. Typical patterns:
A) Manage Ray yourself (recommended in apps/tests/CI)
import os, ray
# Make your codebase visible to workers. Usually the repository root.
ray.init(runtime_env={"working_dir": os.getcwd()}, include_dashboard=False)
# ... use RayMap normally
ray.shutdown() # ← you are responsible for shutting down Ray explicitly
Notes
- If Ray is already initialized,
RayMapwill not callray.init()again — it just prepares remote functions. - Prefer to set
runtime_envexplicitly (see working_dir below). - Call
ray.shutdown()yourself when the program finishes or tests are done.
B) Let RayMap do it lazily
If you don’t call ray.init() yourself, the first call to imap/map will:
- start a local Ray instance with
runtime_envtaken fromRayMap(..., runtime_env=...)(if provided), otherwise a minimal default; - auto-tune
max_pendingif set to-1(CPU + 16batches); - no dashboard; logs suppressed by default.
You still need to shut Ray down yourself if you want a clean exit:
import ray
ray.shutdown()
What arguments can be passed to ray.init() via RayMap
RayMap forwards a subset of options to Ray:
address: connect to a cluster, e.g."ray://host:port"(requiresray[default]/ray[client]). If omitted, a local instance is started.password:_redis_passwordfor legacy clusters using Redis authentication.runtime_env: the runtime environment for your job, see below.remote_options: forwarded to.options(...)of the remote function (e.g.,{"num_cpus": 0.5, "resources": {...}}).
📂 About working_dir (runtime_env)
working_dir defines what source files are shipped to workers. That’s critical when your function fn is defined in your project code (or even inside tests/).
-
When you manage Ray yourself: set it explicitly
ray.init(runtime_env={"working_dir": "."}) # or os.getcwd()
-
When RayMap initializes lazily: it uses the
runtime_envyou passed toRayMap. If you didn’t pass any, useRayMap(fn, runtime_env={"working_dir": "."})to avoid import errors on workers.
If your test functions live in
tests/, either:
- include
tests/in the runtime env (e.g. viapy_modules: ["tests"]), or- move helpers to
src/test_helpers.pyand import from there in tests.
ray.shutdown()
ray.shutdown() is not called by RayMap. You should call it yourself when you want to cleanly stop Ray (end of script, end of test session, etc.). In pytest, use a session-scoped fixture that starts Ray once and calls ray.shutdown() in teardown.
🧭 API Reference
Constructor
RayMap(
fn,
*,
batch_size=1,
max_pending=-1,
checkpoint_path=None,
checkpoint_every=100,
address=None,
password=None,
runtime_env=None,
remote_options=None,
on_error=None,
init_if_needed=False,
# Adaptive tail batching
adaptive_tail_batching=True,
tail_batches_per_core=2,
tail_min_batch_size=1,
)
Iteration
RayMap.imap(iterable, *, timeout=None, safe_exceptions=False, keep_order=True, ret_args=False) -> iterator
RayMap.map(iterable, *, timeout=None, safe_exceptions=False, keep_order=True, ret_args=False) -> list
# Async variants
RayMap.imap_async(...): async iterator
RayMap.map_async(...): list
timeout: per-item timeout (seconds) on worker viaThreadPoolExecutorsafe_exceptions=True: return exception objects (no crash)keep_order=True: preserve input order (1:1);False→ yield as-readyret_args=True: yield(arg, res_or_exc)instead of justres_or_exc
💾 Checkpointing
- Stores
(key, arg, result_or_exc)tocheckpoint_path. - On restart, previously computed results are yielded first; then Ray resumes the rest.
- (Optional, future) flag to skip storing exceptions.
📐 Performance tips
-
Start with
batch_sizein the range 8–64. Too small → Ray scheduling overhead; too big → latency/memory. -
Start with
max_pending=-1(auto-tunes toCPU + 16batches). -
For lower latency, consider
keep_order=False. -
Adaptive tail batching is helpful when:
- the input length is known (
len(iterable)works), - you care about tail latency,
- your per-item workload is not extremely small (otherwise Ray overhead dominates).
- the input length is known (
-
If you need deterministic behavior in tests/benchmarks, consider fixing Ray CPUs via:
ray.init(num_cpus=4, runtime_env={"working_dir": "."}, include_dashboard=False)
🧪 Examples
See examples/ directory for quickstarts and CI-ready snippets (including pytest fixtures for ray.init/ray.shutdown).
🧪 Testing
pytest -q
🧑💻 Development (formatting & lint)
This repo uses:
ruff(lint)black(format)
ruff check .
black --check .
To auto-format:
black .
🧑💻 Development (Dev Container)
This repository includes a ready-to-use VS Code Dev Container configuration.
Prerequisites
- Docker (Docker Desktop / Docker Engine)
- VS Code
- VS Code extension: Dev Containers (
ms-vscode-remote.remote-containers)
How to use
-
Open the repository in VS Code.
-
Run: Dev Containers: Open Folder in Container
-
After the container is built, the
postCreatehook will:pip install -e ".[dev,test]"- run
pytest -qas a sanity check
Ray-specific note
The Dev Container sets --shm-size=2g to avoid Ray object store issues caused by the small default /dev/shm size inside Docker containers.
📄 License
MIT — see LICENSE.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ray_map-1.0.3.tar.gz.
File metadata
- Download URL: ray_map-1.0.3.tar.gz
- Upload date:
- Size: 22.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9257803fa2ff9c5dcc6b4425a16bcdec7e2ec2dd52fe0ea59fef83778c773cd8
|
|
| MD5 |
243966dc9416d34fb3ca803f096ecd5a
|
|
| BLAKE2b-256 |
9827f649b14b9c1750038976aaca39fc37c0fd031025e69dbb763540af2a9662
|
Provenance
The following attestation bundles were made for ray_map-1.0.3.tar.gz:
Publisher:
publish.yml on TovarnovM/ray_map
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
ray_map-1.0.3.tar.gz -
Subject digest:
9257803fa2ff9c5dcc6b4425a16bcdec7e2ec2dd52fe0ea59fef83778c773cd8 - Sigstore transparency entry: 862915108
- Sigstore integration time:
-
Permalink:
TovarnovM/ray_map@16522250f2149a095d65c76369388ebceb3e13b6 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/TovarnovM
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@16522250f2149a095d65c76369388ebceb3e13b6 -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file ray_map-1.0.3-py3-none-any.whl.
File metadata
- Download URL: ray_map-1.0.3-py3-none-any.whl
- Upload date:
- Size: 15.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7354fd8a8cd2502c3cfd99fd6974239e2708890cabfd9756176a37a6bb370e50
|
|
| MD5 |
4f0052b5636afdca98ebc9933c1a6779
|
|
| BLAKE2b-256 |
bbff585f266528239ff8e1c584efde5173d147247ec0db2143029b8b1df0601c
|
Provenance
The following attestation bundles were made for ray_map-1.0.3-py3-none-any.whl:
Publisher:
publish.yml on TovarnovM/ray_map
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
ray_map-1.0.3-py3-none-any.whl -
Subject digest:
7354fd8a8cd2502c3cfd99fd6974239e2708890cabfd9756176a37a6bb370e50 - Sigstore transparency entry: 862915183
- Sigstore integration time:
-
Permalink:
TovarnovM/ray_map@16522250f2149a095d65c76369388ebceb3e13b6 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/TovarnovM
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@16522250f2149a095d65c76369388ebceb3e13b6 -
Trigger Event:
workflow_dispatch
-
Statement type: