Skip to main content

A simple asynchronous task queue

Project description

aiotaskq

codecov build pylint

A simple asynchronous task queue

Motivation

Popular asynchronous worker library like Celery doesn't support async-await and is hard to use for advanced usage. aiotaskq aims to help users compose tasks in a very native async-await manner.

Plus, it is also fully-typed for better productivity and correctness.

Give it a try and let us know if you like it. For questions or feedback, feel free to file issues on this repository.

Sample codes

  1. Simple Django App

Example Usage

Install aiotaskq

python -m pip install --upgrade pip
pip install aiotaskq

Define a simple app like the following:

tree .
.
└── app
    └── app.py

Where app.py contains the following:

import asyncio

import aiotaskq


@aiotaskq.task()
def some_task(b: int) -> int:
    # Some task with high cpu usage
    def _naive_fib(n: int) -> int:
        if n <= 2:
            return 1
        return _naive_fib(n - 1) + _naive_fib(n - 2)
    return _naive_fib(b)


async def main():
    async_result = await some_task.apply_async(42)
    sync_result = some_task(42)
    assert async_result == sync_result
    print(f"sync_result == async_result == 165580141. Awesome!")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Start redis

docker run --publish 127.0.0.1:6379:6379 redis

In a different terminal, start the aiotaskq worker

python -m aiotaskq worker app.app

Then in another different terminal, run your app

python ./app.py
# Output: sync_result == async_result == 165580141. Awesome!

Advanced usage example

Let's say we want to compose a workflow where we want to break up some of the tasks and run them in parallel:

                    |-- task_2 --> |
                    |-- task_2 --> |     | task_3 --> |
START -> task_1 --> |-- task_2 --> | --> | task_3 --> | --> task_4 --> FINISH
                    |-- task_2 --> |     | task_3 --> |
                    |-- task_2 --> |

Using celery we might end up with this

from celery import Celery

app = Celery()


@app.task
def task_1(*args, **kwargs):
        pass


@app.task
def task_2(*args, **kwargs):
        pass


@app.task
def task_3(*args, **kwargs):
        pass


@app.task
def task_4(*args, **kwargs):
        pass


if __name__ == "__main__":
    step_1 = task_1.si(some_arg="a")
    step_2 = [task_2.si(some_arg=f"{i}") for i in range(5)]
    step_3 = [task_3.si(some_arg=f"{i}") for i in range(3)]
    step_4 = task_4.si(some_arg="b")
    workflow = chord(
        header=step_1,
        body=chord(
            header=step_2,
            body=chord(
                header=step_3,
                body=step_4,
            ),
        ),
    )
    output = workflow.apply_async().get()
    print(output)

Using aiotaskq we may end up with the following:

import asyncio

from aiotaskq import task


@task()
def task_1(*args, **kwargs):
        pass


@task()
def task_2(*args, **kwargs):
        pass


@task()
def task_3(*args, **kwargs):
        pass


@task()
def task_4(*args, **kwargs):
        pass


# So far the same as celery

# And now the workflow is just native python, and you're free
# to use any `asyncio` library of your choice to help with composing
# your workflow e.g. `trio` to handle more advanced scenarios like
# error propagation, task cancellation etc.
if __name__ == "__main__":
    step_1 = task_1.apply_async()
    step_2 = asyncio.gather(task_2.apply_async(arg=f"{i}" for i in range(5)))
    step_3 = asyncio.gather(task_3.apply_async(arg=f"{i}" for i in range(3)))
    step_4 = task_4.apply_async()
    workflow = [step_1, step_2, step_3, step_4]
    output = await asyncio.gather(workflow)
    print(output)

Install

pip install aiotaskq

Development

source ./activate.sh

Tests

In another terminal

./docker.sh

In the main terminal

source ./activate.sh
./test.sh

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiotaskq-0.0.17.tar.gz (18.9 kB view details)

Uploaded Source

Built Distribution

aiotaskq-0.0.17-py3-none-any.whl (21.2 kB view details)

Uploaded Python 3

File details

Details for the file aiotaskq-0.0.17.tar.gz.

File metadata

  • Download URL: aiotaskq-0.0.17.tar.gz
  • Upload date:
  • Size: 18.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.17

File hashes

Hashes for aiotaskq-0.0.17.tar.gz
Algorithm Hash digest
SHA256 fb2fb5f01f7da2ab694c6bbf90344fa48903cd1e944d3240ece0ee386430e00d
MD5 57ac45acf1532d19111749d32a3a2659
BLAKE2b-256 78bb86eb47570b69b7e204f9f6e2e7431ca6e0bb9f7c22df3d9ab0f9bc73d2a7

See more details on using hashes here.

File details

Details for the file aiotaskq-0.0.17-py3-none-any.whl.

File metadata

  • Download URL: aiotaskq-0.0.17-py3-none-any.whl
  • Upload date:
  • Size: 21.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.17

File hashes

Hashes for aiotaskq-0.0.17-py3-none-any.whl
Algorithm Hash digest
SHA256 803982a7129adf06d8bb5bda91ed819bd7a9486ccb7355e02897bf96853a55dd
MD5 5e45cbc0e9a79a7e4472c293c6ba984e
BLAKE2b-256 aec9e1bc6748b55e7df40241054d4f8b31a3bca686409cff9f0f25fb2918dd3d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page