Tools to control async code execution
Project description
asynchronously
Asyncio building blocks for service lifecycle, retries, periodic jobs, and structured concurrent awaiting.
Installation
pip install asynchronously
Requires Python 3.11+.
Why asynchronously?
A typical Python service has several long-lived components — a database connection, an HTTP server, Kafka consumers, background workers — that must come up in dependency order and shut down in reverse. Stdlib asyncio doesn't ship the right primitives for that.
asynchronously provides those primitives, plus a few other utilities for common async patterns.
Quick Start
A Context dataclass holds all components, each is an AsyncInitable (and/or AsyncDeinitable), and dependencies are declared with one call.
import asyncio
from dataclasses import dataclass
from asynchronously import AsyncDeinitable, AsyncInitable
class Database(AsyncInitable, AsyncDeinitable):
def __init__(self, url: str) -> None:
AsyncInitable.__init__(self)
AsyncDeinitable.__init__(self)
self.url = url
async def _async_init(self) -> None:
... # open connection pool, run migrations
async def _async_deinit(self) -> None:
... # close pool
class HttpServer(AsyncInitable, AsyncDeinitable):
def __init__(self, port: int) -> None:
AsyncInitable.__init__(self)
AsyncDeinitable.__init__(self)
self.port = port
async def _async_init(self) -> None:
...
async def _async_deinit(self) -> None:
...
@dataclass
class Context(AsyncInitable, AsyncDeinitable):
database: Database = None
http_server: HttpServer = None
def __post_init__(self) -> None:
AsyncInitable.__init__(self)
AsyncDeinitable.__init__(self)
async def amain() -> None:
context = Context(
database=Database("postgres://localhost/app"),
http_server=HttpServer(port=8000),
)
# HTTP server starts only after DB init is done
context.http_server.wait_before_async_init_for(
asyncio.create_task(context.database.async_init_finished__event.wait())
)
await context.async_init()
try:
await asyncio.Event().wait()
finally:
await context.async_deinit()
asyncio.run(amain())
Overview
Lifecycle: AsyncInitable | AsyncDeinitable
Background tasks: Periodic
Retries: retry
Helpers: acall | DummyAsyncContextManager
Concurrent awaiting: strict_gather | await_dict_values | wait_for_all_other_tasks
Lifecycle
AsyncInitable
Mixin that adds a structured asynchronous initialization phase. Override _async_init to do your component's own work. By default the base implementation walks vars(self) and concurrently initializes any nested AsyncInitable attributes (also those inside list, set, or dict containers), so a single await context.async_init() on the top-level container initializes the entire tree.
class FileManager(AsyncInitable):
def __init__(self, context: Context) -> None:
AsyncInitable.__init__(self)
self.context = context
async def _async_init(self) -> None:
await self._reset_stuck_uploads()
Each instance exposes three asyncio.Event attributes that mark phases of its lifecycle and that other components can await:
| Attribute | Set when |
|---|---|
async_init_scheduled__event |
async_init() was called (does not mean work started) |
async_init_started__event |
All wait_before_async_init_for waits have completed |
async_init_finished__event |
_async_init returned |
Declaring init-time dependencies
wait_before_async_init_for(awaitable) registers an awaitable that must complete before this component's _async_init runs. Typically you await another component's async_init_finished__event:
http_server.wait_before_async_init_for(
asyncio.create_task(database.async_init_finished__event.wait())
)
Raises RuntimeError if called after async_init() was already scheduled.
Inheriting alongside other base classes
When mixing with another base (e.g. FastAPI), call AsyncInitable.__init__(self) explicitly after the other parent's __init__. With @dataclass, do the same from __post_init__:
class Application(FastAPI, AsyncInitable, AsyncDeinitable):
def __init__(self, ...):
super().__init__(...)
AsyncInitable.__init__(self)
AsyncDeinitable.__init__(self)
AsyncDeinitable
The teardown counterpart of AsyncInitable. Mirrors the same API with deinit in place of init: override _async_deinit, observe async_deinit_{scheduled,started,finished}__event, declare dependencies with wait_before_async_deinit_for.
The two mixins are independent — a class can be only AsyncInitable, only AsyncDeinitable, or both:
class Database(AsyncInitable, AsyncDeinitable):
def __init__(self, url: str) -> None:
AsyncInitable.__init__(self)
AsyncDeinitable.__init__(self)
self.url = url
async def _async_init(self) -> None:
self.pool = await create_pool(self.url)
async def _async_deinit(self) -> None:
await self.pool.close()
Deinit dependencies typically run in the opposite direction to init dependencies — e.g. the HTTP server stops accepting traffic before the DB pool closes:
database.wait_before_async_deinit_for(
asyncio.create_task(http_server.async_deinit_finished__event.wait())
)
Background tasks
Periodic
Runs an async or sync callable on a fixed period. Construction with is_active=True (the default) schedules the loop immediately, so a Periodic must be created from within a running event loop.
import asyncio
from datetime import timedelta
from asynchronously import Periodic
async def amain():
periodic_print = Periodic(lambda: print("i'm ok"), timedelta(minutes=1))
# do other stuff
await asyncio.Event().wait()
asyncio.run(amain())
Full signature:
class Periodic:
def __init__(
self,
job: Callable,
period: datetime.timedelta,
is_active: bool = True,
first_at: datetime.datetime | None = None,
decrease_sleep_time_by_evaluation_time: bool = False,
) -> None
Scheduling rules:
first_at=None(default): the first invocation happensperiodafter construction.first_atset: the first invocation happens at that moment.first_atmay be naive or timezone-aware; the current time is read with the same tzinfo.first_atin the past: a warning is logged and the job runs immediately.- By default, the next invocation is scheduled
periodafter the previous job finishes. Withdecrease_sleep_time_by_evaluation_time=True, the next sleep is shortened by the job's wall-clock duration so the cadence stays aligned withperiodbetween successive job starts.
Exceptions raised by the job are caught and logged — the loop keeps running.
Public API: start() (called automatically when is_active=True, no-op on second call), async stop() (sets is_active=False and awaits the loop), future (the underlying asyncio.Future).
Retries
retry
Decorator that retries an async function on exceptions. attempts_limit is the maximum number of retries after the first failure, so attempts_limit=N means up to N + 1 total calls. With attempts_limit=None (the default) the function is retried indefinitely.
from asynchronously import retry
class VimeoConnector:
@retry(attempts_limit=3, sleep_time_sec=2, retrying_exceptions=ConnectionError)
async def get_video_duration(self, vimeo_id: str) -> int:
return await self._http.post("/video/duration/get", payload={"vimeo_id": vimeo_id})
async def amain() -> None:
connector = VimeoConnector()
try:
duration = await connector.get_video_duration("12345")
except ConnectionError:
# all 4 attempts exhausted — the original exception is re-raised
logger.error("vimeo unavailable, falling back to default duration")
duration = 0
Full signature:
def retry(
func: Callable[..., Any] | None = None,
attempts_limit: int | None = None,
sleep_time_sec: float = 5.0,
retrying_exceptions: type[Exception] | tuple[type[Exception], ...] = Exception,
) -> Callable[..., Any]
Usable both with and without parentheses (@retry retries forever, @retry(attempts_limit=3) retries up to 3 times).
asyncio.CancelledError and other BaseException subclasses are not caught — task cancellation works as expected. Each retry is logged at ERROR level as Retry: K/N, with K starting from 1. Decorating a sync function raises TypeError.
Helpers
acall
Call target if it's callable, then await it if the result is awaitable. Lets a caller accept "anything that produces a value" — sync function, async function, coroutine, or a plain value — without branching on its kind:
from asynchronously import acall
await acall(async_function) # calls, awaits, returns result
await acall(sync_function) # calls, returns result
await acall(coroutine_object) # awaits, returns result
await acall(42) # returns 42
Used internally by Periodic so jobs can be sync or async.
Any callable is invoked — including classes and instances with __call__. To pass such a value through without invoking it, wrap it: acall(lambda: my_value).
DummyAsyncContextManager
A no-op async context manager. Useful where the structure demands async with but there's nothing to do — for example to make an optional lock optional at the call site:
from asynchronously import DummyAsyncContextManager
async def update(lock: asyncio.Lock | None = None) -> None:
async with lock or DummyAsyncContextManager():
...
Exceptions inside the block propagate normally — __aexit__ does not suppress them.
Concurrent awaiting
strict_gather
Like asyncio.gather, but on the first error cancels and awaits the remaining siblings before re-raising. Bare asyncio.gather leaves the other children running, which can later surface as "Task was destroyed but it is pending!" warnings. The original exception (and its traceback) is preserved — unlike TaskGroup's ExceptionGroup wrapping.
from asynchronously import strict_gather
results = await strict_gather(
fetch_user(user_id),
fetch_profile(user_id),
fetch_settings(user_id),
)
await_dict_values
Awaits every value in a mapping concurrently and returns a plain dict of results, keeping the original keys. On the first failure, siblings are cancelled (via strict_gather) and the exception is re-raised.
from asynchronously import await_dict_values
balances = await await_dict_values({
"stripe": stripe_client.get_balance(),
"paypal": paypal_client.get_balance(),
"internal": internal_ledger.get_balance(),
})
wait_for_all_other_tasks
Awaits every task in the running event loop except the caller's own. Mainly for test teardown and end-of-process drains:
import pytest
from asynchronously import wait_for_all_other_tasks
@pytest.fixture(autouse=True)
async def drain():
yield
await wait_for_all_other_tasks()
Because it waits for every task — including ones the caller did not start — it can easily produce surprising deadlocks if used inside a long-running service. For ordinary coordination, prefer awaiting specific tasks.
The old name complete_all_tasks is kept as an alias for backward compatibility.
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file asynchronously-1.0.0.tar.gz.
File metadata
- Download URL: asynchronously-1.0.0.tar.gz
- Upload date:
- Size: 16.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
29a10ad8ac250177b4ade5c781385eb40a8bd59b7f9b8f101c8e3446e28fc950
|
|
| MD5 |
df48a89d9f5d41682d741f863aa85311
|
|
| BLAKE2b-256 |
b7b46b58508fd9f83525db4869ab11eaa71aae60d58bfd68bc615ef2df256d8d
|
Provenance
The following attestation bundles were made for asynchronously-1.0.0.tar.gz:
Publisher:
publish-pypi.yml on miriada-io/asynchronously
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
asynchronously-1.0.0.tar.gz -
Subject digest:
29a10ad8ac250177b4ade5c781385eb40a8bd59b7f9b8f101c8e3446e28fc950 - Sigstore transparency entry: 1680077821
- Sigstore integration time:
-
Permalink:
miriada-io/asynchronously@4f37e8e32fa42a593d62849392a13c1368df87c9 -
Branch / Tag:
refs/tags/v1.0.0 - Owner: https://github.com/miriada-io
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-pypi.yml@4f37e8e32fa42a593d62849392a13c1368df87c9 -
Trigger Event:
release
-
Statement type:
File details
Details for the file asynchronously-1.0.0-py3-none-any.whl.
File metadata
- Download URL: asynchronously-1.0.0-py3-none-any.whl
- Upload date:
- Size: 15.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b7cdf07cc4127968bfea277bde3c0ae2a491d610ae4c43bd5e1a7914e0c81c45
|
|
| MD5 |
1bd8003aec91abd058e47a8a6fdcf04e
|
|
| BLAKE2b-256 |
df955f1a49e862b33068b10e5d26ecdf4e4b77ad7054b8d029b424bd5e55dfb7
|
Provenance
The following attestation bundles were made for asynchronously-1.0.0-py3-none-any.whl:
Publisher:
publish-pypi.yml on miriada-io/asynchronously
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
asynchronously-1.0.0-py3-none-any.whl -
Subject digest:
b7cdf07cc4127968bfea277bde3c0ae2a491d610ae4c43bd5e1a7914e0c81c45 - Sigstore transparency entry: 1680078068
- Sigstore integration time:
-
Permalink:
miriada-io/asynchronously@4f37e8e32fa42a593d62849392a13c1368df87c9 -
Branch / Tag:
refs/tags/v1.0.0 - Owner: https://github.com/miriada-io
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-pypi.yml@4f37e8e32fa42a593d62849392a13c1368df87c9 -
Trigger Event:
release
-
Statement type: