Skip to main content

Async QUeue Task Engine

Project description

Aqute

Async QUeue Task Engine

Aqute is a minimalist yet potent Python library specifically designed for hassle-free asynchronous task processing. Leveraging the power of async programming, Aqute offers:

  • Efficient Producer-Consumer Model: Utilizing the Producer-Consumer pattern with multiple workers, Aqute ensures streamlined task distribution and swift concurrent processing.
  • Worker count & Rate Limiting: Regulate the execution rate of tasks and configure the number of workers for concurrent processing, ensuring optimal resource utilization. You can even provide your own rate limiting mechanism
  • Resilient Retry Mechanism: Tasks that encounter errors can automatically retry, with options to specify which error types should trigger retries. Exception in handler is returned as error-value.
  • Versatile task adding: You can process the whole batch or add tasks on the fly, depending on your needs.
  • Lightweight and simple: Aqute operates efficiently without relying on any external dependencies, ensuring seamless integration and minimal footprint in your projects.

Aqute simplifies task management in asynchronous landscapes, allowing developers to focus on the task logic rather than concurrency challenges.

Table of Contents

Install

Python 3.9+ required:

pip install aqute

Quickstart

Apply your async function to each item of some iterable and get list of wrapped in AquteTask results, ordered the same way:

import asyncio
from aqute import Aqute

async def main():
    async def handler(i: int) -> int:
        await asyncio.sleep(i / 20)
        return i * 2

    aqute = Aqute(handle_coro=handler, workers_count=2)
    result = await aqute.apply_to_all(range(10))
    # Do not forget to extract result data from wrapper object with <result> property
    assert [t.result for t in result] == [i * 2 for i in range(10)]

asyncio.run(main())

How to use it?

While a deep dive is available through Aqute's method docstrings, it's not necessary.

Aqute is easy to use for both simple and advanced workflows.

Simple batch processing

The easiest way to use Aqute is apply_to_all() method:

import asyncio
import logging
from random import randint, random

from aqute.engine import Aqute, AquteError


logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(name)s - %(message)s')

logger = logging.getLogger("main")

async def handler(i: int) -> str:
    """
    NOTE: This is a mock handler for demonstration purposes.
    Replace the logic below with your own specific task processing logic.
    """
    await asyncio.sleep(0.01 + (0.09) * random())
    
    # Guaranties failures for some tasks of examples
    if i >= 19:
        raise KeyError(f"The Key for {i}")
    
    # Here we have some randomness, so you can see retry after errors in play
    r = randint(1, 101)
    if r >= 80:
        raise ValueError(f"Got big number for {i}")

    return f"success {i}"

async def main():
    # Getting started example, the most simple
    input_data = list(range(20))

    # This will apply handler to every item of iterable and return result as list
    # with task results ordered as input iterable
    aqute = Aqute(handle_coro=handler, workers_count=10)
    result = await aqute.apply_to_all(input_data)
    # Each task result is wrapped in AquteTask instance
    assert [t.data for t in result] == input_data

    ...


asyncio.run(main())

Result as async generator per completed

    # Like previous but the result is async generator and the tasks are yielded
    # in completition order
    input_data = list(range(20))
    aqute = Aqute(handle_coro=handler, workers_count=10)

    done, with_errors = [], []
    # You can determine final task status with specific success field
    async for task in aqute.apply_to_each(input_data):
        if task.success:
            done.append(task)
        else:
            with_errors.append(task)

    assert len(done + with_errors) == len(input_data)

Rate limiting

You can also add RateLimiter instance to Aqute for rate limiting:

    from aqute.ratelimiter import TokenBucketRateLimiter

    # Applied rate limiter with 5 handler calls per second.
    input_data = list(range(20))
    r_limit = TokenBucketRateLimiter(5, 1)
    aqute = Aqute(handle_coro=handler, workers_count=10, rate_limiter=r_limit)
    result = []
    async for task in aqute.apply_to_each(input_data):
        result.append(task)

    assert len(result) == len(input_data)

There are two avaliable RateLimiter implementations:

  • TokenBucketRateLimiter: steady rate by default, burstable with allow_burst option;
  • SlidingRateLimiter: next call will be avaliable after enough time from the oldest one;

You can write your own RateLimiter implementation with specific algorithm if needed.

Manual task adding, context manager and error retry

This can be most useful if not all of your tasks are avaliable at the start:

    # You can add tasks manually and also start/stop aqute with context
    # manager. And even add tasks on the fly.
    # Aqute is reliable for errors retry by default, you can specify your own
    # retry count (and use 0 for no retries) and specify errors to retry or not
    # to keep retrying on all errors
    aqute = Aqute(
        handle_coro=handler,
        workers_count=10,
        # We we will retry 5 more times after first fail
        retry_count=5,
        # We retry only ValueError here
        specific_errors_to_retry=(ValueError,)
    )
    for i in range(10):
        # You also can use your own task id for identification
        await aqute.add_task(i, task_id=f"My task id: {i}")


    async with aqute:
        await asyncio.sleep(0.1)
        for i in range(10, 15):
            await aqute.add_task(i, task_id=f"My task id: {i}")

        await asyncio.sleep(0.1)
        for i in range(15, 20):
            await aqute.add_task(i, task_id=f"My task id: {i}")

        # Set waiting for finalization when you have all tasks added
        await aqute.wait_till_end()

    # You can simply extract all results from queue with this method if aqute has 
    # finished, returns the list of AquteTask
    for tr in aqute.extract_all_results():
        logger.info(f"{tr.success, tr.error, tr.result}")

Manual flow management and custom result queue

    # You can manage the whole workflow manually if needed and use your own
    # result queue instance (with limited size for example)
    result_q = asyncio.Queue(5)

    aqute = Aqute(handle_coro=handler, workers_count=10, result_queue=result_q)
    for i in range(10):
        # We can't await here cause we will hang without queue emptying
        asyncio.create_task(aqute.add_task(i))
    await asyncio.sleep(0.1)

    # Starting the processing
    aqute.start()
    # Sleep enough for possibly all task to finish
    await asyncio.sleep(1)

    # We can see our result sizing works
    assert result_q.qsize() == 5
    for _ in range(5):
        await result_q.get()

    # Now wait till all finished via speicific method, this also notifies
    # aqute that we have added all tasks
    await aqute.wait_till_end()
    assert result_q.qsize() == 5
    # Stop the aqute
    await aqute.stop()

Even more manual management and internal worker queue size

    # You can configure internal queue size for consumers if you want it to be limited
    aqute = Aqute(
        handle_coro=handler, workers_count=10, input_task_queue_size=2
    )
    for i in range(10):
        await aqute.add_task(i)
    # Should set it before awaiting bare start() if we want
    aqute.set_all_tasks_added()

    aqute_run_aiotask = aqute.start()
    await aqute_run_aiotask
    await aqute.stop()

    assert aqute.result_queue.qsize() == 10

Barebone queue via Foreman

If you don't need auto retry and helpers you can use Foreman for bare flow, but still with rate limiting support:

import asyncio
from random import random

from aqute.worker import Foreman
from aqute.ratelimiter import TokenBucketRateLimiter

async def handler(i: int) -> str:
    await asyncio.sleep(0.01 + (0.09) * random())
    return f"Success {i}"

async def main():
    # These are the supported options for Foreman
    foreman = Foreman(
        handle_coro=handler,
        workers_count=10,
        rate_limiter=TokenBucketRateLimiter(5, 1),
        input_task_queue_size=100,
    )
    for i in range(20):
        await foreman.add_task(AquteTask(i, f"{i}"))

    foreman.start()

    result = []
    for _ in range(20):
        # Be aware that status and retries are not relevant here
        # But you can check the error field of output
        r = await foreman.get_handeled_task()
        assert r.error is None
        logger.info(r.result)
        result.append(r)

    # Do not finalize before result extraction
    await foreman.finalize()

Some caveats

Start load timeout

If no tasks will be provided in the configurable timeout (60 seconds by default), Aqute will fail:

    try:
        async with Aqute(
            handle_coro=handler,
            workers_count=10,
            start_timeout_seconds=1,
        ) as aqute:
            await asyncio.sleep(1.2)
    except AquteError as exc:
        logger.error(f"Aqute timeouted: {exc}")

You can't wait on not started Aqute

    # 
    aqute = Aqute(handle_coro=handler, workers_count=10)

    try:
        await aqute.wait_till_end()
    except AquteError as exc:
        logger.error(f"Aqute cannot be waited here: {exc}")

Misc

Instance reuse after stop()

    # You can reuse same aqute instance after proper stop() call
    aqute = Aqute(handle_coro=handler,workers_count=5)
    async with aqute:
        for i in range(10):
            await aqute.add_task(i)
        await aqute.wait_till_end()

    async with aqute:
        for i in range(10, 20):
            await aqute.add_task(i)
        await aqute.wait_till_end()

    assert aqute.result_queue.qsize() == 20

Type checking and generics

You should get error during type check if you would try to use wrong type with Aqute methods (types are indered based on your provided handler):

from aqute import Aqute

async def handler(i: int) -> str:
    return f"success {i}"


async def main() -> None:
    aqute = Aqute(
        handle_coro=handler,
        workers_count=10
    )
    # Mypy error: error: Argument 1 to "add_task" of "Aqute" has incompatible type "str"; expected "int"  [arg-type]
    await aqute.add_task("10") 

You can also provide the expected types of in/out via generics mechanism:

from aqute import Aqute

async def handler(i: int) -> str:
    return f"success {i}"


async def main() -> None:
    # Mypy error: Argument "handle_coro" to "Aqute" has incompatible type "Callable[[int], Coroutine[Any, Any, str]]"; expected "Callable[[int], Coroutine[Any, Any, int]]"  [arg-type]
    aqute = Aqute[int, int](
        handle_coro=handler,
        workers_count=10
    )

    await aqute.add_task(123)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aqute-0.3.0.tar.gz (15.2 kB view details)

Uploaded Source

Built Distribution

aqute-0.3.0-py3-none-any.whl (13.7 kB view details)

Uploaded Python 3

File details

Details for the file aqute-0.3.0.tar.gz.

File metadata

  • Download URL: aqute-0.3.0.tar.gz
  • Upload date:
  • Size: 15.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/4.0.2 CPython/3.11.6

File hashes

Hashes for aqute-0.3.0.tar.gz
Algorithm Hash digest
SHA256 747b98c4bf3ebcead056678811537573ea83bac1f492260d5a91bea134d1e068
MD5 041a9ab1a9c385ecee09c33adf2b91d5
BLAKE2b-256 d2728060202877cf669b87845e3cedcbb0d4103673ae66d799ded2c617129760

See more details on using hashes here.

File details

Details for the file aqute-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: aqute-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 13.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/4.0.2 CPython/3.11.6

File hashes

Hashes for aqute-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6efea18f30cef20f699f4fa0855ff771cd77c808cb1f63362fb7bcfb7e79fb8f
MD5 c7d928a86ee045280c56a3b57943e7ed
BLAKE2b-256 684af4552f295c0de69b01c6af25034ad1ff7f48e8104bbf29fab6cbdd679227

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page