Skip to main content

Simple concurrency pool executors for Python

Project description

Concurio

Concurio is a tiny asyncio-based concurrency pool for running many async callables with:

  • Concurrency limiting via a semaphore
  • Optional rate limiting (max executions per minute)
  • Per-task timeouts that return a TimeoutedTask record (instead of raising)
  • Streaming results as tasks complete, with optional tqdm progress

Project status: alpha (API may change).

Installation

If you’ve published this package to PyPI:

pip install concurio

From source (this repo):

pip install -e .

Python: 3.8+. Dependency: tqdm.

Quickstart (sync entrypoint)

Use AsyncPoolExecutor.as_events() when you’re in a normal (non-async) script and want Concurio to manage the event loop internally.

import asyncio
from concurio import AsyncPoolExecutor, TqdmConfig, TimeoutedTask


async def fetch(i: int) -> int:
    await asyncio.sleep(0.1)
    return i * 2


def main() -> None:
    results: list[int] = []
    errors: list[Exception] = []
    timeouts: list[TimeoutedTask] = []

    with AsyncPoolExecutor(concurrency=10) as pool:
        for i in range(100):
            pool.submit(fetch, i, wait_timeout=2.0)

        pool.as_events(
            on_done=results.append,
            on_error=errors.append,
            on_timeout=timeouts.append,
            tqdm_config=TqdmConfig(desc="Fetching", total=100),
        )

    print(f"done={len(results)} errors={len(errors)} timeouts={len(timeouts)}")


if __name__ == "__main__":
    main()

Usage inside an async app

If you’re already in an async context (FastAPI, async CLI, notebooks, etc.), use async_as_events() and do not call as_events() (because as_events() uses asyncio.run() internally).

import asyncio
from concurio import AsyncPoolExecutor, TqdmConfig


async def work(i: int) -> int:
    await asyncio.sleep(0.05)
    return i


async def run() -> None:
    results: list[int] = []

    pool = AsyncPoolExecutor(concurrency=25)
    for i in range(500):
        pool.submit(work, i)

    await pool.async_as_events(
        on_done=results.append,
        on_error=lambda e: print("error:", repr(e)),
        on_timeout=lambda t: print("timeout:", t),
        tqdm_config=TqdmConfig(desc="Working"),
    )

    print("results:", len(results))


asyncio.run(run())

Timeouts

Concurio supports two different timeout concepts:

  • Per-task timeout (wait_timeout passed to submit()): if the task takes too long, the result is a TimeoutedTask instance (it does not raise).
  • Iteration timeout (timeout passed to as_events() / async_as_events()): passed to asyncio.as_completed(...). If it expires, iteration may stop early (and an exception may be surfaced through on_error).

Rate limiting

Limit executions to a maximum number of task starts per minute:

from concurio import AsyncPoolExecutor

pool = AsyncPoolExecutor(concurrency=20, max_executions_per_minute=120)  # 2/sec average

This is helpful for external APIs where you want both:

  • Parallelism (concurrency), and
  • A global request rate cap (rate limiter).

Progress reporting

  • If tqdm_config is provided and stdout is a TTY, Concurio uses tqdm.asyncio.tqdm_asyncio.as_completed(...).
  • If stdout is not a TTY (e.g. CI logs), it falls back to plain asyncio.as_completed(...) and logs progress periodically via logging.

API overview

The public imports are exposed from concurio:

  • AsyncPoolExecutor(concurrency: int | None = None, max_executions_per_minute: int | None = None)
    • submit(func, *args, wait_timeout: float | None = None, **kwargs) -> Awaitable
    • as_events(on_done, on_error, on_timeout, timeout: float | None = None, tqdm_config: TqdmConfig | None = None) -> None
    • async_as_events(on_done, on_error, on_timeout, timeout: float | None = None, tqdm_config: TqdmConfig | None = None) -> None
    • count() -> int (also len(pool))
  • TqdmConfig(desc: str = "Processing tasks", total: int | None = None, disable: bool = False)
  • TimeoutedTask (dataclass with func_name, timeout, args, kwargs)

Notes / gotchas

  • Async callables only: submit() expects an async def function (i.e. returns an awaitable). If you need to run sync work, wrap it with asyncio.to_thread(...) in your own async wrapper.
  • Reuse: AsyncPoolExecutor collects coroutines you submit; exiting the context manager clears the internal list.

License

Apache-2.0. See LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

concurio-0.0.1.tar.gz (11.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

concurio-0.0.1-py3-none-any.whl (13.7 kB view details)

Uploaded Python 3

File details

Details for the file concurio-0.0.1.tar.gz.

File metadata

  • Download URL: concurio-0.0.1.tar.gz
  • Upload date:
  • Size: 11.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.10.0 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/1.0.0 urllib3/1.26.20 tqdm/4.64.1 importlib-metadata/4.8.3 keyring/23.4.1 rfc3986/1.5.0 colorama/0.4.5 CPython/3.6.8

File hashes

Hashes for concurio-0.0.1.tar.gz
Algorithm Hash digest
SHA256 2f50c493883fb58b79bd9dc28759b25b8e68e434cc3bd0549945f747db640c0c
MD5 094a5f206c9674d94e5d59b43da80b8d
BLAKE2b-256 ab5bc939ea1136beb4da15e10741bca45ecfb668b97fe9df09aab93d1e8b18d8

See more details on using hashes here.

File details

Details for the file concurio-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: concurio-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 13.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.10.0 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/1.0.0 urllib3/1.26.20 tqdm/4.64.1 importlib-metadata/4.8.3 keyring/23.4.1 rfc3986/1.5.0 colorama/0.4.5 CPython/3.6.8

File hashes

Hashes for concurio-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 820d034a85ef9555ea7e1a863f8253fe15ab762cc19b6c44857ee1d7390dabef
MD5 1eb7ac7c559820ea3b948a4133901154
BLAKE2b-256 0e3a130b719541648a230892148baffa451ddf4ccc35f2103d1aa155f665a814

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page