Skip to main content

A stable domain specific language (DSL) for defining and managing data processing pipelines and workflow engines.

Project description

Highway DSL

PyPI version License: MIT Stable Publish to PyPI

Highway DSL is a Python-based domain-specific language for defining complex workflows in a clear, concise, and fluent manner. It is part of the larger Highway project, an advanced workflow engine capable of running complex DAG-based workflows.

Version 1.1.0 - Feature Release

This major feature release adds Airflow-parity features to enable production-grade workflows:

New Features

1. Scheduling Metadata (Airflow Parity)

Define cron-based schedules directly in your workflow:

builder = (
    WorkflowBuilder("daily_pipeline")
    .set_schedule("0 2 * * *")  # Run daily at 2 AM
    .set_start_date(datetime(2025, 1, 1))
    .set_catchup(False)
    .add_tags("production", "daily")
    .set_max_active_runs(1)
)

2. Event-Based Operators (Absurd Integration)

First-class support for event-driven workflows:

# Emit an event that other workflows can wait for
builder.emit_event(
    "notify_completion",
    event_name="pipeline_done",
    payload={"status": "success"}
)

# Wait for an external event
builder.wait_for_event(
    "wait_upstream",
    event_name="data_ready",
    timeout_seconds=3600
)

3. Callback Hooks (Production Workflows)

Durable success/failure handlers as first-class workflow nodes:

builder.task("risky_operation", "process.data")

builder.task("send_alert", "alerts.notify")
builder.on_failure("send_alert")  # Runs if risky_operation fails

builder.task("cleanup", "cleanup.resources")
builder.on_success("cleanup")  # Runs if risky_operation succeeds

4. Switch/Case Operator

Multi-branch routing with cleaner syntax than nested conditions:

builder.switch(
    "route_by_status",
    switch_on="{{data.status}}",
    cases={
        "approved": "approve_task",
        "rejected": "reject_task",
        "pending": "review_task"
    },
    default="unknown_handler"
)

5. Task Descriptions

Document your workflow inline:

builder.task(
    "process",
    "data.transform",
    description="Transform raw data into analytics format"
)

6. Workflow-Level Default Retry Policy

Set a default retry policy for all tasks:

builder.set_default_retry_policy(
    RetryPolicy(max_retries=3, delay=timedelta(seconds=60))
)

See examples/scheduled_event_workflow.py for a comprehensive example using all new features.

RFC-Style Specification

For implementers and advanced users, Highway DSL v1.1.0 includes a comprehensive 3,215-line RFC-style specification (spec.txt) modeled after IETF RFCs (RFC 2119, RFC 8259). This authoritative document provides:

  • Complete operator specifications with execution semantics
  • Integration guidance for Absurd and other runtimes
  • Security considerations and best practices
  • Comprehensive examples for all features
  • Formal data model definitions

Access the specification at /dsl/spec.txt in the repository.

Architecture Diagram

graph TB
    subgraph "Highway DSL v1.1.0 Features"
        A[WorkflowBuilder<br/>Fluent API] --> B[Core Operators]
        A --> C[Scheduling]
        A --> D[Events]
        A --> E[Error Handling]

        B --> B1[Task]
        B --> B2[Condition]
        B --> B3[Parallel]
        B --> B4[ForEach]
        B --> B5[While]
        B --> B6[Wait]
        B --> B7[Switch]

        C --> C1[Cron Schedules]
        C --> C2[Start Date]
        C --> C3[Catchup]
        C --> C4[Tags]

        D --> D1[EmitEvent]
        D --> D2[WaitForEvent]

        E --> E1[RetryPolicy]
        E --> E2[TimeoutPolicy]
        E --> E3[Callbacks]
    end

    subgraph "Output Formats"
        F[YAML]
        G[JSON]
    end

    subgraph "Runtime Integration"
        H[Absurd Runtime]
        I[Airflow]
        J[Temporal]
        K[Custom Engines]
    end

    A --> F
    A --> G
    F --> H
    F --> I
    F --> J
    F --> K
    G --> H
    G --> I
    G --> J
    G --> K

    style A fill:#2563eb,stroke:#1e40af,color:#fff
    style B fill:#8b5cf6,stroke:#7c3aed,color:#fff
    style C fill:#10b981,stroke:#059669,color:#fff
    style D fill:#f59e0b,stroke:#d97706,color:#fff
    style E fill:#ef4444,stroke:#dc2626,color:#fff

Features

  • Fluent API: A powerful and intuitive WorkflowBuilder for defining workflows programmatically.
  • Pydantic-based: All models are built on Pydantic, providing robust data validation, serialization, and documentation.
  • Rich Operators: A comprehensive set of operators for handling various workflow scenarios:
    • Task - Basic workflow steps
    • Condition (if/else) - Conditional branching
    • Parallel - Execute multiple branches simultaneously
    • ForEach - Iterate over collections with proper dependency management
    • Wait - Pause execution for scheduled tasks
    • While - Execute loops based on conditions
    • NEW in v1.1: EmitEvent - Emit events for cross-workflow coordination
    • NEW in v1.1: WaitForEvent - Wait for external events with timeout
    • NEW in v1.1: Switch - Multi-branch routing (switch/case)
  • Scheduling: Built-in support for cron-based scheduling, start dates, and catchup configuration
  • Event-Driven: First-class support for event emission and waiting (Absurd integration)
  • Callback Hooks: Durable success/failure handlers as workflow nodes
  • YAML/JSON Interoperability: Workflows can be defined in Python and exported to YAML or JSON, and vice-versa.
  • Retry and Timeout Policies: Built-in error handling and execution time management.
  • Extensible: The DSL is designed to be extensible with custom operators and policies.

Installation

pip install highway-dsl

Quick Start

Here's a simple example of how to define a workflow using the WorkflowBuilder:

from datetime import timedelta
from highway_dsl import WorkflowBuilder

workflow = (
    WorkflowBuilder("simple_etl")
    .task("extract", "etl.extract_data", result_key="raw_data")
    .task(
        "transform",
        "etl.transform_data",
        args=["{{raw_data}}"],
        result_key="transformed_data",
    )
    .retry(max_retries=3, delay=timedelta(seconds=10))
    .task("load", "etl.load_data", args=["{{transformed_data}}"])
    .timeout(timeout=timedelta(minutes=30))
    .wait("wait_next", timedelta(hours=24))
    .task("cleanup", "etl.cleanup")
    .build()
)

print(workflow.to_yaml())

Real-World Example: E-Commerce Order Processing

from highway_dsl import WorkflowBuilder, RetryPolicy
from datetime import datetime, timedelta

# Production-ready e-commerce order workflow
workflow = (
    WorkflowBuilder("order_processing")
    .set_schedule("*/5 * * * *")  # Run every 5 minutes
    .set_start_date(datetime(2025, 1, 1))
    .add_tags("production", "orders", "critical")
    .set_default_retry_policy(RetryPolicy(max_retries=3, delay=timedelta(seconds=30)))

    # Fetch pending orders
    .task("fetch_orders", "orders.get_pending", result_key="orders")

    # Process each order
    .foreach(
        "process_each_order",
        items="{{orders}}",
        loop_body=lambda b: (
            b.task("validate", "orders.validate", args=["{{item}}"])
            .task("charge_payment", "payments.charge", args=["{{item}}"],
                  result_key="payment_result")
            .task("send_failure_email", "email.send_failure",
                  args=["{{item.customer_email}}"])
            .on_failure("send_failure_email")  # Alert on payment failure
            .switch(
                "route_by_amount",
                switch_on="{{item.total}}",
                cases={
                    "high": "priority_shipping",  # > $500
                    "medium": "standard_shipping",  # $100-500
                    "low": "economy_shipping"  # < $100
                },
                default="standard_shipping"
            )
        )
    )

    # Emit completion event for analytics workflow
    .emit_event(
        "notify_analytics",
        event_name="orders_processed_{{ds}}",
        payload={"count": "{{orders.length}}", "timestamp": "{{run.started_at}}"}
    )

    .build()
)

This workflow demonstrates:

  • Scheduled execution every 5 minutes
  • Default retry policy for all tasks
  • ForEach loop processing multiple orders
  • Payment failure callbacks
  • Switch/case routing based on order amount
  • Event emission for cross-workflow coordination

Mermaid Diagram Generation

You can generate a Mermaid state diagram of your workflow using the to_mermaid method:

print(workflow.to_mermaid())

This will output a Mermaid diagram in the stateDiagram-v2 format, which can be used with a variety of tools to visualize your workflow.

Bank ETL Example

A more complex example of a bank's end-of-day ETL process can be found in examples/bank_end_of_the_day_etl_workflow.py.

A mermaid diagram of this workflow can be found here.

Advanced Usage

Conditional Logic

from highway_dsl import WorkflowBuilder, RetryPolicy
from datetime import timedelta

builder = WorkflowBuilder("data_processing_pipeline")

builder.task("start", "workflows.tasks.initialize", result_key="init_data")
builder.task(
    "validate",
    "workflows.tasks.validate_data",
    args=["{{init_data}}"],
    result_key="validated_data",
)

builder.condition(
    "check_quality",
    condition="{{validated_data.quality_score}} > 0.8",
    if_true=lambda b: b.task(
        "high_quality_processing",
        "workflows.tasks.advanced_processing",
        args=["{{validated_data}}"],
        retry_policy=RetryPolicy(max_retries=5, delay=timedelta(seconds=10), backoff_factor=2.0),
    ),
    if_false=lambda b: b.task(
        "standard_processing",
        "workflows.tasks.basic_processing",
        args=["{{validated_data}}"],
    ),
)

workflow = builder.build()

While Loops

from highway_dsl import WorkflowBuilder

builder = WorkflowBuilder("qa_rework_workflow")

builder.task("start_qa", "workflows.tasks.start_qa", result_key="qa_results")

builder.while_loop(
    "qa_rework_loop",
    condition="{{qa_results.status}} == 'failed'",
    loop_body=lambda b: b.task("perform_rework", "workflows.tasks.perform_rework").task(
        "re_run_qa", "workflows.tasks.run_qa", result_key="qa_results"
    ),
)

builder.task("finalize_product", "workflows.tasks.finalize_product", dependencies=["qa_rework_loop"])

workflow = builder.build()

For-Each Loops with Proper Dependency Management

Fixed bug where foreach loops were incorrectly inheriting dependencies from containing parallel operators:

# This loop now properly encapsulates its internal tasks
builder.foreach(
    "process_items",
    items="{{data.items}}",
    loop_body=lambda fb: fb.task("process_item", "processor.handle_item", args=["{{item.id}}"])
    # Loop body tasks only have proper dependencies, not unwanted "grandparent" dependencies
)

Retry Policies

from highway_dsl import RetryPolicy
from datetime import timedelta

builder.task(
    "reliable_task",
    "service.operation",
    retry_policy=RetryPolicy(
        max_retries=5,
        delay=timedelta(seconds=10),
        backoff_factor=2.0
    )
)

Timeout Policies

from highway_dsl import TimeoutPolicy
from datetime import timedelta

builder.task(
    "timed_task",
    "service.operation",
    timeout_policy=TimeoutPolicy(
        timeout=timedelta(hours=1),
        kill_on_timeout=True
    )
)

Version History

Version 1.1.0 - Feature Release (Current)

Airflow-Parity Features:

  • Scheduling metadata (cron, start_date, catchup, tags, max_active_runs)
  • Workflow-level default retry policy

Event-Driven Features:

  • EmitEventOperator for cross-workflow coordination
  • WaitForEventOperator with timeout support

Production Features:

  • Durable callback hooks (on_success, on_failure)
  • SwitchOperator for multi-branch routing
  • Task descriptions for documentation
  • RFC-style specification document (3,215 lines)

Version 1.0.3 - Stable Release

This is a stable release with important bug fixes and enhancements, including a critical fix for the ForEach operator dependency management issue.

Development

To set up the development environment:

git clone https://github.com/your-username/highway.git
cd highway/dsl
python -m venv .venv
source .venv/bin/activate
pip install -e .[dev]

Running Tests

pytest

Type Checking

mypy .

Documentation

  • README.md (this file) - Getting started and examples
  • spec.txt - RFC-style formal specification (3,215 lines)
  • examples/ - Comprehensive workflow examples

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

highway_dsl-1.2.0.tar.gz (30.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

highway_dsl-1.2.0-py3-none-any.whl (12.8 kB view details)

Uploaded Python 3

File details

Details for the file highway_dsl-1.2.0.tar.gz.

File metadata

  • Download URL: highway_dsl-1.2.0.tar.gz
  • Upload date:
  • Size: 30.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for highway_dsl-1.2.0.tar.gz
Algorithm Hash digest
SHA256 838f85164da54af0cf5bef2b2e33f3b77e4f92a6f2a35a460acb929ad0212da6
MD5 1e85ad3780bad35724a4e7b7136a720c
BLAKE2b-256 c5ebd8891aca6dd2947ca168c3ea4528113c6689eaa5ff90b94e90f42478b3e0

See more details on using hashes here.

File details

Details for the file highway_dsl-1.2.0-py3-none-any.whl.

File metadata

  • Download URL: highway_dsl-1.2.0-py3-none-any.whl
  • Upload date:
  • Size: 12.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for highway_dsl-1.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c7f2d8149778eb9213ffb99ffe5f02a3b04356061120147cd0ef7b1692d47d3f
MD5 a0960b3660cdd671fb1e9d8f9b7e3a29
BLAKE2b-256 0dcc0eb99d341bc8b1d2894acb197a02ef0ac49be0cec2938072e24ff5549762

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page