A simple asynchronous task queue
Project description
aiotaskq
A simple asynchronous task queue
Motivation
Popular asynchronous worker library like Celery doesn't support asyncio and is hard to use for advanced usage. aiotaskq
aims to help users compose tasks in a very native async-await manner.
Plus, it is also fully-typed for better productivity and correctness.
Give it a try and let us know if you like it. For questions or feedback feel to file issues on this repository.
Example Usage
Install aiotaskq
python -m pip install --upgrade pip
pip install aiotaskq
Define a simple app like the following:
tree .
.
└── app
└── app.py
Where app.py
contains the following:
import asyncio
import aiotaskq
@aiotaskq.task()
def some_task(b: int) -> int:
# Some task with high cpu usage
def _naive_fib(n: int) -> int:
if n <= 2:
return 1
return _naive_fib(n - 1) + _naive_fib(n - 2)
return _naive_fib(b)
async def main():
async_result = await some_task.apply_async(42)
sync_result = some_task(42)
assert async_result == sync_result
print(f"sync_result == async_result == 165580141. Awesome!")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Start redis
docker run --publish 127.0.0.1:6379:6379 redis
In a different terminal, start the aiotaskq worker
python -m aiotaskq worker app.app
Then in another different terminal, run your app
python ./app.py
# Output: sync_result == async_result == 165580141. Awesome!
Advanced usage example
Let's say we want to compose a workflow where we want to break up some of the tasks and run them in parallel:
|-- task_2 --> |
|-- task_2 --> | | task_3 --> |
START -> task_1 --> |-- task_2 --> | --> | task_3 --> | --> task_4 --> FINISH
|-- task_2 --> | | task_3 --> |
|-- task_2 --> |
Using celery
we might end up with this
from celery import Celery
app = Celery()
@app.task
def task_1(*args, **kwargs):
pass
@app.task
def task_2(*args, **kwargs):
pass
@app.task
def task_3(*args, **kwargs):
pass
@app.task
def task_4(*args, **kwargs):
pass
if __name__ == "__main__":
step_1 = task_1.si(some_arg="a")
step_2 = [task_2.si(some_arg=f"{i}") for i in range(5)]
step_3 = [task_3.si(some_arg=f"{i}") for i in range(3)]
step_4 = task_4.si(some_arg="b")
workflow = chord(
header=step_1,
body=chord(
header=step_2,
body=chord(
header=step_3,
body=step_4,
),
),
)
output = workflow.apply_async().get()
print(output)
Using aiotaskq
we may end up with the following:
import asyncio
from aiotaskq import task
@task()
def task_1(*args, **kwargs):
pass
@task()
def task_2(*args, **kwargs):
pass
@task()
def task_3(*args, **kwargs):
pass
@task()
def task_4(*args, **kwargs):
pass
# So far the same as celery
# And now the workflow is just native python, and you're free
# to use any `asyncio` library of your choice to help with composing
# your workflow e.g. `trio` to handle more advanced scenarios like
# error propagation, task cancellation etc.
if __name__ == "__main__":
step_1 = task_1.apply_async()
step_2 = asyncio.gather(task_2.apply_async(arg=f"{i}" for i in range(5)))
step_3 = asyncio.gather(task_3.apply_async(arg=f"{i}" for i in range(3)))
step_4 = task_4.apply_async()
workflow = [step_1, step_2, step_3, step_4]
output = await asyncio.gather(workflow)
print(output)
Install
pip install aiotaskq
Development
source ./activate.sh
Tests
In another terminal
./docker.sh
In the main terminal
source ./activate.sh
./test.sh
Links
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file aiotaskq-0.0.13.tar.gz
.
File metadata
- Download URL: aiotaskq-0.0.13.tar.gz
- Upload date:
- Size: 17.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.10.14
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 45088a1ceb6c6cf28a5045b2c6d628f7df5d3cfe47226aad383971410a1df554 |
|
MD5 | 90b4dff50b02f27097906ac06afcfaee |
|
BLAKE2b-256 | e7a9d32ce8270de1a0bffa2b580048fd1ef709b9971ef87dcf49bcc5a31309d2 |
File details
Details for the file aiotaskq-0.0.13-py3-none-any.whl
.
File metadata
- Download URL: aiotaskq-0.0.13-py3-none-any.whl
- Upload date:
- Size: 19.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.10.14
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 94cb62c417563cfb860a4ab1913fbcf5c881ae2841d871d44be74590b4b8065b |
|
MD5 | c492dc8193d4e73c8f295efa3ac71760 |
|
BLAKE2b-256 | 1f3e409d168715d019443611d09e26d6ad8f9c815a614624f0d6985dacabb638 |