A simple library for running complex DAG of async tasks
Project description
async-dag
A simple library for running complex DAG of async tasks.
Use case example
Lets assume that you have the following task dependencies graph:
graph TD;
FastTask_A-->SlowTask_B;
SlowTask_B-->EndTask;
SlowTask_A-->FastTask_C;
FastTask_B-->FastTask_C;
FastTask_C-->EndTask;
The optimal way to run this flow would be:
- Run
FastTask_A,FastTask_B, andSlowTask_Aall at once, - as soon as
FastTask_Aends, start executingSlowTask_B - as soon as
SlowTask_AandFastTask_Bends, start executingFastTask_C - as soon as
SlowTask_BandFastTask_Cends, start executingEndTask
Creating this flow in code isn't trivial and require managing tasks manually, and from my experience most people miss the performance benefits of starting to execute SlowTask_B as soon as possible
(because it's just easy to gather(FastTask_A, SlowTask_A, FastTask_B)).
This library provides a simple interface for creating the optimal execution path for async tasks that build a DAG.
Code example
import asyncio
from async_dag import build_dag
async def inc_task(name: str, delay: float, n: int) -> int:
print(f"{name} task started...")
await asyncio.sleep(delay)
print(f"{name} task done!")
return n + 1
async def add_task(name: str, delay: float, a: int, b: int) -> int:
print(f"{name} task started...")
await asyncio.sleep(delay)
print(f"{name} task done!")
return a + b
# Define the DAG
with build_dag(int) as tm:
# tm.add_immediate_node(...) defines a graph node that resolve immediately and returns its value, this is useful for passing constants to tasks
# if the last parameter is emitted and matches the type passed into `build_dag` the invoke parameter(the value passed to `tm.invoke`) will be passed automatically
fast_task_a = tm.add_node(
inc_task, tm.add_immediate_node("fast_task_a"), tm.add_immediate_node(0.1)
)
# here we pass the result from fast_task_a as the n param to inc_task node
slow_task_b = tm.add_node(
inc_task,
tm.add_immediate_node("slow_task_b"),
tm.add_immediate_node(1),
fast_task_a,
)
slow_task_a = tm.add_node(
inc_task, tm.add_immediate_node("slow_task_a"), tm.add_immediate_node(0.5)
)
fast_task_b = tm.add_node(
inc_task, tm.add_immediate_node("fast_task_b"), tm.add_immediate_node(0.1)
)
fast_task_c = tm.add_node(
add_task,
tm.add_immediate_node("fast_task_c"),
tm.add_immediate_node(0.1),
slow_task_a,
fast_task_b,
)
end_task = tm.add_node(
add_task,
tm.add_immediate_node("end_task"),
tm.add_immediate_node(0.1),
fast_task_c,
slow_task_b,
)
# Invoke the DAG
async def main():
# prints:
# fast_task_a task started...
# slow_task_a task started...
# fast_task_b task started...
# fast_task_a task done!
# fast_task_b task done!
# slow_task_b task started...
# slow_task_a task done!
# fast_task_c task started...
# fast_task_c task done!
# slow_task_b task done!
# end_task task started...
# end_task task done!
execution_result = await tm.invoke(0)
# we can extract each node return value
print(fast_task_a.extract_result(execution_result)) # 1
print(end_task.extract_result(execution_result)) # 4
if __name__ == "__main__":
asyncio.run(main())
Another code example
import asyncio
from dataclasses import dataclass
from datetime import datetime
from async_dag import build_dag
@dataclass
class Event:
timestamp: datetime
location: str
class DatabaseClient:
async def insert(self, event: Event) -> bool:
# simulate async access to the database
await asyncio.sleep(0.5)
return True
class HttpClient:
async def fetch(self, url: str) -> Event:
# simulate async http request
await asyncio.sleep(0.5)
return Event(timestamp=datetime.now(), location=url)
async def publish_logs(self, results: list[bool]) -> None:
# simulate async http request
await asyncio.sleep(0.5)
@dataclass
class Parameters:
http_client: HttpClient
db_client: DatabaseClient
allowed_locations: str
async def fetch_event(url: str, params: Parameters) -> Event:
# NOTE: we have access to the invoke params, http client for example
return await params.http_client.fetch(url)
async def insert_to_db(event: Event, params: Parameters) -> bool:
if event.location != params.allowed_locations:
return False
return await params.db_client.insert(event)
async def publish_results(result_1: bool, result_2: bool, params: Parameters) -> None:
await params.http_client.publish_logs([result_1, result_2])
# NOTE: we don't have to request receive the Parameters argument, we can also request nodes that are not in the last batch
async def logger(
event_1: Event, result_1: bool, event_2: Event, result_2: bool
) -> None:
print(event_1, result_1, event_2, result_2)
with build_dag(Parameters) as tm:
moon_url = tm.add_immediate_node("moon")
moon_event = tm.add_node(fetch_event, moon_url)
moon_insert = tm.add_node(insert_to_db, moon_event)
sun_url = tm.add_immediate_node("sun")
sun_event = tm.add_node(fetch_event, sun_url)
sun_insert = tm.add_node(insert_to_db, sun_event)
tm.add_node(publish_results, moon_insert, sun_insert)
tm.add_node(logger, moon_event, moon_insert, sun_event, sun_insert)
async def main():
http_client = HttpClient()
db_client = DatabaseClient()
# prints due to logger
# Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 55, 498349), location='moon') True Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 55, 498361), location='sun') False
first_result = await tm.invoke(Parameters(http_client, db_client, "moon"))
# Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 55, 498349), location='moon')
# NOTE: the result of each node using the ExecutionResult object
print(moon_event.extract_result(first_result))
# True
print(moon_insert.extract_result(first_result))
# prints due to logger
# Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 57, 48707), location='moon') False Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 57, 48717), location='sun') True
# NOTE: we can use the same TaskGroup many times, there is no need to rebuild the DAG
second_result = await tm.invoke(Parameters(http_client, db_client, "sun"))
# Event(timestamp=datetime.datetime(2025, 3, 23, 16, 13, 57, 48707), location='moon')
print(moon_event.extract_result(second_result))
# False
print(moon_insert.extract_result(second_result))
if __name__ == "__main__":
asyncio.run(main())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file async_dag-0.1.4.tar.gz.
File metadata
- Download URL: async_dag-0.1.4.tar.gz
- Upload date:
- Size: 14.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.6.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ace9c13e8f73c57fd69e7100f08db6bcc07e1454cad5196c0246775197dd5441
|
|
| MD5 |
30e98747bef3fc937236177574cdc263
|
|
| BLAKE2b-256 |
bd87242bf2e5dbd0508c14d8abff8fcc55d7b57ec94d4d0ab1f653cef445582e
|
File details
Details for the file async_dag-0.1.4-py3-none-any.whl.
File metadata
- Download URL: async_dag-0.1.4-py3-none-any.whl
- Upload date:
- Size: 7.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.6.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
66a30b8c51b97f281bb78eebfd5bfb6b4895cad8104ab2abcf5e5a2d86f927e5
|
|
| MD5 |
68790f0dcdd56fba633da65a7e3158b1
|
|
| BLAKE2b-256 |
8668998cb79bb0adfb166dafa92a1a88905ff6a9e438a2ff900d86e13c8e3f4f
|