Some (maybe) useful extensions for asyncio
Project description
asyncio-extensions
Installation
pip install asyncio-extensions
Usage
TaskGroup
asyncio-extensions provides a cancellable version of AsyncIO's TaskGroup.
import asyncio
from asyncio_extensions import TaskGroup
queue = asyncio.Queue()
async with TaskGroup() as tg:
for _ in range(10):
tg.create_task(consume_from_queue(queue))
await add_to_queue(queue)
await queue.join()
tg.cancel()
LimitedTaskGroup
A version of TaskGroup that limits the number of concurrently running tasks.
import asyncio
from asyncio_extensions import LimitedTaskGroup
queue = asyncio.Queue()
async with LimitedTaskGroup(3) as tg:
for _ in range(50):
tg.create_task(some_expensive_operation(queue))
await add_to_queue(queue)
await queue.join()
tg.cancel()
checkpoint
The checkpoint function yields control to the event loop. It is a more elegant approach to do-nothing tasks, giving other tasks a chance to run.
from asyncio_extensions import checkpoint
class DummyChannel:
async def send_message(self, message):
await checkpoint()
sleep_forever
The sleep_forever function never returns. It simply keeps yielding control to the event loop.
from asyncio_extensions import sleep_forever
class DummyChannel:
async def receive_message(self):
await sleep_forever()
heartbeat
The heartbeat function runs a given callable at a regular interval.
from asyncio_extensions import heartbeat
async def ping():
pass
async with TaskGroup() as tg:
tg.create_task(heartbeat(5, ping))
await some_long_running_process()
identity
The identity function yields control back to the event loop once, and then returns the value that was passed in. It is useful when you already have a cached result and want to create a task that behaves like any other asynchronous operation.
from asyncio_extensions import TaskGroup, identity
async def get_product(product_id: int):
cached_product = await product_cache.get(product_id)
async with TaskGroup() as tg:
if cached_product is not None:
task = tg.create_task(identity(cached_product))
else:
task = tg.create_task(fetch_product_from_api(product_id))
tg.create_task(update_search_metrics(product_id))
product = task.result()
return product
asyncify
The asyncify function ensures a callable can be awaited. If the callable is already a coroutine function, it is returned as-is. Otherwise, it is wrapped so that calls run in a separate thread.
from asyncio_extensions import asyncify
def blocking_read(path: str) -> str:
with open(path) as f:
return f.read()
async def main():
content = await asyncify(blocking_read)("data.txt")
It can also be used as a decorator:
from asyncio_extensions import asyncify
@asyncify
def blocking_read(path: str) -> str:
with open(path) as f:
return f.read()
async def main():
content = await blocking_read("data.txt")
iterate_queue
The iterate_queue async generator wraps an asyncio.Queue so it can be consumed with a plain async for loop. It calls task_done() automatically after each item and stops when the queue is shut down (Python 3.13+) or when a sentinel value is dequeued.
import asyncio
from asyncio_extensions import iterate_queue
async def process_items(queue: asyncio.Queue[str]) -> None:
async for item in iterate_queue(queue):
print(item)
To signal the end of the stream, call queue.shutdown() from the producer (Python 3.13+):
async def producer(queue: asyncio.Queue[str]) -> None:
for item in ["a", "b", "c"]:
await queue.put(item)
queue.shutdown()
On older Python versions, put the STOP sentinel in the queue when done:
from asyncio_extensions import iterate_queue, STOP
async def producer(queue: asyncio.Queue[object]) -> None:
for item in ["a", "b", "c"]:
await queue.put(item)
await queue.put(STOP)
async def consumer(queue: asyncio.Queue[object]) -> None:
async for item in iterate_queue(queue):
print(item)
Note:
STOPis deprecated on Python 3.13+. Preferqueue.shutdown()instead.
fill_queue
The fill_queue coroutine fills an asyncio.Queue from any sync or async iterable. It accepts both Iterable and AsyncIterable sources and blocks if the queue is full until space becomes available.
import asyncio
from asyncio_extensions import fill_queue
async def main() -> None:
queue: asyncio.Queue[int] = asyncio.Queue()
await fill_queue(range(10), queue)
It also works with async iterables:
async def source():
for item in fetch_from_api():
yield item
await fill_queue(source(), queue)
merge_iterables
The merge_iterables async context manager merges multiple sync or async iterables into a single interleaved stream, feeding all sources into a shared queue concurrently.
import asyncio
from asyncio_extensions import merge_iterables
async def main() -> None:
async with merge_iterables([1, 2, 3], [4, 5, 6]) as stream:
async for item in stream:
print(item)
It also accepts async iterables, which can produce items in parallel:
async def fetch_page(page: int):
... # yields items from a remote page
async with merge_iterables(fetch_page(1), fetch_page(2)) as stream:
async for item in stream:
process(item)
iscoroutinefunction and markcoroutinefunction
The iscoroutinefunction helper checks whether a callable is already a coroutine function. It is re-exported from inspect on newer Python versions and from asyncio on older versions, depending on the runtime.
The markcoroutinefunction helper marks a normal sync callable as a coroutine function. On Python 3.12+ this is inspect.markcoroutinefunction, but with the return type annotated so the function can be treated as a coroutine function in type-checked code.
from asyncio_extensions import iscoroutinefunction, markcoroutinefunction
async def main():
def sync_task() -> int:
return 42
assert iscoroutinefunction(sync_task) is False
marked = markcoroutinefunction(sync_task)
assert iscoroutinefunction(marked) is True
Contributing
Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.
Please make sure to update tests as appropriate.
License
asyncio-extensions is distributed under the terms of the MIT license.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file asyncio_extensions-0.1.0.tar.gz.
File metadata
- Download URL: asyncio_extensions-0.1.0.tar.gz
- Upload date:
- Size: 15.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f0518245e6f37be5c07991eff5d321a2e321716cf75b951b58cbdaf8dc2e1ef4
|
|
| MD5 |
610f5c31a14b3fde763fb2c7e036e1b7
|
|
| BLAKE2b-256 |
cf7338b7db73f88fc7f4a2fe0cb98a4d7df0a9249a4d2d32f697d44436cc8fd1
|
Provenance
The following attestation bundles were made for asyncio_extensions-0.1.0.tar.gz:
Publisher:
release.yml on hartungstenio/asyncio-extensions
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
asyncio_extensions-0.1.0.tar.gz -
Subject digest:
f0518245e6f37be5c07991eff5d321a2e321716cf75b951b58cbdaf8dc2e1ef4 - Sigstore transparency entry: 1526417878
- Sigstore integration time:
-
Permalink:
hartungstenio/asyncio-extensions@1b80031b5b22269c7ee197ab8129048deedd1954 -
Branch / Tag:
refs/tags/0.1.0 - Owner: https://github.com/hartungstenio
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@1b80031b5b22269c7ee197ab8129048deedd1954 -
Trigger Event:
release
-
Statement type:
File details
Details for the file asyncio_extensions-0.1.0-py3-none-any.whl.
File metadata
- Download URL: asyncio_extensions-0.1.0-py3-none-any.whl
- Upload date:
- Size: 10.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
582c57bb152151d6e5a72ca834da1ade40590d4cf7f151bac79e87adc374a1d9
|
|
| MD5 |
446b72e6bd5bc3918cb3fa61989ce159
|
|
| BLAKE2b-256 |
49f6d649633a7c31e2a9f49d1053b401d8cd7a0f82562400f0de1031425adb96
|
Provenance
The following attestation bundles were made for asyncio_extensions-0.1.0-py3-none-any.whl:
Publisher:
release.yml on hartungstenio/asyncio-extensions
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
asyncio_extensions-0.1.0-py3-none-any.whl -
Subject digest:
582c57bb152151d6e5a72ca834da1ade40590d4cf7f151bac79e87adc374a1d9 - Sigstore transparency entry: 1526418002
- Sigstore integration time:
-
Permalink:
hartungstenio/asyncio-extensions@1b80031b5b22269c7ee197ab8129048deedd1954 -
Branch / Tag:
refs/tags/0.1.0 - Owner: https://github.com/hartungstenio
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@1b80031b5b22269c7ee197ab8129048deedd1954 -
Trigger Event:
release
-
Statement type: