A library for running workflows (chains and groups of tasks) using the Python background task processing library dramatiq.
Project description
dramatiq-workflow
dramatiq-workflow allows running workflows (chains and groups of tasks) using
the Python background task processing library dramatiq.
A workflow allows running tasks in parallel and in sequence. It is a way to define a workflow of tasks, a combination of chains and groups in any order and nested as needed.
Features
- Define workflows with tasks running in parallel and in sequence.
- Nest chains and groups of tasks to create complex workflows.
- Schedules workflows to run in the background using dramatiq.
Note: dramatiq-workflow does not support passing the results from one task
to the next one in a chain. We recommend using a database to store intermediate
results if needed.
Installation
You can install dramatiq-workflow from PyPI:
pip install dramatiq-workflow
Example
Let's assume we want a workflow that looks like this:
╭────────╮ ╭────────╮
│ Task 2 │ │ Task 5 │
╭──┼● ●┼──┼● ●┼╮
╭────────╮│ ╰────────╯ ╰────────╯│ ╭────────╮
│ Task 1 ││ ╭────────╮ │ │ Task 8 │
│ ●┼╯ │ Task 3 │ ╰──┼● │
│ ●┼───┼● ●┼───────────────┼● │
│ ●┼╮ ╰────────╯ ╭─┼● │
╰────────╯│ ╭────────╮ ╭────────╮│╭┼● │
│ │ Task 4 │ │ Task 6 │││╰────────╯
╰──┼● ●┼───┼● ●┼╯│
│ ●┼╮ ╰────────╯ │
╰────────╯│ │
│ ╭────────╮ │
│ │ Task 7 │ │
╰──┼● ●┼─╯
╰────────╯
We can define this workflow as follows:
from dramatiq_workflow import Workflow, Chain, Group
workflow = Workflow(
Chain(
task1.message(),
Group(
Chain(
task2.message(),
task5.message(),
),
task3.message(),
Chain(
task4.message(),
Group(
task6.message(),
task7.message(),
),
),
),
task8.message(),
),
)
workflow.run() # Schedules the workflow to run in the background
Execution Order
In this example, the execution would look like this:
- Task 1 runs
- Task 2, 3, and 4 run in parallel once Task 1 finishes
- Task 5 runs once Task 2 finishes
- Task 6 and 7 run in parallel once Task 4 finishes
- Task 8 runs once Task 5, 6, and 7 finish
This is a simplified example. The actual execution order may vary because tasks that can run in parallel (i.e., in a Group) are not guaranteed to run in the order they are defined in the workflow.
License
This project is licensed under the MIT License. See the LICENSE file for details.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dramatiq_workflow-0.1.0.tar.gz.
File metadata
- Download URL: dramatiq_workflow-0.1.0.tar.gz
- Upload date:
- Size: 9.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.13.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9d47bd7850368fa064991b4c707ef538554e3cfdf2afe00dd9a3c3691e7cad5e
|
|
| MD5 |
28fb9abbf0ac3da985d038df8d2e16c9
|
|
| BLAKE2b-256 |
e224aa8bee1123ce2732d1a429ac9c5a21d7e3425806d4bb343a516b7debf5bf
|
File details
Details for the file dramatiq_workflow-0.1.0-py3-none-any.whl.
File metadata
- Download URL: dramatiq_workflow-0.1.0-py3-none-any.whl
- Upload date:
- Size: 11.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.13.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7f3307fa4e07f256f69076c2d8e548c2190bf6e4bf6b54644bc1c977c537d03e
|
|
| MD5 |
46b2f33387ddc8e80807f512e4977b7c
|
|
| BLAKE2b-256 |
172272ec613e99721de6291db6215a1135bd5558958d516186b3d4f16bbeba7e
|