Skip to main content

A simple library for running complex DAG of async tasks while ensuring maximum possible parallelism

Project description

async-dag

PyPI - Version GitHub Actions Workflow Status

A simple library for running complex DAG of async tasks while ensuring maximum possible parallelism.

Use case and example

Lets assume that you have the following task dependencies graph (each task is an async function that could take time to resolve, and some tasks may depends on each other):

graph TD;
    FastTask_A-->SlowTask_B;
    SlowTask_B-->EndTask;

    SlowTask_A-->FastTask_C;
    FastTask_B-->FastTask_C;
    FastTask_C-->EndTask;

A naive way to write something like this would be:

await end_task(await slow_task_b(await fast_task_a()), await fast_task_c(await slow_task_a(), await fast_task_b()))

But that would be bad because we will miss a lot of opportunities to run tasks in parallel.

A better version would be:

fast_task_a_res, slow_task_a_res, fast_task_b_res = await asyncio.gather(fast_task_a(), slow_task_a(), fast_task_b())
await end_task(await slow_task_b(fast_task_a_res), await fast_task_c(slow_task_a_res, fast_task_b_res))

Where we run fast_task_a_res, slow_task_a_res, and fast_task_b_res in parallel. but this is still suboptimal because we can start executing either slow_task_b once fast_task_a ends or fast_task_c once both slow_task_a and fast_task_b.

The optimal way to run this flow would be:

async def _left_branch():
    return await slow_task_b(await fast_task_a())

async def _right_branch():
    slow_task_a_res, fast_task_b_res = await asyncio.gather(
        slow_task_a(), fast_task_b()
    )

    return await fast_task_c(slow_task_a_res, fast_task_b_res)

async def _end_node():
    left_branch_res, right_branch_res = await asyncio.gather(
        _left_branch(), _right_branch()
    )

    return await end_task(left_branch_res, right_branch_res)

await _end_node()

Which is very cumbersome and error prone to write by hand.

Using async_dag you can just write:

from async_dag import build_dag

# Define your DAG
with build_dag() as tm:
    fast_task_a_node = tm.add_node(fast_task_a)
    slow_task_b_node = tm.add_node(slow_task_b, fast_task_a_node)

    slow_task_a_node = tm.add_node(slow_task_a)
    fast_task_b_node = tm.add_node(fast_task_b)
    fast_task_c_node = tm.add_node(fast_task_c, slow_task_a_node, fast_task_b_node)

    end_task_node = tm.add_node(end_task, slow_task_b_node, fast_task_c_node)

# Execute your DAG
execution_result = await tm.invoke(None) 

# Extract the return value of `end_task`
end_task_result = end_task_node.extract_result(execution_result)

And enjoy from maximum parallelism without the hassle.

Docs and more examples

We use Docstring in order to describe our API, the main concepts you need to know are:

  1. The TaskManager class.
  2. The add_node method on TaskManager.
  3. The parameter_node property of TaskManager.
  4. The sort method of TaskManager.
  5. The invoke method of TaskManager.
  6. The extract_result method of TaskNode.
  7. The build_dag helper function.

For a full example take a look at ./examples/readme.py.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_dag-0.3.0.tar.gz (15.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

async_dag-0.3.0-py3-none-any.whl (9.3 kB view details)

Uploaded Python 3

File details

Details for the file async_dag-0.3.0.tar.gz.

File metadata

  • Download URL: async_dag-0.3.0.tar.gz
  • Upload date:
  • Size: 15.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.6.9

File hashes

Hashes for async_dag-0.3.0.tar.gz
Algorithm Hash digest
SHA256 563b098e7a9429204705e096e88edcf2fc6a8d6e1653b596dd5aecb0105e90bd
MD5 ba9b35c301cd311dac23bac7c568c5bc
BLAKE2b-256 dd27227a69c4e391a65fd94d82c6336f54604119ffc156e4d098e6fe8efe22a6

See more details on using hashes here.

File details

Details for the file async_dag-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: async_dag-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 9.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.6.9

File hashes

Hashes for async_dag-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a390618de136f0aa6a620591265f6ee0efcac26a2ed96109b3e0c43d53776755
MD5 9282cbaea000113ed5a5649e02dcf3ba
BLAKE2b-256 daf33f9bb749d33e5e63516bf22a73312d1edd4679a3cbca42eca50709732828

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page