Skip to main content

A concurrent.futures-like API for executing coroutines using a pool of tasks

Project description

cf-taskpool

An asynchronous task pool with a concurrent.futures-like API for executing coroutines using a pool of asyncio tasks.

Features

  • Simple, familiar API - If you've used ThreadPoolExecutor or ProcessPoolExecutor, you'll feel right at home
  • Plays well with asyncio - Seamlessly integrates with asyncio.wait(), asyncio.as_completed(), and other asyncio utilities
  • 100% test coverage - Most tests ported from Python's concurrent.futures test suite
  • No 3rd party dependencies - Pure Python, only requires Python 3.11+
  • MIT licensed - Free to use, modify, and distribute

Installation

pip install cf-taskpool

Quick Start

import asyncio
from cf_taskpool import TaskPoolExecutor


async def fetch_data(url: str) -> str:
    """Simulate an async operation."""
    await asyncio.sleep(0.1)
    return f"Data from {url}"


async def main():
    # Create an executor with a pool of 3 workers
    async with TaskPoolExecutor(max_workers=3) as executor:
        # Submit a single task
        future = await executor.submit(fetch_data, "https://example.com")
        result = await future
        print(result)  # Data from https://example.com


asyncio.run(main())

API Overview

TaskPoolExecutor(max_workers=None, task_name_prefix="")

Creates a new task pool executor.

  • max_workers: Maximum number of workers (defaults to os.cpu_count())
  • task_name_prefix: Optional prefix for worker task names

async submit(fn, /, *args, **kwargs) -> asyncio.Future

Submits a callable to be executed. Returns an asyncio.Future.

async def multiply(x: int, y: int) -> int:
    await asyncio.sleep(0.1)
    return x * y


async with TaskPoolExecutor() as executor:
    future = await executor.submit(multiply, 6, 7)
    result = await future
    print(result)  # 42

You can also submit an awaitable directly:

async with TaskPoolExecutor() as executor:
    coro = multiply(6, 7)
    future = await executor.submit(coro)
    result = await future
    print(result)  # 42

async map(fn, *iterables, buffersize=None) -> AsyncGenerator

Returns an async iterator that yields results as they complete.

async def process_item(item: int) -> int:
    await asyncio.sleep(0.1)
    return item * 2


async with TaskPoolExecutor(max_workers=3) as executor:
    results = []
    async for result in await executor.map(process_item, range(5)):
        results.append(result)
    print(results)  # [0, 2, 4, 6, 8]

The buffersize parameter controls how many items are pre-fetched:

# Only buffer 2 items at a time (useful for large or infinite iterables)
async for result in await executor.map(process_item, range(100), buffersize=2):
    print(result)

async shutdown(wait=True, cancel_futures=False)

Signals the executor to stop accepting new tasks and cleans up resources.

  • wait: If True, waits for pending tasks to complete
  • cancel_futures: If True, cancels all pending futures
executor = TaskPoolExecutor(max_workers=3)
# ... submit tasks ...
await executor.shutdown(wait=True)

Or use as a context manager (recommended):

async with TaskPoolExecutor(max_workers=3) as executor:
    # ... submit tasks ...
    # shutdown() is called automatically on exit

Integration with asyncio

Using asyncio.wait()

async def task(name: str, delay: float) -> str:
    await asyncio.sleep(delay)
    return name


async with TaskPoolExecutor(max_workers=3) as executor:
    futures = [
        await executor.submit(task, "fast", 0.1),
        await executor.submit(task, "medium", 0.2),
        await executor.submit(task, "slow", 0.3),
    ]

    # Wait for the first task to complete
    done, pending = await asyncio.wait(
        futures, return_when=asyncio.FIRST_COMPLETED
    )
    for future in done:
        print(f"Completed: {await future}")

    # Wait for all tasks to complete
    done, pending = await asyncio.wait(
        pending, return_when=asyncio.ALL_COMPLETED
    )
    for future in done:
        print(f"Completed: {await future}")

Using asyncio.as_completed()

async with TaskPoolExecutor(max_workers=3) as executor:
    futures = [
        await executor.submit(task, "task1", 0.3),
        await executor.submit(task, "task2", 0.1),
        await executor.submit(task, "task3", 0.2),
    ]

    # Process results as they complete
    for coro in asyncio.as_completed(futures):
        result = await coro
        print(f"Got result: {result}")

Using asyncio.gather()

async with TaskPoolExecutor(max_workers=3) as executor:
    futures = [
        await executor.submit(task, "task1", 0.3),
        await executor.submit(task, "task2", 0.1),
        await executor.submit(task, "task3", 0.2),
    ]

    # Wait for all and collect results
    results = await asyncio.gather(*futures)
    print(results)  # ['task1', 'task2', 'task3']

Advanced Examples

Handling Exceptions

async def failing_task():
    await asyncio.sleep(0.1)
    raise ValueError("Something went wrong")


async with TaskPoolExecutor() as executor:
    future = await executor.submit(failing_task)

    try:
        await future
    except ValueError as e:
        print(f"Caught exception: {e}")

    # Or check the exception later
    print(future.exception())  # ValueError("Something went wrong")

Cancellation

async def long_running_task():
    await asyncio.sleep(10)
    return "Done"


async with TaskPoolExecutor() as executor:
    future = await executor.submit(long_running_task)

    # Cancel the task
    future.cancel()

    try:
        await future
    except asyncio.CancelledError:
        print("Task was cancelled")

Processing Multiple Iterables with map()

async def multiply(x: int, y: int) -> int:
    await asyncio.sleep(0.1)
    return x * y


async with TaskPoolExecutor(max_workers=3) as executor:
    # Multiply corresponding elements from two iterables
    async for result in await executor.map(multiply, range(5), range(5, 10)):
        print(result)  # 0, 6, 14, 24, 36

Buffered Processing with map()

import itertools as it


async def process(n: int) -> int:
    await asyncio.sleep(0.1)
    return n * 2


async with TaskPoolExecutor(max_workers=3) as executor:
    # Process an infinite iterator with a buffer of 2
    async for result in await executor.map(process, it.count(), buffersize=2):
        print(result)
        if result >= 20:
            break

Comparison with concurrent.futures

Feature ThreadPoolExecutor ProcessPoolExecutor TaskPoolExecutor
Executes Functions in threads Functions in processes Coroutines/Awaitables in tasks
Returns concurrent.futures.Future concurrent.futures.Future asyncio.Future
Use with concurrent.futures.wait() concurrent.futures.wait() asyncio.wait()
Use with concurrent.futures.as_completed() concurrent.futures.as_completed() asyncio.as_completed()
Best for I/O-bound blocking code CPU-bound code Async/await code

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cf_taskpool-0.1.0.tar.gz (12.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cf_taskpool-0.1.0-py3-none-any.whl (7.0 kB view details)

Uploaded Python 3

File details

Details for the file cf_taskpool-0.1.0.tar.gz.

File metadata

  • Download URL: cf_taskpool-0.1.0.tar.gz
  • Upload date:
  • Size: 12.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for cf_taskpool-0.1.0.tar.gz
Algorithm Hash digest
SHA256 bfe999c22d71d22b078f8e646e07a5b9d019ff39a04396f2c50add0fbcf67ac8
MD5 7c07b71e31f3e258f2b2049ec2598b62
BLAKE2b-256 84db5ac4f82a536d980184b918623afbeb6eaf32aca3052d27c4af75d1caaabb

See more details on using hashes here.

File details

Details for the file cf_taskpool-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: cf_taskpool-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 7.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for cf_taskpool-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 acc88c22e33255bd9dca0c86bf8529e6289d92f78fbd82504321e2f1052dc188
MD5 c757be00bb9e0849eda7b61af292e0c4
BLAKE2b-256 1f66358dade359b656a936f80f4045e8bffc3e96d27d03b87a727a1846c5c055

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page