A library for running workflows (chains and groups of tasks) using the Python background task processing library dramatiq.
Project description
dramatiq-workflow
dramatiq-workflow allows running workflows (chains and groups of tasks) using
the Python background task processing library dramatiq.
Sponsors
Motivation
While dramatiq allows running tasks in parallel via groups, and in sequence via
pipelines, it does not provide a way to combine these two concepts.
dramatiq-workflow aims to fill this gap and allows creating complex
workflows, similar to the canvas feature in Celery.
Features
- Define workflows with tasks running in parallel and in sequence using chains and groups.
- Nest chains and groups of tasks to create complex workflows.
- Schedules workflows to run in the background using dramatiq.
Note: dramatiq-workflow does not support passing the results from one task
to the next one in a chain. We recommend using a database to store intermediate
results if needed.
Installation
You can install dramatiq-workflow from PyPI:
pip install dramatiq-workflow
Then, add the dramatiq-workflow middleware to your dramatiq broker:
from dramatiq.rate_limits.backends import RedisBackend
from dramatiq_workflow import WorkflowMiddleware
backend = RedisBackend()
broker.add_middleware(WorkflowMiddleware(backend))
Please refer to the dramatiq documentation for details on how to set up a broker.
Example
Let's assume we want a workflow that looks like this:
╭────────╮ ╭────────╮
│ Task 2 │ │ Task 5 │
╭──┼● ●┼──┼● ●┼╮
╭────────╮│ ╰────────╯ ╰────────╯│ ╭────────╮
│ Task 1 ││ ╭────────╮ │ │ Task 8 │
│ ●┼╯ │ Task 3 │ ╰──┼● │
│ ●┼───┼● ●┼───────────────┼● │
│ ●┼╮ ╰────────╯ ╭─┼● │
╰────────╯│ ╭────────╮ ╭────────╮│╭┼● │
│ │ Task 4 │ │ Task 6 │││╰────────╯
╰──┼● ●┼───┼● ●┼╯│
│ ●┼╮ ╰────────╯ │
╰────────╯│ │
│ ╭────────╮ │
│ │ Task 7 │ │
╰──┼● ●┼─╯
╰────────╯
We can define this workflow as follows:
import dramatiq
from dramatiq_workflow import Workflow, Chain, Group
@dramatiq.actor
def task1(arg1, arg2, arg3):
print("Task 1")
@dramatiq.actor
def task2():
print("Task 2")
# ...
workflow = Workflow(
Chain(
task1.message("arguments", "go", "here"),
Group(
Chain(
task2.message(),
task5.message(),
),
task3.message(),
Chain(
task4.message(),
Group(
task6.message(),
task7.message(),
),
),
),
task8.message(),
),
)
workflow.run() # Schedules the workflow to run in the background
Execution Order
In this example, the execution would look like this:
- Task 1 runs (with arguments
"arguments","go", and"here") - Task 2, 3, and 4 run in parallel once Task 1 finishes
- Task 5 runs once Task 2 finishes
- Task 6 and 7 run in parallel once Task 4 finishes
- Task 8 runs once Task 5, 6, and 7 finish
This is a simplified example. The actual execution order may vary because
tasks that can run in parallel (i.e. in a Group) are not guaranteed to run in
the order they are defined in the workflow.
Advanced Usage
WithDelay
The WithDelay class allows delaying the execution of a task or a group of tasks:
from dramatiq_workflow import Chain, Group, WithDelay, Workflow
workflow = Workflow(
Chain(
task1.message("arguments", "go", "here"),
WithDelay(task2.message(), delay=1_000),
WithDelay(
Group(
task3.message(),
task4.message(),
),
delay=2_000,
),
)
)
In this example, Task 2 will run roughly 1 second after Task 1 finishes, and Task 3 and will run 2 seconds after Task 2 finishes.
License
This project is licensed under the MIT License. See the LICENSE file for details.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dramatiq_workflow-0.1.1.tar.gz.
File metadata
- Download URL: dramatiq_workflow-0.1.1.tar.gz
- Upload date:
- Size: 10.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7cdd4f713c8dba06894f860f947bcb4265b7a4ac0be431244ede371574a38650
|
|
| MD5 |
34810adc22f2f0893ca2f2e02c32847b
|
|
| BLAKE2b-256 |
cc5ccdc828547471b153f2fd8615c771ce2251f28d6fd304cc51dd985654feda
|
File details
Details for the file dramatiq_workflow-0.1.1-py3-none-any.whl.
File metadata
- Download URL: dramatiq_workflow-0.1.1-py3-none-any.whl
- Upload date:
- Size: 12.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8d5eb474cfecdfbed197e1216ffe94bda4e9b5a0bb71392f337c18421d4d31ae
|
|
| MD5 |
fd725a25120af56925f7f9582bac9028
|
|
| BLAKE2b-256 |
c40351c0d97b043ce2af6896c10fd5a4f7278603c7159b55bac2b9ce0ee4bb52
|