A generic state machine engine for event-driven workflows
Project description
State Machine Engine
Event-driven state machine framework with real-time monitoring and database-backed job queue.
Features
- YAML-Based Configuration: Define workflows declaratively
- Pluggable Actions: Extensible action system with built-in actions
- Real-Time Monitoring: WebSocket server for live state visualization
- Database-Backed Queue: SQLite-based persistent job queue
- Unix Socket Communication: Low-latency inter-machine events
- Multi-Machine Coordination: Event-driven machine-to-machine communication
Installation
From Source (Development)
# Clone the repository
git clone https://github.com/sheikkinen/statemachine-engine.git
cd statemachine-engine
# Create virtual environment (recommended)
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Option 1: Install with pip (uses pyproject.toml)
pip install -e ".[dev]"
# Option 2: Install with requirements files
pip install -r requirements-dev.txt
From PyPI (Coming Soon)
pip install statemachine-engine
Dependencies
The package requires Python 3.9+ and automatically installs:
- PyYAML (YAML configuration parsing)
- FastAPI (WebSocket server)
- Uvicorn (ASGI server)
- websockets (WebSocket protocol)
Development dependencies (optional):
- pytest (testing framework)
- pytest-asyncio (async test support)
Quick Start
1. Try the Included Examples
The package includes working example configurations:
# Simple worker example
cd examples/simple_worker
statemachine config/worker.yaml --machine-name worker
# Controller/worker multi-machine example
cd examples/controller_worker
./run.sh
See examples/ directory for complete working configurations.
2. Create Your Own Configuration
# my_worker.yaml
name: "My Worker"
initial_state: waiting
transitions:
- from: waiting
to: processing
event: new_job
actions:
- type: bash
params:
command: "echo Processing job"
success: job_done
3. Run Your State Machine
statemachine my_worker.yaml --machine-name my_worker
Or using Python directly:
from statemachine_engine.core.engine import StateMachineEngine
engine = StateMachineEngine(machine_name='my_worker')
await engine.load_config('my_worker.yaml')
await engine.execute_state_machine()
Starting Services
The statemachine-engine system consists of several components that work together. Here's how to start each service:
Complete System Startup (Recommended)
For development and testing, use the integrated startup script:
# Start everything at once
./scripts/start-system.sh
# This automatically starts:
# - WebSocket monitoring server
# - Web UI (if Node.js is available)
# - Example state machines
# - Generates FSM diagrams
Individual Service Startup
For production or custom setups, start services individually:
1. State Machine (Core Service)
# Basic usage
statemachine config/worker.yaml --machine-name my_worker
# With debug logging
statemachine config/worker.yaml --machine-name my_worker --debug
# Multiple machines (run in separate terminals)
statemachine config/controller.yaml --machine-name controller
statemachine config/worker.yaml --machine-name worker
2. Web UI Server (Visualization)
# Start web UI with current project
statemachine-ui
# Start with custom project root
statemachine-ui --project-root /path/to/your/project
# Start on custom port
statemachine-ui --port 8080
# Skip WebSocket server (if already running)
statemachine-ui --no-websocket
Access at: http://localhost:3001
3. WebSocket Server (Real-time Monitoring)
# Start WebSocket server
python -m statemachine_engine.monitoring.websocket_server
# Custom port
python -m statemachine_engine.monitoring.websocket_server --port 8765
Endpoints:
- WebSocket:
ws://localhost:8765/ws - Health check:
http://localhost:8765/health
4. Generate Diagrams
# Generate diagrams for UI
statemachine-diagrams config/worker.yaml
# Or use the alias
statemachine-fsm config/worker.yaml
External Project Setup
If you're using statemachine-engine in your own project:
# In your project directory
cd /path/to/your/project
# 1. Generate diagrams for your config
statemachine-diagrams config/worker.yaml
# 2. Start UI with your project root
statemachine-ui --project-root $(pwd)
# 3. Start your state machine
statemachine config/worker.yaml --machine-name my_worker
# 4. Test with events
statemachine-db send-event --target my_worker --type new_job
Database Commands
# Send events to trigger state transitions
statemachine-db send-event --target my_worker --type new_job
# Check machine states
statemachine-db machine-state
# List recent events
statemachine-db list-events --target my_worker --limit 10
# View job queue
statemachine-db list-jobs --status pending
Service Dependencies
Minimum Setup:
- State machine:
statemachine config.yaml --machine-name name
With Monitoring:
- State machine + WebSocket server
- Access real-time events at
ws://localhost:8765/ws
With Visualization:
- State machine + WebSocket server + Web UI
- Full visual interface at
http://localhost:3001
Requirements:
- Python 3.9+ (required)
- Node.js (optional, for Web UI)
- npm (optional, for Web UI dependencies)
Troubleshooting
Web UI can't find diagrams:
# Ensure diagrams are generated in your project
statemachine-diagrams config/worker.yaml
# Start UI with correct project root
statemachine-ui --project-root $(pwd)
Port conflicts:
# Use custom ports
statemachine-ui --port 8080
python -m statemachine_engine.monitoring.websocket_server --port 9000
Missing dependencies:
# Install with all dependencies
pip install statemachine-engine[dev]
# Or install Node.js for Web UI
# macOS: brew install node
# Ubuntu: apt install nodejs npm
Built-In Actions
- bash: Execute shell commands
- log: Activity logging
- check_database_queue: Check job queue for pending jobs
- check_machine_state: Monitor machine states
- clear_events: Clean up processed events
- send_event: Send events to other machines
Custom Actions
Creating Custom Actions
Extend the framework with your own actions by inheriting from BaseAction:
1. Create action file (e.g., my_custom_action.py):
from statemachine_engine.actions import BaseAction
class MyCustomAction(BaseAction):
async def execute(self, context):
# Access config parameters from YAML
param_value = self.config.get('params', {}).get('my_param')
# Access execution context (job_id, machine_name, etc.)
job_id = context.get('job_id')
machine = self.get_machine_name(context)
# Your custom logic
self.logger.info(f"Processing {job_id} on {machine}")
# Return event name to trigger next transition
return self.config.get('params', {}).get('success', 'success')
2. Place in your project's actions directory:
my_project/
├── actions/
│ ├── my_custom_action.py
│ └── another_action.py
└── config/
└── worker.yaml
3. Use in YAML configuration:
actions:
- type: my_custom # Maps to my_custom_action.py → MyCustomAction class
params:
my_param: "value"
success: job_done
4. Run with custom actions directory:
# Use --actions-dir to specify your custom actions directory
statemachine config/worker.yaml \
--machine-name my_worker \
--actions-dir ./actions
# Supports absolute and relative paths
statemachine config/worker.yaml \
--machine-name my_worker \
--actions-dir /path/to/my_project/actions
# Supports ~ (home directory) expansion
statemachine config/worker.yaml \
--machine-name my_worker \
--actions-dir ~/projects/my_worker/actions
Action Discovery
The ActionLoader automatically discovers actions following these conventions:
- File naming:
{action_type}_action.py - Class naming:
{ActionType}Action(PascalCase) - Example:
my_custom_action.py→MyCustomActionclass - YAML reference:
type: my_custom
Discovery Locations:
- With
--actions-dir: Discovers from BOTH custom directory AND built-in actions - Without
--actions-dir: Discovers only from the installed package'sactions/directory
Action Precedence:
- Custom actions can override built-in actions with the same name
- Custom actions take precedence when name conflicts occur
- Both custom and built-in actions are available in the same workflow
Benefits of --actions-dir:
- ✅ No package installation required for custom actions
- ✅ Fast iteration: edit action → test immediately
- ✅ Simple project structure without setup.py/pyproject.toml
- ✅ Keep actions alongside your YAML configs where they belong
- ✅ Use both custom actions AND built-in actions (bash, log, send_event, etc.)
- ✅ Override built-in actions with custom implementations when needed
Multi-Machine Setup
State machines can communicate via events:
# worker.yaml
transitions:
- from: processing
to: notifying
event: job_done
actions:
- type: send_event
params:
target: controller
event_type: task_completed
Event Payload Forwarding
The send_event action supports powerful payload forwarding and transformation capabilities for multi-machine orchestration.
Automatic JSON Parsing
External event payloads sent as JSON strings are automatically parsed to dictionaries:
# Send event via CLI with JSON payload
statemachine-db send-event \
--target worker \
--type process_task \
--payload '{"file": "image.png", "user_id": 123}'
The receiving machine automatically parses the JSON string to a dictionary, making fields accessible in actions.
Extracting Specific Fields
Extract and forward specific fields from received payloads:
# controller.yaml
relaying_to_worker:
- type: send_event
target_machine: worker
event_type: task_request
payload:
input_file: "{event_data.payload.file}"
user_id: "{event_data.payload.user_id}"
priority: "high" # Add static values
success: relay_complete
Nested Field Access
Access nested fields using dot notation:
# Extract nested data
relaying_user_info:
- type: send_event
target_machine: logger
event_type: log_activity
payload:
user_id: "{event_data.payload.user.id}"
user_name: "{event_data.payload.user.name}"
action: "{event_data.payload.metadata.action}"
Forwarding Entire Payloads
Forward the complete payload without modification:
# Simple relay pattern
relaying:
- type: send_event
target_machine: downstream_worker
event_type: relay_complete
payload: "{event_data.payload}" # Forward entire dict
success: relay_sent
Multi-Machine Orchestration Example
A complete controller pattern that relays data between multiple workers:
# controller.yaml
metadata:
name: "Image Processing Controller"
machine_name: controller
initial_state: waiting
transitions:
# Receive from generator
- from: waiting
to: relaying_to_processor
event: image_generated
# Relay to face processor
- from: relaying_to_processor
to: waiting_for_processor
event: start_relay
# Receive from processor
- from: waiting_for_processor
to: relaying_to_finalizer
event: processing_complete
# Relay to finalizer
- from: relaying_to_finalizer
to: waiting
event: relay_complete
actions:
# Extract specific fields and relay
relaying_to_processor:
- type: send_event
target_machine: face_processor
event_type: process_faces
payload:
base_image: "{event_data.payload.generated_image}"
job_id: "{event_data.payload.job_id}"
style: "{event_data.payload.face_style}"
success: start_relay
# Forward complete result
relaying_to_finalizer:
- type: send_event
target_machine: finalizer
event_type: finalize_image
payload: "{event_data.payload}" # Forward everything
success: relay_complete
Benefits of Payload Forwarding
- Performance: 10-50x faster than bash subprocess workarounds
- Type Safety: Automatic JSON parsing with error handling
- Clarity: Explicit field extraction shows data dependencies
- Flexibility: Mix extracted fields with static values
- Simplicity: No custom bash actions needed for relay patterns
Real-Time Monitoring
WebSocket Server
The WebSocket server provides real-time monitoring capabilities:
# Start WebSocket server
python -m statemachine_engine.monitoring.websocket_server
# Or use the integrated UI command (starts both WebSocket + Web UI)
statemachine-ui
Endpoints:
- WebSocket stream:
ws://localhost:8765/ws - Health check:
http://localhost:8765/health
Web UI
The package includes a comprehensive web UI for visualizing and monitoring state machines:
# Start Web UI (includes WebSocket server)
statemachine-ui
# Start with custom settings
statemachine-ui --port 3001 --project-root /path/to/project
Features:
- Real-time state machine visualization with Mermaid diagrams
- Live machine status updates and event streaming
- Interactive state transition monitoring
- Event history and activity logs
- Multi-machine coordination display
Access: http://localhost:3001
Requirements:
- Node.js (for Web UI functionality)
- Generated diagrams (run
statemachine-diagrams config.yamlfirst)
Examples
Running the Examples
Simple Worker
cd examples/simple_worker
statemachine config/worker.yaml --machine-name worker
# Or with debug logging:
statemachine config/worker.yaml --machine-name worker --debug
Controller/Worker (Multi-Machine)
cd examples/controller_worker
# Option 1: Use the run script
./run.sh
# Option 2: Run in separate terminals
# Terminal 1:
statemachine config/controller.yaml --machine-name controller
# Terminal 2:
statemachine config/worker.yaml --machine-name worker
Available Examples:
- Simple Worker - Basic job processing with database queue
- Controller/Worker - Multi-machine event coordination
Tools & Utilities
Validate Configurations
statemachine-validate config/worker.yaml # Single file
statemachine-validate config/*.yaml # All configs
statemachine-validate --strict config/*.yaml # Fail on warnings
Checks: event coverage, action emissions, unreachable states, self-loops
Monitor Real-Time Events
statemachine-events # All machines, human format
statemachine-events --machine simple_worker # Filter by machine
statemachine-events --format json > events.log # JSON output
statemachine-events --duration 60 # Time limit
Connects to /tmp/statemachine-events.sock to display all state changes in real-time
Production Templates
The templates/ directory contains production-ready templates:
Production startup script:
# Copy template to your project
cp templates/start-system.sh ./
chmod +x start-system.sh
# Customize for your configs
vim start-system.sh # Edit CONFIG_FILES and MACHINE_CONFIGS
# Run your system
./start-system.sh
See templates/README.md for full customization guide.
Start Worker
# Start with defaults
./scripts/start-worker.sh
# Specify config and machine name
./scripts/start-worker.sh examples/simple_worker/config/worker.yaml my_worker
Development System Startup
Development/testing script (repository only):
./scripts/start-system.sh
# Comprehensive startup that:
# - Validates all YAML configurations
# - Generates FSM documentation diagrams
# - Starts WebSocket monitoring server
# - Launches state machines
# - Starts Web UI (if Node.js available)
# - Handles graceful shutdown (Ctrl+C)
This script provides a complete system startup with:
- Virtual environment activation and validation
- Pre-flight configuration validation
- FSM diagram generation from YAML configs
- WebSocket server with health check polling
- State machine launching
- Web UI startup (optional, requires Node.js)
- Cleanup trap for graceful shutdown
Usage:
# Start the complete system
./scripts/start-system.sh
# View logs while running
tail -f logs/*.log
# Stop with Ctrl+C (automatic cleanup)
Documentation
- Quickstart Guide - Get started in 5 minutes
- CLAUDE.md - Architecture and development guide
Development
Testing State Transitions
You can manually test state transitions by sending events to running machines:
# Start a machine (in one terminal)
statemachine examples/simple_worker/config/worker.yaml
# Send events to trigger transitions (in another terminal)
statemachine-db send-event \
--target simple_worker \
--type new_job
# Check machine state
statemachine-db machine-state --format json
# List recent events
statemachine-db list-events \
--target simple_worker \
--limit 10
Testing Simple Worker Transitions
The simple_worker example has these transitions:
initializing→waiting(event:initialized) - automatic on startupwaiting→processing(event:new_job) - trigger with send-eventprocessing→completed(event:job_done) - automatic after processingcompleted→waiting(event:new_job) - trigger to loop back*→completed(event:stop) - graceful shutdown from any state
Test scenario:
# Terminal 1: Start the worker
statemachine examples/simple_worker/config/worker.yaml
# Terminal 2: Test transitions
# 1. Trigger a job (waiting → processing → completed)
statemachine-db send-event --target simple_worker --type new_job
# 2. Watch state changes in real-time
watch -n 1 'statemachine-db machine-state'
# 3. Trigger another job (completed → waiting → processing → completed)
statemachine-db send-event --target simple_worker --type new_job
# 4. Stop the machine (any state → completed)
statemachine-db send-event --target simple_worker --type stop
How Event Delivery Works
When you use send-event, the CLI:
- Writes event to database - Logs the event in the
machine_eventstable (audit trail) - Sends event via Unix socket - Delivers the actual event with payload to
/tmp/statemachine-control-{machine_name}.sock - Machine processes event - State machine receives event from socket and executes the transition immediately
- Broadcasts state change - Updates are sent to
/tmp/statemachine-events.sock→ WebSocket → UI
Important: The machine_events database table is an audit log only. The actual event delivery happens via Unix sockets in real-time. Events are not read from the database - they're delivered directly through the socket.
Unix Socket Paths:
- Control sockets:
/tmp/statemachine-control-{machine_name}.sock(receives events with full payload) - Event socket:
/tmp/statemachine-events.sock(broadcasts state changes to WebSocket server) - WebSocket:
ws://localhost:3002/ws/events(real-time updates to browser UI)
This dual approach (database + Unix socket) ensures:
- Reliability: Events are logged for audit (database persistence)
- Speed: Zero-latency event delivery via Unix socket (no polling)
- Monitoring: Real-time visibility via WebSocket broadcasting to UI
CLI Commands
statemachine # Run state machines
statemachine-ui # Web UI server with real-time visualization
statemachine-db # Database operations (events, jobs, state)
statemachine-diagrams # Generate FSM diagrams from YAML
statemachine-validate # Validate YAML configurations
statemachine-events # Monitor real-time events from Unix socket
Database Commands
# Events
statemachine-db send-event --target <machine> --type <event>
statemachine-db list-events --target <machine> --limit 10
# Jobs
statemachine-db create-job --type <type> --data <json>
statemachine-db list-jobs --status pending
# State
statemachine-db machine-state
# State Transition History
statemachine-db transition-history # Show all state transitions
statemachine-db transition-history --machine worker1 # Filter by machine
statemachine-db transition-history --hours 24 # Last 24 hours
statemachine-db transition-history --limit 50 # Limit results
statemachine-db transition-history --format json # JSON output
# Error/Exception History
statemachine-db error-history # Show all errors
statemachine-db error-history --machine worker1 # Filter by machine
statemachine-db error-history --hours 1 # Last hour
statemachine-db error-history --format json # JSON output
Running Unit Tests
# Install dev dependencies first
pip install -e ".[dev]"
# Run all tests
pytest tests/ -v
# Run with detailed output
pytest tests/ -vv
# Run specific test files
pytest tests/actions/test_bash_action_fallback.py -v
pytest tests/communication/test_control_socket.py -v
# Run specific test categories
pytest tests/actions/ -v # Action tests
pytest tests/communication/ -v # Communication tests
pytest tests/database/ -v # Database tests
# Show test summary
pytest tests/ --tb=short
# Run tests with coverage (install pytest-cov first)
pytest tests/ --cov=statemachine_engine --cov-report=html
Current Test Status: 92 tests total (86 passing, 0 failing, 6 skipped) - 100% pass rate ✅
New in v0.0.18+:
- Comprehensive exception handling tests for realtime events
- CLI history command tests (transition-history, error-history)
- Engine error emission tests
Building the Package
# Build distribution packages
python -m build
# Check the built package
ls dist/
# statemachine_engine-1.0.0-py3-none-any.whl
# statemachine_engine-1.0.0.tar.gz
License
MIT License - see LICENSE file
Repository
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file statemachine_engine-0.0.19.tar.gz.
File metadata
- Download URL: statemachine_engine-0.0.19.tar.gz
- Upload date:
- Size: 1.6 MB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1ac6868da6f117dce0402ed131e0f42bb54dc291ef55da90d7c782d2e2226dd3
|
|
| MD5 |
4be13a8be9c70b24c8ebe6f43bef0793
|
|
| BLAKE2b-256 |
de12f40a38d44fb338e220446d5323672c867b46cfc11036abe649ffb394d8d3
|
Provenance
The following attestation bundles were made for statemachine_engine-0.0.19.tar.gz:
Publisher:
release.yml on sheikkinen/statemachine-engine
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
statemachine_engine-0.0.19.tar.gz -
Subject digest:
1ac6868da6f117dce0402ed131e0f42bb54dc291ef55da90d7c782d2e2226dd3 - Sigstore transparency entry: 600870239
- Sigstore integration time:
-
Permalink:
sheikkinen/statemachine-engine@e658b298c67b155a94efa0a046ffe94479d13262 -
Branch / Tag:
refs/tags/v0.0.19 - Owner: https://github.com/sheikkinen
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@e658b298c67b155a94efa0a046ffe94479d13262 -
Trigger Event:
push
-
Statement type:
File details
Details for the file statemachine_engine-0.0.19-py3-none-any.whl.
File metadata
- Download URL: statemachine_engine-0.0.19-py3-none-any.whl
- Upload date:
- Size: 1.9 MB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b202c95e7d214fe4ed95de714fe0067c44556a4958f19bedf4fb395f1225488d
|
|
| MD5 |
754930ac5d3a55c6552c1ad61e86e3a0
|
|
| BLAKE2b-256 |
702c8cfc6afe4be9edee96e9361513b9484b4347a415ef0606c10c5cd8f460db
|
Provenance
The following attestation bundles were made for statemachine_engine-0.0.19-py3-none-any.whl:
Publisher:
release.yml on sheikkinen/statemachine-engine
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
statemachine_engine-0.0.19-py3-none-any.whl -
Subject digest:
b202c95e7d214fe4ed95de714fe0067c44556a4958f19bedf4fb395f1225488d - Sigstore transparency entry: 600870249
- Sigstore integration time:
-
Permalink:
sheikkinen/statemachine-engine@e658b298c67b155a94efa0a046ffe94479d13262 -
Branch / Tag:
refs/tags/v0.0.19 - Owner: https://github.com/sheikkinen
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@e658b298c67b155a94efa0a046ffe94479d13262 -
Trigger Event:
push
-
Statement type: