Skip to main content

A library for running workflows (chains and groups of tasks) using the Python background task processing library dramatiq.

Project description

dramatiq-workflow

Run Tests PyPI version

dramatiq-workflow allows running workflows (chains and groups of tasks) using the Python background task processing library dramatiq.

Sponsors

Outset

Motivation

While dramatiq allows running tasks in parallel via groups, and in sequence via pipelines, it does not provide a way to combine these two concepts. dramatiq-workflow aims to fill this gap and allows creating complex workflows, similar to the canvas feature in Celery.

Features

  • Define workflows with tasks running in parallel and in sequence using chains and groups.
  • Nest chains and groups of tasks to create complex workflows.
  • Schedules workflows to run in the background using dramatiq.

Note: dramatiq-workflow does not support passing the results from one task to the next one in a chain. We recommend using a database to store intermediate results if needed.

Installation

You can install dramatiq-workflow from PyPI:

pip install dramatiq-workflow

Then, add the dramatiq-workflow middleware to your dramatiq broker:

from dramatiq.rate_limits.backends import RedisBackend
from dramatiq_workflow import WorkflowMiddleware

backend = RedisBackend()
broker.add_middleware(WorkflowMiddleware(backend))

Please refer to the dramatiq documentation for details on how to set up a broker.

Example

Let's assume we want a workflow that looks like this:

             ╭────────╮  ╭────────╮
             │ Task 2 │  │ Task 5 │
          ╭──┼●      ●┼──┼●      ●┼╮
╭────────╮│  ╰────────╯  ╰────────╯│  ╭────────╮
│ Task 1 ││  ╭────────╮            │  │ Task 8 │
│       ●┼╯  │ Task 3 │            ╰──┼●       │
│       ●┼───┼●      ●┼───────────────┼●       │
│       ●┼╮  ╰────────╯             ╭─┼●       │
╰────────╯│  ╭────────╮   ╭────────╮│╭┼●       │
          │  │ Task 4 │   │ Task 6 │││╰────────╯
          ╰──┼●      ●┼───┼●      ●┼╯│
             │       ●┼╮  ╰────────╯ │
             ╰────────╯│             │
                       │  ╭────────╮ │
                       │  │ Task 7 │ │
                       ╰──┼●      ●┼─╯
                          ╰────────╯

We can define this workflow as follows:

import dramatiq
from dramatiq_workflow import Workflow, Chain, Group

@dramatiq.actor
def task1(arg1, arg2, arg3):
    print("Task 1")

@dramatiq.actor
def task2():
    print("Task 2")

# ...

workflow = Workflow(
    Chain(
        task1.message("arguments", "go", "here"),
        Group(
            Chain(
                task2.message(),
                task5.message(),
            ),
            task3.message(),
            Chain(
                task4.message(),
                Group(
                    task6.message(),
                    task7.message(),
                ),
            ),
        ),
        task8.message(),
    ),
)
workflow.run()  # Schedules the workflow to run in the background

Execution Order

In this example, the execution would look like this:

  1. Task 1 runs (with arguments "arguments", "go", and "here")
  2. Task 2, 3, and 4 run in parallel once Task 1 finishes
  3. Task 5 runs once Task 2 finishes
  4. Task 6 and 7 run in parallel once Task 4 finishes
  5. Task 8 runs once Task 5, 6, and 7 finish

This is a simplified example. The actual execution order may vary because tasks that can run in parallel (i.e. in a Group) are not guaranteed to run in the order they are defined in the workflow.

Advanced Usage

WithDelay

The WithDelay class allows delaying the execution of a task or a group of tasks:

from dramatiq_workflow import Chain, Group, WithDelay, Workflow

workflow = Workflow(
    Chain(
        task1.message("arguments", "go", "here"),
        WithDelay(task2.message(), delay=1_000),
        WithDelay(
            Group(
                task3.message(),
                task4.message(),
            ),
            delay=2_000,
        ),
    )
)

In this example, Task 2 will run roughly 1 second after Task 1 finishes, and Task 3 and will run 2 seconds after Task 2 finishes.

License

This project is licensed under the MIT License. See the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dramatiq_workflow-0.1.1.tar.gz (10.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dramatiq_workflow-0.1.1-py3-none-any.whl (12.1 kB view details)

Uploaded Python 3

File details

Details for the file dramatiq_workflow-0.1.1.tar.gz.

File metadata

  • Download URL: dramatiq_workflow-0.1.1.tar.gz
  • Upload date:
  • Size: 10.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.12.7

File hashes

Hashes for dramatiq_workflow-0.1.1.tar.gz
Algorithm Hash digest
SHA256 7cdd4f713c8dba06894f860f947bcb4265b7a4ac0be431244ede371574a38650
MD5 34810adc22f2f0893ca2f2e02c32847b
BLAKE2b-256 cc5ccdc828547471b153f2fd8615c771ce2251f28d6fd304cc51dd985654feda

See more details on using hashes here.

File details

Details for the file dramatiq_workflow-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for dramatiq_workflow-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 8d5eb474cfecdfbed197e1216ffe94bda4e9b5a0bb71392f337c18421d4d31ae
MD5 fd725a25120af56925f7f9582bac9028
BLAKE2b-256 c40351c0d97b043ce2af6896c10fd5a4f7278603c7159b55bac2b9ce0ee4bb52

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page