Skip to main content

A python package for running directed acyclic graphs of asynchronous I/O operations

Project description

dagio: Asynchronous I/O - with DAGs!

Version Badge Test Status

dagio is an embarassingly simple Python package for running directed acyclic graphs of asynchronous I/O operations. It is built using and to be used with Python's built-in asyncio module, and provides a veeeery thin layer of functionality on top of it. :sweat_smile:

Getting Started

Suppose you have a set of potentially long-running I/O tasks (e.g. hit a web service, query a database, read a large file from disk, etc), where some of the tasks depend on other tasks having finished. That is, you've got a directed acyclic graph (DAG) of tasks, where non-interdependent tasks can be run asynchronously.

For example, if you've got a task G which depends on E and F, but E depends on D, and F depends on both C and D, etc:

A
|
B   C
 \ /|
  D |
 / \|
E   F
 \ /
  G

Coding that up using raw asyncio might look something like this:

import asyncio


class MyDag:

    async def task_a(self):
        # does task a stuff...

    async def task_b(self):
        # does task b stuff...

    async def task_c(self):
        # does task c stuff...

    async def task_d(self):
        # does task d stuff...

    async def task_e(self):
        # does task e stuff...

    async def task_f(self):
        # does task f stuff...

    async def task_g(self):
        # does task g stuff...


async def run():

    obj = MyDag() 

    task_a = asyncio.create_task(obj.task_a())
    task_c = asyncio.create_task(obj.task_c())

    await task_a

    await obj.task_b()

    await task_c

    await obj.task_d()

    task_e = asyncio.create_task(obj.task_e())
    task_f = asyncio.create_task(obj.task_f())

    await task_e
    await task_f

    await obj.task_g()


asyncio.run(run())

Which is... fine, I guess :roll_eyes: But, you have to be careful about what task you start before what other task, and which tasks can safely be run asynchronously vs those which can't. And then you have to type out all that logic and ordering manually! With the confusing asyncio API! So: a lot of thought has to go into it, especially for complex DAGs.

And thinking is hard! Less thinking! :fist:

With dagio, you just use the depends decorator to specify what methods any other given method depends on, and it'll figure everything out for you, and run them in the correct order, asynchronously where possible:

import asyncio
from dagio import depends


class MyDag:

    async def task_a(self):
        # does task a stuff...

    @depends("task_a")
    async def task_b(self):
        # does task b stuff...

    async def task_c(self):
        # does task c stuff...

    @depends("task_b", "task_c")
    async def task_d(self):
        # does task d stuff...

    @depends("task_d")
    async def task_e(self):
        # does task e stuff...

    @depends("task_c", "task_d")
    async def task_f(self):
        # does task f stuff...

    @depends("task_e", "task_f")
    async def task_g(self):
        # does task g stuff...


async def run():
    obj = MyDag() 
    await obj.task_g()


asyncio.run(run())

Note that:

  1. Each task in your DAG has to be a method of the same class
  2. Task methods must be async methods
  3. Calling a task method decorated with depends runs that task and all its dependencies
  4. Task methods should not take arguments nor return values. You can handle inter-task communication using object attributes (e.g. self._task_a_output = ...). If you need a lock, you can set up an asyncio.Lock in your class's __init__.

You can also run a non-async method asynchronously in a thread pool using the run_async decorator:

import asyncio
from dagio import depends, run_async


class MyDag:

    @run_async
    def task_a(self):
        # a sync method which does task a stuff...

    @run_async
    def task_b(self):
        # a sync method which does task b stuff...

    @depends("task_a", "task_b")
    async def task_c(self):
        # does task c stuff...


async def run():
    obj = MyDag() 
    await obj.task_c()  #runs a and b concurrently, then c

That's it. That's all this package does.

Installation

pip install dagio

Support

Post bug reports, feature requests, and tutorial requests in GitHub issues.

Contributing

Pull requests are totally welcome! Any contribution would be appreciated, from things as minor as fixing typos to things as major as adding new functionality. :smile:

Why the name, dagio?

It's for making DAGs of IO operations. DAG IO. Technically it's asynchronous DAG-based I/O, and the name adagio would have been siiiick, but it was already taken! :sob:

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagio-0.0.2.tar.gz (5.8 kB view details)

Uploaded Source

Built Distribution

dagio-0.0.2-py3-none-any.whl (5.2 kB view details)

Uploaded Python 3

File details

Details for the file dagio-0.0.2.tar.gz.

File metadata

  • Download URL: dagio-0.0.2.tar.gz
  • Upload date:
  • Size: 5.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.7

File hashes

Hashes for dagio-0.0.2.tar.gz
Algorithm Hash digest
SHA256 f01076dcf478c5d7523a9a48f5cb4b54b97270d12434ba99562a4d4df84ddde4
MD5 e19c408eebf93a2085b9c53ba597a61e
BLAKE2b-256 4977506d18203ea0c2e1dc553e44e777f0d55481c6a5d7a1c54d22d88a743ccd

See more details on using hashes here.

File details

Details for the file dagio-0.0.2-py3-none-any.whl.

File metadata

  • Download URL: dagio-0.0.2-py3-none-any.whl
  • Upload date:
  • Size: 5.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.7

File hashes

Hashes for dagio-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 a8e5226d44b7e099eb5d3485f8462e63b0d5ba2eafe6cc221de739dfe0216ce5
MD5 965392bbe305edcb4fa03d20d2b0d331
BLAKE2b-256 08b514a2953f9559f3b3fb82f614ee5d686a89387bf5fcb7dba5c57c0db61067

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page