Skip to main content

A library for running workflows (chains and groups of tasks) using the Python background task processing library dramatiq.

Project description

dramatiq-workflow

dramatiq-workflow allows running workflows (chains and groups of tasks) using the Python background task processing library dramatiq.

A workflow allows running tasks in parallel and in sequence. It is a way to define a workflow of tasks, a combination of chains and groups in any order and nested as needed.

Features

  • Define workflows with tasks running in parallel and in sequence.
  • Nest chains and groups of tasks to create complex workflows.
  • Schedules workflows to run in the background using dramatiq.

Note: dramatiq-workflow does not support passing the results from one task to the next one in a chain. We recommend using a database to store intermediate results if needed.

Installation

You can install dramatiq-workflow from PyPI:

pip install dramatiq-workflow

Example

Let's assume we want a workflow that looks like this:

             ╭────────╮  ╭────────╮
             │ Task 2 │  │ Task 5 │
          ╭──┼●      ●┼──┼●      ●┼╮
╭────────╮│  ╰────────╯  ╰────────╯│  ╭────────╮
│ Task 1 ││  ╭────────╮            │  │ Task 8 │
│       ●┼╯  │ Task 3 │            ╰──┼●       │
│       ●┼───┼●      ●┼───────────────┼●       │
│       ●┼╮  ╰────────╯             ╭─┼●       │
╰────────╯│  ╭────────╮   ╭────────╮│╭┼●       │
          │  │ Task 4 │   │ Task 6 │││╰────────╯
          ╰──┼●      ●┼───┼●      ●┼╯│
             │       ●┼╮  ╰────────╯ │
             ╰────────╯│             │
                       │  ╭────────╮ │
                       │  │ Task 7 │ │
                       ╰──┼●      ●┼─╯
                          ╰────────╯

We can define this workflow as follows:

from dramatiq_workflow import Workflow, Chain, Group

workflow = Workflow(
    Chain(
        task1.message(),
        Group(
            Chain(
                task2.message(),
                task5.message(),
            ),
            task3.message(),
            Chain(
                task4.message(),
                Group(
                    task6.message(),
                    task7.message(),
                ),
            ),
        ),
        task8.message(),
    ),
)
workflow.run()  # Schedules the workflow to run in the background

Execution Order

In this example, the execution would look like this:

  1. Task 1 runs
  2. Task 2, 3, and 4 run in parallel once Task 1 finishes
  3. Task 5 runs once Task 2 finishes
  4. Task 6 and 7 run in parallel once Task 4 finishes
  5. Task 8 runs once Task 5, 6, and 7 finish

This is a simplified example. The actual execution order may vary because tasks that can run in parallel (i.e., in a Group) are not guaranteed to run in the order they are defined in the workflow.

License

This project is licensed under the MIT License. See the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dramatiq_workflow-0.1.0.tar.gz (9.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dramatiq_workflow-0.1.0-py3-none-any.whl (11.6 kB view details)

Uploaded Python 3

File details

Details for the file dramatiq_workflow-0.1.0.tar.gz.

File metadata

  • Download URL: dramatiq_workflow-0.1.0.tar.gz
  • Upload date:
  • Size: 9.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.13.1

File hashes

Hashes for dramatiq_workflow-0.1.0.tar.gz
Algorithm Hash digest
SHA256 9d47bd7850368fa064991b4c707ef538554e3cfdf2afe00dd9a3c3691e7cad5e
MD5 28fb9abbf0ac3da985d038df8d2e16c9
BLAKE2b-256 e224aa8bee1123ce2732d1a429ac9c5a21d7e3425806d4bb343a516b7debf5bf

See more details on using hashes here.

File details

Details for the file dramatiq_workflow-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for dramatiq_workflow-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7f3307fa4e07f256f69076c2d8e548c2190bf6e4bf6b54644bc1c977c537d03e
MD5 46b2f33387ddc8e80807f512e4977b7c
BLAKE2b-256 172272ec613e99721de6291db6215a1135bd5558958d516186b3d4f16bbeba7e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page