Skip to main content

Python SDK for Highway Workflow Engine - build durable workflows with simple decorators

Project description

Python SDK for Highway Workflow Engine. Build durable rock solid workflows with ease.

Highway Workflow Engine UI

Installation

pip install highway

Quick Start

from highway import Driver

driver = Driver()  # Uses HIGHWAY_API_KEY env var

@driver.task(shell=True)
def backup_db():
    return "pg_dump mydb > backup.sql"

@driver.task(py=True, depends=["backup_db"])
def verify():
    import os
    return {"exists": os.path.exists("backup.sql")}

result = driver.run()
print(result.status)  # "completed"

Configuration

Set environment variables:

export HIGHWAY_API_KEY="hw_k1_..."
export HIGHWAY_API_ENDPOINT="https://highway.solutions"

Or pass directly:

driver = Driver(
    api_key="hw_k1_...",
    endpoint="https://highway.solutions"
)

Task Types

Shell Tasks

Execute shell commands:

@driver.task(shell=True)
def list_files():
    return "ls -la /tmp"

Python Tasks

Execute Python code in Highway’s sandboxed environment:

@driver.task(py=True)
def compute():
    import math
    return {"factorial": math.factorial(10)}

HTTP Tasks

Make HTTP requests:

@driver.task(http=True)
def call_api():
    return {
        "url": "https://<organisation>.highway.solutions/webhook",
        "method": "POST",
        "json": {"status": "done"},
    }

Generic Tool Tasks

Call any Highway tool directly:

@driver.task(tool="tools.llm.call")
def summarize():
    return {
        "provider": "ollama",
        "model": "qwen3-vl:235b-instruct-cloud",
        "prompt": "Summarize: {{backup_result.stdout}}",
        "temperature": 0.7
    }

@driver.task(tool="tools.database.query")
def query_users():
    return {
        "connection_string": "vault:db/postgres",
        "query": "SELECT * FROM users"
    }

LLM Agentic Workflows

Build multi-step LLM workflows using tools.llm.call:

@driver.task(http=True)
def fetch_data():
    return {"url": "https://api.example.com/data", "method": "GET"}

@driver.task(tool="tools.llm.call", depends=["fetch_data"])
def analyze():
    return {
        "provider": "ollama",  # or "openai", "anthropic"
        "model": "qwen3-vl:235b-instruct-cloud",
        "prompt": "Analyze: {{fetch_data_result.body}}",
        "temperature": 0.3
    }

@driver.task(tool="tools.llm.call", depends=["analyze"])
def summarize():
    return {
        "provider": "ollama",
        "model": "qwen3-vl:235b-instruct-cloud",
        "prompt": "Summarize: {{analyze_result.response}}"
    }

Workflow Execution

Execute other workflows:

@driver.task(workflow="daily_report")
def run_report():
    return {"inputs": {"date": "2024-01-01"}}

@driver.task(workflow_id="uuid-here")
def run_specific_version():
    return {"inputs": {"mode": "production"}}

Dependencies

Tasks can depend on other tasks:

@driver.task(shell=True)
def step_1():
    return "echo 'Step 1'"

@driver.task(shell=True, depends=["step_1"])
def step_2():
    return "echo 'Step 2'"

@driver.task(shell=True, depends=["step_2"])
def step_3():
    return "echo 'Step 3'"

Workflow Inputs

Pass variables to workflows:

result = driver.run(
    inputs={"email": "user@example.com", "env": "prod"},
    timeout=300
)

Access inputs in tasks via {{inputs.key}} syntax.

Retry Configuration

Configure automatic retries with exponential backoff:

@driver.task(http=True, retries=3, retry_delay=2.0, backoff=2.0)
def call_flaky_api():
    return {"url": "https://<organisation>.highway.solutions/endpoint"}

Durable Delays

Use Highway’s native WaitOperator for delays that consume zero worker resources:

from datetime import timedelta

@driver.task(shell=True, delay=timedelta(hours=2))
def delayed_task():
    return "echo 'Runs after 2 hour delay'"

Scheduling

Schedule recurring workflows:

@driver.task(shell=True, schedule="0 * * * *")  # Every hour
def hourly_backup():
    return "pg_dump mydb > backup.sql"

@driver.task(shell=True, schedule=timedelta(minutes=30))
def interval_check():
    return "curl https://<organisation>.highway.solutions/health"

Observability

Track running workflows:

# Non-blocking submit
result = driver.run(wait=False)
run_id = result.run_id

# Check status
status = driver.status(run_id)
print(status.state)

# Get logs
logs = driver.logs(run_id)

# Cancel
driver.cancel(run_id)

Long-Running Workflows

For workflows that take minutes to complete, submit and check status later:

# Submit and exit immediately
result = driver.run(wait=False, timeout=600)
print(f"Run ID: {result.run_id}")
# Driver can exit, workflow continues on Highway

# Later, check status
status = driver.status(result.run_id)
if status.status == "completed":
    print("Workflow finished!")

For tasks longer than 30 seconds, set appropriate timeout values:

@driver.task(shell=True, timeout=120)  # 2 minute timeout
def long_running_task():
    return "sleep 90 && echo 'Done'"

Persistence

Highway Driver uses SQLite for local state persistence at stabilize.db. This enables:

  • Submit workflow, exit driver, check status later

  • Resume from crashes

  • Idempotent execution

The database is created automatically in your working directory.

Framework Integration

Highway Driver integrates with web frameworks. See examples/integrations/:

FastAPI Example

from fastapi import FastAPI
from highway import Driver

app = FastAPI()
driver = Driver()

@app.post("/submit")
async def submit_workflow(data: dict):
    result = driver.run(wait=False)
    return {"run_id": result.run_id}

@app.get("/status/{run_id}")
async def get_status(run_id: str):
    return driver.status(run_id)

Flask Example

from flask import Flask, jsonify
from highway import Driver

app = Flask(__name__)
driver = Driver()

@app.post("/submit")
def submit_workflow():
    result = driver.run(wait=False)
    return jsonify({"run_id": result.run_id})

See examples/integrations/ for full implementations with WebSocket/SSE support.

Idempotency

Use workflow IDs for idempotent execution:

result1 = driver.run(workflow_id="order-12345")
result2 = driver.run(workflow_id="order-12345")  # Returns cached result

Examples

The examples/ directory contains runnable examples:

  • simple_shell.py - Basic shell task

  • multi_step.py - Task dependencies

  • python_func.py - Python code execution

  • http_tasks.py - HTTP requests

  • parallel.py - Parallel execution

  • llm_agentic.py - LLM-powered workflow

  • long_running/ - Submit and check status later

  • integrations/ - Flask/FastAPI integration

Run examples:

export HIGHWAY_API_KEY="your-key"
python examples/simple_shell.py

Requirements

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

highway-0.9.1.tar.gz (35.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

highway-0.9.1-py3-none-any.whl (36.3 kB view details)

Uploaded Python 3

File details

Details for the file highway-0.9.1.tar.gz.

File metadata

  • Download URL: highway-0.9.1.tar.gz
  • Upload date:
  • Size: 35.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.3

File hashes

Hashes for highway-0.9.1.tar.gz
Algorithm Hash digest
SHA256 b536b7db02af558a899f98e96d2b15cb68d9a5dc1a9003efc36621ff2d11fb5e
MD5 5f938c86794aaf5c644437adbe5a7b55
BLAKE2b-256 59f00b03a3b772fc6117f57a0e05acedd33a874a48e52854905d5b907945701e

See more details on using hashes here.

File details

Details for the file highway-0.9.1-py3-none-any.whl.

File metadata

  • Download URL: highway-0.9.1-py3-none-any.whl
  • Upload date:
  • Size: 36.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.3

File hashes

Hashes for highway-0.9.1-py3-none-any.whl
Algorithm Hash digest
SHA256 494929fda4066914ffdf5c8461c62d9fa7c2f295fffa82da7abcae6ed7a24986
MD5 df4b9aa19b51675b045f5429a8edafea
BLAKE2b-256 82dcf5425a245577c1c6536021b0508e444c4745741eff7e3f8f2b9d41890a6c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page