Skip to main content

A simple asynchronous task queue

Project description

aiotaskq

codecov build pylint

A simple asynchronous task queue

Motivation

Popular asynchronous worker library like Celery doesn't support asyncio and is hard to use for advanced usage. aiotaskq aims to help users compose tasks in a very native async-await manner.

Plus, it is also fully-typed for better productivity and correctness.

Give it a try and let us know if you like it. For questions or feedback feel to file issues on this repository.

Example Usage

Install aiotaskq

python -m pip install --upgrade pip
pip install aiotaskq

Define a simple app like the following:

tree .
.
└── app
    └── app.py

Where app.py contains the following:

import asyncio

import aiotaskq


@aiotaskq.task()
def some_task(b: int) -> int:
    # Some task with high cpu usage
    def _naive_fib(n: int) -> int:
        if n <= 2:
            return 1
        return _naive_fib(n - 1) + _naive_fib(n - 2)
    return _naive_fib(b)


async def main():
    async_result = await some_task.apply_async(42)
    sync_result = some_task(42)
    assert async_result == sync_result
    print(f"sync_result == async_result == 165580141. Awesome!")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Start redis

docker run --publish 127.0.0.1:6379:6379 redis

In a different terminal, start the aiotaskq worker

python -m aiotaskq worker app.app

Then in another different terminal, run your app

python ./app.py
# Output: sync_result == async_result == 165580141. Awesome!

Advanced usage example

Let's say we want to compose a workflow where we want to break up some of the tasks and run them in parallel:

                    |-- task_2 --> |
                    |-- task_2 --> |     | task_3 --> |
START -> task_1 --> |-- task_2 --> | --> | task_3 --> | --> task_4 --> FINISH
                    |-- task_2 --> |     | task_3 --> |
                    |-- task_2 --> |

Using celery we might end up with this

from celery import Celery

app = Celery()


@app.task
def task_1(*args, **kwargs):
        pass


@app.task
def task_2(*args, **kwargs):
        pass


@app.task
def task_3(*args, **kwargs):
        pass


@app.task
def task_4(*args, **kwargs):
        pass


if __name__ == "__main__":
    step_1 = task_1.si(some_arg="a")
    step_2 = [task_2.si(some_arg=f"{i}") for i in range(5)]
    step_3 = [task_3.si(some_arg=f"{i}") for i in range(3)]
    step_4 = task_4.si(some_arg="b")
    workflow = chord(
        header=step_1,
        body=chord(
            header=step_2,
            body=chord(
                header=step_3,
                body=step_4,
            ),
        ),
    )
    output = workflow.apply_async().get()
    print(output)

Using aiotaskq we may end up with the following:

import asyncio

from aiotaskq import task


@task()
def task_1(*args, **kwargs):
        pass


@task()
def task_2(*args, **kwargs):
        pass


@task()
def task_3(*args, **kwargs):
        pass


@task()
def task_4(*args, **kwargs):
        pass


# So far the same as celery

# And now the workflow is just native python, and you're free
# to use any `asyncio` library of your choice to help with composing
# your workflow e.g. `trio` to handle more advanced scenarios like
# error propagation, task cancellation etc.
if __name__ == "__main__":
    step_1 = task_1.apply_async()
    step_2 = asyncio.gather(task_2.apply_async(arg=f"{i}" for i in range(5)))
    step_3 = asyncio.gather(task_3.apply_async(arg=f"{i}" for i in range(3)))
    step_4 = task_4.apply_async()
    workflow = [step_1, step_2, step_3, step_4]
    output = await asyncio.gather(workflow)
    print(output)

Install

pip install aiotaskq

Development

source ./activate.sh

Tests

In another terminal

./docker.sh

In the main terminal

source ./activate.sh
./test.sh

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiotaskq-0.0.13.tar.gz (17.7 kB view details)

Uploaded Source

Built Distribution

aiotaskq-0.0.13-py3-none-any.whl (19.8 kB view details)

Uploaded Python 3

File details

Details for the file aiotaskq-0.0.13.tar.gz.

File metadata

  • Download URL: aiotaskq-0.0.13.tar.gz
  • Upload date:
  • Size: 17.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.10.14

File hashes

Hashes for aiotaskq-0.0.13.tar.gz
Algorithm Hash digest
SHA256 45088a1ceb6c6cf28a5045b2c6d628f7df5d3cfe47226aad383971410a1df554
MD5 90b4dff50b02f27097906ac06afcfaee
BLAKE2b-256 e7a9d32ce8270de1a0bffa2b580048fd1ef709b9971ef87dcf49bcc5a31309d2

See more details on using hashes here.

File details

Details for the file aiotaskq-0.0.13-py3-none-any.whl.

File metadata

  • Download URL: aiotaskq-0.0.13-py3-none-any.whl
  • Upload date:
  • Size: 19.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.10.14

File hashes

Hashes for aiotaskq-0.0.13-py3-none-any.whl
Algorithm Hash digest
SHA256 94cb62c417563cfb860a4ab1913fbcf5c881ae2841d871d44be74590b4b8065b
MD5 c492dc8193d4e73c8f295efa3ac71760
BLAKE2b-256 1f3e409d168715d019443611d09e26d6ad8f9c815a614624f0d6985dacabb638

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page