Skip to main content

Pool of asyncio coroutines with familiar interface

Project description

asyncio-pool-ng

Forked from asyncio-pool to include typing/mypy support and remove support for older versions of Python.

Pool of asyncio coroutines with familiar interface. Supports python 3.10+.

AioPool makes sure no more and no less (if possible) than size spawned coroutines are active at the same time. spawned means created and scheduled with one of the pool interface methods, active means coroutine function started executing it's code, as opposed to waiting -- which waits for pool space without entering coroutine function.

Interface

Read code doctrings for details.

AioPool(size=4, *, loop=None)

Creates pool of size concurrent tasks. Supports async context manager interface.

spawn(coro, cb=None, ctx=None)

Waits for pool space, then creates task for coro coroutine, returning future for it's result. Can spawn coroutine, created by cb with result of coro as first argument. ctx context is passed to callback as third positinal argument.

exec(coro, cb=None, ctx=None)

Waits for pool space, then creates task for coro, then waits for it to finish, then returns result of coro if no callback is provided, otherwise creates task for callback, waits for it and returns result of callback.

spawn_n(coro, cb=None, ctx=None)

Creates waiting task for coro, returns future without waiting for pool space. Task is executed "in pool" when pool space is available.

join()

Waits for all spawned (active and waiting) tasks to finish. Joining pool from coroutine, spawned by the same pool leads to deadlock.

cancel(*futures)

Cancels spawned tasks (active and waiting), finding them by provided futures. If no futures provided -- cancels all spawned tasks.

map(fn, iterable, cb=None, ctx=None, *, get_result=getres.flat)

Spawns coroutines created by fn function for each item in iterable with spawn, waits for all of them to finish (including callbacks), returns results maintaining order of iterable.

map_n(fn, iterable, cb=None, ctx=None, *, get_result=getres.flat)

Spawns coroutines created by fn function for each item in iterable with spawn_n, returns futures for task results maintaining order of iterable.

itermap(fn, iterable, cb=None, ctx=None, *, flat=True, get_result=getres.flat, timeout=None, yield_when=asyncio.ALL_COMPLETED)

Spawns tasks with map_n(fn, iterable, cb, ctx), then waits for results with asyncio.wait function, yielding ready results one by one if flat == True, otherwise yielding list of ready results.

Usage

spawn and map methods is probably what you should use in 99% of cases. Their overhead is minimal (~3% execution time), and even in worst cases memory usage is insignificant.

spawn_n, map_n and itermap methods give you more control and flexibily, but they come with a price of higher overhead. They spawn all tasks that you want, and most of the tasks wait their turn "in background". If you spawn too much (10**6+ tasks) -- you'll use most of the memory you have in system, also you'll lose a lot of time on "concurrency management" of all the tasks spawned.

Play with python tests/loadtest.py -h to understand what you want to use.

Usage examples (more in tests/ and examples/):

async def worker(n):  # dummy worker
    await aio.sleep(1 / n)
    return n


async def spawn_n_usage(todo=[range(1,51), range(51,101), range(101,200)]):
    futures = []
    async with AioPool(size=20) as pool:
        for tasks in todo:
            for i in tasks:  # too many tasks
                # Returns quickly for all tasks, does not wait for pool space.
                # Workers are not spawned, they wait for pool space in their
                # own background tasks.
                fut = pool.spawn_n(worker(i))
                futures.append(fut)
        # At this point not a single worker should start.

        # Context manager calls `join` at exit, so this will finish when all
        # workers return, crash or cancelled.

    assert sum(itertools.chain.from_iterable(todo)) == \
        sum(f.result() for f in futures)


async def spawn_usage(todo=range(1,4)):
    futures = []
    async with AioPool(size=2) as pool:
        for i in todo:  # 1, 2, 3
            # Returns quickly for 1 and 2, then waits for empty space for 3,
            # spawns 3 and returns. Can save some resources I guess.
            fut = await pool.spawn(worker(i))
            futures.append(fut)
        # At this point some of the workers already started.

        # Context manager calls `join` at exit, so this will finish when all
        # workers return, crash or cancelled.

    assert sum(todo) == sum(fut.result() for fut in futures)  # all done


async def map_usage(todo=range(100)):
    pool = AioPool(size=10)
    # Waits and collects results from all spawned workers,
    # returns them in same order as `todo`, if worker crashes or cancelled:
    # returns exception object as a result.
    # Basically, it wraps `spawn_usage` code into one call.
    results = await pool.map(worker, todo)

    # await pool.join()  # is not needed here, bcs no other tasks were spawned

    assert isinstance(results[0], ZeroDivisionError) \
        and sum(results[1:]) == sum(todo)


async def itermap_usage(todo=range(1,11)):
    result = 0
    async with AioPool(size=10) as pool:
        # Combines spawn_n and iterwait, which is a wrapper for asyncio.wait,
        # which yields results of finished workers according to `timeout` and
        # `yield_when` params passed to asyncio.wait (see it's docs for details)
        async for res in pool.itermap(worker, todo, timeout=0.5):
            result += res
        # technically, you can skip join call

    assert result == sum(todo)


async def callbacks_usage():

    async def wrk(n):  # custom dummy worker
        await aio.sleep(1 / n)
        return n

    async def cb(res, err, ctx):  # callback
        if err:  # error handling
            exc, tb = err
            assert tb  # the only purpose of this is logging
            return exc

        pool, n = ctx  # context can be anything you like
        await aio.sleep(1 / (n-1))
        return res + n

    todo = range(5)
    futures = []

    async with AioPool(size=2) as pool:
        for i in todo:
            fut = pool.spawn_n(wrk(i), cb, (pool, i))
            futures.append(fut)

    results = []
    for fut in futures:
        # there are helpers for result extraction. `flat` one will do
        # exactly what's written below
        #   from asyncio_pool import getres
        #   results.append(getres.flat(fut))
        try:
            results.append(fut.result())
        except Exception as e:
            results.append(e)

    # First error happens for n == 0 in wrk, exception of it is passed to
    # callback, callback returns it to us. Second one happens in callback itself
    # and is passed to us by pool.
    assert all(isinstance(e, ZeroDivisionError) for e in results[:2])

    # All n's in `todo` are passed through `wrk` and `cb` (cb adds wrk result
    # and # number, passed inside context), except for n == 0 and n == 1.
    assert sum(results[2:]) == 2 * (sum(todo) - 0 - 1)


async def exec_usage(todo=range(1,11)):
    async with AioPool(size=4) as pool:
        futures = pool.map_n(worker, todo)

        # While other workers are waiting or active, you can "synchronously"
        # execute one task. It does not interrupt  others, just waits for pool
        # space, then waits for task to finish and then returns it's result.
        important_res = await pool.exec(worker(2))
        assert 2 == important_res

        # You can continue working as usual:
        moar = await pool.spawn(worker(10))

    assert sum(todo) == sum(f.result() for f in futures)


async def cancel_usage():

    async def wrk(*arg, **kw):
        await aio.sleep(0.5)
        return 1

    pool = AioPool(size=2)

    f_quick = pool.spawn_n(aio.sleep(0.1))
    f12 = await pool.spawn(wrk()), pool.spawn_n(wrk())
    f35 = pool.map_n(wrk, range(3))

    # At this point, if you cancel futures, returned by pool methods,
    # you just won't be able to retrieve spawned task results, task
    # themselves will continue working. Don't do this:
    #   f_quick.cancel()
    # use `pool.cancel` instead:

    # cancel some
    await aio.sleep(0.1)
    cancelled, results = await pool.cancel(f12[0], f35[2])  # running and waiting
    assert 2 == cancelled  # none of them had time to finish
    assert 2 == len(results) and \
        all(isinstance(res, aio.CancelledError) for res in results)

    # cancel all others
    await aio.sleep(0.1)

    # not interrupted and finished successfully
    assert f_quick.done() and f_quick.result() is None

    cancelled, results = await pool.cancel()  # all
    assert 3 == cancelled
    assert len(results) == 3 and \
        all(isinstance(res, aio.CancelledError) for res in results)

    assert await pool.join()  # joins successfully


async def details(todo=range(1,11)):
    pool = AioPool(size=5)

    # This code:
    f1 = []
    for i in todo:
        f1.append(pool.spawn_n(worker(i)))
    # is equivalent to one call of `map_n`:
    f2 = pool.map_n(worker, todo)

    # Afterwards you can await for any given future:
    try:
        assert 3 == await f1[2]  # result of spawn_n(worker(3))
    except Exception as e:
        # exception happened in worker (or CancelledError) will be re-raised
        pass

    # Or use `asyncio.wait` to handle results in batches (see `iterwait` also):
    important_res = 0
    more_important = [f1[1], f2[1], f2[2]]
    while more_important:
        done, more_important = await aio.wait(more_important, timeout=0.5)
        # handle result, note it will re-raise exceptions
        important_res += sum(f.result() for f in done)

    assert important_res == 2 + 2 + 3

    # But you need to join, to allow all spawned workers to finish
    # (of course you can `asyncio.wait` all of the futures if you want to)
    await pool.join()

    assert all(f.done() for f in itertools.chain(f1,f2))  # this is guaranteed
    assert 2 * sum(todo) == sum(f.result() for f in itertools.chain(f1,f2))

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asyncio_pool_ng-0.7.0.dev0.tar.gz (13.0 kB view details)

Uploaded Source

Built Distribution

asyncio_pool_ng-0.7.0.dev0-py3-none-any.whl (10.8 kB view details)

Uploaded Python 3

File details

Details for the file asyncio_pool_ng-0.7.0.dev0.tar.gz.

File metadata

  • Download URL: asyncio_pool_ng-0.7.0.dev0.tar.gz
  • Upload date:
  • Size: 13.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.2.2 CPython/3.10.5 Linux/4.19.104-microsoft-standard

File hashes

Hashes for asyncio_pool_ng-0.7.0.dev0.tar.gz
Algorithm Hash digest
SHA256 4602d6a6ca42d807e686981511d5392ce4dad3b4179fb14e81c02930a2896639
MD5 65f4e279295a2641d8b0cfc14b4848e6
BLAKE2b-256 791ff1061efb176ea1e1d76cf63fd2b676f38f86e165b7e0d41f3f1be4d6200b

See more details on using hashes here.

File details

Details for the file asyncio_pool_ng-0.7.0.dev0-py3-none-any.whl.

File metadata

  • Download URL: asyncio_pool_ng-0.7.0.dev0-py3-none-any.whl
  • Upload date:
  • Size: 10.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.2.2 CPython/3.10.5 Linux/4.19.104-microsoft-standard

File hashes

Hashes for asyncio_pool_ng-0.7.0.dev0-py3-none-any.whl
Algorithm Hash digest
SHA256 ed43ae1e5a058e354d22b91d0c7beda60006e926f8782cf5f56b4d14f85c6e66
MD5 0bb53caa386438fe8e2e3a8b55b8d305
BLAKE2b-256 85119b73729430d79384b11ba3727df6223a7892810d0722a9f9b664eee366f6

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page