Skip to main content

A python package for running directed acyclic graphs of asynchronous I/O operations

Project description

dagio: Asynchronous I/O - with DAGs!

Version Badge Test Status

dagio is an embarassingly simple Python package for running directed acyclic graphs of asynchronous I/O operations. It is built using and to be used with Python's built-in asyncio module, and provides a veeeery thin layer of functionality on top of it. :sweat_smile:

Getting Started

Suppose you have a set of potentially long-running I/O tasks (e.g. hit a web service, query a database, read a large file from disk, etc), where some of the tasks depend on other tasks having finished. That is, you've got a directed acyclic graph (DAG) of tasks, where non-interdependent tasks can be run asynchronously.

For example, if you've got a task G which depends on E and F, but E depends on D, and F depends on both C and D, etc:

A
|
B   C
 \ /|
  D |
 / \|
E   F
 \ /
  G

Coding that up using raw asyncio might look something like this:

import asyncio


class MyDag:

    async def task_a(self):
        # does task a stuff...

    async def task_b(self):
        # does task b stuff...

    async def task_c(self):
        # does task c stuff...

    async def task_d(self):
        # does task d stuff...

    async def task_e(self):
        # does task e stuff...

    async def task_f(self):
        # does task f stuff...

    async def task_g(self):
        # does task g stuff...


async def run():

    obj = MyDag() 

    task_a = asyncio.create_task(obj.task_a())
    task_c = asyncio.create_task(obj.task_c())

    await task_a

    await obj.task_b()

    await task_c

    await obj.task_d()

    task_e = asyncio.create_task(obj.task_e())
    task_f = asyncio.create_task(obj.task_f())

    await task_e
    await task_f

    await obj.task_g()


asyncio.run(run())

Which is... fine, I guess :roll_eyes: But, you have to be careful about what task you start before what other task, and which tasks can safely be run asynchronously vs those which can't. And then you have to type out all that logic and ordering manually! With the confusing asyncio API! So: a lot of thought has to go into it, especially for complex DAGs.

And thinking is hard! Less thinking! :fist:

With dagio, you just use the depends decorator to specify what methods any other given method depends on, and it'll figure everything out for you, and run them in the correct order, asynchronously where possible:

import asyncio
from dagio import depends


class MyDag:

    async def task_a(self):
        # does task a stuff...

    @depends("task_a")
    async def task_b(self):
        # does task b stuff...

    async def task_c(self):
        # does task c stuff...

    @depends("task_b", "task_c")
    async def task_d(self):
        # does task d stuff...

    @depends("task_d")
    async def task_e(self):
        # does task e stuff...

    @depends("task_c", "task_d")
    async def task_f(self):
        # does task f stuff...

    @depends("task_e", "task_f")
    async def task_g(self):
        # does task g stuff...


async def run():
    obj = MyDag() 
    await obj.task_g()


asyncio.run(run())

Note that:

  1. Each task in your DAG has to be a method of the same class
  2. Task methods must be async methods
  3. Calling a task method decorated with depends runs that task and all its dependencies
  4. Task methods should not take arguments nor return values. You can handle inter-task communication using object attributes (e.g. self._task_a_output = ...). If you need a lock, you can set up an asyncio.Lock in your class's __init__.

You can also run a non-async method asynchronously in a thread pool using the run_async decorator:

import asyncio
from dagio import depends, run_async


class MyDag:

    @run_async
    def task_a(self):
        # a sync method which does task a stuff...

    @run_async
    def task_b(self):
        # a sync method which does task b stuff...

    @depends("task_a", "task_b")
    async def task_c(self):
        # does task c stuff...


async def run():
    obj = MyDag() 
    await obj.task_c()  #runs a and b concurrently, then c

That's it. That's all this package does.

Installation

pip install dagio

Support

Post bug reports, feature requests, and tutorial requests in GitHub issues.

Contributing

Pull requests are totally welcome! Any contribution would be appreciated, from things as minor as fixing typos to things as major as adding new functionality. :smile:

Why the name, dagio?

It's for making DAGs of IO operations. DAG IO. Technically it's asynchronous DAG-based I/O, and the name adagio would have been siiiick, but it was already taken! :sob:

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagio-0.0.2.tar.gz (5.8 kB view hashes)

Uploaded Source

Built Distribution

dagio-0.0.2-py3-none-any.whl (5.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page