Skip to main content

An attempt at comingling async-code and nonasync-code in an ergonomic way.

Project description

An attempt at comingling async-code and nonasync-code in an ergonomic way.

Latest release 20250103:

  • @afunc now accepts an async function and returns it unchanged.
  • @agen now accepts an async generator and returns it unchanged.
  • async_iter: now accepts an async iterable, return aiter(it) of it directly.
  • New AnyIterable = Union[Iterable, AsyncIterable] type alias, to allow both sync and async iterators.
  • async_iter: new optional fast=False parameter, if true then iterate the iterator directly instead of via asyncio.to_thread.
  • async_iter: make missing fast= be True for list/tuple/set and False otherwise.
  • @afunc: new optional fast=False parameter - if true then do not divert through asyncio.to_thread.
  • New AsyncPipeLine, an asynchronous iterable with a put method to provide input for processing.
  • New StageMode class with a STREAM enum for streaming stages, implement in AsyncPipeLine.run_stage.
  • New aqget(Queue), an async interface to queue.Queue.get.
  • New aqiter(Queue[,sentinel]), an async generator yielding from a queue.Queue.

One of the difficulties in adapting non-async code for use in an async world is that anything asynchronous needs to be turtles all the way down: a single blocking synchronous call anywhere in the call stack blocks the async event loop.

This module presently provides:

  • @afunc: a decorator to make a synchronous function asynchronous
  • @agen: a decorator to make a synchronous generator asynchronous
  • amap(func,iterable): asynchronous mapping of func over an iterable
  • aqget(q): asynchronous function to get an item from a queue.Queue or similar
  • aqiter(q): asynchronous generator to yield items from a queue.Queue or similar
  • async_iter(iterable): return an asynchronous iterator of an iterable
  • IterableAsyncQueue: an iterable flavour of asyncio.Queue with no get methods
  • AsyncPipeLine: a pipeline of functions connected together with IterableAsyncQueues

afunc(*da, **dkw)

A decorator for a synchronous function which turns it into an asynchronous function. If func is already an asynchronous function it is returned unchanged. If fast is true (default False) then func is presumed to consume negligible time and it is simply wrapped in an asynchronous function. Otherwise it is wrapped in asyncio.to_thread.

Example:

@afunc
def func(count):
    time.sleep(count)
    return count

slept = await func(5)

@afunc(fast=True)
def asqrt(n):
    return math.sqrt(n)

agen(*da, **dkw)

A decorator for a synchronous generator which turns it into an asynchronous generator. If genfunc already an asynchronous generator it is returned unchanged. Exceptions in the synchronous generator are reraised in the asynchronous generator.

Example:

@agen
def gen(count):
    for i in range(count):
        yield i
        time.sleep(1.0)

async for item in gen(5):
    print(item)

amap(func: Callable[[Any], Any], it: Union[Iterable, AsyncIterable], concurrent=False, unordered=False, indexed=False)

An asynchronous generator yielding the results of func(item) for each item in the iterable it.

it may be a synchronous or asynchronous iterable.

func may be a synchronous or asynchronous callable.

If concurrent is False (the default), run each func(item) call in series.

If concurrent is true run the function calls as asyncio tasks concurrently. If unordered is true (default False) yield results as they arrive, otherwise yield results in the order of the items in it, but as they arrive - tasks still evaluate concurrently if concurrent is true.

If indexed is true (default False) yield 2-tuples of (i,result) instead of just result, where i is the index if each item from it counting from 0.

Example of an async function to fetch URLs in parallel.

async def get_urls(urls : List[str]):
    """ Fetch `urls` in parallel.
        Yield `(url,response)` 2-tuples.
    """
    async for i, response in amap(
        requests.get, urls,
        concurrent=True, unordered=True, indexed=True,
    ):
        yield urls[i], response

aqget(q: queue.Queue)

An asynchronous function to get an item from a queue.Queuelike object q. It must support the .get() and .get_nowait() methods.

aqiter(q: queue.Queue, sentinel=<object object at 0x10f37a300>)

An asynchronous generator to yield items from a queue.Queuelike object q. It must support the .get() and .get_nowait() methods.

An optional sentinel object may be supplied, which ends iteration if encountered. If a sentinel is specified then this must be the only consumer of the queue because the sentinel is consumed.

async_iter(it: Union[Iterable, AsyncIterable], fast=None)

Return an asynchronous iterator yielding items from the iterable it. An asynchronous iterable returns aiter(it) directly.

If fast is true then it is iterated directly instead of via a distinct async generator. If not specified, fast is set to True if it is a list or tuple or set. A true value for this parameter indicates that fetching the next item from it is always effectively instant and never blocks.

Class AsyncPipeLine

An AsyncPipeLine is an asynchronous iterable with a put method to provide input for processing.

A new pipeline is usually constructed via the factory method AsyncPipeLine.from_stages(stage_func,...).

It has the same methods as an IterableAsyncQueue:

  • async put(item) to queue an item for processing
  • async close() to close the input, indicating end of the input items
  • iteration to consume the processed results

It also has the following methods:

  • async submit(AnyIterable) to submit multiple items for processing
  • async __call__(AnyIterable) to submit the iterable for processing and consume the results by iteration

Example:

def double(item):
    yield item
    yield item
pipeline = AsyncPipeLine.from_stages(
    double,
    double,
)
async for result in pipeline([1,2,3,4]):
    print(result)

AsyncPipeLine.__call__(self, it: Union[Iterable, AsyncIterable], fast=None): Call the pipeline with an iterable.

AsyncPipeLine.close(self): Close the input queue.

AsyncPipeLine.from_stages(*stage_specs, maxsize=0) -> Tuple[cs.naysync.IterableAsyncQueue, cs.naysync.IterableAsyncQueue]: Prepare an AsyncPipeLine from stage specifications. Return (inq,tasks,outq) 3-tuple being an input IterableAsyncQueue to receive items to process, a list of asyncio.Tasks per stage specification, and an output IterableAsyncQueue to produce results. If there are no stage_specs the 2 queues are the same queue.

Each stage specification is either:

  • an stage function suitable for run_stage
  • a 2-tuple of (stage_func,batchsize) In the latter case:
  • stage_func is an stage function suitable for run_stage
  • batchsize is an int, where 0 means to gather all the items from inq and supply them as a single batch to stage_func and where a value >0 collects items up to a limit of batchsize and supplies each batch to stage_func If the batchsize is 0 the stage_func is called exactly once with all the input items, even if there are no input items.

AsyncPipeLine.put(self, item): Put item onto the input queue.

AsyncPipeLine.run_stage(inq: cs.naysync.IterableAsyncQueue, stage_func, outq: cs.naysync.IterableAsyncQueue, batchsize: Union[int, NoneType, cs.naysync.StageMode] = None): Run a pipeline stage, copying items from inq to the stage_func and putting results onto outq. After processing, outq is closed.

stage_func is a callable which may be:

  • a sync or async generator which yields results to place onto outq
  • a sync or async function which returns a single result

If batchsize is None, the default, each input item is passed to stage_func(item), which yields the results from the single item.

If batchsize is an int, items from inq are collected into batches up to a limit of batchsize (no limit if batchsize is 0) and passed to stage_func(batch), which yields the results from the batch of items. If the batchsize is 0 the stage_func is called exactly once with all the input items, even if there are no input items.

AsyncPipeLine.submit(self, it: Union[Iterable, AsyncIterable], fast=None): Submit the items from it to the pipeline.

Class IterableAsyncQueue(asyncio.queues.Queue)

An iterable subclass of asyncio.Queue.

This modifies asyncio.Queue by:

  • adding a .close() async method
  • making the queue iterable, with each iteration consuming an item via .get()

IterableAsyncQueue.__anext__(self): Fetch the next item from the queue.

IterableAsyncQueue.close(self): Close the queue. It is not an error to close the queue more than once.

IterableAsyncQueue.get(self): We do not allow .get().

IterableAsyncQueue.get_nowat(self): We do not allow .get_nowait().

IterableAsyncQueue.put(self, item): Put item onto the queue.

IterableAsyncQueue.put_nowait(self, item): Put an item onto the queue without blocking.

Class StageMode(enum.StrEnum)

Special modes for AsyncPipeLine pipeline stages.

StageMode.__format__

StageMode.__str__

Release Log

Release 20250103:

  • @afunc now accepts an async function and returns it unchanged.
  • @agen now accepts an async generator and returns it unchanged.
  • async_iter: now accepts an async iterable, return aiter(it) of it directly.
  • New AnyIterable = Union[Iterable, AsyncIterable] type alias, to allow both sync and async iterators.
  • async_iter: new optional fast=False parameter, if true then iterate the iterator directly instead of via asyncio.to_thread.
  • async_iter: make missing fast= be True for list/tuple/set and False otherwise.
  • @afunc: new optional fast=False parameter - if true then do not divert through asyncio.to_thread.
  • New AsyncPipeLine, an asynchronous iterable with a put method to provide input for processing.
  • New StageMode class with a STREAM enum for streaming stages, implement in AsyncPipeLine.run_stage.
  • New aqget(Queue), an async interface to queue.Queue.get.
  • New aqiter(Queue[,sentinel]), an async generator yielding from a queue.Queue.

Release 20241221.1: Doc fix for amap().

Release 20241221:

  • Simpler implementation of @afunc.
  • Simplify implementation of @agen by using async_iter.
  • Docstring improvements.

Release 20241220:

  • New async_iter(Iterable) returning an asynchronous iterator of a synchronous iterable.
  • New amap(func,iterable) asynchronously mapping a function over an iterable.

Release 20241215:

  • @afunc: now uses asyncio.to_thread() instead of wrapping @agen, drop the decorator parameters since no queue or polling are now used.
  • @agen: nonpolling implementation - now uses asyncio.to_thread() for the next(genfunc) step, drop the decorator parameters since no queue or polling are now used.

Release 20241214.1: Doc update.

Release 20241214: Initial release with @agen and @afunc decorators.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cs_naysync-20250103.tar.gz (11.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cs_naysync-20250103-py3-none-any.whl (11.1 kB view details)

Uploaded Python 3

File details

Details for the file cs_naysync-20250103.tar.gz.

File metadata

  • Download URL: cs_naysync-20250103.tar.gz
  • Upload date:
  • Size: 11.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.12.7

File hashes

Hashes for cs_naysync-20250103.tar.gz
Algorithm Hash digest
SHA256 9c51be6bffb1603d09efd0223aa8833e1e3935dec978635755447842b09f8990
MD5 ad87c4df065612ac56bba7f8ea918c28
BLAKE2b-256 ffeb7d443029de92dc8c3c47d93c115461253f62f53a8eb0b814303c484c6e88

See more details on using hashes here.

File details

Details for the file cs_naysync-20250103-py3-none-any.whl.

File metadata

  • Download URL: cs_naysync-20250103-py3-none-any.whl
  • Upload date:
  • Size: 11.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.12.7

File hashes

Hashes for cs_naysync-20250103-py3-none-any.whl
Algorithm Hash digest
SHA256 cfc8d1c0994d4f606548bb1826baf5f98f8b802c912135528213ec8e8073a700
MD5 2f2ce74ea9208de0d17638b0027fe79f
BLAKE2b-256 9a99ee1a3c6318a40e415a4decfdccbcd09f91823cf3c519bff0e37963540765

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page