A simple asynchronous task queue
Project description
aiotaskq
A simple asynchronous task queue
Motivation
Popular asynchronous worker library like Celery doesn't support async-await and is hard to use for advanced usage. aiotaskq aims to help users compose tasks in a very native async-await manner.
Plus, it is also fully-typed for better productivity and correctness.
Give it a try and let us know if you like it. For questions or feedback, feel free to file issues on this repository.
Sample codes
Example Usage
Install aiotaskq
python -m pip install --upgrade pip
pip install aiotaskq
Define a simple app like the following:
tree .
.
└── app
└── app.py
Where app.py contains the following:
import asyncio
import aiotaskq
@aiotaskq.task()
def some_task(b: int) -> int:
# Some task with high cpu usage
def _naive_fib(n: int) -> int:
if n <= 2:
return 1
return _naive_fib(n - 1) + _naive_fib(n - 2)
return _naive_fib(b)
async def main():
async_result = await some_task.apply_async(42)
sync_result = some_task(42)
assert async_result == sync_result
print(f"sync_result == async_result == 165580141. Awesome!")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Start redis
docker run --publish 127.0.0.1:6379:6379 redis
In a different terminal, start the aiotaskq worker
python -m aiotaskq worker app.app
Then in another different terminal, run your app
python ./app.py
# Output: sync_result == async_result == 165580141. Awesome!
Advanced usage example
Let's say we want to compose a workflow where we want to break up some of the tasks and run them in parallel:
|-- task_2 --> |
|-- task_2 --> | | task_3 --> |
START -> task_1 --> |-- task_2 --> | --> | task_3 --> | --> task_4 --> FINISH
|-- task_2 --> | | task_3 --> |
|-- task_2 --> |
Using celery we might end up with this
from celery import Celery
app = Celery()
@app.task
def task_1(*args, **kwargs):
pass
@app.task
def task_2(*args, **kwargs):
pass
@app.task
def task_3(*args, **kwargs):
pass
@app.task
def task_4(*args, **kwargs):
pass
if __name__ == "__main__":
step_1 = task_1.si(some_arg="a")
step_2 = [task_2.si(some_arg=f"{i}") for i in range(5)]
step_3 = [task_3.si(some_arg=f"{i}") for i in range(3)]
step_4 = task_4.si(some_arg="b")
workflow = chord(
header=step_1,
body=chord(
header=step_2,
body=chord(
header=step_3,
body=step_4,
),
),
)
output = workflow.apply_async().get()
print(output)
Using aiotaskq we may end up with the following:
import asyncio
from aiotaskq import task
@task()
def task_1(*args, **kwargs):
pass
@task()
def task_2(*args, **kwargs):
pass
@task()
def task_3(*args, **kwargs):
pass
@task()
def task_4(*args, **kwargs):
pass
# So far the same as celery
# And now the workflow is just native python, and you're free
# to use any `asyncio` library of your choice to help with composing
# your workflow e.g. `trio` to handle more advanced scenarios like
# error propagation, task cancellation etc.
if __name__ == "__main__":
step_1 = task_1.apply_async()
step_2 = asyncio.gather(task_2.apply_async(arg=f"{i}" for i in range(5)))
step_3 = asyncio.gather(task_3.apply_async(arg=f"{i}" for i in range(3)))
step_4 = task_4.apply_async()
workflow = [step_1, step_2, step_3, step_4]
output = await asyncio.gather(workflow)
print(output)
Install
pip install aiotaskq
Development
source ./activate.sh
Tests
In another terminal
./docker.sh
In the main terminal
source ./activate.sh
./test.sh
Links
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aiotaskq-0.0.17.tar.gz.
File metadata
- Download URL: aiotaskq-0.0.17.tar.gz
- Upload date:
- Size: 18.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.10.17
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fb2fb5f01f7da2ab694c6bbf90344fa48903cd1e944d3240ece0ee386430e00d
|
|
| MD5 |
57ac45acf1532d19111749d32a3a2659
|
|
| BLAKE2b-256 |
78bb86eb47570b69b7e204f9f6e2e7431ca6e0bb9f7c22df3d9ab0f9bc73d2a7
|
File details
Details for the file aiotaskq-0.0.17-py3-none-any.whl.
File metadata
- Download URL: aiotaskq-0.0.17-py3-none-any.whl
- Upload date:
- Size: 21.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.10.17
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
803982a7129adf06d8bb5bda91ed819bd7a9486ccb7355e02897bf96853a55dd
|
|
| MD5 |
5e45cbc0e9a79a7e4472c293c6ba984e
|
|
| BLAKE2b-256 |
aec9e1bc6748b55e7df40241054d4f8b31a3bca686409cff9f0f25fb2918dd3d
|