Highway Workflow Engine - Stabilize execution layer
Project description
Stabilize
A lightweight full featured Python workflow execution engine with DAG-based stage orchestration.
Requirements
- Python 3.11+
- SQLite (included) or PostgreSQL 12+
Installation
pip install stabilize # SQLite support only
pip install stabilize[postgres] # PostgreSQL support
pip install stabilize[all] # All features
Features
- Message-driven DAG execution engine
- Parallel and sequential stage execution
- Synthetic stages (before/after/onFailure)
- PostgreSQL and SQLite persistence
- Pluggable task system
- Retry and timeout support
Quick Start
from stabilize import (
Workflow, StageExecution, TaskExecution,
SqliteWorkflowStore, SqliteQueue, QueueProcessor, Orchestrator,
Task, TaskResult, TaskRegistry,
# All 11 handlers are required
StartWorkflowHandler, StartWaitingWorkflowsHandler, StartStageHandler,
SkipStageHandler, CancelStageHandler, ContinueParentStageHandler,
StartTaskHandler, RunTaskHandler, CompleteTaskHandler,
CompleteStageHandler, CompleteWorkflowHandler,
)
# Define a custom task
class HelloTask(Task):
def execute(self, stage: StageExecution) -> TaskResult:
name = stage.context.get("name", "World")
return TaskResult.success(outputs={"greeting": f"Hello, {name}!"})
# Create a workflow
workflow = Workflow.create(
application="my-app",
name="Hello Workflow",
stages=[
StageExecution(
ref_id="1",
type="hello",
name="Say Hello",
tasks=[
TaskExecution.create(
name="Hello Task",
implementing_class="hello",
stage_start=True,
stage_end=True,
),
],
context={"name": "Stabilize"},
),
],
)
# Setup persistence and queue
store = SqliteWorkflowStore("sqlite:///:memory:", create_tables=True)
queue = SqliteQueue("sqlite:///:memory:")
queue._create_table()
# Register tasks
registry = TaskRegistry()
registry.register("hello", HelloTask)
# Create processor and register handlers
processor = QueueProcessor(queue)
for handler in [
StartWorkflowHandler(queue, store),
StartWaitingWorkflowsHandler(queue, store),
StartStageHandler(queue, store),
SkipStageHandler(queue, store),
CancelStageHandler(queue, store),
ContinueParentStageHandler(queue, store),
StartTaskHandler(queue, store, registry),
RunTaskHandler(queue, store, registry),
CompleteTaskHandler(queue, store),
CompleteStageHandler(queue, store),
CompleteWorkflowHandler(queue, store),
]:
processor.register_handler(handler)
orchestrator = Orchestrator(queue)
# Run workflow
store.store(workflow)
orchestrator.start(workflow)
processor.process_all(timeout=10.0)
# Check result
result = store.retrieve(workflow.id)
print(f"Status: {result.status}") # WorkflowStatus.SUCCEEDED
print(f"Output: {result.stages[0].outputs}") # {'greeting': 'Hello, Stabilize!'}
Built-in Tasks
Stabilize includes ready-to-use tasks for common operations:
ShellTask - Execute Shell Commands
from stabilize import ShellTask
registry.register("shell", ShellTask)
# Use in stage context
context = {
"command": "npm install && npm test",
"cwd": "/app",
"timeout": 300,
"env": {"NODE_ENV": "test"},
}
HTTPTask - HTTP/API Requests
from stabilize import HTTPTask
registry.register("http", HTTPTask)
# GET with JSON parsing
context = {"url": "https://api.example.com/data", "parse_json": True}
# POST with JSON body
context = {"url": "https://api.example.com/users", "method": "POST", "json": {"name": "John"}}
# With authentication
context = {"url": "https://api.example.com/private", "bearer_token": "token"}
# File upload
context = {"url": "https://api.example.com/upload", "method": "POST", "upload_file": "/path/to/file.pdf"}
See examples/ directory for complete examples.
Parallel Stages
Stages with shared dependencies run in parallel:
# Setup
# / \
# Test Lint
# \ /
# Deploy
workflow = Workflow.create(
application="my-app",
name="CI/CD Pipeline",
stages=[
StageExecution(ref_id="setup", type="setup", name="Setup", ...),
StageExecution(ref_id="test", type="test", name="Test",
requisite_stage_ref_ids={"setup"}, ...),
StageExecution(ref_id="lint", type="lint", name="Lint",
requisite_stage_ref_ids={"setup"}, ...),
StageExecution(ref_id="deploy", type="deploy", name="Deploy",
requisite_stage_ref_ids={"test", "lint"}, ...),
],
)
Database Setup
SQLite
No setup required. Schema is created automatically.
PostgreSQL
Apply migrations using the CLI:
# Using mg.yaml in current directory
stabilize mg-up
# Using database URL
stabilize mg-up --db-url postgres://user:pass@host:5432/dbname
# Using environment variable
MG_DATABASE_URL=postgres://user:pass@host:5432/dbname stabilize mg-up
# Check migration status
stabilize mg-status
Example mg.yaml:
database:
host: localhost
port: 5432
user: postgres
password: postgres
dbname: stabilize
CLI Reference
stabilize mg-up [--db-url URL] Apply pending PostgreSQL migrations
stabilize mg-status [--db-url URL] Show migration status
stabilize monitor [--db-url URL] Real-time workflow monitoring dashboard
stabilize prompt Output documentation for pipeline code generation
Running Tests
# All tests (requires Docker for PostgreSQL)
pytest tests/ -v
# SQLite tests only (no Docker)
pytest tests/ -v -k sqlite
License
Apache 2.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file stabilize-0.13.5.tar.gz.
File metadata
- Download URL: stabilize-0.13.5.tar.gz
- Upload date:
- Size: 237.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
da55873b1445627d86f1e0d8cbb6136ac9f458a25ac549cee20b0f1c73b7d13f
|
|
| MD5 |
fdbba8a596937597be116f16385f9a89
|
|
| BLAKE2b-256 |
3e6ab58d1f4f0112608a759e4d0b1ea4b763f3c1c60b01ff646f9f027f764c59
|
File details
Details for the file stabilize-0.13.5-py3-none-any.whl.
File metadata
- Download URL: stabilize-0.13.5-py3-none-any.whl
- Upload date:
- Size: 230.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2973f9eeb03b6ab47285920e5fde3caf3b6bdf9f10a3f1acf62a6a969f7df8fd
|
|
| MD5 |
80ec16a9e1687084ff64607e159b440c
|
|
| BLAKE2b-256 |
d84c7841ede3474a613a2d921ed05d1ebc51a6eab5276f109b8713da6e01ccd5
|