Skip to main content

A domain specific language (DSL) for defining and managing data processing pipelines.

Project description

Highway DSL

Highway DSL is a Python-based Domain Specific Language (DSL) for defining and managing complex workflows. It allows users to declaratively specify tasks, dependencies, and execution parameters, supporting various control flow mechanisms like conditions, parallel execution, and retries.

Features

  • Declarative Workflow Definition: Define workflows using a clear and concise Python API or through YAML/JSON configurations.
  • Pydantic Models: Leverages Pydantic for robust data validation and serialization/deserialization of workflow definitions.
  • Rich Task Types: Supports various operators including:
    • TaskOperator: Executes a Python function.
    • ConditionOperator: Enables conditional branching based on expressions.
    • WaitOperator: Pauses workflow execution for a specified duration or until a specific datetime.
    • ParallelOperator: Executes multiple branches of tasks concurrently.
    • ForEachOperator: Iterates over a collection, executing a chain of tasks for each item.
  • Retry and Timeout Policies: Define retry strategies and timeout limits for individual tasks.
  • Serialization/Deserialization: Seamless conversion of workflow definitions between Python objects, YAML, and JSON formats.
  • Workflow Builder: A fluent API for constructing workflows programmatically.

Installation

To install Highway DSL, you can use pip:

pip install highway-dsl

If you want to install it for development, including testing dependencies:

pip install "highway-dsl[dev]"

Usage

Defining a Simple Workflow

from datetime import timedelta
from workflow_dsl import WorkflowBuilder

def demonstrate_basic_workflow():
    """Show a simple complete workflow using just the builder"""

    workflow = (
        WorkflowBuilder("simple_etl")
        .task("extract", "etl.extract_data", result_key="raw_data")
        .task(
            "transform",
            "etl.transform_data",
            args=["{{raw_data}}"],
            result_key="transformed_data",
        )
        .retry(max_retries=3, delay=timedelta(seconds=10))
        .task("load", "etl.load_data", args=["{{transformed_data}}"])
        .timeout(timeout=timedelta(minutes=30))
        .wait("wait_next", timedelta(hours=24))
        .task("cleanup", "etl.cleanup")
        .build()
    )

    workflow.set_variables(
        {"database_url": "postgresql://localhost/mydb", "chunk_size": 1000}
    )

    return workflow

if __name__ == "__main__":
    basic_workflow = demonstrate_basic_workflow()
    print(basic_workflow.to_yaml())

Defining a Complex Workflow

Refer to example_usage.py for a more complex example demonstrating conditional logic, parallel execution, and iteration.

YAML Configuration

You can also define workflows directly in YAML:

name: simple_etl
version: 1.0.0
description: Simple ETL workflow with retry and timeout
variables:
  database_url: postgresql://localhost/mydb
  chunk_size: 1000
start_task: extract
tasks:
  extract:
    task_id: extract
    operator_type: task
    function: etl.extract_data
    result_key: raw_data
    dependencies: []
    metadata: {}
    
  transform:
    task_id: transform
    operator_type: task
    function: etl.transform_data
    args: ["{{raw_data}}"]
    result_key: transformed_data
    dependencies: ["extract"]
    retry_policy:
      max_retries: 3
      delay: PT10S
      backoff_factor: 2.0
    metadata: {}
    
  load:
    task_id: load
    operator_type: task
    function: etl.load_data
    args: ["{{transformed_data}}"]
    dependencies: ["transform"]
    timeout_policy:
      timeout: PT30M
      kill_on_timeout: true
    metadata: {}
    
  wait_next:
    task_id: wait_next
    operator_type: wait
    wait_for: "P1D"
    dependencies: ["load"]
    metadata: {}
    
  cleanup:
    task_id: cleanup
    operator_type: task
    function: etl.cleanup
    dependencies: ["wait_next"]
    metadata: {}

To load this YAML:

from workflow_dsl import Workflow

yaml_content = """
# ... (yaml content from above)
"""

workflow = Workflow.from_yaml(yaml_content)
print(workflow.name)

Development

Running Tests

To run the unit tests, navigate to the project root and execute:

pytest

Type Checking

To perform static type checking with MyPy:

mypy .

Project Structure

.highway/
├── workflow_dsl.py         # Core DSL definitions (Pydantic models)
├── example_usage.py        # Examples of how to use the DSL
├── tests/
│   ├── __init__.py
│   ├── conftest.py         # Pytest configuration
│   └── test_workflow_dsl.py # Unit and integration tests
├── pyproject.toml          # Project metadata and dependencies
├── README.md               # This file
└── SUMMARY.md              # Summary of changes and future instructions

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

highway_dsl-0.0.1.tar.gz (8.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

highway_dsl-0.0.1-py3-none-any.whl (6.5 kB view details)

Uploaded Python 3

File details

Details for the file highway_dsl-0.0.1.tar.gz.

File metadata

  • Download URL: highway_dsl-0.0.1.tar.gz
  • Upload date:
  • Size: 8.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.24

File hashes

Hashes for highway_dsl-0.0.1.tar.gz
Algorithm Hash digest
SHA256 09e2b42745476aaa745315253e2de6c7fc6173f10d68d8f29d7b5a5010a6eb98
MD5 ea6144c39caa7a3e6b3a333fc9b7f5a1
BLAKE2b-256 72773b925faaa1a3756f637de4a67b9b21e62d1a9bddf52506fc04fe3480ddc5

See more details on using hashes here.

File details

Details for the file highway_dsl-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: highway_dsl-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 6.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.24

File hashes

Hashes for highway_dsl-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 97608e6a653b9c741ce7877d09ed69e044f64ff0730da3ee571e12ee02f41c9f
MD5 2cb28b2aed913beeebb3378b49ef8a77
BLAKE2b-256 2c39c991ca37a467862e2160fbd1e730a2cd724e183806cd9f2f346e128e516b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page