Highway Workflow Engine - Stabilize execution layer
Project description
Stabilize
A lightweight full featured Python workflow execution engine with DAG-based stage orchestration.
Requirements
- Python 3.11+
- SQLite (included) or PostgreSQL 12+
Installation
pip install stabilize # SQLite support only
pip install stabilize[postgres] # PostgreSQL support
pip install stabilize[all] # All features
Features
- Message-driven DAG execution engine
- Parallel and sequential stage execution
- Synthetic stages (before/after/onFailure)
- PostgreSQL and SQLite persistence
- Pluggable task system
- Retry and timeout support
Comparison to Industry Standards
┌───────────────┬────────────────────┬──────────────────────┬───────────────────┐
│ Feature │ stabilize │ Spinnaker (Orca) │ Airflow │
├───────────────┼────────────────────┼──────────────────────┼───────────────────┤
│ State Storage │ Atomic (DB+Queue) │ Atomic (Redis/SQL) │ Atomic (SQL) │
│ Concurrency │ Optimistic Locking │ Distributed Lock │ Database Row Lock │
│ Resilience │ Queue-based (DLQ) │ Queue-based (DLQ) │ Scheduler Loop │
│ Flow Control │ Dynamic (Jumps) │ Rigid DAG │ Rigid DAG │
│ Complexity │ Low (Library) │ High (Microservices) │ High (Platform) │
└───────────────┴────────────────────┴──────────────────────┴───────────────────┘
- If you are looking for a strictly atomic and highly distributed system, please take a look into Highway.
Quick Start
from stabilize import (
Workflow, StageExecution, TaskExecution,
SqliteWorkflowStore, SqliteQueue, QueueProcessor, Orchestrator,
Task, TaskResult, TaskRegistry,
# All 12 handlers are required
StartWorkflowHandler, StartWaitingWorkflowsHandler, StartStageHandler,
SkipStageHandler, CancelStageHandler, ContinueParentStageHandler,
JumpToStageHandler, StartTaskHandler, RunTaskHandler, CompleteTaskHandler,
CompleteStageHandler, CompleteWorkflowHandler,
)
# Define a custom task
class HelloTask(Task):
def execute(self, stage: StageExecution) -> TaskResult:
name = stage.context.get("name", "World")
return TaskResult.success(outputs={"greeting": f"Hello, {name}!"})
# Create a workflow
workflow = Workflow.create(
application="my-app",
name="Hello Workflow",
stages=[
StageExecution(
ref_id="1",
type="hello",
name="Say Hello",
tasks=[
TaskExecution.create(
name="Hello Task",
implementing_class="hello",
stage_start=True,
stage_end=True,
),
],
context={"name": "Stabilize"},
),
],
)
# Setup persistence and queue
store = SqliteWorkflowStore("sqlite:///:memory:", create_tables=True)
queue = SqliteQueue("sqlite:///:memory:")
queue._create_table()
# Register tasks
registry = TaskRegistry()
registry.register("hello", HelloTask)
# Create processor and register handlers
processor = QueueProcessor(queue)
for handler in [
StartWorkflowHandler(queue, store),
StartWaitingWorkflowsHandler(queue, store),
StartStageHandler(queue, store),
SkipStageHandler(queue, store),
CancelStageHandler(queue, store),
ContinueParentStageHandler(queue, store),
JumpToStageHandler(queue, store),
StartTaskHandler(queue, store, registry),
RunTaskHandler(queue, store, registry),
CompleteTaskHandler(queue, store),
CompleteStageHandler(queue, store),
CompleteWorkflowHandler(queue, store),
]:
processor.register_handler(handler)
orchestrator = Orchestrator(queue)
# Run workflow
store.store(workflow)
orchestrator.start(workflow)
processor.process_all(timeout=10.0)
# Check result
result = store.retrieve(workflow.id)
print(f"Status: {result.status}") # WorkflowStatus.SUCCEEDED
print(f"Output: {result.stages[0].outputs}") # {'greeting': 'Hello, Stabilize!'}
Built-in Tasks
Stabilize includes ready-to-use tasks for common operations:
ShellTask - Execute Shell Commands
from stabilize import ShellTask
registry.register("shell", ShellTask)
# Use in stage context
context = {
"command": "npm install && npm test",
"cwd": "/app",
"timeout": 300,
"env": {"NODE_ENV": "test"},
}
HTTPTask - HTTP/API Requests
from stabilize import HTTPTask
registry.register("http", HTTPTask)
# GET with JSON parsing
context = {"url": "https://api.example.com/data", "parse_json": True}
# POST with JSON body
context = {"url": "https://api.example.com/users", "method": "POST", "json": {"name": "John"}}
# With authentication
context = {"url": "https://api.example.com/private", "bearer_token": "token"}
# File upload
context = {"url": "https://api.example.com/upload", "method": "POST", "upload_file": "/path/to/file.pdf"}
See examples/ directory for complete examples.
Parallel Stages
Stages with shared dependencies run in parallel:
# Setup
# / \
# Test Lint
# \ /
# Deploy
workflow = Workflow.create(
application="my-app",
name="CI/CD Pipeline",
stages=[
StageExecution(ref_id="setup", type="setup", name="Setup", ...),
StageExecution(ref_id="test", type="test", name="Test",
requisite_stage_ref_ids={"setup"}, ...),
StageExecution(ref_id="lint", type="lint", name="Lint",
requisite_stage_ref_ids={"setup"}, ...),
StageExecution(ref_id="deploy", type="deploy", name="Deploy",
requisite_stage_ref_ids={"test", "lint"}, ...),
],
)
Dynamic Routing
Stabilize supports dynamic flow control with TaskResult.jump_to() for conditional branching and retry loops:
from stabilize import Task, TaskResult, TransientError
class RouterTask(Task):
"""Route to different stages based on conditions."""
def execute(self, stage: StageExecution) -> TaskResult:
if stage.context.get("tests_passed"):
return TaskResult.success()
else:
# Jump to another stage with context
return TaskResult.jump_to(
"retry_stage",
context={"retry_reason": "tests failed"}
)
Stateful Retries
Preserve progress across transient error retries with context_update:
class ProgressTask(Task):
def execute(self, stage: StageExecution) -> TaskResult:
processed = stage.context.get("processed_items", 0)
try:
# Process next batch
new_processed = process_batch(processed)
return TaskResult.success(outputs={"total": new_processed})
except RateLimitError:
# Preserve progress for next retry
raise TransientError(
"Rate limited",
retry_after=30,
context_update={"processed_items": processed + 10}
)
The context_update is merged into the stage context before the retry, allowing tasks to resume from where they left off.
Database Setup
SQLite
No setup required. Schema is created automatically.
PostgreSQL
Apply migrations using the CLI:
# Using mg.yaml in current directory
stabilize mg-up
# Using database URL
stabilize mg-up --db-url postgres://user:pass@host:5432/dbname
# Using environment variable
MG_DATABASE_URL=postgres://user:pass@host:5432/dbname stabilize mg-up
# Check migration status
stabilize mg-status
Example mg.yaml:
database:
host: localhost
port: 5432
user: postgres
password: postgres
dbname: stabilize
CLI Reference
stabilize mg-up [--db-url URL] Apply pending PostgreSQL migrations
stabilize mg-status [--db-url URL] Show migration status
stabilize monitor [--db-url URL] Real-time workflow monitoring dashboard
stabilize prompt Output documentation for pipeline code generation
Running Tests
# All tests (requires Docker for PostgreSQL)
pytest tests/ -v
# SQLite tests only (no Docker)
pytest tests/ -v -k sqlite
License
Apache 2.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file stabilize-0.15.4.tar.gz.
File metadata
- Download URL: stabilize-0.15.4.tar.gz
- Upload date:
- Size: 216.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a91305b2e09ec63ee6b6856344abd2ebc7afda12ad3f682b7c9736e54b7331c2
|
|
| MD5 |
cfa824ecbe301ebf79e011642e5996e3
|
|
| BLAKE2b-256 |
662377695ee08d72937fa1c3b85da611dec8f7f45bd4c9817de7d0ecb2622d72
|
File details
Details for the file stabilize-0.15.4-py3-none-any.whl.
File metadata
- Download URL: stabilize-0.15.4-py3-none-any.whl
- Upload date:
- Size: 303.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fdec5e4383054087b0741a742701becae3e0ad6542ceda1b67ad5a3c7a3ef037
|
|
| MD5 |
6e00bcea210fbf7d7741341777238a4d
|
|
| BLAKE2b-256 |
7a49c5b71681fd8384847f47224a3662221670bb06693ff9e97dc679b22c6670
|