Skip to main content

A stable domain specific language (DSL) for defining and managing data processing pipelines and workflow engines.

Project description

Highway DSL

PyPI version License: MIT Stable Publish to PyPI

Highway DSL is a Python-based domain-specific language for defining complex workflows in a clear, concise, and fluent manner. It is part of the larger Highway project, an advanced workflow engine capable of running complex DAG-based workflows.

Version 1.3.0 - Critical Fixes & Enhancements

This release includes critical bug fixes, validation improvements, and developer experience enhancements:

New Features & Fixes

1. Universal Result Storage

All operators can now store results in workflow context (not just TaskOperator):

# Now works for all operator types
builder.wait("sleep_5s", wait_for=Duration.seconds(5), result_key="sleep_result")
builder.emit_event("notify", event_name="done", result_key="emit_result")
builder.wait_for_event("wait", event_name="start", result_key="event_data")

2. Callback Validation

Prevents runtime errors from typos in callback references:

builder.task("main", "func").on_success("handler_task")
workflow = builder.build()  # Now validates "handler_task" exists!

3. Duration Helper Class

Simplified time duration creation without datetime imports:

from highway_dsl import Duration

builder.wait("pause", wait_for=Duration.minutes(30))
builder.retry(delay=Duration.seconds(5))
builder.timeout(timeout=Duration.hours(1))

4. Workflow Metadata

Set workflow description and version programmatically:

builder = WorkflowBuilder("my_workflow", version="2.0.0")
builder.set_description("Production ETL pipeline")
builder.set_version("2.1.0")  # Update version later

Breaking Changes

  • Default workflow version changed from "1.1.0" to "1.3.0"
  • WaitOperator now serializes durations as ISO 8601 format (PT<seconds>S) instead of duration:<seconds>

Version 1.1.0 - Feature Release

This major feature release adds Airflow-parity features to enable production-grade workflows:

New Features

1. Scheduling Metadata (Airflow Parity)

Define cron-based schedules directly in your workflow:

builder = (
    WorkflowBuilder("daily_pipeline")
    .set_schedule("0 2 * * *")  # Run daily at 2 AM
    .set_start_date(datetime(2025, 1, 1))
    .set_catchup(False)
    .add_tags("production", "daily")
    .set_max_active_runs(1)
)

2. Event-Based Operators (Absurd Integration)

First-class support for event-driven workflows:

# Emit an event that other workflows can wait for
builder.emit_event(
    "notify_completion",
    event_name="pipeline_done",
    payload={"status": "success"}
)

# Wait for an external event
builder.wait_for_event(
    "wait_upstream",
    event_name="data_ready",
    timeout_seconds=3600
)

3. Callback Hooks (Production Workflows)

Durable success/failure handlers as first-class workflow nodes:

builder.task("risky_operation", "process.data")

builder.task("send_alert", "alerts.notify")
builder.on_failure("send_alert")  # Runs if risky_operation fails

builder.task("cleanup", "cleanup.resources")
builder.on_success("cleanup")  # Runs if risky_operation succeeds

4. Switch/Case Operator

Multi-branch routing with cleaner syntax than nested conditions:

builder.switch(
    "route_by_status",
    switch_on="{{data.status}}",
    cases={
        "approved": "approve_task",
        "rejected": "reject_task",
        "pending": "review_task"
    },
    default="unknown_handler"
)

5. Task Descriptions

Document your workflow inline:

builder.task(
    "process",
    "data.transform",
    description="Transform raw data into analytics format"
)

6. Workflow-Level Default Retry Policy

Set a default retry policy for all tasks:

builder.set_default_retry_policy(
    RetryPolicy(max_retries=3, delay=timedelta(seconds=60))
)

See examples/scheduled_event_workflow.py for a comprehensive example using all new features.

RFC-Style Specification

For implementers and advanced users, Highway DSL v1.1.0 includes a comprehensive 3,215-line RFC-style specification (spec.txt) modeled after IETF RFCs (RFC 2119, RFC 8259). This authoritative document provides:

  • Complete operator specifications with execution semantics
  • Integration guidance for Absurd and other runtimes
  • Security considerations and best practices
  • Comprehensive examples for all features
  • Formal data model definitions

Access the specification at /dsl/spec.txt in the repository.

Architecture Diagram

graph TB
    subgraph "Highway DSL v1.3.0 Features"
        A[WorkflowBuilder<br/>Fluent API] --> B[Core Operators]
        A --> C[Scheduling]
        A --> D[Events]
        A --> E[Error Handling]

        B --> B1[Task]
        B --> B2[Condition]
        B --> B3[Parallel]
        B --> B4[ForEach]
        B --> B5[While]
        B --> B6[Wait]
        B --> B7[Switch]

        C --> C1[Cron Schedules]
        C --> C2[Start Date]
        C --> C3[Catchup]
        C --> C4[Tags]

        D --> D1[EmitEvent]
        D --> D2[WaitForEvent]

        E --> E1[RetryPolicy]
        E --> E2[TimeoutPolicy]
        E --> E3[Callbacks]
    end

    subgraph "Output Formats"
        F[YAML]
        G[JSON]
    end

    subgraph "Runtime Integration"
        H[Absurd Runtime]
        I[Airflow]
        J[Temporal]
        K[Custom Engines]
    end

    A --> F
    A --> G
    F --> H
    F --> I
    F --> J
    F --> K
    G --> H
    G --> I
    G --> J
    G --> K

    style A fill:#2563eb,stroke:#1e40af,color:#fff
    style B fill:#8b5cf6,stroke:#7c3aed,color:#fff
    style C fill:#10b981,stroke:#059669,color:#fff
    style D fill:#f59e0b,stroke:#d97706,color:#fff
    style E fill:#ef4444,stroke:#dc2626,color:#fff

Features

  • Fluent API: A powerful and intuitive WorkflowBuilder for defining workflows programmatically.
  • Pydantic-based: All models are built on Pydantic, providing robust data validation, serialization, and documentation.
  • Rich Operators: A comprehensive set of operators for handling various workflow scenarios:
    • Task - Basic workflow steps
    • Condition (if/else) - Conditional branching
    • Parallel - Execute multiple branches simultaneously
    • ForEach - Iterate over collections with proper dependency management
    • Wait - Pause execution for scheduled tasks
    • While - Execute loops based on conditions
    • NEW in v1.1: EmitEvent - Emit events for cross-workflow coordination
    • NEW in v1.1: WaitForEvent - Wait for external events with timeout
    • NEW in v1.1: Switch - Multi-branch routing (switch/case)
  • Scheduling: Built-in support for cron-based scheduling, start dates, and catchup configuration
  • Event-Driven: First-class support for event emission and waiting (Absurd integration)
  • Callback Hooks: Durable success/failure handlers as workflow nodes
  • YAML/JSON Interoperability: Workflows can be defined in Python and exported to YAML or JSON, and vice-versa.
  • Retry and Timeout Policies: Built-in error handling and execution time management.
  • Extensible: The DSL is designed to be extensible with custom operators and policies.

Installation

pip install highway-dsl

Quick Start

Here's a simple example of how to define a workflow using the WorkflowBuilder:

from datetime import timedelta
from highway_dsl import WorkflowBuilder

workflow = (
    WorkflowBuilder("simple_etl")
    .task("extract", "etl.extract_data", result_key="raw_data")
    .task(
        "transform",
        "etl.transform_data",
        args=["{{raw_data}}"],
        result_key="transformed_data",
    )
    .retry(max_retries=3, delay=timedelta(seconds=10))
    .task("load", "etl.load_data", args=["{{transformed_data}}"])
    .timeout(timeout=timedelta(minutes=30))
    .wait("wait_next", timedelta(hours=24))
    .task("cleanup", "etl.cleanup")
    .build()
)

print(workflow.to_yaml())

Real-World Example: E-Commerce Order Processing

from highway_dsl import WorkflowBuilder, RetryPolicy
from datetime import datetime, timedelta

# Production-ready e-commerce order workflow
workflow = (
    WorkflowBuilder("order_processing")
    .set_schedule("*/5 * * * *")  # Run every 5 minutes
    .set_start_date(datetime(2025, 1, 1))
    .add_tags("production", "orders", "critical")
    .set_default_retry_policy(RetryPolicy(max_retries=3, delay=timedelta(seconds=30)))

    # Fetch pending orders
    .task("fetch_orders", "orders.get_pending", result_key="orders")

    # Process each order
    .foreach(
        "process_each_order",
        items="{{orders}}",
        loop_body=lambda b: (
            b.task("validate", "orders.validate", args=["{{item}}"])
            .task("charge_payment", "payments.charge", args=["{{item}}"],
                  result_key="payment_result")
            .task("send_failure_email", "email.send_failure",
                  args=["{{item.customer_email}}"])
            .on_failure("send_failure_email")  # Alert on payment failure
            .switch(
                "route_by_amount",
                switch_on="{{item.total}}",
                cases={
                    "high": "priority_shipping",  # > $500
                    "medium": "standard_shipping",  # $100-500
                    "low": "economy_shipping"  # < $100
                },
                default="standard_shipping"
            )
        )
    )

    # Emit completion event for analytics workflow
    .emit_event(
        "notify_analytics",
        event_name="orders_processed_{{ds}}",
        payload={"count": "{{orders.length}}", "timestamp": "{{run.started_at}}"}
    )

    .build()
)

This workflow demonstrates:

  • Scheduled execution every 5 minutes
  • Default retry policy for all tasks
  • ForEach loop processing multiple orders
  • Payment failure callbacks
  • Switch/case routing based on order amount
  • Event emission for cross-workflow coordination

Mermaid Diagram Generation

You can generate a Mermaid state diagram of your workflow using the to_mermaid method:

print(workflow.to_mermaid())

This will output a Mermaid diagram in the stateDiagram-v2 format, which can be used with a variety of tools to visualize your workflow.

Bank ETL Example

A more complex example of a bank's end-of-day ETL process can be found in examples/bank_end_of_the_day_etl_workflow.py.

A mermaid diagram of this workflow can be found here.

Advanced Usage

Conditional Logic

from highway_dsl import WorkflowBuilder, RetryPolicy
from datetime import timedelta

builder = WorkflowBuilder("data_processing_pipeline")

builder.task("start", "workflows.tasks.initialize", result_key="init_data")
builder.task(
    "validate",
    "workflows.tasks.validate_data",
    args=["{{init_data}}"],
    result_key="validated_data",
)

builder.condition(
    "check_quality",
    condition="{{validated_data.quality_score}} > 0.8",
    if_true=lambda b: b.task(
        "high_quality_processing",
        "workflows.tasks.advanced_processing",
        args=["{{validated_data}}"],
        retry_policy=RetryPolicy(max_retries=5, delay=timedelta(seconds=10), backoff_factor=2.0),
    ),
    if_false=lambda b: b.task(
        "standard_processing",
        "workflows.tasks.basic_processing",
        args=["{{validated_data}}"],
    ),
)

workflow = builder.build()

While Loops

from highway_dsl import WorkflowBuilder

builder = WorkflowBuilder("qa_rework_workflow")

builder.task("start_qa", "workflows.tasks.start_qa", result_key="qa_results")

builder.while_loop(
    "qa_rework_loop",
    condition="{{qa_results.status}} == 'failed'",
    loop_body=lambda b: b.task("perform_rework", "workflows.tasks.perform_rework").task(
        "re_run_qa", "workflows.tasks.run_qa", result_key="qa_results"
    ),
)

builder.task("finalize_product", "workflows.tasks.finalize_product", dependencies=["qa_rework_loop"])

workflow = builder.build()

For-Each Loops with Proper Dependency Management

Fixed bug where foreach loops were incorrectly inheriting dependencies from containing parallel operators:

# This loop now properly encapsulates its internal tasks
builder.foreach(
    "process_items",
    items="{{data.items}}",
    loop_body=lambda fb: fb.task("process_item", "processor.handle_item", args=["{{item.id}}"])
    # Loop body tasks only have proper dependencies, not unwanted "grandparent" dependencies
)

Retry Policies

from highway_dsl import RetryPolicy
from datetime import timedelta

builder.task(
    "reliable_task",
    "service.operation",
    retry_policy=RetryPolicy(
        max_retries=5,
        delay=timedelta(seconds=10),
        backoff_factor=2.0
    )
)

Timeout Policies

from highway_dsl import TimeoutPolicy
from datetime import timedelta

builder.task(
    "timed_task",
    "service.operation",
    timeout_policy=TimeoutPolicy(
        timeout=timedelta(hours=1),
        kill_on_timeout=True
    )
)

Version History

Version 1.3.0 - Critical Fixes & Enhancements (Current)

Critical Fixes:

  • Universal result_key support for all operators
  • Callback validation in build() prevents typos
  • ISO 8601 duration format for WaitOperator

Developer Experience:

  • Duration helper class for time durations
  • Workflow metadata setters (description, version)
  • Improved error messages

Version 1.1.0 - Feature Release

Airflow-Parity Features:

  • Scheduling metadata (cron, start_date, catchup, tags, max_active_runs)
  • Workflow-level default retry policy

Event-Driven Features:

  • EmitEventOperator for cross-workflow coordination
  • WaitForEventOperator with timeout support

Production Features:

  • Durable callback hooks (on_success, on_failure)
  • SwitchOperator for multi-branch routing
  • Task descriptions for documentation
  • RFC-style specification document (3,215 lines)

Version 1.0.3 - Stable Release

This is a stable release with important bug fixes and enhancements, including a critical fix for the ForEach operator dependency management issue.

Development

To set up the development environment:

git clone https://github.com/your-username/highway.git
cd highway/dsl
python -m venv .venv
source .venv/bin/activate
pip install -e .[dev]

Running Tests

pytest

Type Checking

mypy .

Documentation

  • README.md (this file) - Getting started and examples
  • spec.txt - RFC-style formal specification (3,215 lines)
  • examples/ - Comprehensive workflow examples

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

highway_dsl-1.4.0.tar.gz (32.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

highway_dsl-1.4.0-py3-none-any.whl (14.6 kB view details)

Uploaded Python 3

File details

Details for the file highway_dsl-1.4.0.tar.gz.

File metadata

  • Download URL: highway_dsl-1.4.0.tar.gz
  • Upload date:
  • Size: 32.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for highway_dsl-1.4.0.tar.gz
Algorithm Hash digest
SHA256 6831302e18e6bd5ccf56781d6bd512bbd889f00ffc8065f368a12011aff6baf5
MD5 9025991433e910a38b051b2535d2ffd5
BLAKE2b-256 4ba6004fa3c600d197c31a5328362363032bc8b9d95994b55d7eda5bd8c683bf

See more details on using hashes here.

File details

Details for the file highway_dsl-1.4.0-py3-none-any.whl.

File metadata

  • Download URL: highway_dsl-1.4.0-py3-none-any.whl
  • Upload date:
  • Size: 14.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for highway_dsl-1.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8e4c710123242299f8d13ad27ff30478bb0cadefc7a076f6cbd45e3aaef50b04
MD5 92fb99017b221ae95ce9391db9807a89
BLAKE2b-256 85fc1471f90ff05e67c41c622eafa0f65ea0e4384d1e3256890829143049bb3e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page