Skip to main content

Bounded-concurrency async for Python: run, map, and stream awaitables with limits, retries, rate limiting, and progress — in one typed call.

Project description

tasklane

Bounded-concurrency async for Python — run, map, and stream awaitables with a concurrency limit, retries, backoff, rate limiting, and progress, in one typed call.

CI PyPI Python License: MIT Types: typed Ruff

Every Python project that fans out async work eventually rewrites the same block: an asyncio.Semaphore to cap concurrency, a try/except retry loop, a counter for progress, maybe a sleep to stay under a rate limit. tasklane is that block, done once — correct, fully typed, and zero runtime dependencies.

import asyncio
import httpx
import tasklane

async def fetch(url: str) -> int:
    async with httpx.AsyncClient() as client:
        return len((await client.get(url)).text)

async def main() -> None:
    urls = [f"https://example.com/{i}" for i in range(1000)]

    sizes = await tasklane.amap(
        fetch, urls,
        limit=20,          # at most 20 requests in flight
        retries=3,         # retry failures up to 3x with exponential backoff
        rate_limit=50,     # start at most 50 requests per second
        timeout=10,        # per-attempt timeout (seconds)
    )
    print(sum(sizes))

asyncio.run(main())

Install

pip install tasklane
# or
uv add tasklane

Requires Python 3.10+. No third-party dependencies.

Why not just asyncio.gather?

asyncio.gather starts everything at once. Fan out 10,000 requests and you open 10,000 sockets, trip rate limits, and OOM. The usual fixes are scattered across the stdlib and third-party libs; tasklane brings them together:

asyncio.gather Semaphore + gather aiometer tasklane
Concurrency limit manual
Results in input order
Stream results as completed as_completed manual
Retries + backoff
Rate limiting (per second)
Progress callbacks
Per-task timeout manual
Backpressure on huge inputs manual
Runtime dependencies stdlib stdlib anyio none

Features

amap — concurrent map, results in order

results = await tasklane.amap(fetch, urls, limit=10)
# results[i] corresponds to urls[i]

Accepts both sync and async iterables, and works in constant memory thanks to a bounded internal queue — you can map over a million-item generator without materializing a million tasks.

stream — react to results as they finish

async for size in tasklane.stream(fetch, urls, limit=10):
    print(size)  # arrives in completion order, fastest first

gather — a drop-in asyncio.gather with a limit

results = await tasklane.gather(*(fetch(u) for u in urls), limit=10)

On fail-fast, the remaining coroutines are cancelled and closed, so you never see a coroutine was never awaited warning.

Retries with backoff

from tasklane import Backoff

await tasklane.amap(
    fetch, urls,
    retries=5,
    backoff=Backoff.exponential(0.2, factor=2, max_delay=30),  # 0.2, 0.4, 0.8, ... + jitter
    retry_on=(TimeoutError, ConnectionError),                  # type, tuple, or predicate
)

Backoff.exponential() (the default when retries > 0), Backoff.linear(), and Backoff.constant() cover the common cases. retry_on accepts an exception type, a tuple of types, or a Callable[[BaseException], bool] predicate.

Rate limiting

# Never start more than 100 tasks per second, regardless of the concurrency limit.
await tasklane.amap(call_api, items, limit=50, rate_limit=100)

Progress

from tasklane import Progress

def show(p: Progress) -> None:
    print(f"{p.completed}/{p.total}  ({p.failed} failed)  {p.rate:.0f}/s")

await tasklane.amap(fetch, urls, limit=10, on_progress=show)

Progress carries completed, total, succeeded, failed, in_flight, and elapsed, plus remaining, fraction, and rate helpers. Plug it into tqdm, a logger, or a web UI — no progress-bar dependency is imposed on you.

Collect errors instead of raising

results = await tasklane.amap(fetch, urls, return_exceptions=True)
ok = [r for r in results if not isinstance(r, Exception)]

Lane — configure once, reuse everywhere

from tasklane import Lane

# One policy for a specific downstream API.
github = Lane(limit=8, retries=3, rate_limit=20, timeout=10)

repos = await github.map(fetch_repo, repo_names)
async for issue in github.stream(fetch_issue, issue_ids):
    ...

# Lanes are immutable; derive a variant with .replace()
bulk = github.replace(limit=32)

How it works

tasklane runs a fixed pool of limit worker coroutines that pull items off a bounded asyncio.Queue. The bounded queue is what gives you backpressure and constant memory; the worker pool is what enforces the concurrency limit exactly. Retries, per-attempt timeouts, and rate limiting are applied inside each worker, and completions are streamed back to the caller — collected into order for amap, or yielded as-they-finish for stream. On any early exit (fail-fast, break, or external cancellation) every in-flight task is cancelled and awaited, so nothing leaks.

API reference

Symbol Description
amap(func, items, *, limit, retries, backoff, retry_on, timeout, return_exceptions, rate_limit, on_progress) Concurrent map; returns a list in input order.
stream(func, items, *, ...) Async iterator yielding results in completion order.
gather(*coros, limit, timeout, rate_limit, return_exceptions, on_progress) Concurrency-limited asyncio.gather.
Lane(...) Reusable, immutable bundle of settings with .map, .stream, .gather, .replace.
Backoff Retry delay strategy: .exponential, .linear, .constant.
Progress Immutable progress snapshot passed to on_progress.

Full signatures and docstrings ship with the package and are surfaced by your editor (the library is fully typed and marked with py.typed).

Contributing

Contributions are welcome — see CONTRIBUTING.md. In short:

uv sync
uv run pytest          # tests
uv run ruff check .    # lint
uv run mypy            # types

License

MIT © tasklane contributors

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tasklane-0.1.0.tar.gz (59.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

tasklane-0.1.0-py3-none-any.whl (13.8 kB view details)

Uploaded Python 3

File details

Details for the file tasklane-0.1.0.tar.gz.

File metadata

  • Download URL: tasklane-0.1.0.tar.gz
  • Upload date:
  • Size: 59.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for tasklane-0.1.0.tar.gz
Algorithm Hash digest
SHA256 0db4f83db6d249c316e2581202fb00de33b811582513293e20f44b840afccd5c
MD5 1a8b1d369433604115f089b775841956
BLAKE2b-256 ccdd3dd304abe1deb7c4e287302c7827ffd5968a769d8d3e1e9c5361ac1ff89f

See more details on using hashes here.

Provenance

The following attestation bundles were made for tasklane-0.1.0.tar.gz:

Publisher: release.yml on jpwm2/tasklane

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file tasklane-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: tasklane-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 13.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for tasklane-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a48882e9989dde70073fdb0dd4aced3a78c146cc5f8124d35c0cbed35a5a6938
MD5 5d4704568d2dbd716cc4ed4ad4acb1e2
BLAKE2b-256 09a035b8c2f6ad448383ab691f2ebd8124873ed60ea401fd2236df26492c57c3

See more details on using hashes here.

Provenance

The following attestation bundles were made for tasklane-0.1.0-py3-none-any.whl:

Publisher: release.yml on jpwm2/tasklane

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page