A domain specific language (DSL) for defining and managing data processing pipelines.
Project description
Highway DSL
Highway DSL is a Python-based Domain Specific Language (DSL) for defining and managing complex workflows. It allows users to declaratively specify tasks, dependencies, and execution parameters, supporting various control flow mechanisms like conditions, parallel execution, and retries.
Features
- Declarative Workflow Definition: Define workflows using a clear and concise Python API or through YAML/JSON configurations.
- Pydantic Models: Leverages Pydantic for robust data validation and serialization/deserialization of workflow definitions.
- Rich Task Types: Supports various operators including:
TaskOperator: Executes a Python function.ConditionOperator: Enables conditional branching based on expressions.WaitOperator: Pauses workflow execution for a specified duration or until a specific datetime.ParallelOperator: Executes multiple branches of tasks concurrently.ForEachOperator: Iterates over a collection, executing a chain of tasks for each item.
- Retry and Timeout Policies: Define retry strategies and timeout limits for individual tasks.
- Serialization/Deserialization: Seamless conversion of workflow definitions between Python objects, YAML, and JSON formats.
- Workflow Builder: A fluent API for constructing workflows programmatically.
Feature Overview
graph TD
A[Workflow] --> B{TaskOperator};
A --> C{ConditionOperator};
A --> D{WaitOperator};
A --> E{ParallelOperator};
A --> F{ForEachOperator};
B --> G[Executes Python Function];
C --> H{If/Else Branching};
D --> I[Pauses Execution];
E --> J[Concurrent Branches];
F --> K[Iterates Over Items];
subgraph Policies
B --> L[RetryPolicy];
B --> M[TimeoutPolicy];
end
Installation
To install Highway DSL, you can use pip:
pip install highway-dsl
If you want to install it for development, including testing dependencies:
pip install "highway-dsl[dev]"
Usage
Defining a Simple Workflow
from datetime import timedelta
from highway_dsl import WorkflowBuilder
def demonstrate_basic_workflow():
"""Show a simple complete workflow using just the builder"""
workflow = (
WorkflowBuilder("simple_etl")
.task("extract", "etl.extract_data", result_key="raw_data")
.task(
"transform",
"etl.transform_data",
args=["{{raw_data}}"],
result_key="transformed_data",
)
.retry(max_retries=3, delay=timedelta(seconds=10))
.task("load", "etl.load_data", args=["{{transformed_data}}"])
.timeout(timeout=timedelta(minutes=30))
.wait("wait_next", timedelta(hours=24))
.task("cleanup", "etl.cleanup")
.build()
)
workflow.set_variables(
{"database_url": "postgresql://localhost/mydb", "chunk_size": 1000}
)
return workflow
if __name__ == "__main__":
basic_workflow = demonstrate_basic_workflow()
print(basic_workflow.to_yaml())
Defining a Complex Workflow
Refer to example_usage.py for a more complex example demonstrating conditional logic, parallel execution, and iteration.
YAML Configuration
You can also define workflows directly in YAML:
name: simple_etl
version: 1.0.0
description: Simple ETL workflow with retry and timeout
variables:
database_url: postgresql://localhost/mydb
chunk_size: 1000
start_task: extract
tasks:
extract:
task_id: extract
operator_type: task
function: etl.extract_data
result_key: raw_data
dependencies: []
metadata: {}
transform:
task_id: transform
operator_type: task
function: etl.transform_data
args: ["{{raw_data}}"]
result_key: transformed_data
dependencies: ["extract"]
retry_policy:
max_retries: 3
delay: PT10S
backoff_factor: 2.0
metadata: {}
load:
task_id: load
operator_type: task
function: etl.load_data
args: ["{{transformed_data}}"]
dependencies: ["transform"]
timeout_policy:
timeout: PT30M
kill_on_timeout: true
metadata: {}
wait_next:
task_id: wait_next
operator_type: wait
wait_for: "P1D"
dependencies: ["load"]
metadata: {}
cleanup:
task_id: cleanup
operator_type: task
function: etl.cleanup
dependencies: ["wait_next"]
metadata: {}
To load this YAML:
from highway_dsl import Workflow
yaml_content = """
# ... (yaml content from above)
"""
workflow = Workflow.from_yaml(yaml_content)
print(workflow.name)
Development
Running Tests
To run the unit tests, navigate to the project root and execute:
pytest
Type Checking
To perform static type checking with MyPy:
mypy .
Project Structure
.highway/
├── highway_dsl/
│ ├── __init__.py # Exposes the public API
│ └── workflow_dsl.py # Core DSL definitions (Pydantic models)
├── example_usage.py # Examples of how to use the DSL
├── tests/
│ ├── __init__.py
│ ├── conftest.py # Pytest configuration
│ └── test_workflow_dsl.py # Unit and integration tests
├── pyproject.toml # Project metadata and dependencies
├── README.md # This file
└── SUMMARY.md # Summary of changes and future instructions
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file highway_dsl-0.0.2.tar.gz.
File metadata
- Download URL: highway_dsl-0.0.2.tar.gz
- Upload date:
- Size: 9.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.24
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f0ca8cd51d5c70282af34260929e3ed5058e252ed2e333a89be83c284d8b1db3
|
|
| MD5 |
076b74f2e3875592e2b719bbaa74a17c
|
|
| BLAKE2b-256 |
216f15715cdfe6bf67d4c17d4f0536c734a825b947a6c78a4f67c4640e2ed30f
|
File details
Details for the file highway_dsl-0.0.2-py3-none-any.whl.
File metadata
- Download URL: highway_dsl-0.0.2-py3-none-any.whl
- Upload date:
- Size: 6.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.24
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f7a2721bbd45a73d4d32e2356d05a4f6517173f857b23f1d0f6dbefe61601d84
|
|
| MD5 |
1e911e72b22ccc326b204d487579bee8
|
|
| BLAKE2b-256 |
c20347b58512b166fa2d50408847d58e6d8b49c3442dade2eae95fe44bc512ff
|