A powerful parallel execution library for Python
Project description
Pyarallel
Apply one function to many inputs — with rate limiting, retry, batching, and structured errors. Sync and async.
Pyarallel is for "fan out one function over N items" workloads: API calls, file processing, data crunching. Not DAGs, not queues, not distributed systems. Just concurrent.futures and asyncio with the common policies and result handling already built in.
Zero dependencies. Python 3.12+.
Before / After
Fetch 10,000 URLs with rate limiting and error handling.
concurrent.futures:
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
def fetch(url):
return requests.get(url, timeout=10).json()
urls = ["https://api.example.com/users/1", "https://api.example.com/users/2", ...]
results = [None] * len(urls)
errors = []
with ThreadPoolExecutor(max_workers=10) as pool:
futures = {pool.submit(fetch, url): i for i, url in enumerate(urls)}
for f in as_completed(futures):
i = futures[f]
try:
results[i] = f.result()
except Exception as e:
errors.append((i, e))
# No rate limiting. No retry. No batching. And you still
# need to wire those yourself every time.
pyarallel:
from pyarallel import parallel_map, RateLimit, Retry
result = parallel_map(
fetch, urls,
workers=10,
rate_limit=RateLimit(100, "minute"),
retry=Retry(attempts=3, on=(ConnectionError, TimeoutError)),
)
for idx, val in result.successes():
save(val)
for idx, exc in result.failures():
log_error(idx, exc)
Same thing, async:
import httpx
from pyarallel import async_parallel_map, RateLimit, Retry
async def fetch_async(url):
async with httpx.AsyncClient() as client:
return (await client.get(url, timeout=10)).json()
result = await async_parallel_map(
fetch_async, urls,
concurrency=10,
rate_limit=RateLimit(100, "minute"),
retry=Retry(attempts=3, on=(ConnectionError, TimeoutError)),
)
# Same result model — result.ok, result.successes(), result.failures()
Install
pip install pyarallel
What You Get
- Rate limiting — token bucket, per-second/minute/hour:
rate_limit=RateLimit(100, "minute") - Retry with backoff — per-item, exponential, jitter, exception filtering:
retry=Retry(attempts=3, on=(ConnectionError,)) - Batched execution — lazy input consumption for generators, memory control:
batch_size=500 - Streaming — constant-memory processing via
parallel_iter/async_parallel_iter - Structured errors —
ParallelResultwith.ok,.successes(),.failures(),.raise_on_failure() - Timeouts — wall-clock for the whole operation (
timeout=30.0) or per-task in async (task_timeout=5.0) - Progress callbacks —
on_progress=lambda done, total: print(f"{done}/{total}") - Process executor — CPU-bound work:
executor="process" - Decorator API —
@parallel/@async_parallelwith.map(),.starmap(),.stream()
Quick Start
Sync
import requests
from pyarallel import parallel_map, RateLimit, Retry
def fetch(url):
return requests.get(url, timeout=10).json()
# Fan out over a list, get ordered results
result = parallel_map(fetch, urls, workers=10)
# Rate-limited API calls with retry
def call_api(user_id):
return requests.get(f"https://api.example.com/users/{user_id}").json()
result = parallel_map(
call_api, user_ids,
workers=10,
rate_limit=RateLimit(100, "minute"),
retry=Retry(attempts=3, backoff=1.0, on=(ConnectionError, TimeoutError)),
)
# CPU-bound with processes
from PIL import Image
def resize_image(path):
img = Image.open(path)
img.thumbnail((800, 600))
img.save(path.replace(".png", "_thumb.png"))
result = parallel_map(resize_image, paths, executor="process")
Async
import httpx
from pyarallel import async_parallel_map
async def fetch_async(url):
async with httpx.AsyncClient() as client:
return (await client.get(url, timeout=10)).json()
result = await async_parallel_map(
fetch_async, urls, concurrency=20, task_timeout=5.0,
)
Decorator
Adds .map(), .starmap(), .stream() without changing the function:
from pyarallel import parallel, async_parallel, RateLimit
@parallel(workers=8, rate_limit=RateLimit(100, "minute"))
def fetch(url):
return requests.get(url).json()
fetch("http://example.com") # normal call — returns dict
fetch.map(urls) # parallel — returns ParallelResult
fetch.stream(urls, batch_size=500) # streaming — yields ItemResult
@async_parallel(concurrency=10)
async def fetch_async(url):
async with httpx.AsyncClient() as c:
return (await c.get(url)).json()
await fetch_async.map(urls) # async parallel
Streaming — Constant Memory
For ETL, pipelines, or datasets too large to hold in memory:
from pyarallel import parallel_iter
def transform(row):
return {"id": row["id"], "name": row["name"].strip().title()}
for item in parallel_iter(transform, ten_million_rows, batch_size=1000):
if item.ok:
db.save(item.value)
else:
log_error(item.index, item.error)
Error Handling
All errors collected, never silently swallowed:
def send_email(msg):
return smtp.send(msg["to"], msg["subject"], msg["body"])
result = parallel_map(send_email, messages)
if result.ok:
values = result.values() # list of all results, in order
else:
for idx, exc in result.failures():
log_error(idx, exc)
result.raise_on_failure() # or raise ExceptionGroup with all errors
API Summary
| Function | Decorator | Returns | Use case |
|---|---|---|---|
parallel_map(fn, items) |
.map(items) |
ParallelResult |
Results fit in memory |
parallel_starmap(fn, items) |
.starmap(items) |
ParallelResult |
Multi-arg, fits in memory |
parallel_iter(fn, items) |
.stream(items) |
Iterator[ItemResult] |
Streaming, constant memory |
Async mirrors: async_parallel_map, async_parallel_starmap, async_parallel_iter
| Config | Example |
|---|---|
RateLimit(count, per) |
RateLimit(100, "minute") |
Retry(attempts, backoff, on) |
Retry(attempts=3, on=(ConnectionError,)) |
Works with instance methods and static methods via @parallel decorator — see full docs.
Documentation
Full docs — API reference, advanced features, best practices.
License
MIT — see LICENSE.md.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pyarallel-0.3.0.tar.gz.
File metadata
- Download URL: pyarallel-0.3.0.tar.gz
- Upload date:
- Size: 92.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9d6bf80999664c8950fd0695ef50de1c03b594f7e9c20b297f5701a71924fc83
|
|
| MD5 |
c551ef88bd92cb91bce49c727ba95449
|
|
| BLAKE2b-256 |
018c26c5c86a9b3b9820fc7ffbe4b5efb4cc155c7d1267488759182ba18a13e9
|
File details
Details for the file pyarallel-0.3.0-py3-none-any.whl.
File metadata
- Download URL: pyarallel-0.3.0-py3-none-any.whl
- Upload date:
- Size: 16.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fe4544a768165270b988ad7203ff1b5632b3eab5cd1c4e271b3a85958bdfa886
|
|
| MD5 |
244d8d2273d281d96f54380b77c77f16
|
|
| BLAKE2b-256 |
ccd7d1d5004f832247c5df93f5dc37528cf2739921ccdd8a66510bce05f74226
|