Skip to main content

An attempt at comingling async-code and nonasync-code in an ergonomic way.

Project description

An attempt at comingling async-code and nonasync-code in an ergonomic way.

Latest release 20260415:

  • New to_threadpool() function like to_thread(0 but utilitiing a ThreadPoolExecutor.
  • aqget: new optional tpe:ThreadPoolExecutor parameter to use a thread pool for the to_thread call, if used.
  • New Lock context manager class which can be used from sync and async contexts.

One of the difficulties in adapting non-async code for use in an async world is that anything asynchronous needs to be turtles all the way down: a single blocking synchronous call anywhere in the call stack blocks the async event loop.

This module presently provides:

  • @afunc: a decorator to make a synchronous function asynchronous
  • @agen: a decorator to make a synchronous generator asynchronous
  • amap(func,iterable): asynchronous mapping of func over an iterable
  • aqget(q): asynchronous function to get an item from a queue.Queue or similar
  • aqiter(q): asynchronous generator to yield items from a queue.Queue or similar
  • async_iter(iterable): return an asynchronous iterator of an iterable
  • IterableAsyncQueue: an iterable flavour of asyncio.Queue with no get methods
  • AsyncPipeLine: a pipeline of functions connected together with IterableAsyncQueues

Short summary:

  • afunc: A decorator for a synchronous function which turns it into an asynchronous function. If func is already an asynchronous function it is returned unchanged. If fast is true (default False) then func is presumed to consume negligible time and it is simply wrapped in an asynchronous function. Otherwise it is wrapped in asyncio.to_thread.
  • agen: A decorator for a synchronous generator which turns it into an asynchronous generator. If genfunc already an asynchronous generator it is returned unchanged. Exceptions in the synchronous generator are reraised in the asynchronous generator.
  • amap: An asynchronous generator yielding the results of func(item) for each item in the iterable it.
  • aqget: An asynchronous function to get an item from a queue.Queuelike object q. It must support the .get() and .get_nowait() methods; if requires the .get() is dispatched using to_thread().
  • aqiter: An asynchronous generator to yield items from a queue.Queuelike object q. It must support the .get() and .get_nowait() methods.
  • async_iter: Return an asynchronous iterator yielding items from the iterable it. An asynchronous iterable returns aiter(it) directly.
  • AsyncPipeLine: An AsyncPipeLine is an asynchronous iterable with a put method to provide input for processing.
  • IterableAsyncQueue: An iterable subclass of asyncio.Queue.
  • Lock: A lock object which can be used in synchronous and asynchonous contexts.
  • StageMode: Special modes for AsyncPipeLine pipeline stages.
  • to_threadpool: A variant on asyncio.to_thread which dispatches the thread via a concurrent.futures.ThreadPoolExecutor.

Module contents:

  • afunc(*da, **dkw): A decorator for a synchronous function which turns it into an asynchronous function. If func is already an asynchronous function it is returned unchanged. If fast is true (default False) then func is presumed to consume negligible time and it is simply wrapped in an asynchronous function. Otherwise it is wrapped in asyncio.to_thread.

    Example:

    @afunc
    def func(count):
        time.sleep(count)
        return count
    
    slept = await func(5)
    
    @afunc(fast=True)
    def asqrt(n):
        return math.sqrt(n)
    
  • agen(*da, **dkw): A decorator for a synchronous generator which turns it into an asynchronous generator. If genfunc already an asynchronous generator it is returned unchanged. Exceptions in the synchronous generator are reraised in the asynchronous generator.

    The optional parameter fast is passed through to async_iter.

    Example:

    @agen
    def gen(count):
        for i in range(count):
            yield i
            time.sleep(1.0)
    
    async for item in gen(5):
        print(item)
    
  • amap(func: Callable[[Any], Any], it: Union[Iterable, AsyncIterable], *, concurrent=False, unordered=False, indexed=False, fast=False): An asynchronous generator yielding the results of func(item) for each item in the iterable it.

    it may be a synchronous or asynchronous iterable.

    func may be a synchronous or asynchronous callable.

    If concurrent is False (the default), run each func(item) call in series. If concurrent is true run the function calls as asyncio tasks concurrently.

    If unordered is true (default False) yield results as they arrive, otherwise yield results in the order of the items in it, but as they arrive - tasks still evaluate concurrently if concurrent is true.

    If indexed is true (default False) yield 2-tuples of (i,result) instead of just result, where i is the index if each item from it counting from 0.

    If fast is true (default False) assume that func does not block or otherwise take a long time.

    Example of an async function to fetch URLs in parallel.

    async def get_urls(urls : List[str]):
        """ Fetch `urls` in parallel.
            Yield `(url,response)` 2-tuples.
        """
        async for i, response in amap(
            requests.get, urls,
            concurrent=True, unordered=True, indexed=True,
        ):
            yield urls[i], response
    
  • aqget(q: queue.Queue, tpe: concurrent.futures.thread.ThreadPoolExecutor = None): An asynchronous function to get an item from a queue.Queuelike object q. It must support the .get() and .get_nowait() methods; if requires the .get() is dispatched using to_thread().

    The optional tpe parameter may specify a concurrent.futures.ThreadPoolExecutor to use instead of to_thread. For example, the Lock class uses a thread pool to avoid deadlocking with the general purpose thread pool used by to_thread.

  • aqiter(q: queue.Queue, *, sentinel=<object object at 0x104dca3b0>): An asynchronous generator to yield items from a queue.Queuelike object q. It must support the .get() and .get_nowait() methods.

    An optional sentinel object may be supplied, which ends iteration if encountered. If a sentinel is specified then this must be the only consumer of the queue because the sentinel is consumed.

  • async_iter(it: Union[Iterable, AsyncIterable], *, fast=None): Return an asynchronous iterator yielding items from the iterable it. An asynchronous iterable returns aiter(it) directly.

    If fast is true then it is iterated directly instead of via a distinct async generator. If not specified, fast is set to True if it is a list or tuple or set. A true value for this parameter indicates that fetching the next item from it is always effectively instant and never blocks.

  • class AsyncPipeLine: An AsyncPipeLine is an asynchronous iterable with a put method to provide input for processing.

    A new pipeline is usually constructed via the factory method AsyncPipeLine.from_stages(stage_func,...).

    It has the same methods as an IterableAsyncQueue:

    • async put(item) to queue an item for processing
    • async close() to close the input, indicating end of the input items
    • iteration to consume the processed results

    It also has the following methods:

    • async submit(AnyIterable) to submit multiple items for processing
    • async __call__(AnyIterable) to submit the iterable for processing and consume the results by iteration

    Example:

    def double(item):
        yield item
        yield item
    pipeline = AsyncPipeLine.from_stages(
        double,
        double,
    )
    async for result in pipeline([1,2,3,4]):
        print(result)
    

AsyncPipeLine.__call__(self, it: Union[Iterable, AsyncIterable], fast=None): Call the pipeline with an iterable. Close self.inq after submitting items from it. Return an asynchronous iterable (self.outq) yielding results.

AsyncPipeLine.close(self): Close the input queue.

AsyncPipeLine.from_stages(*stage_specs, maxsize=0) -> Tuple[cs.naysync.IterableAsyncQueue, cs.naysync.IterableAsyncQueue]: Prepare an AsyncPipeLine from stage specifications. Return (inq,tasks,outq) 3-tuple being an input IterableAsyncQueue to receive items to process, a list of asyncio.Tasks per stage specification, and an output IterableAsyncQueue to produce results. If there are no stage_specs the 2 queues are the same queue.

Each stage specification is either:

  • a stage function, implying a batchsize of None
  • a 2-tuple of (stage_func,batchsize) The stage_func and batchsize are as for the run_stage method. In particular the batchsize should be:
  • None, for a stage_func accepting a single item
  • an int >=0, for a stage_func accepting a list of items
  • StageMode.STREAM, for a stage_func accepting an AsyncIterableQueue

AsyncPipeLine.put(self, item): Put item onto the input queue.

AsyncPipeLine.run_stage(inq: cs.naysync.IterableAsyncQueue, stage_func, outq: cs.naysync.IterableAsyncQueue, batchsize: Union[int, NoneType, cs.naysync.StageMode] = None): Run a pipeline stage, copying items from inq to the stage_func and putting results onto outq. After processing, outq is closed.

stage_func is a callable which may be:

  • a sync or async generator which yields results to place onto outq
  • a sync or async function which returns a single result

If batchsize is None, the default, each input item is passed to stage_func(item), which yields the results from the single item.

If batchsize is StageMode.STREAM then stage_func should be a synchronous or asynchronous generator function which receives inq as its sole parameter and yields results.

If batchsize is an int, items from inq are collected into batches up to a limit of batchsize (no limit if batchsize is 0) and passed to stage_func(batch), which yields the results from the batch of items. If the batchsize is 0 the stage_func is called exactly once with all the input items, even if there are no input items.

AsyncPipeLine.submit(self, it: Union[Iterable, AsyncIterable], fast=None): Submit the items from it to the pipeline.

  • class IterableAsyncQueue(asyncio.queues.Queue): An iterable subclass of asyncio.Queue.

    This modifies asyncio.Queue by:

    • adding a .close() async method
    • making the queue iterable, with each iteration consuming an item via .get()

IterableAsyncQueue.__anext__(self): Fetch the next item from the queue.

IterableAsyncQueue.close(self): Close the queue. It is not an error to close the queue more than once.

IterableAsyncQueue.get(self): We do not allow .get().

IterableAsyncQueue.get_nowat(self): We do not allow .get_nowait().

IterableAsyncQueue.put(self, item): Put item onto the queue.

IterableAsyncQueue.put_nowait(self, item): Put an item onto the queue without blocking.

  • class Lock: A lock object which can be used in synchronous and asynchonous contexts.

    The async enter step is built on the aqget() function.

    Example use:

    shared_lock = Lock("my shared lock")
    

    Synchronous use from some thread:

    with shared_lock:
        critical section
    

    Asynchronous use from some event loop:

    async with shared_lock:
        critical section
    

Lock.__init__(self, name=None, *, max_workers=64): Initialise the Lock.

Parameters:

  • name: optional name for the lock
  • mac_workers: optional number of threads in the per-lock thread pool, arbitrarily set to 64 since the defaults for ThreadPoolExecutor seems insanely low and run_in_executor is synchronous!
  • class StageMode(enum.StrEnum): Special modes for AsyncPipeLine pipeline stages.

StageMode.__format__

StageMode.__str__

  • to_threadpool(tpe: concurrent.futures.thread.ThreadPoolExecutor, func, *a, **kw): A variant on asyncio.to_thread which dispatches the thread via a concurrent.futures.ThreadPoolExecutor.

    You can just use the run_in_executor event loop method directly but this is handy if you've already got a thread pool for a specific purpose and code which fits to_thread.

Release Log

Release 20260415:

  • New to_threadpool() function like to_thread(0 but utilitiing a ThreadPoolExecutor.
  • aqget: new optional tpe:ThreadPoolExecutor parameter to use a thread pool for the to_thread call, if used.
  • New Lock context manager class which can be used from sync and async contexts.

Release 20251119: Make most optional parameters keyword only.

Release 20250306:

  • AsyncPipeLine.call: just return self.outq, use try/finally in submit.
  • @agen: new optional fast parameter, plumbed to async_iter().
  • amap: progressive async consume and dispatch, allowing yield of results as items come in - no longer waits for all items to be dispatched before yielding results.
  • amap: new fast=False parameter to indicate that func does not block.
  • Some small fixes.

Release 20250103:

  • @afunc now accepts an async function and returns it unchanged.
  • @agen now accepts an async generator and returns it unchanged.
  • async_iter: now accepts an async iterable, return aiter(it) of it directly.
  • New AnyIterable = Union[Iterable, AsyncIterable] type alias, to allow both sync and async iterators.
  • async_iter: new optional fast=False parameter, if true then iterate the iterator directly instead of via asyncio.to_thread.
  • async_iter: make missing fast= be True for list/tuple/set and False otherwise.
  • @afunc: new optional fast=False parameter - if true then do not divert through asyncio.to_thread.
  • New AsyncPipeLine, an asynchronous iterable with a put method to provide input for processing.
  • New StageMode class with a STREAM enum for streaming stages, implement in AsyncPipeLine.run_stage.
  • New aqget(Queue), an async interface to queue.Queue.get.
  • New aqiter(Queue[,sentinel]), an async generator yielding from a queue.Queue.

Release 20241221.1: Doc fix for amap().

Release 20241221:

  • Simpler implementation of @afunc.
  • Simplify implementation of @agen by using async_iter.
  • Docstring improvements.

Release 20241220:

  • New async_iter(Iterable) returning an asynchronous iterator of a synchronous iterable.
  • New amap(func,iterable) asynchronously mapping a function over an iterable.

Release 20241215:

  • @afunc: now uses asyncio.to_thread() instead of wrapping @agen, drop the decorator parameters since no queue or polling are now used.
  • @agen: nonpolling implementation - now uses asyncio.to_thread() for the next(genfunc) step, drop the decorator parameters since no queue or polling are now used.

Release 20241214.1: Doc update.

Release 20241214: Initial release with @agen and @afunc decorators.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cs_naysync-20260415.tar.gz (13.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cs_naysync-20260415-py2.py3-none-any.whl (14.2 kB view details)

Uploaded Python 2Python 3

File details

Details for the file cs_naysync-20260415.tar.gz.

File metadata

  • Download URL: cs_naysync-20260415.tar.gz
  • Upload date:
  • Size: 13.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.1

File hashes

Hashes for cs_naysync-20260415.tar.gz
Algorithm Hash digest
SHA256 983a68d512a60ebe02ce6ef01bb7faac34d61e8cbd388e4c5e83d61e82f7ae17
MD5 4ae67d45322ace73049009bcfae7dfd5
BLAKE2b-256 c2083ec96b4f4903bd6bc010f1094985f13c8b555fe04f3c3356509acb43785e

See more details on using hashes here.

File details

Details for the file cs_naysync-20260415-py2.py3-none-any.whl.

File metadata

  • Download URL: cs_naysync-20260415-py2.py3-none-any.whl
  • Upload date:
  • Size: 14.2 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.1

File hashes

Hashes for cs_naysync-20260415-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 19c17070a3b8e06686a3b0b5b3d9ced90f6d1608d63c12d0ac1eed12ac69c950
MD5 2233e1601d68f322a693fe815d8b2e53
BLAKE2b-256 5c25dd7c00bd94c32e44559a2f958c37449e6ca4262f1f780300635b904a6a90

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page