Skip to main content

A collection of task utils for asyncio

Project description

Utils for managing concurrent asyncio tasks.

You can use the concurrent async generator to run asyncio tasks concurrently.

It works much like asyncio.as_available, but with a couple of differences.

  • coros can be any iterables including sync/async generators

  • limit can be supplied to specify the maximum number of concurrent tasks

Setting limit to -1 will make all tasks run concurrently.

The default limit is number of cores + 4 to a maximum of 32. This (somewhat arbitrarily) reflects the default for asyncio’s ThreadPoolExecutor.

For network tasks it might make sense to set the concurrency limit lower than the default, if, for example, opening many concurrent connections will trigger rate-limiting or soak bandwidth.

If an error is raised while trying to iterate the provided coroutines, the error is wrapped in an ConcurrentIteratorError and is raised immediately.

In this case, no further handling occurs, and yield_exceptions has no effect.

Any errors raised while trying to create or run tasks are wrapped in ConcurrentError.

Any errors raised during task execution are wrapped in ConcurrentExecutionError.

If you specify yield_exceptions as True then the wrapped errors will be yielded in the results.

If yield_exceptions is False (the default), then the wrapped error will be raised immediately.

If you use any kind of Generator or AsyncGenerator to produce the awaitables, and yield_exceptions is False, in the event that an error occurs, it is your responsibility to close remaining awaitables that you might have created, but which have not already been fired.

This utility is useful for concurrency of io-bound (as opposed to cpu-bound) tasks.

Usage

Lets first create a coroutine that waits for a random amount of time, and then returns its id and how long it waited.

>>> import random

>>> async def task_to_run(task_id):
...     print(f"{task_id} starting")
...     wait = random.random() * 5
...     await asyncio.sleep(wait)
...     return task_id, wait

Next lets create an async generator that yields 10 of the coroutines.

Note that the coroutines are not awaited, they will be created as tasks.

>>> def provider():
...     for task_id in range(0, 10):
...         yield task_to_run(task_id)

Finally, lets create an function to asynchronously iterate the results, and fire it with the generator.

As we limit the concurrency to 3, the first 3 jobs start, and as the first returns, the next one fires.

This continues until all have completed.

>>> import asyncio
>>> from aio.tasks import concurrent

>>> async def run(coros):
...     async for (task_id, wait) in concurrent(coros, limit=3):
...         print(f"{task_id} waited {wait}")

>>> asyncio.run(run(provider()))
0 starting
1 starting
2 starting
... waited ...
3 starting
... waited ...
...
... waited ...

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aio.tasks-0.0.5.tar.gz (8.1 kB view details)

Uploaded Source

Built Distribution

aio.tasks-0.0.5-py3-none-any.whl (8.2 kB view details)

Uploaded Python 3

File details

Details for the file aio.tasks-0.0.5.tar.gz.

File metadata

  • Download URL: aio.tasks-0.0.5.tar.gz
  • Upload date:
  • Size: 8.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.10.1 pkginfo/1.8.2 requests/2.27.1 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.9

File hashes

Hashes for aio.tasks-0.0.5.tar.gz
Algorithm Hash digest
SHA256 dae64f6d72b07b1f6d68dfa991670572b1939e9585f10ac64f5367c6fd5b4df2
MD5 e0829d771d347211d1c8abf07a872e83
BLAKE2b-256 56c6235055cbdb7aaa3bdeee9bb6f7c69227e19845a21baa61006cc1973e8306

See more details on using hashes here.

File details

Details for the file aio.tasks-0.0.5-py3-none-any.whl.

File metadata

  • Download URL: aio.tasks-0.0.5-py3-none-any.whl
  • Upload date:
  • Size: 8.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.10.1 pkginfo/1.8.2 requests/2.27.1 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.9

File hashes

Hashes for aio.tasks-0.0.5-py3-none-any.whl
Algorithm Hash digest
SHA256 a2225785b6f3ecbe0ca879a2a1c0314440c7a3606dd92b2731db547a60b5ed5e
MD5 e84270985f1d794fef7c11fc23309d01
BLAKE2b-256 ea64a45913b7290ded294701d95d1e3f37b41f66cd498f2fda8a9d6a13cb799f

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page