Skip to main content

A pool of coroutine functions.

Project description

asyncio-pool-ng

PyPI version Python Versions License: MIT Code style: black

About

AsyncioPoolNG takes the ideas used in asyncio-pool and wraps them around an asyncio.TaskGroup.

AsyncioPool has three main functions spawn, map, and itermap.

  1. spawn: Schedule an async function on the pool and get a future back which will eventually have either the result or the exception from the function.
  2. map: Spawn an async function for each item in an iterable object, and return a set containing a future for each item.
  • asyncio.wait() can be used to wait for the set of futures to complete.
  • When the AsyncioPool closes, it will wait for all tasks to complete. All pending futures will be complete once it is closed.
  1. itermap: Works similarly to map but returns an Async Generator which yields each future as it completes.

Differences from asyncio-pool

  1. asyncio-pool-ng implements Python typing and passes validation checks with mypy's strict mode. This helps IDEs and static type checkers know what type of result to expect when getting data from a completed future.
  2. asyncio-pool uses callbacks to process data before returning it; asyncio-pool-ng only returns Future instances directly. The future will contain either a result or an exception which can then be handled as needed.
  3. While asyncio-pool schedules Coroutine instances directly, asyncio-pool-ng takes the callable and arguments, and creates the Coroutine instance at execution time.

Example

import asyncio
import logging
from random import random

from asyncio_pool import AsyncioPool


logging.basicConfig(level=logging.INFO)


async def worker(number: int) -> int:
    await asyncio.sleep(random() / 2)
    return number * 2


async def main() -> None:
    result: int = 0
    results: list[int] = []

    async with AsyncioPool(2) as pool:
        """spawn task and wait for the results"""
        result = await pool.spawn(worker, 5)
        assert result == 10
        logging.info(f"results for pool.spawn(worker, 5): {result}")

        """spawn task and get results later"""
        future: asyncio.Future[int] = pool.spawn(worker, 5)

        # do other stuff

        result = await future
        assert result == 10

        """map an async function to a set of values"""
        futures: set[asyncio.Future[int]] = pool.map(worker, range(10))
        await asyncio.wait(futures)
        results = [x.result() for x in futures]
        logging.info(f"results for pool.map(worker, range(10)): {results}")
        results.sort()
        assert results == [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

        """iterate futures as they complete"""
        logging.info("results for pool.itermap(worker, range(10)):")
        results = []
        async for future in pool.itermap(worker, range(10)):
            results.append(future.result())
            logging.info(f"> {future.result()}")

        results.sort()
        assert results == [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]


asyncio.run(main())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asyncio_pool_ng-0.8.1.tar.gz (7.0 kB view hashes)

Uploaded Source

Built Distribution

asyncio_pool_ng-0.8.1-py3-none-any.whl (7.4 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page