Skip to main content

Highway Workflow Engine - Stabilize execution layer

Project description

Stabilize

A lightweight full featured Python workflow execution engine with DAG-based stage orchestration.

Requirements

  • Python 3.11+
  • SQLite (included) or PostgreSQL 12+

Installation

pip install stabilize            # SQLite support only
pip install stabilize[postgres]  # PostgreSQL support
pip install stabilize[all]       # All features

Features

  • Message-driven DAG execution engine
  • Parallel and sequential stage execution
  • Synthetic stages (before/after/onFailure)
  • PostgreSQL and SQLite persistence
  • Pluggable task system
  • Retry and timeout support

Quick Start

from stabilize import (
    Workflow, StageExecution, TaskExecution,
    SqliteWorkflowStore, SqliteQueue, QueueProcessor, Orchestrator,
    Task, TaskResult, TaskRegistry,
    # All 11 handlers are required
    StartWorkflowHandler, StartWaitingWorkflowsHandler, StartStageHandler,
    SkipStageHandler, CancelStageHandler, ContinueParentStageHandler,
    StartTaskHandler, RunTaskHandler, CompleteTaskHandler,
    CompleteStageHandler, CompleteWorkflowHandler,
)

# Define a custom task
class HelloTask(Task):
    def execute(self, stage: StageExecution) -> TaskResult:
        name = stage.context.get("name", "World")
        return TaskResult.success(outputs={"greeting": f"Hello, {name}!"})

# Create a workflow
workflow = Workflow.create(
    application="my-app",
    name="Hello Workflow",
    stages=[
        StageExecution(
            ref_id="1",
            type="hello",
            name="Say Hello",
            tasks=[
                TaskExecution.create(
                    name="Hello Task",
                    implementing_class="hello",
                    stage_start=True,
                    stage_end=True,
                ),
            ],
            context={"name": "Stabilize"},
        ),
    ],
)

# Setup persistence and queue
store = SqliteWorkflowStore("sqlite:///:memory:", create_tables=True)
queue = SqliteQueue("sqlite:///:memory:")
queue._create_table()

# Register tasks
registry = TaskRegistry()
registry.register("hello", HelloTask)

# Create processor and register handlers
processor = QueueProcessor(queue)
for handler in [
    StartWorkflowHandler(queue, store),
    StartWaitingWorkflowsHandler(queue, store),
    StartStageHandler(queue, store),
    SkipStageHandler(queue, store),
    CancelStageHandler(queue, store),
    ContinueParentStageHandler(queue, store),
    StartTaskHandler(queue, store, registry),
    RunTaskHandler(queue, store, registry),
    CompleteTaskHandler(queue, store),
    CompleteStageHandler(queue, store),
    CompleteWorkflowHandler(queue, store),
]:
    processor.register_handler(handler)

orchestrator = Orchestrator(queue)

# Run workflow
store.store(workflow)
orchestrator.start(workflow)
processor.process_all(timeout=10.0)

# Check result
result = store.retrieve(workflow.id)
print(f"Status: {result.status}")  # WorkflowStatus.SUCCEEDED
print(f"Output: {result.stages[0].outputs}")  # {'greeting': 'Hello, Stabilize!'}

Built-in Tasks

Stabilize includes ready-to-use tasks for common operations:

ShellTask - Execute Shell Commands

from stabilize import ShellTask

registry.register("shell", ShellTask)

# Use in stage context
context = {
    "command": "npm install && npm test",
    "cwd": "/app",
    "timeout": 300,
    "env": {"NODE_ENV": "test"},
}

HTTPTask - HTTP/API Requests

from stabilize import HTTPTask

registry.register("http", HTTPTask)

# GET with JSON parsing
context = {"url": "https://api.example.com/data", "parse_json": True}

# POST with JSON body
context = {"url": "https://api.example.com/users", "method": "POST", "json": {"name": "John"}}

# With authentication
context = {"url": "https://api.example.com/private", "bearer_token": "token"}

# File upload
context = {"url": "https://api.example.com/upload", "method": "POST", "upload_file": "/path/to/file.pdf"}

See examples/ directory for complete examples.

Parallel Stages

Stages with shared dependencies run in parallel:

#     Setup
#    /     \
#  Test   Lint
#    \     /
#    Deploy

workflow = Workflow.create(
    application="my-app",
    name="CI/CD Pipeline",
    stages=[
        StageExecution(ref_id="setup", type="setup", name="Setup", ...),
        StageExecution(ref_id="test", type="test", name="Test",
                      requisite_stage_ref_ids={"setup"}, ...),
        StageExecution(ref_id="lint", type="lint", name="Lint",
                      requisite_stage_ref_ids={"setup"}, ...),
        StageExecution(ref_id="deploy", type="deploy", name="Deploy",
                      requisite_stage_ref_ids={"test", "lint"}, ...),
    ],
)

Database Setup

SQLite

No setup required. Schema is created automatically.

PostgreSQL

Apply migrations using the CLI:

# Using mg.yaml in current directory
stabilize mg-up

# Using database URL
stabilize mg-up --db-url postgres://user:pass@host:5432/dbname

# Using environment variable
MG_DATABASE_URL=postgres://user:pass@host:5432/dbname stabilize mg-up

# Check migration status
stabilize mg-status

Example mg.yaml:

database:
  host: localhost
  port: 5432
  user: postgres
  password: postgres
  dbname: stabilize

CLI Reference

stabilize mg-up [--db-url URL]      Apply pending PostgreSQL migrations
stabilize mg-status [--db-url URL]  Show migration status
stabilize monitor [--db-url URL]    Real-time workflow monitoring dashboard
stabilize prompt                    Output documentation for pipeline code generation

Running Tests

# All tests (requires Docker for PostgreSQL)
pytest tests/ -v

# SQLite tests only (no Docker)
pytest tests/ -v -k sqlite

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stabilize-0.13.9.tar.gz (261.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stabilize-0.13.9-py3-none-any.whl (253.8 kB view details)

Uploaded Python 3

File details

Details for the file stabilize-0.13.9.tar.gz.

File metadata

  • Download URL: stabilize-0.13.9.tar.gz
  • Upload date:
  • Size: 261.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for stabilize-0.13.9.tar.gz
Algorithm Hash digest
SHA256 ae8e1475c2a1f3118c6b50c0fbb2788c72d18f4f9c0fdd7b375909514c0d23d8
MD5 0a2fc829242444a21fa8b0df6a7a530b
BLAKE2b-256 36d8025609e96eb8eca2d862ceb4364f3804e0a5a552d60e0ac436bd0cb73e5c

See more details on using hashes here.

File details

Details for the file stabilize-0.13.9-py3-none-any.whl.

File metadata

  • Download URL: stabilize-0.13.9-py3-none-any.whl
  • Upload date:
  • Size: 253.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for stabilize-0.13.9-py3-none-any.whl
Algorithm Hash digest
SHA256 bd48fbbca5dbeeebc1340c113152c37ead2d5dae0c96a8c57e461b940d9ca640
MD5 a4974bf24c1db9759b3d70217d093f50
BLAKE2b-256 59fc929250b7d34c9577333fcb4465bd49e6252741bedcd7737e00b7b2e61730

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page