Skip to main content

Some (maybe) useful extensions for asyncio

Project description

asyncio-extensions

PyPI - Version PyPI - Python Version codecov pre-commit.ci status


Installation

pip install asyncio-extensions

Usage

TaskGroup

asyncio-extensions provides a cancellable version of AsyncIO's TaskGroup.

import asyncio

from asyncio_extensions import TaskGroup

queue = asyncio.Queue()
async with TaskGroup() as tg:
    for _ in range(10):
        tg.create_task(consume_from_queue(queue))

    await add_to_queue(queue)
    await queue.join()
    tg.cancel()

LimitedTaskGroup

A version of TaskGroup that limits the number of concurrently running tasks.

import asyncio

from asyncio_extensions import LimitedTaskGroup

queue = asyncio.Queue()
async with LimitedTaskGroup(3) as tg:
    for _ in range(50):
        tg.create_task(some_expensive_operation(queue))

    await add_to_queue(queue)
    await queue.join()
    tg.cancel()

TerminateTaskGroup and force_terminate_task_group

TerminateTaskGroup and force_terminate_task_group implement the terminating a task group pattern from the Python docs. Schedule force_terminate_task_group() as a task to stop the entire group early; catch TerminateTaskGroup with except* to suppress it.

When using asyncio_extensions.TaskGroup, suppression is automatic — no except* block needed.

import asyncio

from asyncio_extensions import TerminateTaskGroup, force_terminate_task_group

async def main() -> None:
    try:
        async with asyncio.TaskGroup() as tg:
            task = tg.create_task(do_work())
            tg.create_task(force_terminate_task_group())
    except* TerminateTaskGroup:
        pass

checkpoint

The checkpoint function yields control to the event loop. It is a more elegant approach to do-nothing tasks, giving other tasks a chance to run.

from asyncio_extensions import checkpoint

class DummyChannel:
    async def send_message(self, message):
        await checkpoint()

sleep_forever

The sleep_forever function never returns. It simply keeps yielding control to the event loop.

from asyncio_extensions import sleep_forever

class DummyChannel:
    async def receive_message(self):
        await sleep_forever()

heartbeat

The heartbeat function runs a given callable at a regular interval.

from asyncio_extensions import heartbeat

async def ping():
    pass

async with TaskGroup() as tg:
    tg.create_task(heartbeat(5, ping))

    await some_long_running_process()

identity

The identity function yields control back to the event loop once, and then returns the value that was passed in. It is useful when you already have a cached result and want to create a task that behaves like any other asynchronous operation.

from asyncio_extensions import TaskGroup, identity

async def get_product(product_id: int):
    cached_product = await product_cache.get(product_id)

    async with TaskGroup() as tg:
        if cached_product is not None:
            task = tg.create_task(identity(cached_product))
        else:
            task = tg.create_task(fetch_product_from_api(product_id))

        tg.create_task(update_search_metrics(product_id))

    product = task.result()
    return product

asyncify

The asyncify function ensures a callable can be awaited. If the callable is already a coroutine function, it is returned as-is. Otherwise, it is wrapped so that calls run in a separate thread.

from asyncio_extensions import asyncify

def blocking_read(path: str) -> str:
    with open(path) as f:
        return f.read()

async def main():
    content = await asyncify(blocking_read)("data.txt")

It can also be used as a decorator:

from asyncio_extensions import asyncify

@asyncify
def blocking_read(path: str) -> str:
    with open(path) as f:
        return f.read()

async def main():
    content = await blocking_read("data.txt")

asyncify_iterable

The asyncify_iterable function converts any sync or async iterable into an AsyncIterable. If the input is already async, it is returned unchanged. Otherwise it is wrapped in an async generator that yields each item and calls checkpoint() between items to avoid monopolising the event loop on large inputs.

from asyncio_extensions import asyncify_iterable

async def process(items):
    async for item in asyncify_iterable(items):
        await handle(item)

iterate_queue

The iterate_queue async generator wraps an asyncio.Queue so it can be consumed with a plain async for loop. It calls task_done() automatically after each item and stops when the queue is shut down (Python 3.13+) or when a sentinel value is dequeued.

import asyncio

from asyncio_extensions import iterate_queue

async def process_items(queue: asyncio.Queue[str]) -> None:
    async for item in iterate_queue(queue):
        print(item)

To signal the end of the stream, call queue.shutdown() from the producer (Python 3.13+):

async def producer(queue: asyncio.Queue[str]) -> None:
    for item in ["a", "b", "c"]:
        await queue.put(item)
    queue.shutdown()

On older Python versions, put the STOP sentinel in the queue when done:

from asyncio_extensions import iterate_queue, STOP

async def producer(queue: asyncio.Queue[object]) -> None:
    for item in ["a", "b", "c"]:
        await queue.put(item)
    await queue.put(STOP)

async def consumer(queue: asyncio.Queue[object]) -> None:
    async for item in iterate_queue(queue):
        print(item)

Note: STOP is deprecated on Python 3.13+. Prefer queue.shutdown() instead.

fill_queue

The fill_queue coroutine fills an asyncio.Queue from any sync or async iterable. It accepts both Iterable and AsyncIterable sources and blocks if the queue is full until space becomes available.

import asyncio

from asyncio_extensions import fill_queue

async def main() -> None:
    queue: asyncio.Queue[int] = asyncio.Queue()
    await fill_queue(range(10), queue)

It also works with async iterables:

async def source():
    for item in fetch_from_api():
        yield item

await fill_queue(source(), queue)

merge_iterables

The merge_iterables async context manager merges multiple sync or async iterables into a single interleaved stream, feeding all sources into a shared queue concurrently.

import asyncio

from asyncio_extensions import merge_iterables

async def main() -> None:
    async with merge_iterables([1, 2, 3], [4, 5, 6]) as stream:
        async for item in stream:
            print(item)

It also accepts async iterables, which can produce items in parallel:

async def fetch_page(page: int):
    ...  # yields items from a remote page

async with merge_iterables(fetch_page(1), fetch_page(2)) as stream:
    async for item in stream:
        process(item)

ManagedStream

ManagedStream[T] is the type alias for an async context manager that yields an AsyncIterator[T]. It is the return type of safe_gen and merge_iterables, and the accepted parameter type of flatten_stream. Use it to annotate your own functions that expose a context-managed async stream:

from asyncio_extensions import ManagedStream

def my_stream() -> ManagedStream[int]:
    ...

safe_gen

The safe_gen decorator converts an async generator function into a context manager, enforcing correct handling of early exits. A plain async generator abandoned before exhaustion leaks resources and keeps running indefinitely — the caller has no way to know it needs to call aclose(). By returning a context manager instead, safe_gen makes cleanup syntactically mandatory: callers must use async with, which guarantees aclose() is called on exit regardless of how iteration ends. It also suppresses GeneratorExit raised inside an exception group, so it composes safely with TaskGroup.

from asyncio_extensions import safe_gen

@safe_gen
async def paginate(url: str) -> AsyncGenerator[dict]:
    while url:
        response = await fetch(url)
        for item in response["results"]:
            yield item
        url = response.get("next")

async with paginate("https://api.example.com/items") as stream:
    async for item in stream:
        if should_stop(item):
            break  # generator is closed automatically

flatten_stream

The flatten_stream async generator enters a ManagedStream context manager and yields its items directly, without requiring an explicit async with block. This is particularly useful when a plain async for loop is the only interface available — such as Django's StreamingHttpResponse.

The recommended pattern is to keep the async with block until the last possible moment, and only apply flatten_stream at the interface boundary:

from contextlib import asynccontextmanager
from django.http import StreamingHttpResponse
from asyncio_extensions import flatten_stream, merge_iterables

async def export_view(request):
    async def serialized(stream):
        async for row in stream:
            yield serialize(row)

    @asynccontextmanager
    async def response_body():
        async with merge_iterables(fetch_orders(), fetch_invoices()) as stream:
            yield serialized(stream)

    return StreamingHttpResponse(flatten_stream(response_body()), content_type="text/csv")

For early-exit scenarios, wrap the result in aclosing to ensure proper cleanup:

from contextlib import aclosing

async with aclosing(flatten_stream(merge_iterables(source_a, source_b))) as stream:
    async for item in stream:
        if done(item):
            break

iscoroutinefunction, markcoroutinefunction, and is_awaitable

The iscoroutinefunction helper checks whether a callable is already a coroutine function. It is re-exported from inspect on newer Python versions and from asyncio on older versions, depending on the runtime.

The markcoroutinefunction helper marks a normal sync callable as a coroutine function. On Python 3.12+ this is inspect.markcoroutinefunction, but with the return type annotated so the function can be treated as a coroutine function in type-checked code.

is_awaitable is a typed variant of iscoroutinefunction that returns a TypeIs guard, allowing type checkers to narrow the callable's return type to Awaitable in the True branch.

from asyncio_extensions import iscoroutinefunction, is_awaitable, markcoroutinefunction

async def main():
    def sync_task() -> int:
        return 42

    assert iscoroutinefunction(sync_task) is False

    marked = markcoroutinefunction(sync_task)
    assert iscoroutinefunction(marked) is True

async def call(fn: Callable[[], int] | Callable[[], Awaitable[int]]) -> int:
    if is_awaitable(fn):
        return await fn()  # type checker knows fn() returns Awaitable[int] here
    return fn()

Contributing

Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.

Please make sure to update tests as appropriate.

License

asyncio-extensions is distributed under the terms of the MIT license.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asyncio_extensions-0.2.0.tar.gz (17.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

asyncio_extensions-0.2.0-py3-none-any.whl (12.8 kB view details)

Uploaded Python 3

File details

Details for the file asyncio_extensions-0.2.0.tar.gz.

File metadata

  • Download URL: asyncio_extensions-0.2.0.tar.gz
  • Upload date:
  • Size: 17.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.13

File hashes

Hashes for asyncio_extensions-0.2.0.tar.gz
Algorithm Hash digest
SHA256 9c5741959dd3f08b14fe1681db19c2304fae8e3dce9d9fd914fa58f781e1f7ca
MD5 2091b61b84e401ed61419b86d3f58aee
BLAKE2b-256 69b01f2bd4c5f22e123e856095330f06284da5eca098dd7522581e0d16f5b13a

See more details on using hashes here.

Provenance

The following attestation bundles were made for asyncio_extensions-0.2.0.tar.gz:

Publisher: release.yml on hartungstenio/asyncio-extensions

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file asyncio_extensions-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for asyncio_extensions-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 707d91768498f0ad1b772bbe2e72a70d42657b00e5b6fd536f7d7cda923f617b
MD5 73c6b6ab391c43eb0a1459d50228f0e5
BLAKE2b-256 f29e720db0b4351916c8c379f2e0cf4860afb6603bb721852c4ef42f014a1ca9

See more details on using hashes here.

Provenance

The following attestation bundles were made for asyncio_extensions-0.2.0-py3-none-any.whl:

Publisher: release.yml on hartungstenio/asyncio-extensions

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page