An attempt at comingling async-code and nonasync-code in an ergonomic way.
Project description
An attempt at comingling async-code and nonasync-code in an ergonomic way.
Latest release 20250103:
- @afunc now accepts an async function and returns it unchanged.
- @agen now accepts an async generator and returns it unchanged.
- async_iter: now accepts an async iterable, return aiter(it) of it directly.
- New AnyIterable = Union[Iterable, AsyncIterable] type alias, to allow both sync and async iterators.
- async_iter: new optional fast=False parameter, if true then iterate the iterator directly instead of via asyncio.to_thread.
- async_iter: make missing
fast=be True for list/tuple/set and False otherwise. - @afunc: new optional fast=False parameter - if true then do not divert through asyncio.to_thread.
- New AsyncPipeLine, an asynchronous iterable with a
putmethod to provide input for processing. - New StageMode class with a STREAM enum for streaming stages, implement in AsyncPipeLine.run_stage.
- New aqget(Queue), an async interface to queue.Queue.get.
- New aqiter(Queue[,sentinel]), an async generator yielding from a queue.Queue.
One of the difficulties in adapting non-async code for use in an async world is that anything asynchronous needs to be turtles all the way down: a single blocking synchronous call anywhere in the call stack blocks the async event loop.
This module presently provides:
@afunc: a decorator to make a synchronous function asynchronous@agen: a decorator to make a synchronous generator asynchronousamap(func,iterable): asynchronous mapping offuncover an iterableaqget(q): asynchronous function to get an item from aqueue.Queueor similaraqiter(q): asynchronous generator to yield items from aqueue.Queueor similarasync_iter(iterable): return an asynchronous iterator of an iterableIterableAsyncQueue: an iterable flavour ofasyncio.Queuewith nogetmethodsAsyncPipeLine: a pipeline of functions connected together withIterableAsyncQueues
afunc(*da, **dkw)
A decorator for a synchronous function which turns it into
an asynchronous function.
If func is already an asynchronous function it is returned unchanged.
If fast is true (default False) then func is presumed to consume
negligible time and it is simply wrapped in an asynchronous function.
Otherwise it is wrapped in asyncio.to_thread.
Example:
@afunc
def func(count):
time.sleep(count)
return count
slept = await func(5)
@afunc(fast=True)
def asqrt(n):
return math.sqrt(n)
agen(*da, **dkw)
A decorator for a synchronous generator which turns it into
an asynchronous generator.
If genfunc already an asynchronous generator it is returned unchanged.
Exceptions in the synchronous generator are reraised in the asynchronous
generator.
Example:
@agen
def gen(count):
for i in range(count):
yield i
time.sleep(1.0)
async for item in gen(5):
print(item)
amap(func: Callable[[Any], Any], it: Union[Iterable, AsyncIterable], concurrent=False, unordered=False, indexed=False)
An asynchronous generator yielding the results of func(item)
for each item in the iterable it.
it may be a synchronous or asynchronous iterable.
func may be a synchronous or asynchronous callable.
If concurrent is False (the default), run each func(item)
call in series.
If concurrent is true run the function calls as asyncio
tasks concurrently.
If unordered is true (default False) yield results as
they arrive, otherwise yield results in the order of the items
in it, but as they arrive - tasks still evaluate concurrently
if concurrent is true.
If indexed is true (default False) yield 2-tuples of
(i,result) instead of just result, where i is the index
if each item from it counting from 0.
Example of an async function to fetch URLs in parallel.
async def get_urls(urls : List[str]):
""" Fetch `urls` in parallel.
Yield `(url,response)` 2-tuples.
"""
async for i, response in amap(
requests.get, urls,
concurrent=True, unordered=True, indexed=True,
):
yield urls[i], response
aqget(q: queue.Queue)
An asynchronous function to get an item from a queue.Queuelike object q.
It must support the .get() and .get_nowait() methods.
aqiter(q: queue.Queue, sentinel=<object object at 0x10f37a300>)
An asynchronous generator to yield items from a queue.Queuelike object q.
It must support the .get() and .get_nowait() methods.
An optional sentinel object may be supplied, which ends iteration
if encountered. If a sentinel is specified then this must be the only
consumer of the queue because the sentinel is consumed.
async_iter(it: Union[Iterable, AsyncIterable], fast=None)
Return an asynchronous iterator yielding items from the iterable it.
An asynchronous iterable returns aiter(it) directly.
If fast is true then it is iterated directly instead of
via a distinct async generator. If not specified, fast is
set to True if it is a list or tuple or set. A true
value for this parameter indicates that fetching the next
item from it is always effectively instant and never blocks.
Class AsyncPipeLine
An AsyncPipeLine is an asynchronous iterable with a put method
to provide input for processing.
A new pipeline is usually constructed via the factory method
AsyncPipeLine.from_stages(stage_func,...).
It has the same methods as an IterableAsyncQueue:
async put(item)to queue an item for processingasync close()to close the input, indicating end of the input items- iteration to consume the processed results
It also has the following methods:
async submit(AnyIterable)to submit multiple items for processingasync __call__(AnyIterable)to submit the iterable for processing and consume the results by iteration
Example:
def double(item):
yield item
yield item
pipeline = AsyncPipeLine.from_stages(
double,
double,
)
async for result in pipeline([1,2,3,4]):
print(result)
AsyncPipeLine.__call__(self, it: Union[Iterable, AsyncIterable], fast=None):
Call the pipeline with an iterable.
AsyncPipeLine.close(self):
Close the input queue.
AsyncPipeLine.from_stages(*stage_specs, maxsize=0) -> Tuple[cs.naysync.IterableAsyncQueue, cs.naysync.IterableAsyncQueue]:
Prepare an AsyncPipeLine from stage specifications.
Return (inq,tasks,outq) 3-tuple being an input IterableAsyncQueue
to receive items to process, a list of asyncio.Tasks per
stage specification, and an output IterableAsyncQueue to
produce results. If there are no stage_specs the 2 queues
are the same queue.
Each stage specification is either:
- an stage function suitable for
run_stage - a 2-tuple of
(stage_func,batchsize)In the latter case: stage_funcis an stage function suitable forrun_stagebatchsizeis anint, where0means to gather all the items frominqand supply them as a single batch tostage_funcand where a value>0collects items up to a limit ofbatchsizeand supplies each batch tostage_funcIf thebatchsizeis0thestage_funcis called exactly once with all the input items, even if there are no input items.
AsyncPipeLine.put(self, item):
Put item onto the input queue.
AsyncPipeLine.run_stage(inq: cs.naysync.IterableAsyncQueue, stage_func, outq: cs.naysync.IterableAsyncQueue, batchsize: Union[int, NoneType, cs.naysync.StageMode] = None):
Run a pipeline stage, copying items from inq to the stage_func
and putting results onto outq. After processing, outq is
closed.
stage_func is a callable which may be:
- a sync or async generator which yields results to place onto
outq - a sync or async function which returns a single result
If batchsize is None, the default, each input item is
passed to stage_func(item), which yields the results from the
single item.
If batchsize is an int, items from inq are collected
into batches up to a limit of batchsize (no limit if
batchsize is 0) and passed to stage_func(batch), which
yields the results from the batch of items.
If the batchsize is 0 the stage_func is called exactly
once with all the input items, even if there are no input
items.
AsyncPipeLine.submit(self, it: Union[Iterable, AsyncIterable], fast=None):
Submit the items from it to the pipeline.
Class IterableAsyncQueue(asyncio.queues.Queue)
An iterable subclass of asyncio.Queue.
This modifies asyncio.Queue by:
- adding a
.close()async method - making the queue iterable, with each iteration consuming an item via
.get()
IterableAsyncQueue.__anext__(self):
Fetch the next item from the queue.
IterableAsyncQueue.close(self):
Close the queue.
It is not an error to close the queue more than once.
IterableAsyncQueue.get(self):
We do not allow .get().
IterableAsyncQueue.get_nowat(self):
We do not allow .get_nowait().
IterableAsyncQueue.put(self, item):
Put item onto the queue.
IterableAsyncQueue.put_nowait(self, item):
Put an item onto the queue without blocking.
Class StageMode(enum.StrEnum)
Special modes for AsyncPipeLine pipeline stages.
StageMode.__format__
StageMode.__str__
Release Log
Release 20250103:
- @afunc now accepts an async function and returns it unchanged.
- @agen now accepts an async generator and returns it unchanged.
- async_iter: now accepts an async iterable, return aiter(it) of it directly.
- New AnyIterable = Union[Iterable, AsyncIterable] type alias, to allow both sync and async iterators.
- async_iter: new optional fast=False parameter, if true then iterate the iterator directly instead of via asyncio.to_thread.
- async_iter: make missing
fast=be True for list/tuple/set and False otherwise. - @afunc: new optional fast=False parameter - if true then do not divert through asyncio.to_thread.
- New AsyncPipeLine, an asynchronous iterable with a
putmethod to provide input for processing. - New StageMode class with a STREAM enum for streaming stages, implement in AsyncPipeLine.run_stage.
- New aqget(Queue), an async interface to queue.Queue.get.
- New aqiter(Queue[,sentinel]), an async generator yielding from a queue.Queue.
Release 20241221.1: Doc fix for amap().
Release 20241221:
- Simpler implementation of @afunc.
- Simplify implementation of @agen by using async_iter.
- Docstring improvements.
Release 20241220:
- New async_iter(Iterable) returning an asynchronous iterator of a synchronous iterable.
- New amap(func,iterable) asynchronously mapping a function over an iterable.
Release 20241215:
- @afunc: now uses asyncio.to_thread() instead of wrapping @agen, drop the decorator parameters since no queue or polling are now used.
- @agen: nonpolling implementation - now uses asyncio.to_thread() for the next(genfunc) step, drop the decorator parameters since no queue or polling are now used.
Release 20241214.1: Doc update.
Release 20241214: Initial release with @agen and @afunc decorators.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cs_naysync-20250103.tar.gz.
File metadata
- Download URL: cs_naysync-20250103.tar.gz
- Upload date:
- Size: 11.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9c51be6bffb1603d09efd0223aa8833e1e3935dec978635755447842b09f8990
|
|
| MD5 |
ad87c4df065612ac56bba7f8ea918c28
|
|
| BLAKE2b-256 |
ffeb7d443029de92dc8c3c47d93c115461253f62f53a8eb0b814303c484c6e88
|
File details
Details for the file cs_naysync-20250103-py3-none-any.whl.
File metadata
- Download URL: cs_naysync-20250103-py3-none-any.whl
- Upload date:
- Size: 11.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cfc8d1c0994d4f606548bb1826baf5f98f8b802c912135528213ec8e8073a700
|
|
| MD5 |
2f2ce74ea9208de0d17638b0027fe79f
|
|
| BLAKE2b-256 |
9a99ee1a3c6318a40e415a4decfdccbcd09f91823cf3c519bff0e37963540765
|