An attempt at comingling async-code and nonasync-code in an ergonomic way.
Project description
An attempt at comingling async-code and nonasync-code in an ergonomic way.
Latest release 20251119: Make most optional parameters keyword only.
One of the difficulties in adapting non-async code for use in an async world is that anything asynchronous needs to be turtles all the way down: a single blocking synchronous call anywhere in the call stack blocks the async event loop.
This module presently provides:
@afunc: a decorator to make a synchronous function asynchronous@agen: a decorator to make a synchronous generator asynchronousamap(func,iterable): asynchronous mapping offuncover an iterableaqget(q): asynchronous function to get an item from aqueue.Queueor similaraqiter(q): asynchronous generator to yield items from aqueue.Queueor similarasync_iter(iterable): return an asynchronous iterator of an iterableIterableAsyncQueue: an iterable flavour ofasyncio.Queuewith nogetmethodsAsyncPipeLine: a pipeline of functions connected together withIterableAsyncQueues
Short summary:
afunc: A decorator for a synchronous function which turns it into an asynchronous function. Iffuncis already an asynchronous function it is returned unchanged. Iffastis true (defaultFalse) thenfuncis presumed to consume negligible time and it is simply wrapped in an asynchronous function. Otherwise it is wrapped inasyncio.to_thread.agen: A decorator for a synchronous generator which turns it into an asynchronous generator. Ifgenfuncalready an asynchronous generator it is returned unchanged. Exceptions in the synchronous generator are reraised in the asynchronous generator.amap: An asynchronous generator yielding the results offunc(item)for eachitemin the iterableit.aqget: An asynchronous function to get an item from aqueue.Queuelike objectq. It must support the.get()and.get_nowait()methods.aqiter: An asynchronous generator to yield items from aqueue.Queuelike objectq. It must support the.get()and.get_nowait()methods.async_iter: Return an asynchronous iterator yielding items from the iterableit. An asynchronous iterable returnsaiter(it)directly.AsyncPipeLine: AnAsyncPipeLineis an asynchronous iterable with aputmethod to provide input for processing.IterableAsyncQueue: An iterable subclass ofasyncio.Queue.StageMode: Special modes forAsyncPipeLinepipeline stages.
Module contents:
-
afunc(*da, **dkw): A decorator for a synchronous function which turns it into an asynchronous function. Iffuncis already an asynchronous function it is returned unchanged. Iffastis true (defaultFalse) thenfuncis presumed to consume negligible time and it is simply wrapped in an asynchronous function. Otherwise it is wrapped inasyncio.to_thread.Example:
@afunc def func(count): time.sleep(count) return count slept = await func(5) @afunc(fast=True) def asqrt(n): return math.sqrt(n) -
agen(*da, **dkw): A decorator for a synchronous generator which turns it into an asynchronous generator. Ifgenfuncalready an asynchronous generator it is returned unchanged. Exceptions in the synchronous generator are reraised in the asynchronous generator.The optional parameter
fastis passed through toasync_iter.Example:
@agen def gen(count): for i in range(count): yield i time.sleep(1.0) async for item in gen(5): print(item) -
amap(func: Callable[[Any], Any], it: Union[Iterable, AsyncIterable], *, concurrent=False, unordered=False, indexed=False, fast=False): An asynchronous generator yielding the results offunc(item)for eachitemin the iterableit.itmay be a synchronous or asynchronous iterable.funcmay be a synchronous or asynchronous callable.If
concurrentisFalse(the default), run eachfunc(item)call in series. Ifconcurrentis true run the function calls asasynciotasks concurrently.If
unorderedis true (defaultFalse) yield results as they arrive, otherwise yield results in the order of the items init, but as they arrive - tasks still evaluate concurrently ifconcurrentis true.If
indexedis true (defaultFalse) yield 2-tuples of(i,result)instead of justresult, whereiis the index if each item fromitcounting from0.If
fastis true (defaultFalse) assume thatfuncdoes not block or otherwise take a long time.Example of an async function to fetch URLs in parallel.
async def get_urls(urls : List[str]): """ Fetch `urls` in parallel. Yield `(url,response)` 2-tuples. """ async for i, response in amap( requests.get, urls, concurrent=True, unordered=True, indexed=True, ): yield urls[i], response -
aqget(q: queue.Queue): An asynchronous function to get an item from aqueue.Queuelike objectq. It must support the.get()and.get_nowait()methods. -
aqiter(q: queue.Queue, *, sentinel=<object object at 0x1097e5f70>): An asynchronous generator to yield items from aqueue.Queuelike objectq. It must support the.get()and.get_nowait()methods.An optional
sentinelobject may be supplied, which ends iteration if encountered. If a sentinel is specified then this must be the only consumer of the queue because the sentinel is consumed. -
async_iter(it: Union[Iterable, AsyncIterable], *, fast=None): Return an asynchronous iterator yielding items from the iterableit. An asynchronous iterable returnsaiter(it)directly.If
fastis true thenitis iterated directly instead of via a distinct async generator. If not specified,fastis set toTrueifitis alistortupleorset. A true value for this parameter indicates that fetching the next item fromitis always effectively instant and never blocks. -
class AsyncPipeLine: AnAsyncPipeLineis an asynchronous iterable with aputmethod to provide input for processing.A new pipeline is usually constructed via the factory method
AsyncPipeLine.from_stages(stage_func,...).It has the same methods as an
IterableAsyncQueue:async put(item)to queue an item for processingasync close()to close the input, indicating end of the input items- iteration to consume the processed results
It also has the following methods:
async submit(AnyIterable)to submit multiple items for processingasync __call__(AnyIterable)to submit the iterable for processing and consume the results by iteration
Example:
def double(item): yield item yield item pipeline = AsyncPipeLine.from_stages( double, double, ) async for result in pipeline([1,2,3,4]): print(result)
AsyncPipeLine.__call__(self, it: Union[Iterable, AsyncIterable], fast=None):
Call the pipeline with an iterable.
Close self.inq after submitting items from it.
Return an asynchronous iterable (self.outq)
yielding results.
AsyncPipeLine.close(self):
Close the input queue.
AsyncPipeLine.from_stages(*stage_specs, maxsize=0) -> Tuple[cs.naysync.IterableAsyncQueue, cs.naysync.IterableAsyncQueue]:
Prepare an AsyncPipeLine from stage specifications.
Return (inq,tasks,outq) 3-tuple being an input IterableAsyncQueue
to receive items to process, a list of asyncio.Tasks per
stage specification, and an output IterableAsyncQueue to
produce results. If there are no stage_specs the 2 queues
are the same queue.
Each stage specification is either:
- a stage function, implying a
batchsizeofNone - a 2-tuple of
(stage_func,batchsize)Thestage_funcandbatchsizeare as for therun_stagemethod. In particular thebatchsizeshould be: None, for astage_funcaccepting a single item- an
int>=0, for astage_funcaccepting a list of items StageMode.STREAM, for astage_funcaccepting anAsyncIterableQueue
AsyncPipeLine.put(self, item):
Put item onto the input queue.
AsyncPipeLine.run_stage(inq: cs.naysync.IterableAsyncQueue, stage_func, outq: cs.naysync.IterableAsyncQueue, batchsize: Union[int, NoneType, cs.naysync.StageMode] = None):
Run a pipeline stage, copying items from inq to the stage_func
and putting results onto outq. After processing, outq is
closed.
stage_func is a callable which may be:
- a sync or async generator which yields results to place onto
outq - a sync or async function which returns a single result
If batchsize is None, the default, each input item is
passed to stage_func(item), which yields the results from the
single item.
If batchsize is StageMode.STREAM then stage_func
should be a synchronous or asynchronous generator function
which receives inq as its sole parameter and yields
results.
If batchsize is an int, items from inq are collected
into batches up to a limit of batchsize (no limit if
batchsize is 0) and passed to stage_func(batch), which
yields the results from the batch of items.
If the batchsize is 0 the stage_func is called exactly
once with all the input items, even if there are no input
items.
AsyncPipeLine.submit(self, it: Union[Iterable, AsyncIterable], fast=None):
Submit the items from it to the pipeline.
-
class IterableAsyncQueue(asyncio.queues.Queue): An iterable subclass ofasyncio.Queue.This modifies
asyncio.Queueby:- adding a
.close()async method - making the queue iterable, with each iteration consuming an item via
.get()
- adding a
IterableAsyncQueue.__anext__(self):
Fetch the next item from the queue.
IterableAsyncQueue.close(self):
Close the queue.
It is not an error to close the queue more than once.
IterableAsyncQueue.get(self):
We do not allow .get().
IterableAsyncQueue.get_nowat(self):
We do not allow .get_nowait().
IterableAsyncQueue.put(self, item):
Put item onto the queue.
IterableAsyncQueue.put_nowait(self, item):
Put an item onto the queue without blocking.
StageMode.__format__
StageMode.__str__
Release Log
Release 20251119: Make most optional parameters keyword only.
Release 20250306:
- AsyncPipeLine.call: just return self.outq, use try/finally in submit.
- @agen: new optional
fastparameter, plumbed to async_iter(). - amap: progressive async consume and dispatch, allowing yield of results as items come in - no longer waits for all items to be dispatched before yielding results.
- amap: new fast=False parameter to indicate that func does not block.
- Some small fixes.
Release 20250103:
- @afunc now accepts an async function and returns it unchanged.
- @agen now accepts an async generator and returns it unchanged.
- async_iter: now accepts an async iterable, return aiter(it) of it directly.
- New AnyIterable = Union[Iterable, AsyncIterable] type alias, to allow both sync and async iterators.
- async_iter: new optional fast=False parameter, if true then iterate the iterator directly instead of via asyncio.to_thread.
- async_iter: make missing
fast=be True for list/tuple/set and False otherwise. - @afunc: new optional fast=False parameter - if true then do not divert through asyncio.to_thread.
- New AsyncPipeLine, an asynchronous iterable with a
putmethod to provide input for processing. - New StageMode class with a STREAM enum for streaming stages, implement in AsyncPipeLine.run_stage.
- New aqget(Queue), an async interface to queue.Queue.get.
- New aqiter(Queue[,sentinel]), an async generator yielding from a queue.Queue.
Release 20241221.1: Doc fix for amap().
Release 20241221:
- Simpler implementation of @afunc.
- Simplify implementation of @agen by using async_iter.
- Docstring improvements.
Release 20241220:
- New async_iter(Iterable) returning an asynchronous iterator of a synchronous iterable.
- New amap(func,iterable) asynchronously mapping a function over an iterable.
Release 20241215:
- @afunc: now uses asyncio.to_thread() instead of wrapping @agen, drop the decorator parameters since no queue or polling are now used.
- @agen: nonpolling implementation - now uses asyncio.to_thread() for the next(genfunc) step, drop the decorator parameters since no queue or polling are now used.
Release 20241214.1: Doc update.
Release 20241214: Initial release with @agen and @afunc decorators.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cs_naysync-20251119.tar.gz.
File metadata
- Download URL: cs_naysync-20251119.tar.gz
- Upload date:
- Size: 11.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a443c31e38d6b608613aac06f2c35264c8608910a3e0d9c12934520f206fecf7
|
|
| MD5 |
9374c59ca1ebf4be3121846f42e92e93
|
|
| BLAKE2b-256 |
103e6fe0d2b56978d86da4469d55a2620354cad9dfca5454637e923f7c9cdfcd
|
File details
Details for the file cs_naysync-20251119-py2.py3-none-any.whl.
File metadata
- Download URL: cs_naysync-20251119-py2.py3-none-any.whl
- Upload date:
- Size: 12.1 kB
- Tags: Python 2, Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
161ab13b68c7b835daa3d0b9d184ea548f7cff02220967140853c49118c099e8
|
|
| MD5 |
fd36119050ff9c95386323c220229322
|
|
| BLAKE2b-256 |
0b14cc994307f1193f8ea975d9b17dec96eb12f3cae04028e208327cdd0b5ff2
|