Simple concurrency pool executors for Python
Project description
Concurio
Concurio is a tiny asyncio-based concurrency pool for running many async callables with:
- Concurrency limiting via a semaphore
- Optional rate limiting (max executions per minute)
- Per-task timeouts that return a
TimeoutedTaskrecord (instead of raising) - Streaming results as tasks complete, with optional
tqdmprogress
Project status: alpha (API may change).
Installation
If you’ve published this package to PyPI:
pip install concurio
From source (this repo):
pip install -e .
Python: 3.8+. Dependency: tqdm.
Quickstart (sync entrypoint)
Use AsyncPoolExecutor.as_events() when you’re in a normal (non-async) script and want Concurio to manage the event loop internally.
import asyncio
from concurio import AsyncPoolExecutor, TqdmConfig, TimeoutedTask
async def fetch(i: int) -> int:
await asyncio.sleep(0.1)
return i * 2
def main() -> None:
results: list[int] = []
errors: list[Exception] = []
timeouts: list[TimeoutedTask] = []
with AsyncPoolExecutor(concurrency=10) as pool:
for i in range(100):
pool.submit(fetch, i, wait_timeout=2.0)
pool.as_events(
on_done=results.append,
on_error=errors.append,
on_timeout=timeouts.append,
tqdm_config=TqdmConfig(desc="Fetching", total=100),
)
print(f"done={len(results)} errors={len(errors)} timeouts={len(timeouts)}")
if __name__ == "__main__":
main()
Usage inside an async app
If you’re already in an async context (FastAPI, async CLI, notebooks, etc.), use async_as_events() and do not call as_events() (because as_events() uses asyncio.run() internally).
import asyncio
from concurio import AsyncPoolExecutor, TqdmConfig
async def work(i: int) -> int:
await asyncio.sleep(0.05)
return i
async def run() -> None:
results: list[int] = []
pool = AsyncPoolExecutor(concurrency=25)
for i in range(500):
pool.submit(work, i)
await pool.async_as_events(
on_done=results.append,
on_error=lambda e: print("error:", repr(e)),
on_timeout=lambda t: print("timeout:", t),
tqdm_config=TqdmConfig(desc="Working"),
)
print("results:", len(results))
asyncio.run(run())
Timeouts
Concurio supports two different timeout concepts:
- Per-task timeout (
wait_timeoutpassed tosubmit()): if the task takes too long, the result is aTimeoutedTaskinstance (it does not raise). - Iteration timeout (
timeoutpassed toas_events()/async_as_events()): passed toasyncio.as_completed(...). If it expires, iteration may stop early (and an exception may be surfaced throughon_error).
Rate limiting
Limit executions to a maximum number of task starts per minute:
from concurio import AsyncPoolExecutor
pool = AsyncPoolExecutor(concurrency=20, max_executions_per_minute=120) # 2/sec average
This is helpful for external APIs where you want both:
- Parallelism (concurrency), and
- A global request rate cap (rate limiter).
Progress reporting
- If
tqdm_configis provided and stdout is a TTY, Concurio usestqdm.asyncio.tqdm_asyncio.as_completed(...). - If stdout is not a TTY (e.g. CI logs), it falls back to plain
asyncio.as_completed(...)and logs progress periodically vialogging.
API overview
The public imports are exposed from concurio:
AsyncPoolExecutor(concurrency: int | None = None, max_executions_per_minute: int | None = None)submit(func, *args, wait_timeout: float | None = None, **kwargs) -> Awaitableas_events(on_done, on_error, on_timeout, timeout: float | None = None, tqdm_config: TqdmConfig | None = None) -> Noneasync_as_events(on_done, on_error, on_timeout, timeout: float | None = None, tqdm_config: TqdmConfig | None = None) -> Nonecount() -> int(alsolen(pool))
TqdmConfig(desc: str = "Processing tasks", total: int | None = None, disable: bool = False)TimeoutedTask(dataclass withfunc_name,timeout,args,kwargs)
Notes / gotchas
- Async callables only:
submit()expects anasync deffunction (i.e. returns an awaitable). If you need to run sync work, wrap it withasyncio.to_thread(...)in your own async wrapper. - Reuse:
AsyncPoolExecutorcollects coroutines you submit; exiting the context manager clears the internal list.
License
Apache-2.0. See LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file concurio-0.0.1.tar.gz.
File metadata
- Download URL: concurio-0.0.1.tar.gz
- Upload date:
- Size: 11.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.10.0 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/1.0.0 urllib3/1.26.20 tqdm/4.64.1 importlib-metadata/4.8.3 keyring/23.4.1 rfc3986/1.5.0 colorama/0.4.5 CPython/3.6.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2f50c493883fb58b79bd9dc28759b25b8e68e434cc3bd0549945f747db640c0c
|
|
| MD5 |
094a5f206c9674d94e5d59b43da80b8d
|
|
| BLAKE2b-256 |
ab5bc939ea1136beb4da15e10741bca45ecfb668b97fe9df09aab93d1e8b18d8
|
File details
Details for the file concurio-0.0.1-py3-none-any.whl.
File metadata
- Download URL: concurio-0.0.1-py3-none-any.whl
- Upload date:
- Size: 13.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.10.0 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/1.0.0 urllib3/1.26.20 tqdm/4.64.1 importlib-metadata/4.8.3 keyring/23.4.1 rfc3986/1.5.0 colorama/0.4.5 CPython/3.6.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
820d034a85ef9555ea7e1a863f8253fe15ab762cc19b6c44857ee1d7390dabef
|
|
| MD5 |
1eb7ac7c559820ea3b948a4133901154
|
|
| BLAKE2b-256 |
0e3a130b719541648a230892148baffa451ddf4ccc35f2103d1aa155f665a814
|