Skip to main content

Tools to control async code execution

Project description

asynchronously

PyPI Python Tests License

Asyncio building blocks for service lifecycle, retries, periodic jobs, and structured concurrent awaiting.

Installation

pip install asynchronously

Requires Python 3.11+.

Why asynchronously?

A typical Python service has several long-lived components — a database connection, an HTTP server, Kafka consumers, background workers — that must come up in dependency order and shut down in reverse. Stdlib asyncio doesn't ship the right primitives for that.

asynchronously provides those primitives, plus a few other utilities for common async patterns.

Quick Start

A Context dataclass holds all components, each is an AsyncInitable (and/or AsyncDeinitable), and dependencies are declared with one call.

import asyncio
from dataclasses import dataclass

from asynchronously import AsyncDeinitable, AsyncInitable


class Database(AsyncInitable, AsyncDeinitable):
    def __init__(self, url: str) -> None:
        AsyncInitable.__init__(self)
        AsyncDeinitable.__init__(self)
        self.url = url

    async def _async_init(self) -> None:
        ...  # open connection pool, run migrations

    async def _async_deinit(self) -> None:
        ...  # close pool


class HttpServer(AsyncInitable, AsyncDeinitable):
    def __init__(self, port: int) -> None:
        AsyncInitable.__init__(self)
        AsyncDeinitable.__init__(self)
        self.port = port

    async def _async_init(self) -> None:
        ...

    async def _async_deinit(self) -> None:
        ...


@dataclass
class Context(AsyncInitable, AsyncDeinitable):
    database: Database = None
    http_server: HttpServer = None

    def __post_init__(self) -> None:
        AsyncInitable.__init__(self)
        AsyncDeinitable.__init__(self)


async def amain() -> None:
    context = Context(
        database=Database("postgres://localhost/app"),
        http_server=HttpServer(port=8000),
    )

    # HTTP server starts only after DB init is done
    context.http_server.wait_before_async_init_for(
        asyncio.create_task(context.database.async_init_finished__event.wait())
    )

    await context.async_init()
    try:
        await asyncio.Event().wait()
    finally:
        await context.async_deinit()


asyncio.run(amain())

Overview

Lifecycle: AsyncInitable | AsyncDeinitable

Background tasks: Periodic

Retries: retry

Helpers: acall | DummyAsyncContextManager

Concurrent awaiting: strict_gather | await_dict_values | wait_for_all_other_tasks


Lifecycle

AsyncInitable

Mixin that adds a structured asynchronous initialization phase. Override _async_init to do your component's own work. By default the base implementation walks vars(self) and concurrently initializes any nested AsyncInitable attributes (also those inside list, set, or dict containers), so a single await context.async_init() on the top-level container initializes the entire tree.

class FileManager(AsyncInitable):
    def __init__(self, context: Context) -> None:
        AsyncInitable.__init__(self)
        self.context = context

    async def _async_init(self) -> None:
        await self._reset_stuck_uploads()

Each instance exposes three asyncio.Event attributes that mark phases of its lifecycle and that other components can await:

Attribute Set when
async_init_scheduled__event async_init() was called (does not mean work started)
async_init_started__event All wait_before_async_init_for waits have completed
async_init_finished__event _async_init returned

Declaring init-time dependencies

wait_before_async_init_for(awaitable) registers an awaitable that must complete before this component's _async_init runs. Typically you await another component's async_init_finished__event:

http_server.wait_before_async_init_for(
    asyncio.create_task(database.async_init_finished__event.wait())
)

Raises RuntimeError if called after async_init() was already scheduled.

Inheriting alongside other base classes

When mixing with another base (e.g. FastAPI), call AsyncInitable.__init__(self) explicitly after the other parent's __init__. With @dataclass, do the same from __post_init__:

class Application(FastAPI, AsyncInitable, AsyncDeinitable):
    def __init__(self, ...):
        super().__init__(...)
        AsyncInitable.__init__(self)
        AsyncDeinitable.__init__(self)

AsyncDeinitable

The teardown counterpart of AsyncInitable. Mirrors the same API with deinit in place of init: override _async_deinit, observe async_deinit_{scheduled,started,finished}__event, declare dependencies with wait_before_async_deinit_for.

The two mixins are independent — a class can be only AsyncInitable, only AsyncDeinitable, or both:

class Database(AsyncInitable, AsyncDeinitable):
    def __init__(self, url: str) -> None:
        AsyncInitable.__init__(self)
        AsyncDeinitable.__init__(self)
        self.url = url

    async def _async_init(self) -> None:
        self.pool = await create_pool(self.url)

    async def _async_deinit(self) -> None:
        await self.pool.close()

Deinit dependencies typically run in the opposite direction to init dependencies — e.g. the HTTP server stops accepting traffic before the DB pool closes:

database.wait_before_async_deinit_for(
    asyncio.create_task(http_server.async_deinit_finished__event.wait())
)

Background tasks

Periodic

Runs an async or sync callable on a fixed period. Construction with is_active=True (the default) schedules the loop immediately, so a Periodic must be created from within a running event loop.

import asyncio
from datetime import timedelta

from asynchronously import Periodic


async def amain():
    periodic_print = Periodic(lambda: print("i'm ok"), timedelta(minutes=1))
    # do other stuff
    await asyncio.Event().wait()


asyncio.run(amain())

Full signature:

class Periodic:
    def __init__(
        self,
        job: Callable,
        period: datetime.timedelta,
        is_active: bool = True,
        first_at: datetime.datetime | None = None,
        decrease_sleep_time_by_evaluation_time: bool = False,
    ) -> None

Scheduling rules:

  • first_at=None (default): the first invocation happens period after construction.
  • first_at set: the first invocation happens at that moment. first_at may be naive or timezone-aware; the current time is read with the same tzinfo.
  • first_at in the past: a warning is logged and the job runs immediately.
  • By default, the next invocation is scheduled period after the previous job finishes. With decrease_sleep_time_by_evaluation_time=True, the next sleep is shortened by the job's wall-clock duration so the cadence stays aligned with period between successive job starts.

Exceptions raised by the job are caught and logged — the loop keeps running.

Public API: start() (called automatically when is_active=True, no-op on second call), async stop() (sets is_active=False and awaits the loop), future (the underlying asyncio.Future).

Retries

retry

Decorator that retries an async function on exceptions. attempts_limit is the maximum number of retries after the first failure, so attempts_limit=N means up to N + 1 total calls. With attempts_limit=None (the default) the function is retried indefinitely.

from asynchronously import retry


class VimeoConnector:
    @retry(attempts_limit=3, sleep_time_sec=2, retrying_exceptions=ConnectionError)
    async def get_video_duration(self, vimeo_id: str) -> int:
        return await self._http.post("/video/duration/get", payload={"vimeo_id": vimeo_id})


async def amain() -> None:
    connector = VimeoConnector()
    try:
        duration = await connector.get_video_duration("12345")
    except ConnectionError:
        # all 4 attempts exhausted — the original exception is re-raised
        logger.error("vimeo unavailable, falling back to default duration")
        duration = 0

Full signature:

def retry(
    func: Callable[..., Any] | None = None,
    attempts_limit: int | None = None,
    sleep_time_sec: float = 5.0,
    retrying_exceptions: type[Exception] | tuple[type[Exception], ...] = Exception,
) -> Callable[..., Any]

Usable both with and without parentheses (@retry retries forever, @retry(attempts_limit=3) retries up to 3 times).

asyncio.CancelledError and other BaseException subclasses are not caught — task cancellation works as expected. Each retry is logged at ERROR level as Retry: K/N, with K starting from 1. Decorating a sync function raises TypeError.

Helpers

acall

Call target if it's callable, then await it if the result is awaitable. Lets a caller accept "anything that produces a value" — sync function, async function, coroutine, or a plain value — without branching on its kind:

from asynchronously import acall

await acall(async_function)      # calls, awaits, returns result
await acall(sync_function)       # calls, returns result
await acall(coroutine_object)    # awaits, returns result
await acall(42)                  # returns 42

Used internally by Periodic so jobs can be sync or async.

Any callable is invoked — including classes and instances with __call__. To pass such a value through without invoking it, wrap it: acall(lambda: my_value).

DummyAsyncContextManager

A no-op async context manager. Useful where the structure demands async with but there's nothing to do — for example to make an optional lock optional at the call site:

from asynchronously import DummyAsyncContextManager


async def update(lock: asyncio.Lock | None = None) -> None:
    async with lock or DummyAsyncContextManager():
        ...

Exceptions inside the block propagate normally — __aexit__ does not suppress them.

Concurrent awaiting

strict_gather

Like asyncio.gather, but on the first error cancels and awaits the remaining siblings before re-raising. Bare asyncio.gather leaves the other children running, which can later surface as "Task was destroyed but it is pending!" warnings. The original exception (and its traceback) is preserved — unlike TaskGroup's ExceptionGroup wrapping.

from asynchronously import strict_gather

results = await strict_gather(
    fetch_user(user_id),
    fetch_profile(user_id),
    fetch_settings(user_id),
)

await_dict_values

Awaits every value in a mapping concurrently and returns a plain dict of results, keeping the original keys. On the first failure, siblings are cancelled (via strict_gather) and the exception is re-raised.

from asynchronously import await_dict_values

balances = await await_dict_values({
    "stripe": stripe_client.get_balance(),
    "paypal": paypal_client.get_balance(),
    "internal": internal_ledger.get_balance(),
})

wait_for_all_other_tasks

Awaits every task in the running event loop except the caller's own. Mainly for test teardown and end-of-process drains:

import pytest
from asynchronously import wait_for_all_other_tasks


@pytest.fixture(autouse=True)
async def drain():
    yield
    await wait_for_all_other_tasks()

Because it waits for every task — including ones the caller did not start — it can easily produce surprising deadlocks if used inside a long-running service. For ordinary coordination, prefer awaiting specific tasks.

The old name complete_all_tasks is kept as an alias for backward compatibility.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asynchronously-1.0.0.tar.gz (16.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

asynchronously-1.0.0-py3-none-any.whl (15.4 kB view details)

Uploaded Python 3

File details

Details for the file asynchronously-1.0.0.tar.gz.

File metadata

  • Download URL: asynchronously-1.0.0.tar.gz
  • Upload date:
  • Size: 16.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for asynchronously-1.0.0.tar.gz
Algorithm Hash digest
SHA256 29a10ad8ac250177b4ade5c781385eb40a8bd59b7f9b8f101c8e3446e28fc950
MD5 df48a89d9f5d41682d741f863aa85311
BLAKE2b-256 b7b46b58508fd9f83525db4869ab11eaa71aae60d58bfd68bc615ef2df256d8d

See more details on using hashes here.

Provenance

The following attestation bundles were made for asynchronously-1.0.0.tar.gz:

Publisher: publish-pypi.yml on miriada-io/asynchronously

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file asynchronously-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: asynchronously-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 15.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for asynchronously-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b7cdf07cc4127968bfea277bde3c0ae2a491d610ae4c43bd5e1a7914e0c81c45
MD5 1bd8003aec91abd058e47a8a6fdcf04e
BLAKE2b-256 df955f1a49e862b33068b10e5d26ecdf4e4b77ad7054b8d029b424bd5e55dfb7

See more details on using hashes here.

Provenance

The following attestation bundles were made for asynchronously-1.0.0-py3-none-any.whl:

Publisher: publish-pypi.yml on miriada-io/asynchronously

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page