Skip to main content

A stable domain specific language (DSL) for defining and managing data processing pipelines and workflow engines.

Project description

Highway DSL

PyPI version License: MIT Stable

Highway DSL is a stable, Python-based domain-specific language for defining complex workflows in a clear, concise, and fluent manner. It is part of the larger Highway project, an advanced workflow engine capable of running complex DAG-based workflows.

Version 1.0.2 - Stable Release

This is a stable release with important bug fixes and enhancements, including a critical fix for the ForEach operator dependency management issue.

Features

  • Fluent API: A powerful and intuitive WorkflowBuilder for defining workflows programmatically.
  • Pydantic-based: All models are built on Pydantic, providing robust data validation, serialization, and documentation.
  • Rich Operators: A comprehensive set of operators for handling various workflow scenarios:
    • Task - Basic workflow steps
    • Condition (if/else) - Conditional branching
    • Parallel - Execute multiple branches simultaneously
    • ForEach - Iterate over collections with proper dependency management
    • Wait - Pause execution for scheduled tasks
    • While - Execute loops based on conditions
  • Fixed ForEach Bug: Proper encapsulation of loop body tasks to prevent unwanted "grandparent" dependencies from containing parallel operators.
  • YAML/JSON Interoperability: Workflows can be defined in Python and exported to YAML or JSON, and vice-versa.
  • Retry and Timeout Policies: Built-in error handling and execution time management.
  • Extensible: The DSL is designed to be extensible with custom operators and policies.

Installation

pip install highway-dsl

Quick Start

Here's a simple example of how to define a workflow using the WorkflowBuilder:

from datetime import timedelta
from highway_dsl import WorkflowBuilder

workflow = (
    WorkflowBuilder("simple_etl")
    .task("extract", "etl.extract_data", result_key="raw_data")
    .task(
        "transform",
        "etl.transform_data",
        args=["{{raw_data}}"],
        result_key="transformed_data",
    )
    .retry(max_retries=3, delay=timedelta(seconds=10))
    .task("load", "etl.load_data", args=["{{transformed_data}}"])
    .timeout(timeout=timedelta(minutes=30))
    .wait("wait_next", timedelta(hours=24))
    .task("cleanup", "etl.cleanup")
    .build()
)

print(workflow.to_yaml())

Advanced Usage

Conditional Logic

from highway_dsl import WorkflowBuilder, RetryPolicy
from datetime import timedelta

builder = WorkflowBuilder("data_processing_pipeline")

builder.task("start", "workflows.tasks.initialize", result_key="init_data")
builder.task(
    "validate",
    "workflows.tasks.validate_data",
    args=["{{init_data}}"],
    result_key="validated_data",
)

builder.condition(
    "check_quality",
    condition="{{validated_data.quality_score}} > 0.8",
    if_true=lambda b: b.task(
        "high_quality_processing",
        "workflows.tasks.advanced_processing",
        args=["{{validated_data}}"],
        retry_policy=RetryPolicy(max_retries=5, delay=timedelta(seconds=10), backoff_factor=2.0),
    ),
    if_false=lambda b: b.task(
        "standard_processing",
        "workflows.tasks.basic_processing",
        args=["{{validated_data}}"],
    ),
)

workflow = builder.build()

While Loops

from highway_dsl import WorkflowBuilder

builder = WorkflowBuilder("qa_rework_workflow")

builder.task("start_qa", "workflows.tasks.start_qa", result_key="qa_results")

builder.while_loop(
    "qa_rework_loop",
    condition="{{qa_results.status}} == 'failed'",
    loop_body=lambda b: b.task("perform_rework", "workflows.tasks.perform_rework").task(
        "re_run_qa", "workflows.tasks.run_qa", result_key="qa_results"
    ),
)

builder.task("finalize_product", "workflows.tasks.finalize_product", dependencies=["qa_rework_loop"])

workflow = builder.build()

For-Each Loops with Proper Dependency Management

Fixed bug where foreach loops were incorrectly inheriting dependencies from containing parallel operators:

# This loop now properly encapsulates its internal tasks
builder.foreach(
    "process_items",
    items="{{data.items}}",
    loop_body=lambda fb: fb.task("process_item", "processor.handle_item", args=["{{item.id}}"])
    # Loop body tasks only have proper dependencies, not unwanted "grandparent" dependencies
)

Retry Policies

from highway_dsl import RetryPolicy
from datetime import timedelta

builder.task(
    "reliable_task",
    "service.operation",
    retry_policy=RetryPolicy(
        max_retries=5,
        delay=timedelta(seconds=10),
        backoff_factor=2.0
    )
)

Timeout Policies

from highway_dsl import TimeoutPolicy
from datetime import timedelta

builder.task(
    "timed_task",
    "service.operation",
    timeout_policy=TimeoutPolicy(
        timeout=timedelta(hours=1),
        kill_on_timeout=True
    )
)

What's New in Version 1.0.2

Bug Fixes

  • Fixed ForEach Operator Bug: Resolved issue where foreach loops were incorrectly getting "grandparent" dependencies from containing parallel operators. Loop body tasks are now properly encapsulated and only depend on their parent loop operator and internal chain dependencies.

Enhancements

  • Improved Loop Dependency Management: While loops and ForEach loops now properly encapsulate their internal dependencies without being affected by containing parallel operators.
  • Better Error Handling: Enhanced error handling throughout the DSL.
  • Comprehensive Test Suite: Added functional tests for all example workflows to ensure consistency.

Development

To set up the development environment:

git clone https://github.com/your-username/highway.git
cd highway
python -m venv .venv
source .venv/bin/activate
pip install -e .[dev]

Running Tests

pytest

Type Checking

mypy .

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

highway_dsl-1.0.2.tar.gz (19.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

highway_dsl-1.0.2-py3-none-any.whl (8.0 kB view details)

Uploaded Python 3

File details

Details for the file highway_dsl-1.0.2.tar.gz.

File metadata

  • Download URL: highway_dsl-1.0.2.tar.gz
  • Upload date:
  • Size: 19.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for highway_dsl-1.0.2.tar.gz
Algorithm Hash digest
SHA256 bdd8e378ad4eaf70c6d7ee4dbbbb0c1baa1dfaac1f8761bfd16a408080b3f0af
MD5 fd7e2c91607f9a867f61c7105797ed47
BLAKE2b-256 55a2da092e65115daace1079d8a05dbec2f29f21aff6050ba72f8fe59eb38b41

See more details on using hashes here.

File details

Details for the file highway_dsl-1.0.2-py3-none-any.whl.

File metadata

  • Download URL: highway_dsl-1.0.2-py3-none-any.whl
  • Upload date:
  • Size: 8.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for highway_dsl-1.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 3eb5d751a6f79110da0b8ef4d9856e718c06481e37f7a845b9ddc781c4c63ec6
MD5 004f0cb6f27ec29126ca5ff9c61e329e
BLAKE2b-256 12c21e443e9ff7634b4d88546f0fd9669dcf49c0b3369db261b6c8420af0ccd7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page