Skip to main content

Idiomatic asyncio utilities

Project description

aiotools

PyPI release version Supported Python versions CI Status Code Coverage

Idiomatic asyncio utilities

Modules

Full API documentation: https://aiotools.readthedocs.io

See also

  • anyio: High level asynchronous concurrency and networking framework that works on top of either Trio or asyncio
  • trio: An alternative implementation of asyncio focusing on structured concurrency
  • aiometer: A Python concurrency scheduling library, compatible with asyncio and trio.
  • aiojobs: A concurrency-limiting, task-shielding scheduler for asyncio tasks for graceful shutdown

Currently aiotools targets the vanilly asyncio ecosystem only (with some tight coupling with asyncio internals), but you may find similar problem definitions and alternative solutions in the above libraries.

Examples

Below are some highlighted usecases of aiotools. Please refer more detailed logic and backgrounds in the documentation.

Safe Cancellation

Consider the following commonly used pattern:

task = asyncio.create_task(...)
task.cancel()
await task  # PROBLEM: would it raise CancelledError or not? should we propagate it or not?

It has been the reponsibility of the author of tasks and the caller of them to coordinate whether to re-raise injected cancellation error.

Now we can use the structured cancellation introduced in Python 3.11:

task = asyncio.create_task(...)
await aiotools.cancel_and_wait(task)

which will re-raise the cancellation when there is an external cancellation request and absorb it otherwise.

Relying on this API whenever you need to cancel asyncio tasks will make your codebase more consistent because you no longer need to decide whether to (re-)raise or suppress CancelledError in your task codes.

You may also combine cancel_and_wait() with ShieldScope to guard a block of codes from cancellation in the middle but defer the cancellation to the end of the block.

async def work():
    try:
        ...
    except asyncio.CancelledError:
        with aiotools.ShieldScope():
            await cleanup()  # any async code here is not affected by multiple cancellation
            raise

async def parent():
    work_task = asyncio.create_task(work())
    ...
    await cancel_and_wait(work_task)

parent_task = asyncio.create_task(parent())
...
await cancel_and_wait(parent_task)  # it may trigger double cancellation, but it will return after the shielded block completes.

Async Context Manager

This is an asynchronous version of contextlib.contextmanager to make it easier to write asynchronous context managers without creating boilerplate classes.

import asyncio
import aiotools

@aiotools.actxmgr
async def mygen(a):
    await asyncio.sleep(1)
    yield a + 1
    await asyncio.sleep(1)

async def somewhere():
    async with mygen(1) as b:
        assert b == 2

Note that you need to wrap yield with a try-finally block to ensure resource releases (e.g., locks), even in the case when an exception is ocurred inside the async-with block.

import asyncio
import aiotools

lock = asyncio.Lock()

@aiotools.actxmgr
async def mygen(a):
    await lock.acquire()
    try:
        yield a + 1
    finally:
        lock.release()

async def somewhere():
    try:
        async with mygen(1) as b:
            raise RuntimeError('oops')
    except RuntimeError:
        print('caught!')  # you can catch exceptions here.

You can also create a group of async context managers, which are entered/exited all at once using asyncio.gather().

import asyncio
import aiotools

@aiotools.actxmgr
async def mygen(a):
    yield a + 10

async def somewhere():
    ctxgrp = aiotools.actxgroup(mygen(i) for i in range(10))
    async with ctxgrp as values:
        assert len(values) == 10
        for i in range(10):
            assert values[i] == i + 10

Async Server

This implements a common pattern to launch asyncio-based server daemons.

import asyncio
import aiotools

async def echo(reader, writer):
    data = await reader.read(100)
    writer.write(data)
    await writer.drain()
    writer.close()

@aiotools.server
async def myworker(loop, pidx, args):
    server = await asyncio.start_server(echo, '0.0.0.0', 8888, reuse_port=True)
    print(f'[{pidx}] started')
    yield  # wait until terminated
    server.close()
    await server.wait_closed()
    print(f'[{pidx}] terminated')

if __name__ == '__main__':
    # Run the above server using 4 worker processes.
    aiotools.start_server(myworker, num_workers=4)

It handles SIGINT/SIGTERM signals automatically to stop the server, as well as lifecycle management of event loops running on multiple processes. Internally it uses aiotools.fork module to get kernel support to resolve potential signal/PID related races via PID file descriptors on supported versions (Python 3.9+ and Linux kernel 5.4+).

Async TaskScope

TaskScope is a variant of TaskGroup which ensures all child tasks run to completion (either having results or exceptions) unless the context manager is cancelled.

This could be considered as a safer version (in terms of lifecycle tracking) of asyncio.gather(*, return_exceptions=True) and a more convenient version of it because you can decouple the timing of task creation and their scheduling within the TaskScope context.

import aiotools

async def do():
    async with aiotools.TaskScope() as ts:
        ts.create_task(...)
        ts.create_task(...)
        # each task will run to completion regardless sibling failures.
        ...
    # at this point, all subtasks are either cancelled or done.

You may customize exception handler for each scope to receive and process unhandled exceptions in child tasks. For use in long-running server contexts, TaskScope does not store any exceptions or results by itself.

See also high-level coroutine utilities such as as_completed_safe(), gather_safe(), and race() functions in the utils module.

TaskScope itself and these utilities integrate with the call-graph inspection introduced in Python 3.14.

Async Timer

import aiotools

i = 0

async def mytick(interval):
    print(i)
    i += 1

async def somewhere():
    task = aiotools.create_timer(mytick, 1.0)
    ...
    await aiotools.cancel_and_wait(task)

The returned task is an asyncio.Task object. To stop the timer, it should be cancelled explicitly. Use cancel_and_wait() to ensure complete shutdown of any ongoing tick tasks. To make your timer function to be cancellable, add a try-except clause catching asyncio.CancelledError since we use it as a termination signal.

You may add TimerDelayPolicy argument to control the behavior when the timer-fired task takes longer than the timer interval. DEFAULT is to accumulate them and cancel all the remainings at once when the timer is cancelled. CANCEL is to cancel any pending previously fired tasks on every interval.

import asyncio
import aiotools

async def mytick(interval):
    await asyncio.sleep(100)  # cancelled on every next interval.

async def somewhere():
    t = aiotools.create_timer(mytick, 1.0, aiotools.TimerDelayPolicy.CANCEL)
    ...
    await aiotools.cancel_and_wait(t)

Virtual Clock

It provides a virtual clock that advances the event loop time instantly upon any combination of asyncio.sleep() calls in multiple coroutine tasks, by temporarily patching the event loop selector.

This is also used in our timer test suite.

import aiotools
import pytest

@pytest.mark.asyncio
async def test_sleeps():
    loop = aiotools.compat.get_running_loop()
    vclock = aiotools.VirtualClock()
    with vclock.patch_loop():
        print(loop.time())  # -> prints 0
        await asyncio.sleep(3600)
        print(loop.time())  # -> prints 3600

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiotools-2.2.3.tar.gz (148.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aiotools-2.2.3-py3-none-any.whl (46.1 kB view details)

Uploaded Python 3

File details

Details for the file aiotools-2.2.3.tar.gz.

File metadata

  • Download URL: aiotools-2.2.3.tar.gz
  • Upload date:
  • Size: 148.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for aiotools-2.2.3.tar.gz
Algorithm Hash digest
SHA256 cb6756db339dde5c9bb5091a4ee5fa25e577f074175f4f272c0b4e99e5d6f0de
MD5 d8aaff5fa67db9ae83dde094ef66f4d6
BLAKE2b-256 3b82c14fa975a4d9c3695d2fad07207a64bd9015ee53ee5cd488a7fc91ec6604

See more details on using hashes here.

Provenance

The following attestation bundles were made for aiotools-2.2.3.tar.gz:

Publisher: ci.yml on achimnol/aiotools

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file aiotools-2.2.3-py3-none-any.whl.

File metadata

  • Download URL: aiotools-2.2.3-py3-none-any.whl
  • Upload date:
  • Size: 46.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for aiotools-2.2.3-py3-none-any.whl
Algorithm Hash digest
SHA256 f164eb41d17517d8d03da2715e827dd03a0aa99be48c9f7f4463f1129803e18e
MD5 40c1ad14f294082fe662dd959e629c65
BLAKE2b-256 dc2373b496e20e1c615aaaa28fa016a2e1cfd802167632653945a853c7f46387

See more details on using hashes here.

Provenance

The following attestation bundles were made for aiotools-2.2.3-py3-none-any.whl:

Publisher: ci.yml on achimnol/aiotools

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page