Efficient Ray-powered map/imap with backpressure, checkpointing, timeouts, and async
Project description
ray-map
Efficient Ray-powered imap/map with backpressure, checkpointing, per-item timeouts, safe exceptions, and async variants.
✨ Features
- ⚡ Parallel map/imap on Ray with back-pressure and batching
- 💾 Checkpointing + replay of already computed points
- ⏱️ Per-item timeouts (worker-side, via threads)
- 🧯 Safe exceptions: return
Exceptionobjects instead of failing the whole run - 🔁 Ordered or as-ready modes,
(arg, res)or plainres - 🧰 Async API:
imap_async,map_async - 🧩 Single-file module
ray_map.py,src-layout friendly
📦 Install
pip install ray-map
Requires Python 3.9+ and Ray 2.6+.
🚀 Quickstart
from ray_map import RayMap
def foo(x):
if x == 3:
raise ValueError("oops")
return x * x
rmap = RayMap(foo, batch_size=8, max_pending=-1, checkpoint_path="res.pkl")
# 1) Stream (ordered). Exceptions raise by default (safe_exceptions=False)
for y in rmap.imap(range(10)):
print(y)
# 2) Stream (as-ready), safe exceptions, return (arg, res_or_exc)
for arg, res in rmap.imap(
range(10), keep_order=False, safe_exceptions=True, ret_args=True, timeout=2.0
):
print(arg, "->", res)
# 3) Collect to list
lst = rmap.map(range(1000), timeout=2.0, safe_exceptions=True)
⚙️ ray.init / ray.shutdown — how to run Ray
RayMap supports lazy initialization of Ray: if Ray is not initialized when you start computing, RayMap will call ray.init() for you (local, no dashboard) with the runtime env you pass to RayMap.
You can also manage Ray yourself. Typical patterns:
A) Manage Ray yourself (recommended in apps/tests/CI)
import os, ray
# Make your codebase visible to workers. Usually the repository root.
ray.init(runtime_env={"working_dir": os.getcwd()}, include_dashboard=False)
# ... use RayMap normally
ray.shutdown() # ← you are responsible for shutting down Ray explicitly
Notes
- If Ray is already initialized,
RayMapwill not callray.init()again — it just prepares remote functions. - Prefer to set
runtime_envexplicitly (see working_dir below). - Call
ray.shutdown()yourself when the program finishes or tests are done.
B) Let RayMap do it lazily
If you don’t call ray.init() yourself, the first call to imap/map will:
- start a local Ray instance with
runtime_envtaken fromRayMap(..., runtime_env=...)(if provided), otherwise a minimal default; - auto-tune
max_pendingif set to-1(CPU + 16batches); - no dashboard; logs suppressed by default.
You still need to shut Ray down yourself if you want a clean exit:
import ray
ray.shutdown()
What arguments can be passed to ray.init() via RayMap
RayMap forwards a subset of options to Ray:
address: connect to a cluster, e.g."ray://host:port"(requiresray[default]/ray[client]). If omitted, a local instance is started.password:_redis_passwordfor legacy clusters using Redis authentication.runtime_env: the runtime environment for your job, see below.remote_options: forwarded to.options(...)of the remote function (e.g.,{"num_cpus": 0.5, "resources": {...}}).
📂 About working_dir (runtime_env)
working_dir defines what source files are shipped to workers. That’s critical when your function fn is defined in your project code (or even inside tests/).
-
When you manage Ray yourself: set it explicitly
ray.init(runtime_env={"working_dir": "."}) # or os.getcwd()
-
When RayMap initializes lazily: it uses the
runtime_envyou passed toRayMap. If you didn’t pass any, useRayMap(fn, runtime_env={"working_dir": "."})to avoid import errors on workers.
If your test functions live in
tests/, either:
- include
tests/in the runtime env (e.g. viapy_modules: ["tests"]), or- move helpers to
src/test_helpers.pyand import from there in tests.
ray.shutdown()
ray.shutdown() is not called by RayMap. You should call it yourself when you want to cleanly stop Ray (end of script, end of test session, etc.). In pytest, use a session-scoped fixture that starts Ray once and calls ray.shutdown() in teardown.
🧭 API Reference
RayMap.imap(iterable, *, timeout=None, safe_exceptions=False, keep_order=True, ret_args=False) -> iterator
RayMap.map(iterable, *, timeout=None, safe_exceptions=False, keep_order=True, ret_args=False) -> list
# Async variants
RayMap.imap_async(...): async iterator
RayMap.map_async(...): list
timeout: per-item timeout (seconds) on worker viaThreadPoolExecutorsafe_exceptions=True: return exception objects (no crash)keep_order=True: preserve input order (1:1);False→ yield as-readyret_args=True: yield(arg, res_or_exc)instead of justres_or_exc
💾 Checkpointing
- Stores
(key, arg, result_or_exc)tocheckpoint_path. - On restart, previously computed results are yielded first; then Ray resumes the rest.
- (Optional, future) flag to skip storing exceptions.
🧪 Examples
See examples/ directory for quickstarts and CI-ready snippets (including pytest fixtures for ray.init/ray.shutdown).
🛠️ Ray configuration & environments (details)
- Local vs remote cluster; choosing
batch_size/max_pending. - Shipping code to workers via
runtime_env(working_dir,py_modules). - Error callback and tuple/dict arg calling conventions.
📐 Performance tips
- Keep
batch_sizemodest (8–64). Too small → overhead; too big → latency/memory. - Start with
max_pending=-1and reduce if memory use spikes. - For lower latency, consider
keep_order=False.
🧪 Testing
Provide a pytest conftest.py that starts Ray once per session with proper runtime_env and shuts it down in teardown. See README “ray.init / ray.shutdown”.
🧑💻 Development (Dev Container)
This repository includes a ready-to-use VS Code Dev Container configuration (same approach as zen-fronts).
Prerequisites
- Docker (Docker Desktop / Docker Engine)
- VS Code
- VS Code extension: Dev Containers (
ms-vscode-remote.remote-containers)
How to use
- Open the repository in VS Code.
- Run: Dev Containers: Open Folder in Container
- After the container is built, the
postCreatehook will:pip install -e ".[dev,test]"- run
pytest -qas a sanity check
Ray-specific note
The Dev Container sets --shm-size=2g to avoid Ray object store issues caused by the small default /dev/shm size inside Docker containers.
📄 License
MIT — see LICENSE.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ray_map-1.0.2.tar.gz.
File metadata
- Download URL: ray_map-1.0.2.tar.gz
- Upload date:
- Size: 21.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0cf7ae0e67c57628c1506e8c260bb1e44536287e29ecd7554e57bf6f3cb147dd
|
|
| MD5 |
28a476f91751b6a06071dbabea91ef29
|
|
| BLAKE2b-256 |
a63b6abbd30503407a47934fb8a911fb0555c5b4422ae70e46f2ad833e6852af
|
Provenance
The following attestation bundles were made for ray_map-1.0.2.tar.gz:
Publisher:
publish.yml on TovarnovM/ray_map
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
ray_map-1.0.2.tar.gz -
Subject digest:
0cf7ae0e67c57628c1506e8c260bb1e44536287e29ecd7554e57bf6f3cb147dd - Sigstore transparency entry: 862831269
- Sigstore integration time:
-
Permalink:
TovarnovM/ray_map@2c5067f3106b3cc13c4f59f0e04cf6fac3e8d9ab -
Branch / Tag:
refs/heads/main - Owner: https://github.com/TovarnovM
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@2c5067f3106b3cc13c4f59f0e04cf6fac3e8d9ab -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file ray_map-1.0.2-py3-none-any.whl.
File metadata
- Download URL: ray_map-1.0.2-py3-none-any.whl
- Upload date:
- Size: 14.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
baceb44df31ee99a449eadea457358f62126626323797eed80160e5835c54e57
|
|
| MD5 |
c72691bd4f655905de3dcb842df49922
|
|
| BLAKE2b-256 |
eccc46fd0bd5d736703e9eaa189ec90cdb173337e5de3794a5e2fc55b3c18459
|
Provenance
The following attestation bundles were made for ray_map-1.0.2-py3-none-any.whl:
Publisher:
publish.yml on TovarnovM/ray_map
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
ray_map-1.0.2-py3-none-any.whl -
Subject digest:
baceb44df31ee99a449eadea457358f62126626323797eed80160e5835c54e57 - Sigstore transparency entry: 862831310
- Sigstore integration time:
-
Permalink:
TovarnovM/ray_map@2c5067f3106b3cc13c4f59f0e04cf6fac3e8d9ab -
Branch / Tag:
refs/heads/main - Owner: https://github.com/TovarnovM
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@2c5067f3106b3cc13c4f59f0e04cf6fac3e8d9ab -
Trigger Event:
workflow_dispatch
-
Statement type: