Collection of utilities for working with asynchronous Python code
Project description
Aiodrive
Aiodrive is a Python package for working with asynchronous code powered by asyncio. It provides various utilities for producing and consuming iterators, managing tasks, and coordinating between threads and asynchronous code.
It supports Python 3.12 and later.
Installation
Aiodrive is available on PyPI under the name aiodrive.
API
Button
A primitive similar toasyncio.Eventbut that resets immediately after being set, useful for triggering waiters.Cargo
A primitive similar toButtonbut that carries a value.FutureState
A primitive for storing a future's state.GuaranteedTask
A variant ofasyncio.Taskthat is guaranteed to be awaited before any cancellation can happen.Latch
A primitive similar toasyncio.Eventbut that can be awaited for both set and reset occurences.NestableLock
A lock that can be held simultaneously by different callers in the same context.ThreadsafeButton
A thread-safe variant ofButton.ThreadsafeState
A thread-safe primitive for storing and watching a state.ThreadsafeLock
A lock that can be acquired from different threads.arun()
Run an awaitable in a new event loop while enforcing structured concurrency.buffer_aiter()
Pre-fetch items of an async iterable.cancel_task()
Cancel a task and await it.cleanup_shield()
Shield a task against cancellation if it has not been cancelled yet, and await it.collect()
Collect items of an async iterable into a list.concurrent_contexts()
Run multiple context managers concurrently.contextualize()
Run an awaitable as a context manager, ensuring that its task is cancelled when the context manager exits.eager_task_group()
Create a task group that automatically terminates when the current task exits the associated context manager.ensure_aiter()
Ensure that the provided iterable is an async iterable.ensure_correct_cancellation()
Ensure that anasyncio.CancelledErroris re-raised if the current task has been cancelled.handle_signal()
Register a signal handler that cancels the current task when the signal is received.launch_in_thread_loop()
Launch an awaitable in a separate thread with its own event loop.prime()
Immediately execute as much code of a coroutine as possible before it is awaited.race()
Run multiple tasks and return the result of the first one that finishes, after having cancelled and awaited the other tasks.repeat_periodically()
Create an iterator that yields periodically, taking into account the time taken when the yielded value is consumed.run_in_thread_loop_contextualized()
Run an awaitable in a separate thread with its own event loop, using a context manager.run_in_thread_loop()
Run an awaitable in a separate thread with its own event loop.shield()
Shield a task against cancellation and await it.suppress()
A context manager that suppresses specified exceptions, ensuring that anasyncio.CancelledErroris re-raised if necessary.try_all()
Run multiple tasks and cancel those still running if one of them raises an exception.use_scope()
Create a context manager that allows for a silent cancellation of the current task.wait_all()
Run multiple tasks without cancelling any if one raises an exception.wait_for_signal()
Wait for a signal.zip_concurrently()
Zip multiple async iterables concurrently into a new async iterable.
Notes
- There is no implicit cleanup treatment of async generators, which is unlike certain libraries like
asyncstdlib. If required, as is the case for certain functions such asbuffer_aiter(), generators must be closed explicitly. For example:agen = get_some_async_generator() async with contextlib.aclosing(agen): async for item in agen: if some_condition: break # The generator may not be exhausted here and must therefore be closed explicitly.
- If a function accepts an awaitable, it is guaranteed to be awaited as long as the function itself is awaited.
- Only a single call to an async iterator's
__anext__()method is ever pending at a time. The same is expected from consumers of a returned async iterator. - Unless stated otherwise, functions and classes may not be used across multiple event loops.
- All functions and classes support using an eager task factory.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aiodrive-2.0-py3-none-any.whl.
File metadata
- Download URL: aiodrive-2.0-py3-none-any.whl
- Upload date:
- Size: 52.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
280aa6bf25386bdd530fb8937ccb4225506bbe260d333dc51cae928e8acc4d7b
|
|
| MD5 |
8f9c307b16b41b5e6486f5db677a8a99
|
|
| BLAKE2b-256 |
70f9bc4fea6ecd47bde882f1dcf3461566071d2260186993c3242c7fc3dee0e3
|
Provenance
The following attestation bundles were made for aiodrive-2.0-py3-none-any.whl:
Publisher:
publish.yml on slietar/aiodrive
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
aiodrive-2.0-py3-none-any.whl -
Subject digest:
280aa6bf25386bdd530fb8937ccb4225506bbe260d333dc51cae928e8acc4d7b - Sigstore transparency entry: 557765753
- Sigstore integration time:
-
Permalink:
slietar/aiodrive@344e54d0259ad3cf100953f25e58ad12edf18473 -
Branch / Tag:
refs/tags/2.0 - Owner: https://github.com/slietar
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@344e54d0259ad3cf100953f25e58ad12edf18473 -
Trigger Event:
push
-
Statement type: