Skip to main content

Highway Workflow Engine - Stabilize execution layer

Project description

Stabilize

Highway Workflow Engine - Stabilize execution layer.

A lightweight Python workflow execution engine with DAG-based stage orchestration.

Requirements

  • Python 3.11+
  • SQLite (included) or PostgreSQL 12+

Installation

pip install stabilize            # SQLite support only
pip install stabilize[postgres]  # PostgreSQL support
pip install stabilize[rag]       # RAG-powered pipeline generation
pip install stabilize[all]       # All features

Features

  • Message-driven DAG execution engine
  • Parallel and sequential stage execution
  • Synthetic stages (before/after/onFailure)
  • PostgreSQL and SQLite persistence
  • Pluggable task system
  • Retry and timeout support
  • RAG-powered pipeline generation from natural language

Quick Start

from stabilize import Workflow, StageExecution, TaskExecution, WorkflowStatus
from stabilize.persistence.sqlite import SqliteWorkflowStore
from stabilize.queue.sqlite_queue import SqliteQueue
from stabilize.queue.processor import QueueProcessor
from stabilize.orchestrator import Orchestrator
from stabilize.tasks.interface import Task
from stabilize.tasks.result import TaskResult
from stabilize.tasks.registry import TaskRegistry

# Define a custom task
class HelloTask(Task):
    def execute(self, stage: StageExecution) -> TaskResult:
        name = stage.context.get("name", "World")
        return TaskResult.success(outputs={"greeting": f"Hello, {name}!"})

# Create a workflow
workflow = Workflow.create(
    application="my-app",
    name="Hello Workflow",
    stages=[
        StageExecution(
            ref_id="1",
            type="hello",
            name="Say Hello",
            tasks=[
                TaskExecution.create(
                    name="Hello Task",
                    implementing_class="hello",
                    stage_start=True,
                    stage_end=True,
                ),
            ],
            context={"name": "Stabilize"},
        ),
    ],
)

# Setup persistence and queue
store = SqliteWorkflowStore(":memory:")
queue = SqliteQueue(":memory:")

# Register tasks
registry = TaskRegistry()
registry.register("hello", HelloTask)

# Create processor and orchestrator
processor = QueueProcessor.create(queue, store, registry)
orchestrator = Orchestrator(queue)

# Run workflow
store.store(workflow)
orchestrator.start(workflow)
processor.process_all(timeout=10.0)

# Check result
result = store.retrieve(workflow.id)
print(f"Status: {result.status}")  # WorkflowStatus.SUCCEEDED
print(f"Output: {result.stages[0].outputs}")  # {'greeting': 'Hello, Stabilize!'}

Built-in Tasks

Stabilize includes ready-to-use tasks for common operations:

ShellTask - Execute Shell Commands

from stabilize.tasks.shell import ShellTask

registry.register("shell", ShellTask)

# Use in stage context
context = {
    "command": "npm install && npm test",
    "cwd": "/app",
    "timeout": 300,
    "env": {"NODE_ENV": "test"},
}

HTTPTask - HTTP/API Requests

from stabilize.tasks.http import HTTPTask

registry.register("http", HTTPTask)

# GET with JSON parsing
context = {"url": "https://api.example.com/data", "parse_json": True}

# POST with JSON body
context = {"url": "https://api.example.com/users", "method": "POST", "json": {"name": "John"}}

# With authentication
context = {"url": "https://api.example.com/private", "bearer_token": "token"}

# File upload
context = {"url": "https://api.example.com/upload", "method": "POST", "upload_file": "/path/to/file.pdf"}

See examples/ directory for complete examples.

Parallel Stages

Stages with shared dependencies run in parallel:

#     Setup
#    /     \
#  Test   Lint
#    \     /
#    Deploy

workflow = Workflow.create(
    application="my-app",
    name="CI/CD Pipeline",
    stages=[
        StageExecution(ref_id="setup", type="setup", name="Setup", ...),
        StageExecution(ref_id="test", type="test", name="Test",
                      requisite_stage_ref_ids={"setup"}, ...),
        StageExecution(ref_id="lint", type="lint", name="Lint",
                      requisite_stage_ref_ids={"setup"}, ...),
        StageExecution(ref_id="deploy", type="deploy", name="Deploy",
                      requisite_stage_ref_ids={"test", "lint"}, ...),
    ],
)

Database Setup

SQLite

No setup required. Schema is created automatically.

PostgreSQL

Apply migrations using the CLI:

# Using mg.yaml in current directory
stabilize mg-up

# Using database URL
stabilize mg-up --db-url postgres://user:pass@host:5432/dbname

# Using environment variable
MG_DATABASE_URL=postgres://user:pass@host:5432/dbname stabilize mg-up

# Check migration status
stabilize mg-status

Example mg.yaml:

database:
  host: localhost
  port: 5432
  user: postgres
  password: postgres
  dbname: stabilize

RAG-Powered Pipeline Generation

Stabilize includes an AI-powered assistant that generates pipeline code from natural language descriptions using RAG (Retrieval-Augmented Generation).

Requirements

pip install stabilize[rag]  # Installs ragit dependency

You also need:

  • Local Ollama (required for embeddings - ollama.com doesn't support embeddings API):
    # Install from https://ollama.com/download, then:
    ollama serve                    # Start the server
    ollama pull nomic-embed-text    # Download embedding model
    
  • Ollama API key for LLM generation (uses ollama.com cloud)

Setup

Create a .env file with your API key:

OLLAMA_API_KEY=your_api_key_here

Initialize the embedding cache:

# Initialize with default context (Stabilize docs + examples)
stabilize rag init

# Include your own code as additional training context
stabilize rag init --additional-context /path/to/your/code/

# Force regenerate embeddings
stabilize rag init --force

Generate Pipelines

# Generate a pipeline from natural language
stabilize rag generate "create a pipeline that processes CSV files in parallel"

# Save to file
stabilize rag generate "build a CI/CD pipeline with test and deploy stages" > my_pipeline.py

Clear Cache

stabilize rag clear

Configuration

Environment variables:

Variable Default Description
OLLAMA_API_KEY (required) API key for ollama.com
OLLAMA_BASE_URL https://ollama.com LLM endpoint URL
OLLAMA_EMBEDDING_URL http://localhost:11434 Local Ollama for embeddings

Example

$ stabilize rag generate "create a hello world pipeline"

from stabilize import Workflow, StageExecution, TaskExecution
from stabilize.tasks.interface import Task
from stabilize.tasks.result import TaskResult
...

CLI Reference

stabilize mg-up [--db-url URL]      Apply pending PostgreSQL migrations
stabilize mg-status [--db-url URL]  Show migration status
stabilize rag init [--force] [--additional-context PATH]  Initialize RAG embeddings
stabilize rag generate "prompt"     Generate pipeline from natural language
stabilize rag clear                 Clear embedding cache

Naming Alignment with highway_dsl

highway_dsl stabilize
Workflow Workflow
TaskOperator Task interface
RetryPolicy RetryableTask
TimeoutPolicy OverridableTimeoutRetryableTask

Running Tests

# All tests (requires Docker for PostgreSQL)
pytest tests/ -v

# SQLite tests only (no Docker)
pytest tests/ -v -k sqlite

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stabilize-0.10.1.tar.gz (153.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stabilize-0.10.1-py3-none-any.whl (169.7 kB view details)

Uploaded Python 3

File details

Details for the file stabilize-0.10.1.tar.gz.

File metadata

  • Download URL: stabilize-0.10.1.tar.gz
  • Upload date:
  • Size: 153.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.3

File hashes

Hashes for stabilize-0.10.1.tar.gz
Algorithm Hash digest
SHA256 5d3593e35d66af9ecf87e43519076ab16158a85bce147ffa16161278080525c2
MD5 ffcf9d5b6dae1bcc94627ceda0fcbb35
BLAKE2b-256 321883ce0e1651f39b08b22f11a3cec0c7aa151e39905682dfbc5bfdc53343c8

See more details on using hashes here.

File details

Details for the file stabilize-0.10.1-py3-none-any.whl.

File metadata

  • Download URL: stabilize-0.10.1-py3-none-any.whl
  • Upload date:
  • Size: 169.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.3

File hashes

Hashes for stabilize-0.10.1-py3-none-any.whl
Algorithm Hash digest
SHA256 948ec1d0e04ffa3f9d7a57aa060d798c85aea531f2158bdad14c9a15d6a54c7e
MD5 62c0582861d97fb947b3f6a7d6480c67
BLAKE2b-256 7233534c998a3c2345d4c18e20eb62326a88edeca0ffcdcf455a1f6d3809c1f5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page