Skip to main content

A domain specific language (DSL) for defining and managing data processing pipelines.

Project description

Highway DSL

Highway DSL is a Python-based Domain Specific Language (DSL) for defining and managing complex workflows. It allows users to declaratively specify tasks, dependencies, and execution parameters, supporting various control flow mechanisms like conditions, parallel execution, and retries.

Features

  • Declarative Workflow Definition: Define workflows using a clear and concise Python API or through YAML/JSON configurations.
  • Pydantic Models: Leverages Pydantic for robust data validation and serialization/deserialization of workflow definitions.
  • Rich Task Types: Supports various operators including:
    • TaskOperator: Executes a Python function.
    • ConditionOperator: Enables conditional branching based on expressions.
    • WaitOperator: Pauses workflow execution for a specified duration or until a specific datetime.
    • ParallelOperator: Executes multiple branches of tasks concurrently.
    • ForEachOperator: Iterates over a collection, executing a chain of tasks for each item.
  • Retry and Timeout Policies: Define retry strategies and timeout limits for individual tasks.
  • Serialization/Deserialization: Seamless conversion of workflow definitions between Python objects, YAML, and JSON formats.
  • Workflow Builder: A fluent API for constructing workflows programmatically.

Feature Overview

graph TD
    A[Workflow] --> B{TaskOperator};
    A --> C{ConditionOperator};
    A --> D{WaitOperator};
    A --> E{ParallelOperator};
    A --> F{ForEachOperator};

    B --> G[Executes Python Function];
    C --> H{If/Else Branching};
    D --> I[Pauses Execution];
    E --> J[Concurrent Branches];
    F --> K[Iterates Over Items];

    subgraph Policies
        B --> L[RetryPolicy];
        B --> M[TimeoutPolicy];
    end

Installation

To install Highway DSL, you can use pip:

pip install highway-dsl

If you want to install it for development, including testing dependencies:

pip install "highway-dsl[dev]"

Usage

Defining a Simple Workflow

from datetime import timedelta
from highway_dsl import WorkflowBuilder

def demonstrate_basic_workflow():
    """Show a simple complete workflow using just the builder"""

    workflow = (
        WorkflowBuilder("simple_etl")
        .task("extract", "etl.extract_data", result_key="raw_data")
        .task(
            "transform",
            "etl.transform_data",
            args=["{{raw_data}}"],
            result_key="transformed_data",
        )
        .retry(max_retries=3, delay=timedelta(seconds=10))
        .task("load", "etl.load_data", args=["{{transformed_data}}"])
        .timeout(timeout=timedelta(minutes=30))
        .wait("wait_next", timedelta(hours=24))
        .task("cleanup", "etl.cleanup")
        .build()
    )

    workflow.set_variables(
        {"database_url": "postgresql://localhost/mydb", "chunk_size": 1000}
    )

    return workflow

if __name__ == "__main__":
    basic_workflow = demonstrate_basic_workflow()
    print(basic_workflow.to_yaml())

Defining a Complex Workflow

Refer to example_usage.py for a more complex example demonstrating conditional logic, parallel execution, and iteration.

YAML Configuration

You can also define workflows directly in YAML:

name: simple_etl
version: 1.0.0
description: Simple ETL workflow with retry and timeout
variables:
  database_url: postgresql://localhost/mydb
  chunk_size: 1000
start_task: extract
tasks:
  extract:
    task_id: extract
    operator_type: task
    function: etl.extract_data
    result_key: raw_data
    dependencies: []
    metadata: {}
    
  transform:
    task_id: transform
    operator_type: task
    function: etl.transform_data
    args: ["{{raw_data}}"]
    result_key: transformed_data
    dependencies: ["extract"]
    retry_policy:
      max_retries: 3
      delay: PT10S
      backoff_factor: 2.0
    metadata: {}
    
  load:
    task_id: load
    operator_type: task
    function: etl.load_data
    args: ["{{transformed_data}}"]
    dependencies: ["transform"]
    timeout_policy:
      timeout: PT30M
      kill_on_timeout: true
    metadata: {}
    
  wait_next:
    task_id: wait_next
    operator_type: wait
    wait_for: "P1D"
    dependencies: ["load"]
    metadata: {}
    
  cleanup:
    task_id: cleanup
    operator_type: task
    function: etl.cleanup
    dependencies: ["wait_next"]
    metadata: {}

To load this YAML:

from highway_dsl import Workflow

yaml_content = """
# ... (yaml content from above)
"""

workflow = Workflow.from_yaml(yaml_content)
print(workflow.name)

Development

Running Tests

To run the unit tests, navigate to the project root and execute:

pytest

Type Checking

To perform static type checking with MyPy:

mypy .

Project Structure

.highway/
├── highway_dsl/
│   ├── __init__.py         # Exposes the public API
│   └── workflow_dsl.py     # Core DSL definitions (Pydantic models)
├── example_usage.py        # Examples of how to use the DSL
├── tests/
│   ├── __init__.py
│   ├── conftest.py         # Pytest configuration
│   └── test_workflow_dsl.py # Unit and integration tests
├── pyproject.toml          # Project metadata and dependencies
├── README.md               # This file
└── SUMMARY.md              # Summary of changes and future instructions

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

highway_dsl-0.0.2.tar.gz (9.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

highway_dsl-0.0.2-py3-none-any.whl (6.9 kB view details)

Uploaded Python 3

File details

Details for the file highway_dsl-0.0.2.tar.gz.

File metadata

  • Download URL: highway_dsl-0.0.2.tar.gz
  • Upload date:
  • Size: 9.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.24

File hashes

Hashes for highway_dsl-0.0.2.tar.gz
Algorithm Hash digest
SHA256 f0ca8cd51d5c70282af34260929e3ed5058e252ed2e333a89be83c284d8b1db3
MD5 076b74f2e3875592e2b719bbaa74a17c
BLAKE2b-256 216f15715cdfe6bf67d4c17d4f0536c734a825b947a6c78a4f67c4640e2ed30f

See more details on using hashes here.

File details

Details for the file highway_dsl-0.0.2-py3-none-any.whl.

File metadata

  • Download URL: highway_dsl-0.0.2-py3-none-any.whl
  • Upload date:
  • Size: 6.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.24

File hashes

Hashes for highway_dsl-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 f7a2721bbd45a73d4d32e2356d05a4f6517173f857b23f1d0f6dbefe61601d84
MD5 1e911e72b22ccc326b204d487579bee8
BLAKE2b-256 c20347b58512b166fa2d50408847d58e6d8b49c3442dade2eae95fe44bc512ff

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page