Skip to main content

A domain specific language (DSL) for defining and managing data processing pipelines.

Project description

Highway DSL

PyPI version License: MIT

Highway DSL is a Python-based domain-specific language for defining complex workflows in a clear, concise, and fluent manner. It is part of the larger Highway project, an advanced workflow engine capable of running complex DAG-based workflows.

Features

  • Fluent API: A powerful and intuitive WorkflowBuilder for defining workflows programmatically.
  • Pydantic-based: All models are built on Pydantic, providing robust data validation, serialization, and documentation.
  • Rich Operators: A comprehensive set of operators for handling various workflow scenarios:
    • Task
    • Condition (if/else)
    • Parallel
    • ForEach
    • Wait
    • While
  • YAML/JSON Interoperability: Workflows can be defined in Python and exported to YAML or JSON, and vice-versa.
  • Extensible: The DSL is designed to be extensible with custom operators and policies.

Installation

pip install highway-dsl

Quick Start

Here's a simple example of how to define a workflow using the WorkflowBuilder:

from datetime import timedelta
from highway_dsl import WorkflowBuilder

workflow = (
    WorkflowBuilder("simple_etl")
    .task("extract", "etl.extract_data", result_key="raw_data")
    .task(
        "transform",
        "etl.transform_data",
        args=["{{raw_data}}"],
        result_key="transformed_data",
    )
    .retry(max_retries=3, delay=timedelta(seconds=10))
    .task("load", "etl.load_data", args=["{{transformed_data}}"])
    .timeout(timeout=timedelta(minutes=30))
    .wait("wait_next", timedelta(hours=24))
    .task("cleanup", "etl.cleanup")
    .build()
)

print(workflow.to_yaml())

Advanced Usage

Conditional Logic

from highway_dsl import WorkflowBuilder, RetryPolicy
from datetime import timedelta

builder = WorkflowBuilder("data_processing_pipeline")

builder.task("start", "workflows.tasks.initialize", result_key="init_data")
builder.task(
    "validate",
    "workflows.tasks.validate_data",
    args=["{{init_data}}"],
    result_key="validated_data",
)

builder.condition(
    "check_quality",
    condition="{{validated_data.quality_score}} > 0.8",
    if_true=lambda b: b.task(
        "high_quality_processing",
        "workflows.tasks.advanced_processing",
        args=["{{validated_data}}"],
        retry_policy=RetryPolicy(max_retries=5, delay=timedelta(seconds=10), backoff_factor=2.0),
    ),
    if_false=lambda b: b.task(
        "standard_processing",
        "workflows.tasks.basic_processing",
        args=["{{validated_data}}"],
    ),
)

workflow = builder.build()

While Loops

from highway_dsl import WorkflowBuilder

builder = WorkflowBuilder("qa_rework_workflow")

builder.task("start_qa", "workflows.tasks.start_qa", result_key="qa_results")

builder.while_loop(
    "qa_rework_loop",
    condition="{{qa_results.status}} == 'failed'",
    loop_body=lambda b: b.task("perform_rework", "workflows.tasks.perform_rework").task(
        "re_run_qa", "workflows.tasks.run_qa", result_key="qa_results"
    ),
)

builder.task("finalize_product", "workflows.tasks.finalize_product", dependencies=["qa_rework_loop"])

workflow = builder.build()

Development

To set up the development environment:

git clone https://github.com/your-username/highway.git
cd highway
python -m venv .venv
source .venv/bin/activate
pip install -e .[dev]

Running Tests

pytest

Type Checking

mypy .

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

highway_dsl-0.0.3.tar.gz (8.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

highway_dsl-0.0.3-py3-none-any.whl (6.7 kB view details)

Uploaded Python 3

File details

Details for the file highway_dsl-0.0.3.tar.gz.

File metadata

  • Download URL: highway_dsl-0.0.3.tar.gz
  • Upload date:
  • Size: 8.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.24

File hashes

Hashes for highway_dsl-0.0.3.tar.gz
Algorithm Hash digest
SHA256 9298ffebea639a5ebbbd036fe27918c8350296e4856ad269d050b173c568ffcc
MD5 316e5cce1ed6f40dd053bcaf0ab7178c
BLAKE2b-256 3749dc1bcdbad845778815d5f5424295e11633f0bd60ff077b1c708a78654cc8

See more details on using hashes here.

File details

Details for the file highway_dsl-0.0.3-py3-none-any.whl.

File metadata

  • Download URL: highway_dsl-0.0.3-py3-none-any.whl
  • Upload date:
  • Size: 6.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.24

File hashes

Hashes for highway_dsl-0.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 9da87dbc7a91241fb602b54adeb88f00e3dc6b9620bc4ddb0b29afd9b0907b67
MD5 c84e101707e359d4bfb6d1909b3ce558
BLAKE2b-256 17ae9ccb5e324727e8b264268e2de0605948b1fb30ff469954c0ef87f1514467

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page