A python package for running directed acyclic graphs of asynchronous I/O operations
Project description
dagio: Asynchronous I/O - with DAGs!
dagio
is an embarassingly simple Python package for running directed acyclic
graphs of asynchronous I/O operations. It is built using and to be used with
Python's built-in asyncio
module, and provides a veeeery thin layer of functionality on top of it.
:sweat_smile:
- Git repository: https://github.com/brendanhasz/dagio
- Bug reports: https://github.com/brendanhasz/dagio/issues
Getting Started
Suppose you have a set of potentially long-running I/O tasks (e.g. hit a web service, query a database, read a large file from disk, etc), where some of the tasks depend on other tasks having finished. That is, you've got a directed acyclic graph (DAG) of tasks, where non-interdependent tasks can be run asynchronously.
For example, if you've got a task G
which depends on E
and F
, but E
depends on D
, and F
depends on both C
and D
, etc:
A
|
B C
\ /|
D |
/ \|
E F
\ /
G
Coding that up using raw asyncio
might look something like this:
import asyncio
class MyDag:
async def task_a(self):
# does task a stuff...
async def task_b(self):
# does task b stuff...
async def task_c(self):
# does task c stuff...
async def task_d(self):
# does task d stuff...
async def task_e(self):
# does task e stuff...
async def task_f(self):
# does task f stuff...
async def task_g(self):
# does task g stuff...
async def run():
obj = MyDag()
task_a = asyncio.create_task(obj.task_a())
task_c = asyncio.create_task(obj.task_c())
await task_a
await obj.task_b()
await task_c
await obj.task_d()
task_e = asyncio.create_task(obj.task_e())
task_f = asyncio.create_task(obj.task_f())
await task_e
await task_f
await obj.task_g()
asyncio.run(run())
Which is... fine, I guess :roll_eyes: But, you have to be careful about what
task you start before what other task, and which tasks can safely be run
asynchronously vs those which can't. And then you have to type out all that
logic and ordering manually! With the confusing asyncio
API! So: a lot of
thought has to go into it, especially for complex DAGs.
And thinking is hard! Less thinking! :fist:
With dagio
, you just use the depends
decorator to specify what methods any
other given method depends on, and it'll figure everything out for you, and run
them in the correct order, asynchronously where possible:
import asyncio
from dagio import depends
class MyDag:
async def task_a(self):
# does task a stuff...
@depends("task_a")
async def task_b(self):
# does task b stuff...
async def task_c(self):
# does task c stuff...
@depends("task_b", "task_c")
async def task_d(self):
# does task d stuff...
@depends("task_d")
async def task_e(self):
# does task e stuff...
@depends("task_c", "task_d")
async def task_f(self):
# does task f stuff...
@depends("task_e", "task_f")
async def task_g(self):
# does task g stuff...
async def run():
obj = MyDag()
await obj.task_g()
asyncio.run(run())
Note that:
- Each task in your DAG has to be a method of the same class
- Task methods must be
async
methods - Calling a task method decorated with
depends
runs that task and all its dependencies - Task methods should not take arguments nor return values. You can handle
inter-task communication using object attributes (e.g.
self._task_a_output = ...
). If you need a lock, you can set up anasyncio.Lock
in your class's__init__
.
You can also run a non-async method asynchronously in a thread pool using the run_async
decorator:
import asyncio
from dagio import depends, run_async
class MyDag:
@run_async
def task_a(self):
# a sync method which does task a stuff...
@run_async
def task_b(self):
# a sync method which does task b stuff...
@depends("task_a", "task_b")
async def task_c(self):
# does task c stuff...
async def run():
obj = MyDag()
await obj.task_c() #runs a and b concurrently, then c
That's it. That's all this package does.
Installation
pip install dagio
Support
Post bug reports, feature requests, and tutorial requests in GitHub issues.
Contributing
Pull requests are totally welcome! Any contribution would be appreciated, from things as minor as fixing typos to things as major as adding new functionality. :smile:
Why the name, dagio?
It's for making DAGs of IO operations. DAG IO. Technically it's asynchronous
DAG-based I/O, and the name adagio
would have been siiiick, but it was
already taken! :sob:
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.