A simple library for running complex DAG of async tasks
Project description
async-dag
A simple library for running complex DAG of async tasks.
Use case example
Lets assume that you have the following task dependencies graph:
graph TD;
FastTask_A-->SlowTask_B;
SlowTask_B-->EndTask;
SlowTask_A-->FastTask_C;
FastTask_B-->FastTask_C;
FastTask_C-->EndTask;
The optimal way to run this flow would be:
- Run
FastTask_A,FastTask_B, andSlowTask_Aall at once, - as soon as
FastTask_Aends, start executingSlowTask_B - as soon as
SlowTask_AandFastTask_Bends, start executingFastTask_C - as soon as
SlowTask_BandFastTask_Cends, start executingEndTask
Creating this flow in code isn't trivial and require managing tasks manually, and from my experience most people miss the performance benefits of starting to execute SlowTask_B as soon as possible
(because it's just easy to gather(FastTask_A, SlowTask_A, FastTask_B)).
This library provides a simple interface for creating the optimal execution path for async tasks that build a DAG.
Code example
import asyncio
from async_dag import build_dag
async def inc_task(n: int, name: str, delay: float) -> int:
print(f"{name} task started...")
await asyncio.sleep(delay)
print(f"{name} task done!")
return n + 1
async def add_task(a: int, b: int, name: str, delay: float) -> int:
print(f"{name} task started...")
await asyncio.sleep(delay)
print(f"{name} task done!")
return a + b
# Define the DAG (O(|V| + |E|) operation, where V is the number of vertices (nodes) and E is the number of edges in the DAG)
with build_dag(int) as tm:
# Each node is made of an async function, and the parameters that will get passed to it at invoke time, a parameter can be either a value or another node.
# We are essentially creating a partially applied async function, just like `functools.partial`.
# tm.parameter_node is a spacial node that will get resolved into the invoke parameter (the value passed to `tm.invoke`)
# you can also pass an immediate value to the node as a constant that will be the same across all invocations
fast_task_a = tm.add_node(
inc_task,
tm.parameter_node,
"fast_task_a",
0.1,
)
# here we pass the result from fast_task_a as the n param to inc_task node
slow_task_b = tm.add_node(
inc_task,
fast_task_a,
"slow_task_b",
1,
)
slow_task_a = tm.add_node(
inc_task,
tm.parameter_node,
"slow_task_a",
0.5,
)
fast_task_b = tm.add_node(
inc_task,
tm.parameter_node,
"fast_task_b",
0.2,
)
fast_task_c = tm.add_node(
add_task,
slow_task_a,
fast_task_b,
"fast_task_c",
0.1,
)
end_task = tm.add_node(add_task, fast_task_c, slow_task_b, "end_task", 0.1)
# Invoke the DAG (O(|V|) operation , where V is the number of vertices (noes) in the DAG)
async def main() -> None:
# In order to execute our partially applied DAG we call `tm.invoke` and pass in the parameters, we can invoke the same DAG many times after we have fully built it.
# each run returns an `ExecutionResult` which can be used to extract the return value of each node by calling `extract_result` on the node.
# prints:
# fast_task_a task started...
# slow_task_a task started...
# fast_task_b task started...
# fast_task_a task done!
# slow_task_b task started...
# fast_task_b task done!
# slow_task_a task done!
# fast_task_c task started...
# fast_task_c task done!
# slow_task_b task done!
# end_task task started...
# end_task task done!
execution_result = await tm.invoke(0)
# we can extract each node return value
print(fast_task_a.extract_result(execution_result)) # 1
print(end_task.extract_result(execution_result)) # 4
# Here is what you would of needed to do without async-dag in order to achieve maximum parallelism
# Note that this is a simple example, each breach would cause us to create more step functions and each merge would require gather
async def builtin_alternative() -> None:
# Note that we need to create coroutines to handle each "branch" of our DAG
# Also note that we can't access any of the intermediate steps without explicitly returning them, unlike async-dag
async def _left_branch(param: int) -> int:
fast_task_a_res = await inc_task(param, "fast_task_a", 0.1)
return await inc_task(fast_task_a_res, "slow_task_b", 1)
async def _right_branch(param: int) -> int:
slow_task_a_res, fast_task_b_res = await asyncio.gather(
inc_task(param, "slow_task_a", 0.5), inc_task(param, "fast_task_b", 0.2)
)
return await add_task(slow_task_a_res, fast_task_b_res, "fast_task_c", 0.1)
async def _my_task(param: int) -> int:
left_branch_res, right_branch_res = await asyncio.gather(
_left_branch(param), _right_branch(param)
)
return await add_task(left_branch_res, right_branch_res, "end_task", 0.1)
# prints:
# fast_task_a task started...
# slow_task_a task started...
# fast_task_b task started...
# fast_task_a task done!
# slow_task_b task started...
# fast_task_b task done!
# slow_task_a task done!
# fast_task_c task started...
# fast_task_c task done!
# slow_task_b task done!
# end_task task started...
# end_task task done!
result = await _my_task(0)
print(result) # 4
if __name__ == "__main__":
asyncio.run(main())
asyncio.run(builtin_alternative())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file async_dag-0.2.2.tar.gz.
File metadata
- Download URL: async_dag-0.2.2.tar.gz
- Upload date:
- Size: 13.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.6.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a8617fbf6a73f800e658d6d9908a6782c5ca7913a68994884d0ad6c284d60283
|
|
| MD5 |
611ab4380edc622490292faf75c62a50
|
|
| BLAKE2b-256 |
b5c6a8e956ee708355707f50242c405d0794654abfa3a0a384b196be08e37c61
|
File details
Details for the file async_dag-0.2.2-py3-none-any.whl.
File metadata
- Download URL: async_dag-0.2.2-py3-none-any.whl
- Upload date:
- Size: 7.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.6.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
53171ce21992fbfa47d7a4965a17c4e381739fdfd21924b15bfb8aa087c85c1c
|
|
| MD5 |
3d48f7bfb57f908d316828e3fb69db06
|
|
| BLAKE2b-256 |
102624e24388d343e2e66d0ed4fe0b6253f819ee88a9d4406703af6989d17edd
|