A simple library for running complex DAG of async tasks while ensuring maximum possible parallelism
Project description
async-dag
A simple library for running complex DAG of async tasks while ensuring maximum possible parallelism.
Use case and example
Lets assume that you have the following task dependencies graph (each task is an async function that could take time to resolve, and some tasks may depends on each other):
graph TD;
FastTask_A-->SlowTask_B;
SlowTask_B-->EndTask;
SlowTask_A-->FastTask_C;
FastTask_B-->FastTask_C;
FastTask_C-->EndTask;
A naive way to write something like this would be:
await end_task(await slow_task_b(await fast_task_a()), await fast_task_c(await slow_task_a(), await fast_task_b()))
But that would be bad because we will miss a lot of opportunities to run tasks in parallel.
A better version would be:
fast_task_a_res, slow_task_a_res, fast_task_b_res = await asyncio.gather(fast_task_a(), slow_task_a(), fast_task_b())
await end_task(await slow_task_b(fast_task_a_res), await fast_task_c(slow_task_a_res, fast_task_b_res))
Where we run fast_task_a_res, slow_task_a_res, and fast_task_b_res in parallel.
but this is still suboptimal because we can start executing either slow_task_b once fast_task_a ends or fast_task_c once both slow_task_a and fast_task_b.
The optimal way to run this flow would be:
async def _left_branch():
return await slow_task_b(await fast_task_a())
async def _right_branch():
slow_task_a_res, fast_task_b_res = await asyncio.gather(
slow_task_a(), fast_task_b()
)
return await fast_task_c(slow_task_a_res, fast_task_b_res)
async def _end_node():
left_branch_res, right_branch_res = await asyncio.gather(
_left_branch(), _right_branch()
)
return await end_task(left_branch_res, right_branch_res)
await _end_node()
Which is very cumbersome and error prone to write by hand.
Using async_dag you can just write:
from async_dag import build_dag
# Define your DAG
with build_dag() as tm:
fast_task_a_node = tm.add_node(fast_task_a)
slow_task_b_node = tm.add_node(slow_task_b, fast_task_a_node)
slow_task_a_node = tm.add_node(slow_task_a)
fast_task_b_node = tm.add_node(fast_task_b)
fast_task_c_node = tm.add_node(fast_task_c, slow_task_a_node, fast_task_b_node)
end_task_node = tm.add_node(end_task, slow_task_b_node, fast_task_c_node)
# Execute your DAG
execution_result = await tm.invoke(None)
# Extract the return value of `end_task`
end_task_result = end_task_node.extract_result(execution_result)
And enjoy from maximum parallelism without the hassle.
Docs and more examples
We use Docstring in order to describe our API, the main concepts you need to know are:
- The TaskManager class.
- The add_node method on
TaskManager. - The parameter_node property of
TaskManager. - The sort method of
TaskManager. - The invoke method of
TaskManager. - The extract_result method of
TaskNode. - The build_dag helper function.
For a full example take a look at examples/readme.py.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file async_dag-0.3.1.tar.gz.
File metadata
- Download URL: async_dag-0.3.1.tar.gz
- Upload date:
- Size: 15.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.6.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
051b1be402cc6933eadc1f5b98c8dca0f70d77d1b21eab55b87179ea998fd457
|
|
| MD5 |
25673382f800df1d2ff5a7a57fa08c7e
|
|
| BLAKE2b-256 |
c34cd465599e4d4625dbcf569aa1981f3b1196c44864bd26814150227edd80bd
|
File details
Details for the file async_dag-0.3.1-py3-none-any.whl.
File metadata
- Download URL: async_dag-0.3.1-py3-none-any.whl
- Upload date:
- Size: 9.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.6.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5f543bea985dd42f4be7a425de5c21d3744f1833c469c2ae549f5522f0d905c6
|
|
| MD5 |
cce0efe6ad335106baa0e0d89678b1ae
|
|
| BLAKE2b-256 |
8d886489a4b20c4f46ae0d4106d34a47487313ad90b816e51474655bfb67ca7a
|