Bounded-concurrency async for Python: run, map, and stream awaitables with limits, retries, rate limiting, and progress — in one typed call.
Project description
tasklane
Bounded-concurrency async for Python — run, map, and stream awaitables with a concurrency limit, retries, backoff, rate limiting, and progress, in one typed call.
Every Python project that fans out async work eventually rewrites the same block:
an asyncio.Semaphore to cap concurrency, a try/except retry loop, a counter
for progress, maybe a sleep to stay under a rate limit. tasklane is that block,
done once — correct, fully typed, and zero runtime dependencies.
import asyncio
import httpx
import tasklane
async def fetch(url: str) -> int:
async with httpx.AsyncClient() as client:
return len((await client.get(url)).text)
async def main() -> None:
urls = [f"https://example.com/{i}" for i in range(1000)]
sizes = await tasklane.amap(
fetch, urls,
limit=20, # at most 20 requests in flight
retries=3, # retry failures up to 3x with exponential backoff
rate_limit=50, # start at most 50 requests per second
timeout=10, # per-attempt timeout (seconds)
)
print(sum(sizes))
asyncio.run(main())
Install
pip install tasklane
# or
uv add tasklane
Requires Python 3.10+. No third-party dependencies.
Why not just asyncio.gather?
asyncio.gather starts everything at once. Fan out 10,000 requests and you
open 10,000 sockets, trip rate limits, and OOM. The usual fixes are scattered
across the stdlib and third-party libs; tasklane brings them together:
asyncio.gather |
Semaphore + gather |
aiometer |
tasklane | |
|---|---|---|---|---|
| Concurrency limit | ✗ | manual | ✓ | ✓ |
| Results in input order | ✓ | ✓ | ✓ | ✓ |
| Stream results as completed | as_completed |
manual | ✓ | ✓ |
| Retries + backoff | ✗ | ✗ | ✗ | ✓ |
| Rate limiting (per second) | ✗ | ✗ | ✓ | ✓ |
| Progress callbacks | ✗ | ✗ | ✗ | ✓ |
| Per-task timeout | ✗ | manual | ✗ | ✓ |
| Backpressure on huge inputs | ✗ | manual | ✓ | ✓ |
| Runtime dependencies | stdlib | stdlib | anyio |
none |
Features
amap — concurrent map, results in order
results = await tasklane.amap(fetch, urls, limit=10)
# results[i] corresponds to urls[i]
Accepts both sync and async iterables, and works in constant memory thanks to a bounded internal queue — you can map over a million-item generator without materializing a million tasks.
stream — react to results as they finish
async for size in tasklane.stream(fetch, urls, limit=10):
print(size) # arrives in completion order, fastest first
gather — a drop-in asyncio.gather with a limit
results = await tasklane.gather(*(fetch(u) for u in urls), limit=10)
On fail-fast, the remaining coroutines are cancelled and closed, so you never
see a coroutine was never awaited warning.
Retries with backoff
from tasklane import Backoff
await tasklane.amap(
fetch, urls,
retries=5,
backoff=Backoff.exponential(0.2, factor=2, max_delay=30), # 0.2, 0.4, 0.8, ... + jitter
retry_on=(TimeoutError, ConnectionError), # type, tuple, or predicate
)
Backoff.exponential() (the default when retries > 0), Backoff.linear(), and
Backoff.constant() cover the common cases. retry_on accepts an exception type,
a tuple of types, or a Callable[[BaseException], bool] predicate.
Rate limiting
# Never start more than 100 tasks per second, regardless of the concurrency limit.
await tasklane.amap(call_api, items, limit=50, rate_limit=100)
Progress
from tasklane import Progress
def show(p: Progress) -> None:
print(f"{p.completed}/{p.total} ({p.failed} failed) {p.rate:.0f}/s")
await tasklane.amap(fetch, urls, limit=10, on_progress=show)
Progress carries completed, total, succeeded, failed, in_flight, and
elapsed, plus remaining, fraction, and rate helpers. Plug it into tqdm,
a logger, or a web UI — no progress-bar dependency is imposed on you.
Collect errors instead of raising
results = await tasklane.amap(fetch, urls, return_exceptions=True)
ok = [r for r in results if not isinstance(r, Exception)]
Lane — configure once, reuse everywhere
from tasklane import Lane
# One policy for a specific downstream API.
github = Lane(limit=8, retries=3, rate_limit=20, timeout=10)
repos = await github.map(fetch_repo, repo_names)
async for issue in github.stream(fetch_issue, issue_ids):
...
# Lanes are immutable; derive a variant with .replace()
bulk = github.replace(limit=32)
How it works
tasklane runs a fixed pool of limit worker coroutines that pull items off a
bounded asyncio.Queue. The bounded queue is what gives you backpressure and
constant memory; the worker pool is what enforces the concurrency limit exactly.
Retries, per-attempt timeouts, and rate limiting are applied inside each worker,
and completions are streamed back to the caller — collected into order for
amap, or yielded as-they-finish for stream. On any early exit (fail-fast,
break, or external cancellation) every in-flight task is cancelled and awaited,
so nothing leaks.
API reference
| Symbol | Description |
|---|---|
amap(func, items, *, limit, retries, backoff, retry_on, timeout, return_exceptions, rate_limit, on_progress) |
Concurrent map; returns a list in input order. |
stream(func, items, *, ...) |
Async iterator yielding results in completion order. |
gather(*coros, limit, timeout, rate_limit, return_exceptions, on_progress) |
Concurrency-limited asyncio.gather. |
Lane(...) |
Reusable, immutable bundle of settings with .map, .stream, .gather, .replace. |
Backoff |
Retry delay strategy: .exponential, .linear, .constant. |
Progress |
Immutable progress snapshot passed to on_progress. |
Full signatures and docstrings ship with the package and are surfaced by your
editor (the library is fully typed and marked with py.typed).
Contributing
Contributions are welcome — see CONTRIBUTING.md. In short:
uv sync
uv run pytest # tests
uv run ruff check . # lint
uv run mypy # types
License
MIT © tasklane contributors
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file tasklane-0.1.0.tar.gz.
File metadata
- Download URL: tasklane-0.1.0.tar.gz
- Upload date:
- Size: 59.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0db4f83db6d249c316e2581202fb00de33b811582513293e20f44b840afccd5c
|
|
| MD5 |
1a8b1d369433604115f089b775841956
|
|
| BLAKE2b-256 |
ccdd3dd304abe1deb7c4e287302c7827ffd5968a769d8d3e1e9c5361ac1ff89f
|
Provenance
The following attestation bundles were made for tasklane-0.1.0.tar.gz:
Publisher:
release.yml on jpwm2/tasklane
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
tasklane-0.1.0.tar.gz -
Subject digest:
0db4f83db6d249c316e2581202fb00de33b811582513293e20f44b840afccd5c - Sigstore transparency entry: 1697553624
- Sigstore integration time:
-
Permalink:
jpwm2/tasklane@3256ac0490e10f6fecceaf04f70c00247f2018ed -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/jpwm2
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@3256ac0490e10f6fecceaf04f70c00247f2018ed -
Trigger Event:
release
-
Statement type:
File details
Details for the file tasklane-0.1.0-py3-none-any.whl.
File metadata
- Download URL: tasklane-0.1.0-py3-none-any.whl
- Upload date:
- Size: 13.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a48882e9989dde70073fdb0dd4aced3a78c146cc5f8124d35c0cbed35a5a6938
|
|
| MD5 |
5d4704568d2dbd716cc4ed4ad4acb1e2
|
|
| BLAKE2b-256 |
09a035b8c2f6ad448383ab691f2ebd8124873ed60ea401fd2236df26492c57c3
|
Provenance
The following attestation bundles were made for tasklane-0.1.0-py3-none-any.whl:
Publisher:
release.yml on jpwm2/tasklane
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
tasklane-0.1.0-py3-none-any.whl -
Subject digest:
a48882e9989dde70073fdb0dd4aced3a78c146cc5f8124d35c0cbed35a5a6938 - Sigstore transparency entry: 1697553689
- Sigstore integration time:
-
Permalink:
jpwm2/tasklane@3256ac0490e10f6fecceaf04f70c00247f2018ed -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/jpwm2
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@3256ac0490e10f6fecceaf04f70c00247f2018ed -
Trigger Event:
release
-
Statement type: