Skip to main content

Some (maybe) useful extensions for asyncio

Project description

asyncio-extensions

PyPI - Version PyPI - Python Version codecov pre-commit.ci status


Installation

pip install asyncio-extensions

Usage

TaskGroup

asyncio-extensions provides a cancellable version of AsyncIO's TaskGroup.

import asyncio

from asyncio_extensions import TaskGroup

queue = asyncio.Queue()
async with TaskGroup() as tg:
    for _ in range(10):
        tg.create_task(consume_from_queue(queue))

    await add_to_queue(queue)
    await queue.join()
    tg.cancel()

LimitedTaskGroup

A version of TaskGroup that limits the number of concurrently running tasks.

import asyncio

from asyncio_extensions import LimitedTaskGroup

queue = asyncio.Queue()
async with LimitedTaskGroup(3) as tg:
    for _ in range(50):
        tg.create_task(some_expensive_operation(queue))

    await add_to_queue(queue)
    await queue.join()
    tg.cancel()

checkpoint

The checkpoint function yields control to the event loop. It is a more elegant approach to do-nothing tasks, giving other tasks a chance to run.

from asyncio_extensions import checkpoint

class DummyChannel:
    async def send_message(self, message):
        await checkpoint()

sleep_forever

The sleep_forever function never returns. It simply keeps yielding control to the event loop.

from asyncio_extensions import sleep_forever

class DummyChannel:
    async def receive_message(self):
        await sleep_forever()

heartbeat

The heartbeat function runs a given callable at a regular interval.

from asyncio_extensions import heartbeat

async def ping():
    pass

async with TaskGroup() as tg:
    tg.create_task(heartbeat(5, ping))

    await some_long_running_process()

identity

The identity function yields control back to the event loop once, and then returns the value that was passed in. It is useful when you already have a cached result and want to create a task that behaves like any other asynchronous operation.

from asyncio_extensions import TaskGroup, identity

async def get_product(product_id: int):
    cached_product = await product_cache.get(product_id)

    async with TaskGroup() as tg:
        if cached_product is not None:
            task = tg.create_task(identity(cached_product))
        else:
            task = tg.create_task(fetch_product_from_api(product_id))

        tg.create_task(update_search_metrics(product_id))

    product = task.result()
    return product

asyncify

The asyncify function ensures a callable can be awaited. If the callable is already a coroutine function, it is returned as-is. Otherwise, it is wrapped so that calls run in a separate thread.

from asyncio_extensions import asyncify

def blocking_read(path: str) -> str:
    with open(path) as f:
        return f.read()

async def main():
    content = await asyncify(blocking_read)("data.txt")

It can also be used as a decorator:

from asyncio_extensions import asyncify

@asyncify
def blocking_read(path: str) -> str:
    with open(path) as f:
        return f.read()

async def main():
    content = await blocking_read("data.txt")

iterate_queue

The iterate_queue async generator wraps an asyncio.Queue so it can be consumed with a plain async for loop. It calls task_done() automatically after each item and stops when the queue is shut down (Python 3.13+) or when a sentinel value is dequeued.

import asyncio

from asyncio_extensions import iterate_queue

async def process_items(queue: asyncio.Queue[str]) -> None:
    async for item in iterate_queue(queue):
        print(item)

To signal the end of the stream, call queue.shutdown() from the producer (Python 3.13+):

async def producer(queue: asyncio.Queue[str]) -> None:
    for item in ["a", "b", "c"]:
        await queue.put(item)
    queue.shutdown()

On older Python versions, put the STOP sentinel in the queue when done:

from asyncio_extensions import iterate_queue, STOP

async def producer(queue: asyncio.Queue[object]) -> None:
    for item in ["a", "b", "c"]:
        await queue.put(item)
    await queue.put(STOP)

async def consumer(queue: asyncio.Queue[object]) -> None:
    async for item in iterate_queue(queue):
        print(item)

Note: STOP is deprecated on Python 3.13+. Prefer queue.shutdown() instead.

fill_queue

The fill_queue coroutine fills an asyncio.Queue from any sync or async iterable. It accepts both Iterable and AsyncIterable sources and blocks if the queue is full until space becomes available.

import asyncio

from asyncio_extensions import fill_queue

async def main() -> None:
    queue: asyncio.Queue[int] = asyncio.Queue()
    await fill_queue(range(10), queue)

It also works with async iterables:

async def source():
    for item in fetch_from_api():
        yield item

await fill_queue(source(), queue)

merge_iterables

The merge_iterables async context manager merges multiple sync or async iterables into a single interleaved stream, feeding all sources into a shared queue concurrently.

import asyncio

from asyncio_extensions import merge_iterables

async def main() -> None:
    async with merge_iterables([1, 2, 3], [4, 5, 6]) as stream:
        async for item in stream:
            print(item)

It also accepts async iterables, which can produce items in parallel:

async def fetch_page(page: int):
    ...  # yields items from a remote page

async with merge_iterables(fetch_page(1), fetch_page(2)) as stream:
    async for item in stream:
        process(item)

iscoroutinefunction and markcoroutinefunction

The iscoroutinefunction helper checks whether a callable is already a coroutine function. It is re-exported from inspect on newer Python versions and from asyncio on older versions, depending on the runtime.

The markcoroutinefunction helper marks a normal sync callable as a coroutine function. On Python 3.12+ this is inspect.markcoroutinefunction, but with the return type annotated so the function can be treated as a coroutine function in type-checked code.

from asyncio_extensions import iscoroutinefunction, markcoroutinefunction

async def main():
    def sync_task() -> int:
        return 42

    assert iscoroutinefunction(sync_task) is False

    marked = markcoroutinefunction(sync_task)
    assert iscoroutinefunction(marked) is True

Contributing

Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.

Please make sure to update tests as appropriate.

License

asyncio-extensions is distributed under the terms of the MIT license.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asyncio_extensions-0.0.5.tar.gz (12.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

asyncio_extensions-0.0.5-py3-none-any.whl (9.8 kB view details)

Uploaded Python 3

File details

Details for the file asyncio_extensions-0.0.5.tar.gz.

File metadata

  • Download URL: asyncio_extensions-0.0.5.tar.gz
  • Upload date:
  • Size: 12.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.13

File hashes

Hashes for asyncio_extensions-0.0.5.tar.gz
Algorithm Hash digest
SHA256 33238277e3466c2e6f2caf5b9e1718cc3500db1b7524cf5c6eadb8bc9221af8b
MD5 ce79af07f91ece7beb800aae387a4d07
BLAKE2b-256 05de305c099ec2ed551505c631630fae6bc3863c0e1e4eeb42de3e3067d050e3

See more details on using hashes here.

Provenance

The following attestation bundles were made for asyncio_extensions-0.0.5.tar.gz:

Publisher: release.yml on hartungstenio/asyncio-extensions

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file asyncio_extensions-0.0.5-py3-none-any.whl.

File metadata

File hashes

Hashes for asyncio_extensions-0.0.5-py3-none-any.whl
Algorithm Hash digest
SHA256 387a0f4ac8cca986f0c9ba4479917d8d97a0ada5f89c094d5913d3d52cf267d3
MD5 c89303894c505770aed0e1ba98302e36
BLAKE2b-256 6d4d7bc3fdd30ac28c0361c884c239131b4638db327cbd0d2350877e3fb80a27

See more details on using hashes here.

Provenance

The following attestation bundles were made for asyncio_extensions-0.0.5-py3-none-any.whl:

Publisher: release.yml on hartungstenio/asyncio-extensions

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page