A concurrent.futures-like API for executing coroutines using a pool of tasks
Project description
cf-taskpool
An asynchronous task pool with a concurrent.futures-like API for executing coroutines using a pool of asyncio tasks.
Features
- Simple, familiar API - If you've used
ThreadPoolExecutororProcessPoolExecutor, you'll feel right at home - Plays well with asyncio - Seamlessly integrates with
asyncio.wait(),asyncio.as_completed(), and other asyncio utilities - 100% test coverage - Most tests ported from Python's
concurrent.futurestest suite - No 3rd party dependencies - Pure Python, only requires Python 3.11+
- MIT licensed - Free to use, modify, and distribute
Installation
pip install cf-taskpool
Quick Start
import asyncio
from cf_taskpool import TaskPoolExecutor
async def fetch_data(url: str) -> str:
"""Simulate an async operation."""
await asyncio.sleep(0.1)
return f"Data from {url}"
async def main():
# Create an executor with a pool of 3 workers
async with TaskPoolExecutor(max_workers=3) as executor:
# Submit a single task
future = await executor.submit(fetch_data, "https://example.com")
result = await future
print(result) # Data from https://example.com
asyncio.run(main())
API Overview
TaskPoolExecutor(max_workers=None, task_name_prefix="")
Creates a new task pool executor.
max_workers: Maximum number of workers (defaults toos.cpu_count())task_name_prefix: Optional prefix for worker task names
async submit(fn, /, *args, **kwargs) -> asyncio.Future
Submits a callable to be executed. Returns an asyncio.Future.
async def multiply(x: int, y: int) -> int:
await asyncio.sleep(0.1)
return x * y
async with TaskPoolExecutor() as executor:
future = await executor.submit(multiply, 6, 7)
result = await future
print(result) # 42
You can also submit an awaitable directly:
async with TaskPoolExecutor() as executor:
coro = multiply(6, 7)
future = await executor.submit(coro)
result = await future
print(result) # 42
async map(fn, *iterables, buffersize=None) -> AsyncGenerator
Returns an async iterator that yields results as they complete.
async def process_item(item: int) -> int:
await asyncio.sleep(0.1)
return item * 2
async with TaskPoolExecutor(max_workers=3) as executor:
results = []
async for result in await executor.map(process_item, range(5)):
results.append(result)
print(results) # [0, 2, 4, 6, 8]
The buffersize parameter controls how many items are pre-fetched:
# Only buffer 2 items at a time (useful for large or infinite iterables)
async for result in await executor.map(process_item, range(100), buffersize=2):
print(result)
async shutdown(wait=True, cancel_futures=False)
Signals the executor to stop accepting new tasks and cleans up resources.
wait: IfTrue, waits for pending tasks to completecancel_futures: IfTrue, cancels all pending futures
executor = TaskPoolExecutor(max_workers=3)
# ... submit tasks ...
await executor.shutdown(wait=True)
Or use as a context manager (recommended):
async with TaskPoolExecutor(max_workers=3) as executor:
# ... submit tasks ...
# shutdown() is called automatically on exit
Integration with asyncio
Using asyncio.wait()
async def task(name: str, delay: float) -> str:
await asyncio.sleep(delay)
return name
async with TaskPoolExecutor(max_workers=3) as executor:
futures = [
await executor.submit(task, "fast", 0.1),
await executor.submit(task, "medium", 0.2),
await executor.submit(task, "slow", 0.3),
]
# Wait for the first task to complete
done, pending = await asyncio.wait(
futures, return_when=asyncio.FIRST_COMPLETED
)
for future in done:
print(f"Completed: {await future}")
# Wait for all tasks to complete
done, pending = await asyncio.wait(
pending, return_when=asyncio.ALL_COMPLETED
)
for future in done:
print(f"Completed: {await future}")
Using asyncio.as_completed()
async with TaskPoolExecutor(max_workers=3) as executor:
futures = [
await executor.submit(task, "task1", 0.3),
await executor.submit(task, "task2", 0.1),
await executor.submit(task, "task3", 0.2),
]
# Process results as they complete
for coro in asyncio.as_completed(futures):
result = await coro
print(f"Got result: {result}")
Using asyncio.gather()
async with TaskPoolExecutor(max_workers=3) as executor:
futures = [
await executor.submit(task, "task1", 0.3),
await executor.submit(task, "task2", 0.1),
await executor.submit(task, "task3", 0.2),
]
# Wait for all and collect results
results = await asyncio.gather(*futures)
print(results) # ['task1', 'task2', 'task3']
Advanced Examples
Handling Exceptions
async def failing_task():
await asyncio.sleep(0.1)
raise ValueError("Something went wrong")
async with TaskPoolExecutor() as executor:
future = await executor.submit(failing_task)
try:
await future
except ValueError as e:
print(f"Caught exception: {e}")
# Or check the exception later
print(future.exception()) # ValueError("Something went wrong")
Cancellation
async def long_running_task():
await asyncio.sleep(10)
return "Done"
async with TaskPoolExecutor() as executor:
future = await executor.submit(long_running_task)
# Cancel the task
future.cancel()
try:
await future
except asyncio.CancelledError:
print("Task was cancelled")
Processing Multiple Iterables with map()
async def multiply(x: int, y: int) -> int:
await asyncio.sleep(0.1)
return x * y
async with TaskPoolExecutor(max_workers=3) as executor:
# Multiply corresponding elements from two iterables
async for result in await executor.map(multiply, range(5), range(5, 10)):
print(result) # 0, 6, 14, 24, 36
Buffered Processing with map()
import itertools as it
async def process(n: int) -> int:
await asyncio.sleep(0.1)
return n * 2
async with TaskPoolExecutor(max_workers=3) as executor:
# Process an infinite iterator with a buffer of 2
async for result in await executor.map(process, it.count(), buffersize=2):
print(result)
if result >= 20:
break
Comparison with concurrent.futures
| Feature | ThreadPoolExecutor | ProcessPoolExecutor | TaskPoolExecutor |
|---|---|---|---|
| Executes | Functions in threads | Functions in processes | Coroutines/Awaitables in tasks |
| Returns | concurrent.futures.Future |
concurrent.futures.Future |
asyncio.Future |
| Use with | concurrent.futures.wait() |
concurrent.futures.wait() |
asyncio.wait() |
| Use with | concurrent.futures.as_completed() |
concurrent.futures.as_completed() |
asyncio.as_completed() |
| Best for | I/O-bound blocking code | CPU-bound code | Async/await code |
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cf_taskpool-0.1.0.tar.gz.
File metadata
- Download URL: cf_taskpool-0.1.0.tar.gz
- Upload date:
- Size: 12.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bfe999c22d71d22b078f8e646e07a5b9d019ff39a04396f2c50add0fbcf67ac8
|
|
| MD5 |
7c07b71e31f3e258f2b2049ec2598b62
|
|
| BLAKE2b-256 |
84db5ac4f82a536d980184b918623afbeb6eaf32aca3052d27c4af75d1caaabb
|
File details
Details for the file cf_taskpool-0.1.0-py3-none-any.whl.
File metadata
- Download URL: cf_taskpool-0.1.0-py3-none-any.whl
- Upload date:
- Size: 7.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
acc88c22e33255bd9dca0c86bf8529e6289d92f78fbd82504321e2f1052dc188
|
|
| MD5 |
c757be00bb9e0849eda7b61af292e0c4
|
|
| BLAKE2b-256 |
1f66358dade359b656a936f80f4045e8bffc3e96d27d03b87a727a1846c5c055
|