A configuration-driven, stateless finite state machine library for Python
Project description
PyStator
A configuration-driven, stateless finite state machine library for Python with support for hierarchical states, parallel states (orthogonal regions), and async execution.
PyStator defines behavioral contracts through YAML/JSON specifications, computing state transitions without holding internal state. Designed for high-integrity systems like order management, trading workflows, and distributed applications.
Features
- Configuration-Driven: Define state machines in YAML/JSON with schema validation
- Stateless Design: Pure computation—takes state in, returns state/actions out
- Hierarchical States: Compound states with parent/child; transitions from parent match any sub-state; exit/enter chains follow LCA (see Hierarchical States)
- Parallel States: Orthogonal regions—each region is an independent sub-machine; events can trigger region-scoped transitions (see Parallel States)
- Delayed Transitions: Schedule transitions to fire after a delay with pluggable schedulers (see Delayed Transitions)
- Inline Guard Expressions: Define guards directly in YAML without Python code (see Inline Guard Expressions)
- Action Parameters: Pass configuration to actions from YAML (see Action Parameters)
- Async Actions: Register async callables and run them with sequential, parallel, or phased execution (see Async Support and Execution Modes (Actions))
- Guards: Conditional transitions based on runtime context (sync and async)
- Actions/Hooks: Entry/exit hooks and transition actions
- Timeouts/TTL: Automatic transitions after configurable durations
- Type-Safe: Full type hints and PEP 561 compliance
- Retry Mechanism: Configurable retry with exponential backoff
- Idempotency: Pluggable backends for duplicate detection
- Visualization: Generate diagrams from FSM definitions
- Scheduler Adapters: Pluggable backends for delayed transitions (asyncio, Redis, Celery)
- Optional API: REST API via
pip install pystator[api]
Installation
# Core library
pip install pystator
# With API server
pip install pystator[api]
# Development
pip install -e ".[dev]"
Quick Start
1. Define Your State Machine (YAML)
# order_fsm.yaml
meta:
version: "1.0.0"
machine_name: "order_management"
strict_mode: true
states:
- name: PENDING
type: initial
timeout:
seconds: 5.0
destination: TIMED_OUT
- name: OPEN
type: stable
on_enter:
- notify_ui
- log_audit
- name: FILLED
type: terminal
transitions:
- trigger: exchange_ack
source: PENDING
dest: OPEN
actions:
- update_order_id
- trigger: execution_report
source: OPEN
dest: FILLED
guards:
- is_full_fill
actions:
- update_positions
2. Use the State Machine
from pystator import StateMachine, GuardRegistry, ActionRegistry, Event
from pystator.actions import ActionExecutor
# Load FSM
machine = StateMachine.from_yaml("order_fsm.yaml")
# Register guards and actions
guards = GuardRegistry()
guards.register("is_full_fill", lambda ctx: ctx["fill_qty"] >= ctx["order_qty"])
machine.bind_guards(guards)
actions = ActionRegistry()
actions.register("update_order_id", lambda ctx: print(f"Order ID: {ctx['order_id']}"))
actions.register("update_positions", lambda ctx: print("Positions updated"))
executor = ActionExecutor(actions)
# Process event (pure computation)
result = machine.process(
current_state="OPEN",
event="execution_report",
context={"fill_qty": 100, "order_qty": 100}
)
# Execute actions after persistence
if result.success:
db.update_state(order_id, result.target_state)
executor.execute(result, context)
Core Concepts
States
States represent nodes in the state machine graph:
from pystator import State, StateType, Timeout
state = State(
name="PENDING",
type=StateType.INITIAL, # initial, stable, terminal, error, parallel
on_enter=("log_entry",),
on_exit=("log_exit",),
timeout=Timeout(seconds=5.0, destination="TIMED_OUT"),
)
Transitions
from pystator import Transition
transition = Transition(
trigger="execution_report",
source=frozenset({"OPEN", "PARTIALLY_FILLED"}),
dest="FILLED",
guards=("is_full_fill",),
actions=("update_positions",),
)
Guards
Guards are pure functions that control whether transitions are allowed:
from pystator import GuardRegistry
guards = GuardRegistry()
@guards.decorator("has_buying_power")
def has_buying_power(ctx: dict) -> bool:
return ctx["buying_power"] >= ctx["order_value"]
# Async guards
@guards.decorator("check_external")
async def check_external(ctx: dict) -> bool:
account = await broker.get_account()
return account.buying_power >= ctx["order_value"]
Actions
Actions handle side effects and are executed after state persistence. You can register sync or async callables; both take a single context dict (event payload, entity state, etc.).
Async actions: Register async callables with ActionRegistry and run them with executor.async_execute_*. Use Execution Modes (sequential, parallel, phased) to control how actions are run.
from pystator import ActionRegistry
from pystator.actions import ActionExecutor, ExecutionMode
actions = ActionRegistry()
@actions.decorator()
def send_notification(ctx: dict) -> None:
email_service.send(ctx["user_email"], "Order updated")
@actions.decorator()
async def update_position(ctx: dict) -> None:
await db.update_position(ctx["symbol"], ctx["quantity"])
executor = ActionExecutor(actions, default_mode=ExecutionMode.PHASED)
# Sync execution (sequential by default)
result = executor.execute(transition_result, context)
# Async execution: parallel (all at once) or phased (exit → transition → enter)
result = await executor.async_execute_parallel(transition_result, context)
result = await executor.async_execute_phased(transition_result, context)
Hierarchical States (Statecharts)
States can be nested with parent/child relationships. The machine resolves to a single active leaf: compound states specify an initial_child, and the engine follows that chain until it reaches a leaf. Transitions are matched by ancestry—a transition whose source is a parent applies when the current state is that parent or any descendant.
Exit/enter order: When transitioning between two leaves, the engine computes the lowest common ancestor (LCA). Exit actions run from the current leaf up to (but not including) the LCA; enter actions run from the LCA down to the target leaf. This preserves statechart semantics.
states:
- name: active
type: stable
initial_child: active.scanning
on_enter: [start_feed]
on_exit: [stop_feed]
- name: active.scanning
parent: active
type: stable
- name: active.analyzing
parent: active
type: stable
- name: halted
type: terminal
transitions:
# Transition between siblings
- trigger: signal_detected
source: active.scanning
dest: active.analyzing
# Transition from parent applies to any child
- trigger: emergency_stop
source: active
dest: halted
When processing emergency_stop from active.scanning, the transition succeeds because active is an ancestor of the current leaf. get_initial_state() returns the resolved leaf (e.g. active.scanning), not the compound root.
Parallel States (Orthogonal Regions)
Parallel states contain orthogonal regions—each region is an independent sub-machine with its own current state. All regions are active at once; an event can trigger a transition in one or more regions via region-scoped transitions (region: region_name). Use parallel states when you need concurrent behaviors (e.g. trading workflow + risk monitor + data feed).
Each region has an initial state and a states list (the valid states in that region). You can define region states as separate state entries (with parent: parallel_state_name) to attach on_enter/on_exit hooks.
states:
- name: active
type: parallel
regions:
- name: trading
initial: scanning
states: [scanning, analyzing, executing]
- name: risk_monitor
initial: normal
states: [normal, elevated, critical]
- name: data_feed
initial: connecting
states: [connecting, connected, failed]
# Optional: define region states for on_enter/on_exit
- name: scanning
type: stable
parent: active
on_enter: [start_scanners]
- name: normal
type: stable
parent: active
on_enter: [reset_risk_alerts]
transitions:
# Region-scoped: only this region is considered
- trigger: signal_detected
source: scanning
dest: analyzing
region: trading
- trigger: risk_warning
source: normal
dest: elevated
region: risk_monitor
Using Parallel States
machine = StateMachine.from_yaml("trading_fsm.yaml")
# Enter parallel state - initializes all regions
config = machine.enter_parallel_state("active")
# config.region_states = {"trading": "scanning", "risk_monitor": "normal", "data_feed": "connecting"}
# Process events that affect specific regions
config, results = machine.process_parallel(config, "signal_detected", context)
# Trading region: scanning -> analyzing
config, results = machine.process_parallel(config, "risk_warning", context)
# Risk region: normal -> elevated (if guards pass)
# Each result contains actions for that region transition
for result in results:
await executor.async_execute_phased(result, context)
# Exit parallel state
exit_actions = machine.get_parallel_exit_actions(config)
Delayed Transitions
Schedule transitions to fire automatically after a specified delay using the after field. This is useful for timeouts, retries, and scheduled state changes.
YAML Syntax
transitions:
- trigger: timeout
source: waiting
dest: retry
after: 5000 # 5 seconds (milliseconds)
- trigger: expire
source: pending
dest: cancelled
after: "30m" # 30 minutes (string with unit)
- trigger: daily_close
source: active
dest: closed
after: "1h" # 1 hour
Delay formats:
- Integer: milliseconds (e.g.,
5000= 5 seconds) - String with unit:
"5s"(seconds),"30m"(minutes),"1h"(hours)
Implicit trigger: When only after (and dest, optional guards) is set, you can omit trigger; the loader assigns a synthetic trigger so the orchestrator can schedule the transition. Delayed transitions with implicit trigger must have exactly one source state.
Using Delayed Transitions
Delayed transitions require a scheduler adapter. Use AsyncioScheduler for zero-infrastructure scheduling:
from pystator import StateMachine, Orchestrator, AsyncioScheduler
from pystator.core.state_store import InMemoryStateStore
machine = StateMachine.from_yaml("order_fsm.yaml")
store = InMemoryStateStore()
scheduler = AsyncioScheduler()
orchestrator = Orchestrator(
machine=machine,
state_store=store,
guards=guards,
actions=actions,
scheduler=scheduler, # Enable delayed transitions
)
# Process events - delayed transitions are scheduled automatically
result = await orchestrator.async_process_event("order-123", "submit", context)
# If target state has delayed transitions, they'll be scheduled
# When the delay expires, the event fires automatically
Scheduler Adapters
| Adapter | Use Case | Infrastructure |
|---|---|---|
AsyncioScheduler |
Development, testing, single-process apps | None (in-memory) |
RedisScheduler |
Production, distributed, persistence needed | Redis server |
CeleryScheduler |
Production, task queue integration | Celery + broker |
# Zero infrastructure (asyncio)
from pystator.scheduler import AsyncioScheduler
scheduler = AsyncioScheduler()
# With Redis (production)
from pystator.scheduler import RedisScheduler
from redis.asyncio import Redis
redis = Redis.from_url("redis://localhost:6379")
scheduler = RedisScheduler(redis)
await scheduler.start() # Start polling loop
# With Celery
from pystator.scheduler import CeleryScheduler
from celery import Celery
celery_app = Celery('tasks', broker='redis://localhost:6379')
scheduler = CeleryScheduler(celery_app)
scheduler.register_task() # Register with Celery
Inline Guard Expressions
Define guards directly in YAML using the expr syntax. No Python code needed for simple conditions.
YAML Syntax
transitions:
- trigger: fill
source: open
dest: filled
guards:
- expr: "fill_qty >= order_qty" # Inline expression
- expr: "price > 0 and symbol != ''" # Multiple conditions
- is_valid_order # Named guard function
- trigger: partial_fill
source: open
dest: partial
guards:
- expr: "fill_qty > 0 and fill_qty < order_qty"
Expression Features
Context variables are directly available in expressions:
# If context = {"qty": 100, "filled": 50, "price": 10.5}
guards:
- expr: "qty > 0" # True
- expr: "filled < qty" # True
- expr: "price * qty < 2000" # True (1050 < 2000)
- expr: "status == 'active'" # String comparison
- expr: "item in items" # List membership
- expr: "not is_cancelled" # Boolean negation
Supported operations:
- Comparisons:
>,<,>=,<=,==,!= - Boolean:
and,or,not - Arithmetic:
+,-,*,/ - Membership:
in,not in
Safe builtins: Inline expressions can use a small allowlist of builtins: len, min, max, abs, sum. For example: len(positions) < max_positions works without precomputing position_count in context.
Note: Inline expressions require simpleeval. Install with: pip install pystator[recipes]
Event Normalization
Optional meta.event_normalizer (e.g. "lower" or "upper") normalizes trigger and incoming event names at process time before matching. Config and application code can use UPPERCASE event names while the engine normalizes for comparison.
meta:
event_normalizer: lower
transitions:
- trigger: FILL
source: open
dest: filled
Calling machine.process(state, "fill", context) or machine.process(state, "FILL", context) both match the transition.
Context for guards
Guard expressions see a flat namespace. Build it in your application before calling process, or use the helper from the recipes package:
from pystator.recipes import flatten_context_for_guards
# Nested context: add derived values for guards
ctx = flatten_context_for_guards(
{"trading_context": {"positions": [...], "buying_power": 10000}},
overrides={"position_count": 3}, # or len(trading_context["positions"])
)
result = machine.process(current_state, "signal", ctx)
Recommend building the flat dict in the application or in the state store adapter when providing context.
Invoke (long-lived services)
Optional invoke on a state: list of service refs (id, src, optional on_done). The orchestrator starts services on enter and stops them on exit via an invoke adapter. When a service completes, the adapter emits the on_done event (e.g. the app calls process_event(entity_id, on_done)). Without an adapter, invoke is a no-op.
states:
- name: monitoring
on_enter: [start_metrics]
invoke:
- id: price_feed
src: websocket
on_done: feed_ended
Provide an InvokeAdapter when creating the orchestrator; implement start_services(entity_id, state_name, invoke_specs, context) and stop_services(entity_id, state_name). See orchestration.invoke.InvokeAdapter and NoOpInvokeAdapter.
Action Parameters
Pass configuration to actions directly from YAML using the params syntax. Actions receive parameters via the context. Parameterized entry/exit: on_enter and on_exit on states support the same shape as transition actions: a string (action name) or { name, params }.
YAML Syntax
transitions:
- trigger: order_filled
source: open
dest: filled
actions:
- log_fill # Simple action (no params)
- name: send_notification # Parameterized action
params:
channel: slack
priority: high
- name: update_metrics
params:
metric_name: fills
increment: 1
Accessing Parameters
Parameters are injected into the context under _action_params:
from pystator.actions import ACTION_PARAMS_KEY
@actions.decorator()
def send_notification(context: dict) -> None:
params = context.get(ACTION_PARAMS_KEY, {})
channel = params.get("channel", "email") # "slack"
priority = params.get("priority", "normal") # "high"
notify(channel=channel, priority=priority, message=context["message"])
Async Support
PyStator supports async throughout: guards, actions, and processing.
- Async guards: Register async callables; they are awaited when you use
machine.async_process()ormachine.async_process_parallel(). Use for checks that call external services (e.g. buying power, permissions). - Async actions: Register async callables; run them with
executor.async_execute_*after persisting the transition. Use for side effects that call APIs, DBs, or message queues. - Async processing: Use
async_process()/async_process_parallel()when guards are async or you want non-blocking transition computation.
# Async guards (evaluated during async_process / async_process_parallel)
@guards.decorator()
async def check_buying_power(ctx: dict) -> bool:
account = await broker.get_account()
return account.buying_power >= ctx["order_value"]
# Async actions (run with executor.async_execute_* after persistence)
@actions.decorator()
async def update_position(ctx: dict) -> None:
async with db.transaction():
await db.update_position(ctx["symbol"], ctx["qty"])
# Async transition computation
result = await machine.async_process("OPEN", "execution_report", context)
# Async parallel state processing
config, results = await machine.async_process_parallel(config, event, context)
# Async action execution (parallel or phased)
execution = await executor.async_execute_parallel(result, context)
Execution Modes (Actions)
How actions are run after a transition (sync with execute(), async with async_execute_*):
| Mode | Description | Use Case |
|---|---|---|
SEQUENTIAL |
Actions run one after another | Default, safest |
PARALLEL |
All actions run concurrently | Independent actions, latency critical |
PHASED |
Exit → transition → enter in parallel phases | Respects state machine semantics |
from pystator.actions import ExecutionMode
executor = ActionExecutor(actions, default_mode=ExecutionMode.PHASED)
# Or specify per-call
result = await executor.async_execute_with_mode(
transition_result, context, mode=ExecutionMode.PARALLEL
)
The Sandwich Pattern
PyStator is designed around "Load → Decide → Commit → Act":
┌─────────────────────────────────────────────────────────────┐
│ 1. INGRESS: Receive event, normalize to trigger + payload │
├─────────────────────────────────────────────────────────────┤
│ 2. HYDRATION: Load current state from database │
├─────────────────────────────────────────────────────────────┤
│ 3. COMPUTE: machine.process() - pure, no side effects │
├─────────────────────────────────────────────────────────────┤
│ 4. PERSIST: Atomic DB transaction (state + audit trail) │
├─────────────────────────────────────────────────────────────┤
│ 5. EXECUTE: Run actions AFTER successful commit │
└─────────────────────────────────────────────────────────────┘
Configuration Schema
meta:
version: "1.0.0"
machine_name: "my_fsm"
strict_mode: true
states:
- name: STATE_NAME
type: initial|stable|terminal|error|parallel
parent: PARENT_STATE # For hierarchical states
initial_child: CHILD_STATE # For compound states
regions: # For parallel states
- name: region_name
initial: initial_state
states: [state1, state2]
on_enter: [action1]
on_exit: [action2]
timeout:
seconds: 5.0
destination: TIMEOUT_STATE
transitions:
- trigger: event_name
source: STATE_A
dest: STATE_B
region: region_name # For parallel state transitions
after: 5000 # Delayed transition (milliseconds or "5s"/"5m"/"1h")
guards:
- named_guard # Named guard function
- expr: "qty > 0" # Inline expression
actions:
- simple_action # Simple action
- name: parameterized # Action with parameters
params:
key: value
error_policy:
default_fallback: ERROR_STATE
retry_attempts: 3
Examples
See the examples/ directory for complete examples:
order_fsm.yaml/basic_usage.py: Simple order lifecycleday_trading_fsm.yaml/day_trading_example.py: Parallel states for tradingportfolio_optimization_fsm.yaml/portfolio_optimization_example.py: Hierarchical states for workflows
REST API
With pip install pystator[api]:
uvicorn pystator.api.main:app --port 8000
| Endpoint | Method | Description |
|---|---|---|
/health |
GET | Health check |
/api/v1/validate |
POST | Validate FSM config |
/api/v1/process |
POST | Compute transition |
/api/v1/machines |
GET/POST | List/create machines |
/api/v1/machines/{id} |
GET/PUT/DELETE | CRUD operations |
API Reference
StateMachine
# Creation
machine = StateMachine.from_yaml("config.yaml")
machine = StateMachine.from_dict(config_dict)
# Processing
result = machine.process(current_state, event, context)
result = await machine.async_process(current_state, event, context)
# Parallel states
config = machine.enter_parallel_state("parallel_state")
config, results = machine.process_parallel(config, event, context)
config, results = await machine.async_process_parallel(config, event, context)
# Queries
machine.get_state("STATE")
machine.get_initial_state()
machine.get_available_transitions("STATE")
machine.is_parallel_state("STATE")
machine.validate_parallel_config(config)
TransitionResult
result.success # bool
result.source_state # str
result.target_state # str | None
result.trigger # str
result.all_actions # tuple[str, ...] (exit + transition + enter)
result.error # FSMError | None
ParallelStateConfig
config.parallel_state # str
config.region_states # dict[str, str]
config.get_region_state("trading") # str | None
config.is_in_state("scanning") # bool
config.get_all_active() # list[str]
config.to_string() # "active:trading=scanning,risk=normal"
Development
# Install dev dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Type checking
mypy src/
# Linting
ruff check .
ruff format .
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pystator-0.0.1.tar.gz.
File metadata
- Download URL: pystator-0.0.1.tar.gz
- Upload date:
- Size: 1.2 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
415d58e4153d74f679e42d31c19368442f3e129146467d09ba4f2caa6688976c
|
|
| MD5 |
32482bc5a024712b39bd7389495b3797
|
|
| BLAKE2b-256 |
b3405f86340de36e668ee96ceead3ce35a0a4edcc8b4a0a2497d92bfa736736a
|
File details
Details for the file pystator-0.0.1-py3-none-any.whl.
File metadata
- Download URL: pystator-0.0.1-py3-none-any.whl
- Upload date:
- Size: 1.3 MB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e6523cca100bd0d7696565da448712f28ac7daa624942368bc42177cedadc764
|
|
| MD5 |
3874c6bbf8622338685a487b69b3992c
|
|
| BLAKE2b-256 |
a6c905d69d3cf7c8b1fc571e1d368339ca2daf785dbbf3096379e61090669ad6
|