Skip to main content

An attempt at comingling async-code and nonasync-code in an ergonomic way.

Project description

An attempt at comingling async-code and nonasync-code in an ergonomic way.

Latest release 20251119: Make most optional parameters keyword only.

One of the difficulties in adapting non-async code for use in an async world is that anything asynchronous needs to be turtles all the way down: a single blocking synchronous call anywhere in the call stack blocks the async event loop.

This module presently provides:

  • @afunc: a decorator to make a synchronous function asynchronous
  • @agen: a decorator to make a synchronous generator asynchronous
  • amap(func,iterable): asynchronous mapping of func over an iterable
  • aqget(q): asynchronous function to get an item from a queue.Queue or similar
  • aqiter(q): asynchronous generator to yield items from a queue.Queue or similar
  • async_iter(iterable): return an asynchronous iterator of an iterable
  • IterableAsyncQueue: an iterable flavour of asyncio.Queue with no get methods
  • AsyncPipeLine: a pipeline of functions connected together with IterableAsyncQueues

Short summary:

  • afunc: A decorator for a synchronous function which turns it into an asynchronous function. If func is already an asynchronous function it is returned unchanged. If fast is true (default False) then func is presumed to consume negligible time and it is simply wrapped in an asynchronous function. Otherwise it is wrapped in asyncio.to_thread.
  • agen: A decorator for a synchronous generator which turns it into an asynchronous generator. If genfunc already an asynchronous generator it is returned unchanged. Exceptions in the synchronous generator are reraised in the asynchronous generator.
  • amap: An asynchronous generator yielding the results of func(item) for each item in the iterable it.
  • aqget: An asynchronous function to get an item from a queue.Queuelike object q. It must support the .get() and .get_nowait() methods.
  • aqiter: An asynchronous generator to yield items from a queue.Queuelike object q. It must support the .get() and .get_nowait() methods.
  • async_iter: Return an asynchronous iterator yielding items from the iterable it. An asynchronous iterable returns aiter(it) directly.
  • AsyncPipeLine: An AsyncPipeLine is an asynchronous iterable with a put method to provide input for processing.
  • IterableAsyncQueue: An iterable subclass of asyncio.Queue.
  • StageMode: Special modes for AsyncPipeLine pipeline stages.

Module contents:

  • afunc(*da, **dkw): A decorator for a synchronous function which turns it into an asynchronous function. If func is already an asynchronous function it is returned unchanged. If fast is true (default False) then func is presumed to consume negligible time and it is simply wrapped in an asynchronous function. Otherwise it is wrapped in asyncio.to_thread.

    Example:

    @afunc
    def func(count):
        time.sleep(count)
        return count
    
    slept = await func(5)
    
    @afunc(fast=True)
    def asqrt(n):
        return math.sqrt(n)
    
  • agen(*da, **dkw): A decorator for a synchronous generator which turns it into an asynchronous generator. If genfunc already an asynchronous generator it is returned unchanged. Exceptions in the synchronous generator are reraised in the asynchronous generator.

    The optional parameter fast is passed through to async_iter.

    Example:

    @agen
    def gen(count):
        for i in range(count):
            yield i
            time.sleep(1.0)
    
    async for item in gen(5):
        print(item)
    
  • amap(func: Callable[[Any], Any], it: Union[Iterable, AsyncIterable], *, concurrent=False, unordered=False, indexed=False, fast=False): An asynchronous generator yielding the results of func(item) for each item in the iterable it.

    it may be a synchronous or asynchronous iterable.

    func may be a synchronous or asynchronous callable.

    If concurrent is False (the default), run each func(item) call in series. If concurrent is true run the function calls as asyncio tasks concurrently.

    If unordered is true (default False) yield results as they arrive, otherwise yield results in the order of the items in it, but as they arrive - tasks still evaluate concurrently if concurrent is true.

    If indexed is true (default False) yield 2-tuples of (i,result) instead of just result, where i is the index if each item from it counting from 0.

    If fast is true (default False) assume that func does not block or otherwise take a long time.

    Example of an async function to fetch URLs in parallel.

    async def get_urls(urls : List[str]):
        """ Fetch `urls` in parallel.
            Yield `(url,response)` 2-tuples.
        """
        async for i, response in amap(
            requests.get, urls,
            concurrent=True, unordered=True, indexed=True,
        ):
            yield urls[i], response
    
  • aqget(q: queue.Queue): An asynchronous function to get an item from a queue.Queuelike object q. It must support the .get() and .get_nowait() methods.

  • aqiter(q: queue.Queue, *, sentinel=<object object at 0x1097e5f70>): An asynchronous generator to yield items from a queue.Queuelike object q. It must support the .get() and .get_nowait() methods.

    An optional sentinel object may be supplied, which ends iteration if encountered. If a sentinel is specified then this must be the only consumer of the queue because the sentinel is consumed.

  • async_iter(it: Union[Iterable, AsyncIterable], *, fast=None): Return an asynchronous iterator yielding items from the iterable it. An asynchronous iterable returns aiter(it) directly.

    If fast is true then it is iterated directly instead of via a distinct async generator. If not specified, fast is set to True if it is a list or tuple or set. A true value for this parameter indicates that fetching the next item from it is always effectively instant and never blocks.

  • class AsyncPipeLine: An AsyncPipeLine is an asynchronous iterable with a put method to provide input for processing.

    A new pipeline is usually constructed via the factory method AsyncPipeLine.from_stages(stage_func,...).

    It has the same methods as an IterableAsyncQueue:

    • async put(item) to queue an item for processing
    • async close() to close the input, indicating end of the input items
    • iteration to consume the processed results

    It also has the following methods:

    • async submit(AnyIterable) to submit multiple items for processing
    • async __call__(AnyIterable) to submit the iterable for processing and consume the results by iteration

    Example:

    def double(item):
        yield item
        yield item
    pipeline = AsyncPipeLine.from_stages(
        double,
        double,
    )
    async for result in pipeline([1,2,3,4]):
        print(result)
    

AsyncPipeLine.__call__(self, it: Union[Iterable, AsyncIterable], fast=None): Call the pipeline with an iterable. Close self.inq after submitting items from it. Return an asynchronous iterable (self.outq) yielding results.

AsyncPipeLine.close(self): Close the input queue.

AsyncPipeLine.from_stages(*stage_specs, maxsize=0) -> Tuple[cs.naysync.IterableAsyncQueue, cs.naysync.IterableAsyncQueue]: Prepare an AsyncPipeLine from stage specifications. Return (inq,tasks,outq) 3-tuple being an input IterableAsyncQueue to receive items to process, a list of asyncio.Tasks per stage specification, and an output IterableAsyncQueue to produce results. If there are no stage_specs the 2 queues are the same queue.

Each stage specification is either:

  • a stage function, implying a batchsize of None
  • a 2-tuple of (stage_func,batchsize) The stage_func and batchsize are as for the run_stage method. In particular the batchsize should be:
  • None, for a stage_func accepting a single item
  • an int >=0, for a stage_func accepting a list of items
  • StageMode.STREAM, for a stage_func accepting an AsyncIterableQueue

AsyncPipeLine.put(self, item): Put item onto the input queue.

AsyncPipeLine.run_stage(inq: cs.naysync.IterableAsyncQueue, stage_func, outq: cs.naysync.IterableAsyncQueue, batchsize: Union[int, NoneType, cs.naysync.StageMode] = None): Run a pipeline stage, copying items from inq to the stage_func and putting results onto outq. After processing, outq is closed.

stage_func is a callable which may be:

  • a sync or async generator which yields results to place onto outq
  • a sync or async function which returns a single result

If batchsize is None, the default, each input item is passed to stage_func(item), which yields the results from the single item.

If batchsize is StageMode.STREAM then stage_func should be a synchronous or asynchronous generator function which receives inq as its sole parameter and yields results.

If batchsize is an int, items from inq are collected into batches up to a limit of batchsize (no limit if batchsize is 0) and passed to stage_func(batch), which yields the results from the batch of items. If the batchsize is 0 the stage_func is called exactly once with all the input items, even if there are no input items.

AsyncPipeLine.submit(self, it: Union[Iterable, AsyncIterable], fast=None): Submit the items from it to the pipeline.

  • class IterableAsyncQueue(asyncio.queues.Queue): An iterable subclass of asyncio.Queue.

    This modifies asyncio.Queue by:

    • adding a .close() async method
    • making the queue iterable, with each iteration consuming an item via .get()

IterableAsyncQueue.__anext__(self): Fetch the next item from the queue.

IterableAsyncQueue.close(self): Close the queue. It is not an error to close the queue more than once.

IterableAsyncQueue.get(self): We do not allow .get().

IterableAsyncQueue.get_nowat(self): We do not allow .get_nowait().

IterableAsyncQueue.put(self, item): Put item onto the queue.

IterableAsyncQueue.put_nowait(self, item): Put an item onto the queue without blocking.

  • class StageMode(enum.StrEnum): Special modes for AsyncPipeLine pipeline stages.

StageMode.__format__

StageMode.__str__

Release Log

Release 20251119: Make most optional parameters keyword only.

Release 20250306:

  • AsyncPipeLine.call: just return self.outq, use try/finally in submit.
  • @agen: new optional fast parameter, plumbed to async_iter().
  • amap: progressive async consume and dispatch, allowing yield of results as items come in - no longer waits for all items to be dispatched before yielding results.
  • amap: new fast=False parameter to indicate that func does not block.
  • Some small fixes.

Release 20250103:

  • @afunc now accepts an async function and returns it unchanged.
  • @agen now accepts an async generator and returns it unchanged.
  • async_iter: now accepts an async iterable, return aiter(it) of it directly.
  • New AnyIterable = Union[Iterable, AsyncIterable] type alias, to allow both sync and async iterators.
  • async_iter: new optional fast=False parameter, if true then iterate the iterator directly instead of via asyncio.to_thread.
  • async_iter: make missing fast= be True for list/tuple/set and False otherwise.
  • @afunc: new optional fast=False parameter - if true then do not divert through asyncio.to_thread.
  • New AsyncPipeLine, an asynchronous iterable with a put method to provide input for processing.
  • New StageMode class with a STREAM enum for streaming stages, implement in AsyncPipeLine.run_stage.
  • New aqget(Queue), an async interface to queue.Queue.get.
  • New aqiter(Queue[,sentinel]), an async generator yielding from a queue.Queue.

Release 20241221.1: Doc fix for amap().

Release 20241221:

  • Simpler implementation of @afunc.
  • Simplify implementation of @agen by using async_iter.
  • Docstring improvements.

Release 20241220:

  • New async_iter(Iterable) returning an asynchronous iterator of a synchronous iterable.
  • New amap(func,iterable) asynchronously mapping a function over an iterable.

Release 20241215:

  • @afunc: now uses asyncio.to_thread() instead of wrapping @agen, drop the decorator parameters since no queue or polling are now used.
  • @agen: nonpolling implementation - now uses asyncio.to_thread() for the next(genfunc) step, drop the decorator parameters since no queue or polling are now used.

Release 20241214.1: Doc update.

Release 20241214: Initial release with @agen and @afunc decorators.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cs_naysync-20251119.tar.gz (11.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cs_naysync-20251119-py2.py3-none-any.whl (12.1 kB view details)

Uploaded Python 2Python 3

File details

Details for the file cs_naysync-20251119.tar.gz.

File metadata

  • Download URL: cs_naysync-20251119.tar.gz
  • Upload date:
  • Size: 11.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.1

File hashes

Hashes for cs_naysync-20251119.tar.gz
Algorithm Hash digest
SHA256 a443c31e38d6b608613aac06f2c35264c8608910a3e0d9c12934520f206fecf7
MD5 9374c59ca1ebf4be3121846f42e92e93
BLAKE2b-256 103e6fe0d2b56978d86da4469d55a2620354cad9dfca5454637e923f7c9cdfcd

See more details on using hashes here.

File details

Details for the file cs_naysync-20251119-py2.py3-none-any.whl.

File metadata

  • Download URL: cs_naysync-20251119-py2.py3-none-any.whl
  • Upload date:
  • Size: 12.1 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.1

File hashes

Hashes for cs_naysync-20251119-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 161ab13b68c7b835daa3d0b9d184ea548f7cff02220967140853c49118c099e8
MD5 fd36119050ff9c95386323c220229322
BLAKE2b-256 0b14cc994307f1193f8ea975d9b17dec96eb12f3cae04028e208327cdd0b5ff2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page