A lightweight, event-driven state machine framework with WebSocket support and plugin architecture
Project description
State Machine Engine
Event-driven state machine framework with real-time monitoring and database-backed job queue.
Features
- YAML-Based Configuration: Define workflows declaratively
- Pluggable Actions: Extensible action system with built-in actions
- Real-Time Monitoring: WebSocket server for live state visualization
- Kanban View: Visual board for monitoring multiple FSM instances with state groups
- Database-Backed Queue: SQLite-based persistent job queue
- Unix Socket Communication: Low-latency inter-machine events
- Multi-Machine Coordination: Event-driven machine-to-machine communication
- Multiple Engine Support: Run multiple engines simultaneously with configurable socket paths
- Event Context Promotion: Declare
context_mapon events to promote payload fields to durable context keys
Installation
From Source (Development)
# Clone the repository
git clone https://github.com/sheikkinen/statemachine-engine.git
cd statemachine-engine
# Create virtual environment (recommended)
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Option 1: Install with pip (uses pyproject.toml)
pip install -e ".[dev]"
# Option 2: Install with requirements files
pip install -r requirements-dev.txt
From PyPI (Coming Soon)
pip install statemachine-engine
Dependencies
The package requires Python 3.10+ and automatically installs:
- PyYAML (YAML configuration parsing)
- FastAPI (WebSocket server)
- Uvicorn (ASGI server)
- websockets (WebSocket protocol)
Development dependencies (optional):
- pytest (testing framework)
- pytest-asyncio (async test support)
Quick Start
1. Try the Included Examples
The package includes working example configurations:
# Simple worker example
cd examples/simple_worker
statemachine config/worker.yaml --machine-name worker
# Controller/worker multi-machine example
cd examples/controller_worker
./run.sh
See examples/ directory for complete working configurations.
2. Create Your Own Configuration
# my_worker.yaml
name: "My Worker"
initial_state: waiting
transitions:
- from: waiting
to: processing
event: new_job
actions:
- type: bash
params:
command: "echo Processing job"
success: job_done
3. Run Your State Machine
statemachine my_worker.yaml --machine-name my_worker
Or using Python directly:
from statemachine_engine.core.engine import StateMachineEngine
engine = StateMachineEngine(machine_name='my_worker')
await engine.load_config('my_worker.yaml')
await engine.execute_state_machine()
Starting Services
The statemachine-engine system consists of several components that work together. Here's how to start each service:
Complete System Startup (Recommended)
For development and testing, use the integrated startup script:
# Start everything at once
./scripts/start-system.sh
# This automatically starts:
# - WebSocket monitoring server
# - Web UI (if Node.js is available)
# - Example state machines
# - Generates FSM diagrams
Individual Service Startup
For production or custom setups, start services individually:
1. State Machine (Core Service)
# Basic usage
statemachine config/worker.yaml --machine-name my_worker
# With debug logging
statemachine config/worker.yaml --machine-name my_worker --debug
# Multiple machines (run in separate terminals)
statemachine config/controller.yaml --machine-name controller
statemachine config/worker.yaml --machine-name worker
2. Web UI Server (Visualization)
# Start web UI with current project
statemachine-ui
# Start with custom project root
statemachine-ui --project-root /path/to/your/project
# Start on custom port
statemachine-ui --port 8080
# Skip WebSocket server (if already running)
statemachine-ui --no-websocket
# Start with custom WebSocket configuration (NEW in v1.0.63)
statemachine-ui \
--port 3001 \
--websocket-port 3002 \
--event-socket-path /tmp/custom-events.sock
Enhanced UI Options (v1.0.63+):
statemachine-ui [options]
Options:
--port PORT Port for the web server (default: 3001)
--project-root PROJECT_ROOT Project root directory (default: current directory)
--no-websocket Skip starting the WebSocket server
--event-socket-path PATH Custom event socket path for WebSocket server
--websocket-port PORT Custom port for WebSocket server (default: 3002)
Access at: http://localhost:3001
3. WebSocket Server (Real-time Monitoring)
# Start WebSocket server
python -m statemachine_engine.monitoring.websocket_server
# Custom port
python -m statemachine_engine.monitoring.websocket_server --port 8765
Endpoints:
- WebSocket:
ws://localhost:8765/ws
Multiple Engines Support
As of version 1.0.63, you can run multiple state machine engines simultaneously with configurable socket paths and ports.
Configuration Options
State Machine CLI:
statemachine config.yaml [options]
Options:
--event-socket-path PATH Custom event socket path (default: /tmp/statemachine-events.sock)
--control-socket-prefix PATH Custom control socket prefix (default: /tmp/statemachine-control)
WebSocket Server:
python -m statemachine_engine.monitoring.websocket_server [options]
Options:
--host HOST Host to bind to (default: 127.0.0.1)
--port PORT Port to bind to (default: 3002)
--event-socket-path PATH Path to event socket (default: /tmp/statemachine-events.sock)
Example: Running Multiple Engines
# Terminal 1 - Engine 1
statemachine engine1_config.yaml \
--machine-name engine1 \
--event-socket-path /tmp/engine1-events.sock \
--control-socket-prefix /tmp/engine1-control
# Terminal 2 - Engine 2
statemachine engine2_config.yaml \
--machine-name engine2 \
--event-socket-path /tmp/engine2-events.sock \
--control-socket-prefix /tmp/engine2-control
# Terminal 3 - Monitor Engine 1 (Option 1: Separate WebSocket server)
python -m statemachine_engine.monitoring.websocket_server \
--port 3002 \
--event-socket-path /tmp/engine1-events.sock
# Terminal 4 - Monitor Engine 2 (Option 1: Separate WebSocket server)
python -m statemachine_engine.monitoring.websocket_server \
--port 3003 \
--event-socket-path /tmp/engine2-events.sock
# OR
# Terminal 3 - Monitor Engine 1 (Option 2: Enhanced UI with integrated WebSocket)
statemachine-ui \
--port 3001 \
--websocket-port 3002 \
--event-socket-path /tmp/engine1-events.sock
# Terminal 4 - Monitor Engine 2 (Option 2: Enhanced UI with integrated WebSocket)
statemachine-ui \
--port 3004 \
--websocket-port 3003 \
--event-socket-path /tmp/engine2-events.sock
Web interfaces:
- Engine 1: http://localhost:3001 (UI) + http://localhost:3002 (WebSocket)
- Engine 2: http://localhost:3004 (UI) + http://localhost:3003 (WebSocket)
Backward compatibility: All default values remain unchanged, so existing scripts continue to work without modification.
- Health check:
http://localhost:8765/health
4. Generate Diagrams
# Generate diagrams for UI
statemachine-diagrams config/worker.yaml
# Or use the alias
statemachine-fsm config/worker.yaml
External Project Setup
If you're using statemachine-engine in your own project:
# In your project directory
cd /path/to/your/project
# 1. Generate diagrams for your config
statemachine-diagrams config/worker.yaml
# 2. Start UI with your project root
statemachine-ui --project-root $(pwd)
# 3. Start your state machine
statemachine config/worker.yaml --machine-name my_worker
# 4. Test with events
statemachine-db send-event --target my_worker --type new_job
Database Commands
⚠️ Breaking Change in v1.0.3: add-job Command
The add-job command has been redesigned to be fully generic. Update your scripts:
# OLD (v1.0.2 and earlier) - DEPRECATED
statemachine-db add-job job123 \
--type face_processing \
--input-image photo.jpg \
--prompt "enhance faces"
# NEW (v1.0.3+) - Use --payload for all custom data
statemachine-db add-job job123 \
--type face_processing \
--input-file photo.jpg \
--payload '{"prompt": "enhance faces"}'
Quick Migration:
- Remove:
--input-image,--prompt,--pony-prompt,--flux-prompt,--padding-factor,--mask-padding-factor - Use:
--input-file(for file paths) and--payload '{"key": "value"}'(for all other data) --typenow accepts any string (no hardcoded choices)
# Add jobs to the queue
statemachine-db add-job job_001 \
--type image_processing \
--payload '{"input": "image.jpg", "output": "result.png"}'
# Send events to trigger state transitions
statemachine-db send-event --target my_worker --type new_job
# Check machine states
statemachine-db machine-state
# List recent events
statemachine-db list-events --target my_worker --limit 10
# View job queue
statemachine-db list --status pending
# View specific job details
statemachine-db details <job-id>
Service Dependencies
Minimum Setup:
- State machine:
statemachine config.yaml --machine-name name
With Monitoring:
- State machine + WebSocket server
- Access real-time events at
ws://localhost:8765/ws
With Visualization:
- State machine + WebSocket server + Web UI
- Full visual interface at
http://localhost:3001
Requirements:
- Python 3.10+ (required)
- Node.js (optional, for Web UI)
- npm (optional, for Web UI dependencies)
Troubleshooting
Web UI can't find diagrams:
# Ensure diagrams are generated in your project
statemachine-diagrams config/worker.yaml
# Start UI with correct project root
statemachine-ui --project-root $(pwd)
Port conflicts:
# Use custom ports
statemachine-ui --port 8080
python -m statemachine_engine.monitoring.websocket_server --port 9000
Missing dependencies:
# Install with all dependencies
pip install statemachine-engine[dev]
# Or install Node.js for Web UI
# macOS: brew install node
# Ubuntu: apt install nodejs npm
Timeout Events
Timeout events automatically fire after a specified duration if no other events occur. This is useful for watchdog timers, retry logic, polling intervals, and graceful degradation.
Syntax
Add timeout transitions using the special timeout(N) event syntax where N is the duration in seconds (supports decimals):
transitions:
- from: waiting
to: timed_out
event: timeout(5) # Fires after 5 seconds if still in 'waiting' state
- from: processing
to: timeout_error
event: timeout(30.5) # Fires after 30.5 seconds
How It Works
- State Entry: When entering a state with timeout transitions, the engine starts an asyncio timer task
- Timer Active: The timer counts down in the background
- Event Cancels: If ANY other event arrives, all timeout timers are cancelled
- Timeout Fires: If the timer completes, the timeout event is automatically processed
Example
# examples/timeout_demo/config/timeout_worker.yaml
transitions:
# Wait for work with 5-second timeout
- from: waiting
to: timed_out
event: timeout(5)
- from: waiting
to: processing
event: start_work # Cancels the timeout if received
# Process with 10-second timeout
- from: processing
to: timed_out
event: timeout(10)
- from: processing
to: completed
event: work_done # Cancels the timeout if received
# Retry after timeout
- from: timed_out
to: waiting
event: retry
actions:
timed_out:
- type: log
message: "⏰ TIMEOUT! Operation took too long"
level: warning
- type: bash
command: "sleep 3 && echo 'Retrying...'"
success: retry
Use Cases
- Watchdog timers: Ensure states don't hang indefinitely
- Retry logic: Retry failed operations after a delay
- Polling intervals: Periodically check for conditions
- Graceful degradation: Fall back to alternate paths when operations are slow
- Resource cleanup: Clean up stale resources after inactivity
- SLA enforcement: Ensure operations complete within time limits
Multiple Timeouts
You can have multiple timeout transitions from the same state, but only the shortest timeout will fire (as entering a new state cancels all active timeouts):
transitions:
- from: waiting
to: short_timeout_path
event: timeout(5) # Fires first after 5 seconds
- from: waiting
to: long_timeout_path
event: timeout(30) # Would fire after 30s, but short fires first
Testing Timeout Events
See the timeout demo for a complete working example:
# Run the timeout demo
cd examples/timeout_demo
python -m statemachine_engine.cli config/timeout_worker.yaml
# In another terminal, send events to cancel the timeout
echo '{"type": "start_work", "payload": {}}' | nc -U /tmp/statemachine-control-timeout_worker.sock
Built-In Actions
log - Activity Logging
Log messages that appear in the Web UI's activity log panel.
YAML Configuration:
actions:
processing:
- type: log
message: "🔄 Processing job {id}"
level: info # Optional: info (default), error, success
success: continue # Optional: event to emit on success
Features:
- Variable substitution:
{id},{job_id},{current_state},{machine_name} - Event payload access:
{event_data.payload.field_name} - Log levels:
info(blue),error(red),success(green) - Real-time display: Messages appear instantly in Web UI
Examples:
# Simple info message
- type: log
message: "Worker ready - waiting for jobs"
# With context variables
- type: log
message: "Processing job {id} in state {current_state}"
level: info
# Error logging
- type: log
message: "❌ Job {id} failed: {error_message}"
level: error
# Success notification
- type: log
message: "✅ Completed {id} - generated {output_count} results"
level: success
bash - Execute Shell Commands
Execute shell commands with timeout and error handling.
YAML Configuration:
- type: bash
description: "Process the job"
command: "python process.py --input {input_file}"
timeout: 30
success: job_done
error: job_failed
Other Built-In Actions
- check_database_queue: Check job queue for pending jobs
- check_machine_state: Monitor machine states
- clear_events: Clean up processed events
- send_event: Send events to other machines
- start_fsm: Spawn new state machine instances with context passing
- complete_job: Mark jobs as completed in the database queue
See examples/ directory for complete working examples.
Custom Actions
Creating Custom Actions
Extend the framework with your own actions by inheriting from BaseAction:
1. Create action file (e.g., my_custom_action.py):
from statemachine_engine.actions import BaseAction
class MyCustomAction(BaseAction):
async def execute(self, context):
# Access config parameters from YAML
param_value = self.config.get('params', {}).get('my_param')
# Access execution context (job_id, machine_name, etc.)
job_id = context.get('job_id')
machine = self.get_machine_name(context)
# Your custom logic
self.logger.info(f"Processing {job_id} on {machine}")
# Return event name to trigger next transition
return self.config.get('params', {}).get('success', 'success')
2. Place in your project's actions directory:
my_project/
├── actions/
│ ├── my_custom_action.py
│ └── another_action.py
└── config/
└── worker.yaml
3. Use in YAML configuration:
actions:
- type: my_custom # Maps to my_custom_action.py → MyCustomAction class
params:
my_param: "value"
success: job_done
4. Run with custom actions directory:
# Use --actions-dir to specify your custom actions directory
statemachine config/worker.yaml \
--machine-name my_worker \
--actions-dir ./actions
# Supports absolute and relative paths
statemachine config/worker.yaml \
--machine-name my_worker \
--actions-dir /path/to/my_project/actions
# Supports ~ (home directory) expansion
statemachine config/worker.yaml \
--machine-name my_worker \
--actions-dir ~/projects/my_worker/actions
Action Discovery
The ActionLoader automatically discovers actions following these conventions:
- File naming:
{action_type}_action.py - Class naming:
{ActionType}Action(PascalCase) - Example:
my_custom_action.py→MyCustomActionclass - YAML reference:
type: my_custom
Discovery Locations:
- With
--actions-dir: Discovers from BOTH custom directory AND built-in actions - Without
--actions-dir: Discovers only from the installed package'sactions/directory
Action Precedence:
- Custom actions can override built-in actions with the same name
- Custom actions take precedence when name conflicts occur
- Both custom and built-in actions are available in the same workflow
Benefits of --actions-dir:
- ✅ No package installation required for custom actions
- ✅ Fast iteration: edit action → test immediately
- ✅ Simple project structure without setup.py/pyproject.toml
- ✅ Keep actions alongside your YAML configs where they belong
- ✅ Use both custom actions AND built-in actions (bash, log, send_event, etc.)
- ✅ Override built-in actions with custom implementations when needed
Variable Interpolation
New in v0.1.0: The engine now provides automatic variable interpolation at the engine level, making context data available to all actions consistently.
How It Works
The engine automatically substitutes {variable} placeholders in action configurations before passing them to actions. This happens transparently for all action types (built-in and custom).
Supported Variable Types
Simple Variables
Access any value in the context dictionary:
actions:
processing:
- type: log
message: "Processing job {job_id} with status {status}"
- type: bash
command: "python process.py --id {job_id} --state {current_state}"
Available context variables:
{job_id}- Current job ID{id}- Alias for job_id{current_state}- Current state machine state{machine_name}- Name of the current machine{status}- Job status- Any custom variables added by actions to context
Nested Variables
Access nested data using dot notation:
actions:
relaying:
- type: bash
command: "process {event_data.payload.job_id}"
- type: log
message: "Input: {event_data.payload.input_file}, Prompt: {event_data.payload.user_prompt}"
- type: send_event
target_machine: worker
event_type: task_request
payload:
file: "{event_data.payload.input_image}"
user: "{event_data.payload.user.name}"
priority: "{event_data.payload.metadata.priority}"
Common nested paths:
{event_data.payload.*}- Event payload fields{event_data.event_name}- The event that triggered this action{current_job.data.*}- Job data fields (if job_model is used)
Custom Actions and Context Modification
Custom actions can now modify the context dictionary, and those changes will be visible to subsequent actions through variable interpolation:
# custom_extract_action.py
class CustomExtractAction(BaseAction):
async def execute(self, context):
# Extract data from event payload
payload = context['event_data']['payload']
# Add to context for subsequent actions
context['user_id'] = payload.get('user_id')
context['file_path'] = payload.get('input_file')
context['processing_mode'] = 'fast'
return 'extracted'
# worker.yaml
actions:
extracting:
- type: custom_extract
success: extracted
# These actions now see the extracted variables
- type: log
message: "Processing file {file_path} for user {user_id} in {processing_mode} mode"
- type: bash
command: "process --user {user_id} --file {file_path} --mode {processing_mode}"
Benefits
- ✅ No repetitive references: Extract once, use everywhere (no more
{event_data.payload.field}everywhere) - ✅ Cleaner YAML: Shorter, more readable action configurations
- ✅ Type safety: Values are automatically converted to strings
- ✅ Consistent behavior: All actions (built-in and custom) use the same interpolation
- ✅ Unknown placeholders preserved: If a variable doesn't exist, the placeholder remains for debugging
- ✅ Special characters supported: Handles spaces, quotes, and special characters correctly
Advanced Examples
Combining static and dynamic values:
- type: bash
command: "convert {input_file} -resize {width}x{height} {output_file}"
params:
output_file: "/tmp/{job_id}_resized.png" # Interpolated
width: "800" # Static
Deeply nested structures:
- type: send_event
target_machine: logger
event_type: log_activity
payload:
user:
id: "{event_data.payload.user.id}"
name: "{event_data.payload.user.name}"
action: "{event_data.payload.metadata.action}"
timestamp: "{current_timestamp}"
List processing:
- type: multi_step
steps:
- "step1 {job_id}"
- "step2 {output_dir}/{file_name}"
- "step3 {status}"
Implementation Details
The interpolation happens in the StateMachineEngine._interpolate_config() method before actions are executed. This ensures:
- All action types benefit automatically
- Custom actions don't need to implement their own interpolation
- Variables are resolved consistently across the entire workflow
- Performance is optimized (single pass per action config)
Event Context Promotion (context_map)
New in v1.0.74: Events can declare a context_map that promotes payload fields
to durable top-level context keys the moment an event fires — before any action runs.
The Problem
context["event_data"] is overwritten on every incoming event. Any payload field
accessed via {event_data.payload.field} is lost as soon as the next event arrives.
This forces actions to manually copy values into context as workarounds.
Solution
Declare context_map on events in YAML. The engine promotes fields atomically at
the boundary where external data enters:
events:
transcribed:
context_map:
user_utterance: payload.user_utterance # context["user_utterance"] = event["payload"]["user_utterance"]
speak_done: {} # no promotion needed
incoming_call:
context_map:
call_sid: payload.call_sid
caller: payload.caller
Downstream actions reference {user_utterance} — a durable context key that
survives any number of subsequent events.
How It Works
- Config load:
load_config()builds an index of event → context_map entries - Event arrives:
_check_control_socket()writescontext["event_data"](as before) - Promotion: Engine applies
context_map— walks the dot-path in the event dict, writes the value to a top-level context key - Action runs: Actions see both
event_data(transient) and promoted keys (durable)
Backward Compatibility
The events: block accepts both formats:
# Old format (flat list) — still works, no promotion
events:
- transcribed
- speak_done
- hangup
# New format (dict) — enables context_map
events:
transcribed:
context_map:
user_utterance: payload.user_utterance
speak_done: {}
hangup: {}
Rules
- Missing payload fields are silently skipped — no error if the path doesn't exist
- Null values are skipped — only non-None values are promoted
- Dot notation traverses nested dicts:
payload.user.name→event["payload"]["user"]["name"] - Overwrites are intentional — each event firing re-promotes its mapped fields
- No action needed — promotion happens at engine level before
process_event()
Example: Voice Coordinator
events:
transcribed:
context_map:
user_utterance: payload.user_utterance
actions:
classifying:
- type: yamlgraph
params:
input_value: "{user_utterance}" # durable — survives speak_done
forwarding_to_ninchat:
- type: ninchat_send
params:
text: "{user_utterance}" # still available after acknowledging
Without context_map, user_utterance would be lost after any speak_done event
overwrites event_data.
Multi-Machine Setup
State machines can communicate via events:
# worker.yaml
transitions:
- from: processing
to: notifying
event: job_done
actions:
- type: send_event
params:
target: controller
event_type: task_completed
Event Payload Forwarding
The send_event action supports powerful payload forwarding and transformation capabilities for multi-machine orchestration.
Automatic JSON Parsing
External event payloads sent as JSON strings are automatically parsed to dictionaries:
# Send event via CLI with JSON payload
statemachine-db send-event \
--target worker \
--type process_task \
--payload '{"file": "image.png", "user_id": 123}'
The receiving machine automatically parses the JSON string to a dictionary, making fields accessible in actions.
Extracting Specific Fields
Extract and forward specific fields from received payloads:
# controller.yaml
relaying_to_worker:
- type: send_event
target_machine: worker
event_type: task_request
payload:
input_file: "{event_data.payload.file}"
user_id: "{event_data.payload.user_id}"
priority: "high" # Add static values
success: relay_complete
Nested Field Access
Access nested fields using dot notation:
# Extract nested data
relaying_user_info:
- type: send_event
target_machine: logger
event_type: log_activity
payload:
user_id: "{event_data.payload.user.id}"
user_name: "{event_data.payload.user.name}"
action: "{event_data.payload.metadata.action}"
Forwarding Entire Payloads
Forward the complete payload without modification:
# Simple relay pattern
relaying:
- type: send_event
target_machine: downstream_worker
event_type: relay_complete
payload: "{event_data.payload}" # Forward entire dict
success: relay_sent
Multi-Machine Orchestration Example
A complete controller pattern that relays data between multiple workers:
# controller.yaml
metadata:
name: "Image Processing Controller"
machine_name: controller
initial_state: waiting
transitions:
# Receive from generator
- from: waiting
to: relaying_to_processor
event: image_generated
# Relay to face processor
- from: relaying_to_processor
to: waiting_for_processor
event: start_relay
# Receive from processor
- from: waiting_for_processor
to: relaying_to_finalizer
event: processing_complete
# Relay to finalizer
- from: relaying_to_finalizer
to: waiting
event: relay_complete
actions:
# Extract specific fields and relay
relaying_to_processor:
- type: send_event
target_machine: face_processor
event_type: process_faces
payload:
base_image: "{event_data.payload.generated_image}"
job_id: "{event_data.payload.job_id}"
style: "{event_data.payload.face_style}"
success: start_relay
# Forward complete result
relaying_to_finalizer:
- type: send_event
target_machine: finalizer
event_type: finalize_image
payload: "{event_data.payload}" # Forward everything
success: relay_complete
Benefits of Payload Forwarding
- Performance: 10-50x faster than bash subprocess workarounds
- Type Safety: Automatic JSON parsing with error handling
- Clarity: Explicit field extraction shows data dependencies
- Flexibility: Mix extracted fields with static values
- Simplicity: No custom bash actions needed for relay patterns
Real-Time Monitoring
WebSocket Server
The WebSocket server provides real-time monitoring capabilities:
# Start WebSocket server
python -m statemachine_engine.monitoring.websocket_server
# Or use the integrated UI command (starts both WebSocket + Web UI)
statemachine-ui
Endpoints:
- WebSocket stream:
ws://localhost:8765/ws - Health check:
http://localhost:8765/health
Web UI
The package includes a comprehensive web UI for visualizing and monitoring state machines:
# Start Web UI (includes WebSocket server)
statemachine-ui
# Start with custom settings
statemachine-ui --port 3001 --project-root /path/to/project
Features:
- Real-time state machine visualization with Mermaid diagrams
- Live machine status updates and event streaming
- Kanban board view for monitoring multiple FSM instances (Press 'K' to toggle)
- Interactive state transition monitoring
- Event history and activity logs
- Multi-machine coordination display
Kanban View:
The UI includes a Kanban board view for visualizing multiple instances of the same FSM template:
- Toggle: Press 'K' or click the "Show Kanban View" button
- State Groups: States are organized into logical groups (e.g., IDLE, PROCESSING, COMPLETION)
- Horizontal Layout: Groups flow left-to-right, states stack vertically within each group
- Real-time Updates: Cards move automatically as machines transition between states
- Multi-Instance: Shows all running instances of the current template (e.g., patient_record_1, patient_record_2, patient_record_3)
Example with patient records demo:
IDLE │ PROCESSING │ COMPLETION
─────────────── │ ───────────────── │ ──────────────
waiting_for_ │ summarizing │ ready
report │ • patient_1 │ • patient_3
│ │
│ fact_checking │ failed
│ • patient_2 │
│ │
│ │ shutdown
Access: http://localhost:3001
Tab Consolidation (v1.0.71+):
- Templated machines automatically consolidate into single tabs with count badges
- Example:
patient_record_job_001,_002,_003→Patient Record Job (3) - Detects
_NNNsuffix pattern automatically - Individual (non-templated) machines remain as separate tabs
- Scales efficiently: 100 instances = 1 tab with "(100)" badge
Auto-View Switching (v1.0.70+):
- UI automatically switches to Kanban view for templated machines
- Automatically switches to Diagram view for unique machines
- No manual 'K' key press needed
- View selection based on machine template metadata
Requirements:
- Node.js (for Web UI functionality)
- Generated diagrams (run
statemachine-diagrams config.yamlfirst)
Examples
Running the Examples
Simple Worker
cd examples/simple_worker
statemachine config/worker.yaml --machine-name worker
# Or with debug logging:
statemachine config/worker.yaml --machine-name worker --debug
Controller/Worker (Multi-Machine)
cd examples/controller_worker
# Option 1: Use the run script
./run.sh
# Option 2: Run in separate terminals
# Terminal 1:
statemachine config/controller.yaml --machine-name controller
# Terminal 2:
statemachine config/worker.yaml --machine-name worker
Available Examples:
- Simple Worker - Basic job processing with database queue
- Controller/Worker - Multi-machine event coordination
Tools & Utilities
Validate Configurations
statemachine-validate config/worker.yaml # Single file
statemachine-validate config/*.yaml # All configs
statemachine-validate --strict config/*.yaml # Fail on warnings
Checks: event coverage, action emissions, unreachable states, self-loops
Monitor Real-Time Events
statemachine-events # All machines, human format
statemachine-events --machine simple_worker # Filter by machine
statemachine-events --format json > events.log # JSON output
statemachine-events --duration 60 # Time limit
Connects to /tmp/statemachine-events.sock to display all state changes in real-time
Production Templates
The templates/ directory contains production-ready templates:
Production startup script:
# Copy template to your project
cp templates/start-system.sh ./
chmod +x start-system.sh
# Customize for your configs
vim start-system.sh # Edit CONFIG_FILES and MACHINE_CONFIGS
# Run your system
./start-system.sh
See templates/README.md for full customization guide.
Start Worker
# Start with defaults
./scripts/start-worker.sh
# Specify config and machine name
./scripts/start-worker.sh examples/simple_worker/config/worker.yaml my_worker
Development System Startup
Development/testing script (repository only):
./scripts/start-system.sh
# Comprehensive startup that:
# - Validates all YAML configurations
# - Generates FSM documentation diagrams
# - Starts WebSocket monitoring server
# - Launches state machines
# - Starts Web UI (if Node.js available)
# - Handles graceful shutdown (Ctrl+C)
This script provides a complete system startup with:
- Virtual environment activation and validation
- Pre-flight configuration validation
- FSM diagram generation from YAML configs
- WebSocket server with health check polling
- State machine launching
- Web UI startup (optional, requires Node.js)
- Cleanup trap for graceful shutdown
Usage:
# Start the complete system
./scripts/start-system.sh
# View logs while running
tail -f logs/*.log
# Stop with Ctrl+C (automatic cleanup)
Documentation
- Quickstart Guide - Get started in 5 minutes
- CLAUDE.md - Architecture and development guide
Development
Development Setup
# Install in development mode with all dev dependencies
pip install -e ".[dev]"
# Install pre-commit hooks (BOTH required)
pre-commit install
pre-commit install --hook-type commit-msg
Pre-commit hooks enforce: ruff lint + format, file size gate (450 max), forbidden terms, pytest, and conventional commits.
Migrating add-job Scripts (v1.0.2 → v1.0.3)
If you have existing scripts using add-job, update them as follows:
# Pattern 1: Image processing with prompt
# OLD:
add-job $JOB_ID --type face_processing --input-image "$IMAGE" --prompt "$PROMPT"
# NEW:
add-job $JOB_ID --type face_processing --input-file "$IMAGE" --payload "{\"prompt\":\"$PROMPT\"}"
# Pattern 2: Image generation with multiple prompts
# OLD:
add-job $JOB_ID --type pony_flux --pony-prompt "$PONY" --flux-prompt "$FLUX"
# NEW:
add-job $JOB_ID --type pony_flux --payload "{\"pony_prompt\":\"$PONY\",\"flux_prompt\":\"$FLUX\"}"
# Pattern 3: With padding factors
# OLD:
add-job $JOB_ID --type face_processing --input-image "$IMG" --padding-factor 1.5 --mask-padding-factor 1.2
# NEW:
add-job $JOB_ID --type face_processing --input-file "$IMG" --payload '{"padding_factor":1.5,"mask_padding_factor":1.2}'
# Pattern 4: Custom job types (now supported!)
# NEW: You can now use ANY job type string
add-job $JOB_ID --type custom_workflow --payload '{"config":"value"}'
Helper function for easy migration:
# Add to your scripts for backward compatibility
add_job_v103() {
local job_id="$1"
local job_type="$2"
local input_file="$3"
local payload="$4"
statemachine-db add-job "$job_id" \
--type "$job_type" \
${input_file:+--input-file "$input_file"} \
${payload:+--payload "$payload"}
}
# Usage:
add_job_v103 "job123" "image_processing" "/path/to/image.jpg" '{"prompt":"enhance"}'
Testing State Transitions
You can manually test state transitions by sending events to running machines:
# Start a machine (in one terminal)
statemachine examples/simple_worker/config/worker.yaml
# Send events to trigger transitions (in another terminal)
statemachine-db send-event \
--target simple_worker \
--type new_job
# Check machine state
statemachine-db machine-state --format json
# List recent events
statemachine-db list-events \
--target simple_worker \
--limit 10
Testing Simple Worker Transitions
The simple_worker example has these transitions:
initializing→waiting(event:initialized) - automatic on startupwaiting→processing(event:new_job) - trigger with send-eventprocessing→completed(event:job_done) - automatic after processingcompleted→waiting(event:new_job) - trigger to loop back*→completed(event:stop) - graceful shutdown from any state
Test scenario:
# Terminal 1: Start the worker
statemachine examples/simple_worker/config/worker.yaml
# Terminal 2: Test transitions
# 1. Trigger a job (waiting → processing → completed)
statemachine-db send-event --target simple_worker --type new_job
# 2. Watch state changes in real-time
watch -n 1 'statemachine-db machine-state'
# 3. Trigger another job (completed → waiting → processing → completed)
statemachine-db send-event --target simple_worker --type new_job
# 4. Stop the machine (any state → completed)
statemachine-db send-event --target simple_worker --type stop
How Event Delivery Works
When you use send-event, the CLI:
- Writes event to database - Logs the event in the
machine_eventstable (audit trail) - Sends event via Unix socket - Delivers the actual event with payload to
/tmp/statemachine-control-{machine_name}.sock - Machine processes event - State machine receives event from socket and executes the transition immediately
- Broadcasts state change - Updates are sent to
/tmp/statemachine-events.sock→ WebSocket → UI
Important: The machine_events database table is an audit log only. The actual event delivery happens via Unix sockets in real-time. Events are not read from the database - they're delivered directly through the socket.
Unix Socket Paths:
- Control sockets:
/tmp/statemachine-control-{machine_name}.sock(receives events with full payload) - Event socket:
/tmp/statemachine-events.sock(broadcasts state changes to WebSocket server) - WebSocket:
ws://localhost:3002/ws/events(real-time updates to browser UI)
This dual approach (database + Unix socket) ensures:
- Reliability: Events are logged for audit (database persistence)
- Speed: Zero-latency event delivery via Unix socket (no polling)
- Monitoring: Real-time visibility via WebSocket broadcasting to UI
CLI Commands
statemachine # Run state machines
statemachine-ui # Web UI server with real-time visualization
statemachine-db # Database operations (events, jobs, state)
statemachine-diagrams # Generate FSM diagrams from YAML
statemachine-validate # Validate YAML configurations
statemachine-events # Monitor real-time events from Unix socket
Database Commands
# Events
statemachine-db send-event --target <machine> --type <event>
statemachine-db list-events --target <machine> --limit 10
# Send events with real-time UI updates (NEW)
# Sends to both database AND Unix socket for instant UI display
statemachine-db send-event --target ui --type activity_log \
--payload '{"message": "Task completed", "level": "SUCCESS"}'
# Custom source attribution for UI display
statemachine-db send-event --target ui --type activity_log \
--source my_tool --payload '{"message": "Processing...", "level": "INFO"}'
# Send to state machines (goes to database + machine control socket + WebSocket UI)
statemachine-db send-event --target worker1 --type custom_event \
--job-id job123 --payload '{"data": "value"}'
# Jobs (NEW in v1.0.3: Fully generic job creation)
# Add jobs with any job type and custom JSON payload
statemachine-db add-job job_001 \
--type image_processing \
--payload '{"input": "image.jpg", "config": {"quality": 95}}'
# Add job with machine type (routes to specific worker type)
statemachine-db add-job job_002 \
--type video_transcode \
--machine-type video_worker \
--payload '{"source": "video.mp4", "format": "h264"}'
# Add job with input file reference
statemachine-db add-job job_003 \
--type document_convert \
--input-file /path/to/document.pdf \
--payload '{"output_format": "docx"}'
# Add job with complex nested data
statemachine-db add-job ml_batch_001 \
--type ml_inference \
--payload '{
"model": "resnet50",
"input": {"image": "photo.jpg"},
"options": {"batch_size": 32, "gpu": true}
}'
# List and filter jobs
statemachine-db list --status pending
statemachine-db list --type image_processing --limit 20
statemachine-db list --status completed
# Job details
statemachine-db details <job-id>
statemachine-db details test_job_001
# State
statemachine-db machine-state
# State Transition History
statemachine-db transition-history # Show all state transitions
statemachine-db transition-history --machine worker1 # Filter by machine
statemachine-db transition-history --hours 24 # Last 24 hours
statemachine-db transition-history --limit 50 # Limit results
statemachine-db transition-history --format json # JSON output
# Error/Exception History
statemachine-db error-history # Show all errors
statemachine-db error-history --machine worker1 # Filter by machine
statemachine-db error-history --hours 1 # Last hour
statemachine-db error-history --format json # JSON output
Real-time Event Delivery (NEW in v0.0.20):
send-eventnow delivers events to the Web UI instantly via Unix socket- Activity logs sent via CLI appear immediately in the UI (no refresh needed)
- Requires WebSocket server (
statemachine-ui) to be running - Falls back gracefully to database-only if server unavailable
Running Unit Tests
# Install dev dependencies first
pip install -e ".[dev]"
# Run all tests
pytest tests/ -v
# Run with detailed output
pytest tests/ -vv
# Run specific test files
pytest tests/actions/test_bash_action_fallback.py -v
pytest tests/communication/test_control_socket.py -v
# Run specific test categories
pytest tests/actions/ -v # Action tests
pytest tests/communication/ -v # Communication tests
pytest tests/database/ -v # Database tests
# Show test summary
pytest tests/ --tb=short
# Run tests with coverage (install pytest-cov first)
pytest tests/ --cov=statemachine_engine --cov-report=html
Current Test Status: 143 tests total (136 passing, 0 failing, 7 skipped) - 100% pass rate ✅
New in v0.0.18+:
- Comprehensive exception handling tests for realtime events
- CLI history command tests (transition-history, error-history)
- Engine error emission tests
- Real-time socket delivery tests for send-event CLI (v0.0.20)
Building the Package
# Build distribution packages
python -m build
# Check the built package
ls dist/
# statemachine_engine-1.0.0-py3-none-any.whl
# statemachine_engine-1.0.0.tar.gz
License
MIT License - see LICENSE file
Repository
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file statemachine_engine-1.0.85.tar.gz.
File metadata
- Download URL: statemachine_engine-1.0.85.tar.gz
- Upload date:
- Size: 3.8 MB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
488cb2b0235881926fd34778f886657038d1ae0548aea2d9e1457d6fde922f64
|
|
| MD5 |
c8741685cbe7412acefb968d82848f50
|
|
| BLAKE2b-256 |
365cb5dbd7da1132fa39550ba692902ff692b34a545ef34f4c92292599a65bd9
|
Provenance
The following attestation bundles were made for statemachine_engine-1.0.85.tar.gz:
Publisher:
release.yml on sheikkinen/statemachine-engine
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
statemachine_engine-1.0.85.tar.gz -
Subject digest:
488cb2b0235881926fd34778f886657038d1ae0548aea2d9e1457d6fde922f64 - Sigstore transparency entry: 1109432399
- Sigstore integration time:
-
Permalink:
sheikkinen/statemachine-engine@9ce5319cc4291255c2cabe2ed82c8b04b03b4e9e -
Branch / Tag:
refs/tags/v1.0.85 - Owner: https://github.com/sheikkinen
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@9ce5319cc4291255c2cabe2ed82c8b04b03b4e9e -
Trigger Event:
push
-
Statement type:
File details
Details for the file statemachine_engine-1.0.85-py3-none-any.whl.
File metadata
- Download URL: statemachine_engine-1.0.85-py3-none-any.whl
- Upload date:
- Size: 5.2 MB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ede4142538644e6ae2e36d180655ff00eac18a3e062fe31e71310ab0f22386f2
|
|
| MD5 |
b0c41d901335c32f0a91e890526af497
|
|
| BLAKE2b-256 |
d3321e5eba0979cfb41004975def6ab7e19aa9b89d921b55b2402468465bea5d
|
Provenance
The following attestation bundles were made for statemachine_engine-1.0.85-py3-none-any.whl:
Publisher:
release.yml on sheikkinen/statemachine-engine
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
statemachine_engine-1.0.85-py3-none-any.whl -
Subject digest:
ede4142538644e6ae2e36d180655ff00eac18a3e062fe31e71310ab0f22386f2 - Sigstore transparency entry: 1109432403
- Sigstore integration time:
-
Permalink:
sheikkinen/statemachine-engine@9ce5319cc4291255c2cabe2ed82c8b04b03b4e9e -
Branch / Tag:
refs/tags/v1.0.85 - Owner: https://github.com/sheikkinen
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@9ce5319cc4291255c2cabe2ed82c8b04b03b4e9e -
Trigger Event:
push
-
Statement type: