Skip to main content

A simple library for running complex DAG of async tasks while ensuring maximum possible parallelism

Project description

async-dag

PyPI - Version GitHub Actions Workflow Status

A simple library for running complex DAG of async tasks while ensuring maximum possible parallelism.

Use case and example

Lets assume that you have the following task dependencies graph (each task is an async function that could take time to resolve, and some tasks may depends on each other):

graph TD;
    FastTask_A-->SlowTask_B;
    SlowTask_B-->EndTask;

    SlowTask_A-->FastTask_C;
    FastTask_B-->FastTask_C;
    FastTask_C-->EndTask;

A naive way to write something like this would be:

await end_task(await slow_task_b(await fast_task_a()), await fast_task_c(await slow_task_a(), await fast_task_b()))

But that would be bad because we will miss a lot of opportunities to run tasks in parallel.

A better version would be:

fast_task_a_res, slow_task_a_res, fast_task_b_res = await asyncio.gather(fast_task_a(), slow_task_a(), fast_task_b())
await end_task(await slow_task_b(fast_task_a_res), await fast_task_c(slow_task_a_res, fast_task_b_res))

Where we run fast_task_a_res, slow_task_a_res, and fast_task_b_res in parallel. but this is still suboptimal because we can start executing either slow_task_b once fast_task_a ends or fast_task_c once both slow_task_a and fast_task_b.

The optimal way to run this flow would be:

async def _left_branch():
    return await slow_task_b(await fast_task_a())

async def _right_branch():
    slow_task_a_res, fast_task_b_res = await asyncio.gather(
        slow_task_a(), fast_task_b()
    )

    return await fast_task_c(slow_task_a_res, fast_task_b_res)

async def _end_node():
    left_branch_res, right_branch_res = await asyncio.gather(
        _left_branch(), _right_branch()
    )

    return await end_task(left_branch_res, right_branch_res)

await _end_node()

Which is very cumbersome and error prone to write by hand.

Using async_dag you can just write:

from async_dag import build_dag

# Define your DAG
with build_dag() as tm:
    fast_task_a_node = tm.add_node(fast_task_a)
    slow_task_b_node = tm.add_node(slow_task_b, fast_task_a_node)

    slow_task_a_node = tm.add_node(slow_task_a)
    fast_task_b_node = tm.add_node(fast_task_b)
    fast_task_c_node = tm.add_node(fast_task_c, slow_task_a_node, fast_task_b_node)

    end_task_node = tm.add_node(end_task, slow_task_b_node, fast_task_c_node)

# Execute your DAG
execution_result = await tm.invoke(None) 

# Extract the return value of `end_task`
end_task_result = end_task_node.extract_result(execution_result)

And enjoy from maximum parallelism without the hassle.

Docs and more examples

We use Docstring in order to describe our API, the main concepts you need to know are:

  1. The TaskManager class.
  2. The add_node method on TaskManager.
  3. The parameter_node property of TaskManager.
  4. The sort method of TaskManager.
  5. The invoke method of TaskManager.
  6. The extract_result method of TaskNode.
  7. The build_dag helper function.

For a full example take a look at examples/readme.py.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_dag-0.3.1.tar.gz (15.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

async_dag-0.3.1-py3-none-any.whl (9.3 kB view details)

Uploaded Python 3

File details

Details for the file async_dag-0.3.1.tar.gz.

File metadata

  • Download URL: async_dag-0.3.1.tar.gz
  • Upload date:
  • Size: 15.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.6.9

File hashes

Hashes for async_dag-0.3.1.tar.gz
Algorithm Hash digest
SHA256 051b1be402cc6933eadc1f5b98c8dca0f70d77d1b21eab55b87179ea998fd457
MD5 25673382f800df1d2ff5a7a57fa08c7e
BLAKE2b-256 c34cd465599e4d4625dbcf569aa1981f3b1196c44864bd26814150227edd80bd

See more details on using hashes here.

File details

Details for the file async_dag-0.3.1-py3-none-any.whl.

File metadata

  • Download URL: async_dag-0.3.1-py3-none-any.whl
  • Upload date:
  • Size: 9.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.6.9

File hashes

Hashes for async_dag-0.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 5f543bea985dd42f4be7a425de5c21d3744f1833c469c2ae549f5522f0d905c6
MD5 cce0efe6ad335106baa0e0d89678b1ae
BLAKE2b-256 8d886489a4b20c4f46ae0d4106d34a47487313ad90b816e51474655bfb67ca7a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page