A stable domain specific language (DSL) for defining and managing data processing pipelines and workflow engines.
Project description
Highway DSL
Highway DSL is a Python-based domain-specific language for defining complex workflows in a clear, concise, and fluent manner. It is part of the larger Highway project, an advanced workflow engine capable of running complex DAG-based workflows.
Version 1.3.0 - Critical Fixes & Enhancements
This release includes critical bug fixes, validation improvements, and developer experience enhancements:
New Features & Fixes
1. Universal Result Storage
All operators can now store results in workflow context (not just TaskOperator):
# Now works for all operator types
builder.wait("sleep_5s", wait_for=Duration.seconds(5), result_key="sleep_result")
builder.emit_event("notify", event_name="done", result_key="emit_result")
builder.wait_for_event("wait", event_name="start", result_key="event_data")
2. Callback Validation
Prevents runtime errors from typos in callback references:
builder.task("main", "func").on_success("handler_task")
workflow = builder.build() # Now validates "handler_task" exists!
3. Duration Helper Class
Simplified time duration creation without datetime imports:
from highway_dsl import Duration
builder.wait("pause", wait_for=Duration.minutes(30))
builder.retry(delay=Duration.seconds(5))
builder.timeout(timeout=Duration.hours(1))
4. Workflow Metadata
Set workflow description and version programmatically:
builder = WorkflowBuilder("my_workflow", version="2.0.0")
builder.set_description("Production ETL pipeline")
builder.set_version("2.1.0") # Update version later
Breaking Changes
- Default workflow version changed from "1.1.0" to "1.3.0"
- WaitOperator now serializes durations as ISO 8601 format (
PT<seconds>S) instead ofduration:<seconds>
Version 1.1.0 - Feature Release
This major feature release adds Airflow-parity features to enable production-grade workflows:
New Features
1. Scheduling Metadata (Airflow Parity)
Define cron-based schedules directly in your workflow:
builder = (
WorkflowBuilder("daily_pipeline")
.set_schedule("0 2 * * *") # Run daily at 2 AM
.set_start_date(datetime(2025, 1, 1))
.set_catchup(False)
.add_tags("production", "daily")
.set_max_active_runs(1)
)
2. Event-Based Operators (Absurd Integration)
First-class support for event-driven workflows:
# Emit an event that other workflows can wait for
builder.emit_event(
"notify_completion",
event_name="pipeline_done",
payload={"status": "success"}
)
# Wait for an external event
builder.wait_for_event(
"wait_upstream",
event_name="data_ready",
timeout_seconds=3600
)
3. Callback Hooks (Production Workflows)
Durable success/failure handlers as first-class workflow nodes:
builder.task("risky_operation", "process.data")
builder.task("send_alert", "alerts.notify")
builder.on_failure("send_alert") # Runs if risky_operation fails
builder.task("cleanup", "cleanup.resources")
builder.on_success("cleanup") # Runs if risky_operation succeeds
4. Switch/Case Operator
Multi-branch routing with cleaner syntax than nested conditions:
builder.switch(
"route_by_status",
switch_on="{{data.status}}",
cases={
"approved": "approve_task",
"rejected": "reject_task",
"pending": "review_task"
},
default="unknown_handler"
)
5. Task Descriptions
Document your workflow inline:
builder.task(
"process",
"data.transform",
description="Transform raw data into analytics format"
)
6. Workflow-Level Default Retry Policy
Set a default retry policy for all tasks:
builder.set_default_retry_policy(
RetryPolicy(max_retries=3, delay=timedelta(seconds=60))
)
See examples/scheduled_event_workflow.py for a comprehensive example using all new features.
RFC-Style Specification
For implementers and advanced users, Highway DSL v1.1.0 includes a comprehensive 3,215-line RFC-style specification (spec.txt) modeled after IETF RFCs (RFC 2119, RFC 8259). This authoritative document provides:
- Complete operator specifications with execution semantics
- Integration guidance for Absurd and other runtimes
- Security considerations and best practices
- Comprehensive examples for all features
- Formal data model definitions
Access the specification at /dsl/spec.txt in the repository.
Architecture Diagram
graph TB
subgraph "Highway DSL v1.3.0 Features"
A[WorkflowBuilder<br/>Fluent API] --> B[Core Operators]
A --> C[Scheduling]
A --> D[Events]
A --> E[Error Handling]
B --> B1[Task]
B --> B2[Condition]
B --> B3[Parallel]
B --> B4[ForEach]
B --> B5[While]
B --> B6[Wait]
B --> B7[Switch]
C --> C1[Cron Schedules]
C --> C2[Start Date]
C --> C3[Catchup]
C --> C4[Tags]
D --> D1[EmitEvent]
D --> D2[WaitForEvent]
E --> E1[RetryPolicy]
E --> E2[TimeoutPolicy]
E --> E3[Callbacks]
end
subgraph "Output Formats"
F[YAML]
G[JSON]
end
subgraph "Runtime Integration"
H[Absurd Runtime]
I[Airflow]
J[Temporal]
K[Custom Engines]
end
A --> F
A --> G
F --> H
F --> I
F --> J
F --> K
G --> H
G --> I
G --> J
G --> K
style A fill:#2563eb,stroke:#1e40af,color:#fff
style B fill:#8b5cf6,stroke:#7c3aed,color:#fff
style C fill:#10b981,stroke:#059669,color:#fff
style D fill:#f59e0b,stroke:#d97706,color:#fff
style E fill:#ef4444,stroke:#dc2626,color:#fff
Features
- Fluent API: A powerful and intuitive
WorkflowBuilderfor defining workflows programmatically. - Pydantic-based: All models are built on Pydantic, providing robust data validation, serialization, and documentation.
- Rich Operators: A comprehensive set of operators for handling various workflow scenarios:
Task- Basic workflow stepsCondition(if/else) - Conditional branchingParallel- Execute multiple branches simultaneouslyForEach- Iterate over collections with proper dependency managementWait- Pause execution for scheduled tasksWhile- Execute loops based on conditions- NEW in v1.1:
EmitEvent- Emit events for cross-workflow coordination - NEW in v1.1:
WaitForEvent- Wait for external events with timeout - NEW in v1.1:
Switch- Multi-branch routing (switch/case)
- Scheduling: Built-in support for cron-based scheduling, start dates, and catchup configuration
- Event-Driven: First-class support for event emission and waiting (Absurd integration)
- Callback Hooks: Durable success/failure handlers as workflow nodes
- YAML/JSON Interoperability: Workflows can be defined in Python and exported to YAML or JSON, and vice-versa.
- Retry and Timeout Policies: Built-in error handling and execution time management.
- Extensible: The DSL is designed to be extensible with custom operators and policies.
Installation
pip install highway-dsl
Quick Start
Here's a simple example of how to define a workflow using the WorkflowBuilder:
from datetime import timedelta
from highway_dsl import WorkflowBuilder
workflow = (
WorkflowBuilder("simple_etl")
.task("extract", "etl.extract_data", result_key="raw_data")
.task(
"transform",
"etl.transform_data",
args=["{{raw_data}}"],
result_key="transformed_data",
)
.retry(max_retries=3, delay=timedelta(seconds=10))
.task("load", "etl.load_data", args=["{{transformed_data}}"])
.timeout(timeout=timedelta(minutes=30))
.wait("wait_next", timedelta(hours=24))
.task("cleanup", "etl.cleanup")
.build()
)
print(workflow.to_yaml())
Real-World Example: E-Commerce Order Processing
from highway_dsl import WorkflowBuilder, RetryPolicy
from datetime import datetime, timedelta
# Production-ready e-commerce order workflow
workflow = (
WorkflowBuilder("order_processing")
.set_schedule("*/5 * * * *") # Run every 5 minutes
.set_start_date(datetime(2025, 1, 1))
.add_tags("production", "orders", "critical")
.set_default_retry_policy(RetryPolicy(max_retries=3, delay=timedelta(seconds=30)))
# Fetch pending orders
.task("fetch_orders", "orders.get_pending", result_key="orders")
# Process each order
.foreach(
"process_each_order",
items="{{orders}}",
loop_body=lambda b: (
b.task("validate", "orders.validate", args=["{{item}}"])
.task("charge_payment", "payments.charge", args=["{{item}}"],
result_key="payment_result")
.task("send_failure_email", "email.send_failure",
args=["{{item.customer_email}}"])
.on_failure("send_failure_email") # Alert on payment failure
.switch(
"route_by_amount",
switch_on="{{item.total}}",
cases={
"high": "priority_shipping", # > $500
"medium": "standard_shipping", # $100-500
"low": "economy_shipping" # < $100
},
default="standard_shipping"
)
)
)
# Emit completion event for analytics workflow
.emit_event(
"notify_analytics",
event_name="orders_processed_{{ds}}",
payload={"count": "{{orders.length}}", "timestamp": "{{run.started_at}}"}
)
.build()
)
This workflow demonstrates:
- Scheduled execution every 5 minutes
- Default retry policy for all tasks
- ForEach loop processing multiple orders
- Payment failure callbacks
- Switch/case routing based on order amount
- Event emission for cross-workflow coordination
Mermaid Diagram Generation
You can generate a Mermaid state diagram of your workflow using the to_mermaid method:
print(workflow.to_mermaid())
This will output a Mermaid diagram in the stateDiagram-v2 format, which can be used with a variety of tools to visualize your workflow.
Bank ETL Example
A more complex example of a bank's end-of-day ETL process can be found in examples/bank_end_of_the_day_etl_workflow.py.
A mermaid diagram of this workflow can be found here.
Advanced Usage
Conditional Logic
from highway_dsl import WorkflowBuilder, RetryPolicy
from datetime import timedelta
builder = WorkflowBuilder("data_processing_pipeline")
builder.task("start", "workflows.tasks.initialize", result_key="init_data")
builder.task(
"validate",
"workflows.tasks.validate_data",
args=["{{init_data}}"],
result_key="validated_data",
)
builder.condition(
"check_quality",
condition="{{validated_data.quality_score}} > 0.8",
if_true=lambda b: b.task(
"high_quality_processing",
"workflows.tasks.advanced_processing",
args=["{{validated_data}}"],
retry_policy=RetryPolicy(max_retries=5, delay=timedelta(seconds=10), backoff_factor=2.0),
),
if_false=lambda b: b.task(
"standard_processing",
"workflows.tasks.basic_processing",
args=["{{validated_data}}"],
),
)
workflow = builder.build()
While Loops
from highway_dsl import WorkflowBuilder
builder = WorkflowBuilder("qa_rework_workflow")
builder.task("start_qa", "workflows.tasks.start_qa", result_key="qa_results")
builder.while_loop(
"qa_rework_loop",
condition="{{qa_results.status}} == 'failed'",
loop_body=lambda b: b.task("perform_rework", "workflows.tasks.perform_rework").task(
"re_run_qa", "workflows.tasks.run_qa", result_key="qa_results"
),
)
builder.task("finalize_product", "workflows.tasks.finalize_product", dependencies=["qa_rework_loop"])
workflow = builder.build()
For-Each Loops with Proper Dependency Management
Fixed bug where foreach loops were incorrectly inheriting dependencies from containing parallel operators:
# This loop now properly encapsulates its internal tasks
builder.foreach(
"process_items",
items="{{data.items}}",
loop_body=lambda fb: fb.task("process_item", "processor.handle_item", args=["{{item.id}}"])
# Loop body tasks only have proper dependencies, not unwanted "grandparent" dependencies
)
Retry Policies
from highway_dsl import RetryPolicy
from datetime import timedelta
builder.task(
"reliable_task",
"service.operation",
retry_policy=RetryPolicy(
max_retries=5,
delay=timedelta(seconds=10),
backoff_factor=2.0
)
)
Timeout Policies
from highway_dsl import TimeoutPolicy
from datetime import timedelta
builder.task(
"timed_task",
"service.operation",
timeout_policy=TimeoutPolicy(
timeout=timedelta(hours=1),
kill_on_timeout=True
)
)
Version History
Version 1.3.0 - Critical Fixes & Enhancements (Current)
Critical Fixes:
- Universal result_key support for all operators
- Callback validation in build() prevents typos
- ISO 8601 duration format for WaitOperator
Developer Experience:
- Duration helper class for time durations
- Workflow metadata setters (description, version)
- Improved error messages
Version 1.1.0 - Feature Release
Airflow-Parity Features:
- Scheduling metadata (cron, start_date, catchup, tags, max_active_runs)
- Workflow-level default retry policy
Event-Driven Features:
- EmitEventOperator for cross-workflow coordination
- WaitForEventOperator with timeout support
Production Features:
- Durable callback hooks (on_success, on_failure)
- SwitchOperator for multi-branch routing
- Task descriptions for documentation
- RFC-style specification document (3,215 lines)
Version 1.0.3 - Stable Release
This is a stable release with important bug fixes and enhancements, including a critical fix for the ForEach operator dependency management issue.
Development
To set up the development environment:
git clone https://github.com/your-username/highway.git
cd highway/dsl
python -m venv .venv
source .venv/bin/activate
pip install -e .[dev]
Running Tests
pytest
Type Checking
mypy .
Documentation
- README.md (this file) - Getting started and examples
- spec.txt - RFC-style formal specification (3,215 lines)
- examples/ - Comprehensive workflow examples
License
MIT License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file highway_dsl-1.4.0.tar.gz.
File metadata
- Download URL: highway_dsl-1.4.0.tar.gz
- Upload date:
- Size: 32.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6831302e18e6bd5ccf56781d6bd512bbd889f00ffc8065f368a12011aff6baf5
|
|
| MD5 |
9025991433e910a38b051b2535d2ffd5
|
|
| BLAKE2b-256 |
4ba6004fa3c600d197c31a5328362363032bc8b9d95994b55d7eda5bd8c683bf
|
File details
Details for the file highway_dsl-1.4.0-py3-none-any.whl.
File metadata
- Download URL: highway_dsl-1.4.0-py3-none-any.whl
- Upload date:
- Size: 14.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8e4c710123242299f8d13ad27ff30478bb0cadefc7a076f6cbd45e3aaef50b04
|
|
| MD5 |
92fb99017b221ae95ce9391db9807a89
|
|
| BLAKE2b-256 |
85fc1471f90ff05e67c41c622eafa0f65ea0e4384d1e3256890829143049bb3e
|