Skip to main content

A simple asynchronous task queue

Project description

aiotaskq

codecov build pylint

A simple asynchronous task queue

Motivation

Popular asynchronous worker library like Celery doesn't support asyncio and is hard to use for advanced usage. aiotaskq aims to help users compose tasks in a very native async-await manner.

Plus, it is also fully-typed for better productivity and correctness.

Give it a try and let us know if you like it. For questions or feedback feel to file issues on this repository.

Example Usage

Install aiotaskq

python -m pip install --upgrade pip
pip install aiotaskq

Define a simple app like the following:

tree .
.
└── app
    └── app.py

Where app.py contains the following:

import asyncio

import aiotaskq


@aiotaskq.task
def some_task(b: int) -> int:
    # Some task with high cpu usage
    def _naive_fib(n: int) -> int:
        if n <= 2:
            return 1
        return _naive_fib(n - 1) + _naive_fib(n - 2)
    return _naive_fib(b)


async def main():
    async_result = await some_task.apply_async(42)
    sync_result = some_task(42)
    assert async_result == sync_result
    print(f"sync_result == async_result == 165580141. Awesome!")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Start redis

docker run --publish 127.0.0.1:6379:6379 redis

In a different terminal, start the aiotaskq worker

python -m aiotaskq worker app.app

Then in another different terminal, run your app

python ./app.py
# Output: sync_result == async_result == 165580141. Awesome!

Advanced usage example

Let's say we want to compose a workflow where we want to break up some of the tasks and run them in parallel:

                    |-- task_2 --> |
                    |-- task_2 --> |     | task_3 --> |
START -> task_1 --> |-- task_2 --> | --> | task_3 --> | --> task_4 --> FINISH
                    |-- task_2 --> |     | task_3 --> |
                    |-- task_2 --> |

Using celery we might end up with this

from celery import Celery

app = Celery()


@app.task
def task_1(*args, **kwargs):
        pass


@app.task
def task_2(*args, **kwargs):
        pass


@app.task
def task_3(*args, **kwargs):
        pass


@app.task
def task_4(*args, **kwargs):
        pass


if __name__ == "__main__":
    step_1 = task_1.si(some_arg="a")
    step_2 = [task_2.si(some_arg=f"{i}") for i in range(5)]
    step_3 = [task_3.si(some_arg=f"{i}") for i in range(3)]
    step_4 = task_4.si(some_arg="b")
    workflow = chord(
        header=step_1,
        body=chord(
            header=step_2,
            body=chord(
                header=step_3,
                body=step_4,
            ),
        ),
    )
    output = workflow.apply_async().get()
    print(output)

Using aiotaskq we may end up with the following:

import asyncio

from aiotaskq import task


@task
def task_1(*args, **kwargs):
        pass


@task
def task_2(*args, **kwargs):
        pass


@task
def task_3(*args, **kwargs):
        pass


@task
def task_4(*args, **kwargs):
        pass


# So far the same as celery

# And now the workflow is just native python, and you're free
# to use any `asyncio` library of your choice to help with composing
# your workflow e.g. `trio` to handle more advanced scenarios like
# error propagation, task cancellation etc.
if __name__ == "__main__":
    step_1 = task_1.apply_async()
    step_2 = asyncio.gather(task_2.apply_async(arg=f"{i}" for i in range(5)))
    step_3 = asyncio.gather(task_3.apply_async(arg=f"{i}" for i in range(3)))
    step_4 = task_4.apply_async()
    workflow = [step_1, step_2, step_3, step_4]
    output = await asyncio.gather(workflow)
    print(output)

Install

pip install aiotaskq

Development

source ./activate.sh

Tests

In another terminal

./docker.sh

In the main terminal

source ./activate.sh
./test.sh

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiotaskq-0.0.12.tar.gz (14.2 kB view hashes)

Uploaded Source

Built Distribution

aiotaskq-0.0.12-py3-none-any.whl (15.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page