A configuration-driven, stateless finite state machine library for Python
Project description
PyStator
Configuration-driven, stateless finite state machines for Python: define behavior in YAML, compute transitions, run guards and actions.
Quick start (2 minutes)
Install, define a tiny state machine, and process one event. Copy-paste into a new terminal:
pip install pystator
1. Save this as order_fsm.yaml:
meta:
version: "1.0.0"
machine_name: "order_management"
strict_mode: true
states:
- name: PENDING
type: initial
- name: OPEN
type: stable
- name: FILLED
type: terminal
transitions:
- trigger: exchange_ack
source: PENDING
dest: OPEN
- trigger: fill
source: OPEN
dest: FILLED
2. Run this Python:
from pystator import StateMachine
machine = StateMachine.from_yaml("order_fsm.yaml")
# Pure computation: current state + event → next state
result = machine.process("PENDING", "exchange_ack", {})
print(result.success) # True
print(result.target_state) # OPEN
Then add guards (conditions) and actions (side effects), or use the REST API and UI with pip install pystator[api] (see Concepts and Documentation).
What is PyStator?
PyStator is a stateless finite state machine (FSM) library for Python. You define behavior in YAML or JSON; the engine computes transitions from (current state + event + context) and returns the next state and any actions to run. No internal state is held—ideal for APIs, workers, and distributed systems.
- Configuration-driven: Define states, transitions, guards, and actions in YAML/JSON with schema validation.
- Stateless: Pure computation—pass state in, get state and actions out; you persist state in your database.
- Hierarchical & parallel: Compound states, orthogonal regions, and statechart-style exit/enter semantics.
- Guards & actions: Conditional transitions (sync/async guards) and side effects executed after you persist the transition.
- Delayed transitions: Schedule transitions after a delay (asyncio, Redis, or Celery).
- Optional API & UI: REST API and web UI for validation, process, and machine CRUD (
pip install pystator[api]).
Core vs optional: The core is the FSM library (StateMachine, EntitySession, guards, actions, stores, Orchestrator). The API, DB, UI, and schedulers are optional layers; see Package structure.
Concepts
A short mental model so you know what to reach for.
| Concept | What it is | When you use it |
|---|---|---|
| State | A node in the graph: initial, stable, terminal, or parallel. | Define the possible states of your entity (e.g. order: PENDING, OPEN, FILLED). |
| Transition | A rule: from state(s), on trigger event, to state; optional guards and actions. | Define how events move the entity between states. |
| EntitySession | One stateful object per entity: holds current state and context; use send(trigger, **payload). |
The main way to run a stateful machine in memory (e.g. one order per request). |
| Guard | A condition (sync or async) that must be true for the transition to fire. | Business rules (e.g. "full fill only if fill_qty >= order_qty"). |
| Action | A side effect (sync or async) run after you persist the new state. | Notifications, DB updates, messaging—never for transition logic. |
| Context | A dict passed into send() (as payload) or process(current_state, trigger, context). |
Event payload, entity data, and anything guards/actions need. |
Flow from "just compute" to "full app":
Option A: In-memory (EntitySession or stateless)
YAML FSM → StateMachine.from_yaml() → machine.create(context=...) → instance.send(trigger, **payload)
Or: machine.process(state, event, context) with state you supply. State in memory or you persist snapshot() yourself.
Option B: With persistence by entity ID
State store (DB/Redis) → Orchestrator → load state → process() → Persist → Execute actions
(Sandwich pattern). Use a [StateStore](docs/guides/state-stores.md) adapter.
Option C: With API & UI
pystator api → Validate configs, run process, manage machines via REST/UI
Start with Option A (Quick start above); use EntitySession when you want a stateful object per entity; add Orchestrator when you need persistence by entity ID; add API/UI when you want HTTP and a visual builder.
How PyStator runs (execution model)
PyStator is event-driven and stateless. You do not run a long-lived loop per FSM.
- Events drive transitions: When an event occurs (HTTP request, message from a queue, cron job, or a delayed-transition callback), you call
process(current_state, trigger, context)or the Orchestrator’sasync_process_event(entity_id, trigger, context)once. State is loaded from your store, the transition is computed, and you persist the new state and run actions. - Scale by replicas: Run multiple copies of your API or worker (e.g. several pods). They share the same state store (database or Redis). Any replica can process any event for any entity—there is no “one pod per FSM.” See Deployment.
- Delayed transitions (
after: "30s"in YAML) need a scheduler so that when the delay expires, something calls the orchestrator again. One process (or Redis/Celery) can track many delays for many entities and FSM types. See Choosing a scheduler below.
Glossary:
| Term | Meaning |
|---|---|
| Machine (FSM definition) | The YAML/config: states and transitions. One per “type” (e.g. order_management). |
| Entity | The domain object whose state you model (e.g. order-123). Each entity has a current FSM state. |
| EntitySession | The in-process object for one entity: machine.create(context=...); use instance.send(trigger) to process events. |
| Replica | Another copy of your service (e.g. API pod). All replicas use the same state store and can process any entity. |
Choosing a scheduler
Only needed if your FSM uses delayed transitions (after: "5s" or after: 5000 in a transition).
| Need | Scheduler | Extra infra |
|---|---|---|
| Delayed transitions, single process or dev | AsyncioScheduler | None |
| Multiple replicas, HA, or survive restarts | RedisScheduler or CeleryScheduler | Redis or Celery (+ broker) |
With AsyncioScheduler, one pod can track delays for all entities and all FSM types; delays are lost if that process exits. With Redis or Celery, delays are stored externally so any healthy worker can fire them. See Schedulers and delayed transitions.
Features
- Configuration-driven: YAML/JSON definitions with schema validation
- Stateless: Pure computation—no internal state
- Hierarchical states: Compound states, parent/child, LCA exit/enter
- Parallel states: Orthogonal regions—multiple active sub-states
- Delayed transitions:
after: 5sorafter: 5000with pluggable schedulers (asyncio, Redis, Celery) - Inline guards:
expr: "fill_qty >= order_qty"in YAML (no Python for simple rules) - Guards & actions: Sync and async; decorator-based registration
- Action parameters: Pass config from YAML into actions via
params - Timeouts: State-level timeout to a destination state
- Type-safe: Full type hints and PEP 561
- Retry & idempotency: Configurable retry, pluggable idempotency backends
- REST API & UI: Optional server and web UI for FSM validation and process
Installation
Core library
pip install pystator
With API and UI
pip install pystator[api]
Installs FastAPI, Uvicorn, and PyJWT for the REST API (and optional auth). The UI is served by the same server when you run pystator ui serve (requires a built UI; see below).
With UI (development)
To build and serve the Next.js UI from source:
pip install pystator[api,ui]
cd src/pystator/ui && npm install && npm run build
pystator ui serve # Serves UI + proxies API
From the project root you can also run pystator ui dev for hot-reload development.
Optional: recipes (inline guards)
For inline guard expressions in YAML (expr: "qty > 0"):
pip install pystator[recipes]
Development
pip install -e ".[dev]"
Quick start (extended)
From a YAML file
from pystator import StateMachine
machine = StateMachine.from_yaml("order_fsm.yaml")
result = machine.process("PENDING", "exchange_ack", {})
From a dict
from pystator import StateMachine
config = {
"meta": {"version": "1.0.0", "machine_name": "my_fsm", "strict_mode": True},
"states": [
{"name": "A", "type": "initial"},
{"name": "B", "type": "stable"},
{"name": "C", "type": "terminal"},
],
"transitions": [
{"trigger": "go", "source": "A", "dest": "B"},
{"trigger": "done", "source": "B", "dest": "C"},
],
}
machine = StateMachine.from_dict(config)
result = machine.process("A", "go", {})
EntitySession — per-entity stateful wrapper
For per-entity stateful processing in memory (one instance per order, user, trade, etc.), use EntitySession:
from pystator import StateMachine
machine = StateMachine.from_yaml("order_fsm.yaml")
# Create a stateful instance for one entity (e.g. order-123)
order = machine.create(context={"order_id": "123", "order_qty": 100})
print(order.current_state) # PENDING
print(order.is_pending) # True — auto-generated per-state property
# Send events; payload merges into context
order.send("exchange_ack")
print(order.current_state) # OPEN
order.send("fill", fill_qty=100)
print(order.current_state) # FILLED
print(order.is_final) # True — entity has reached a terminal state
# Auto-generated trigger methods (same as send):
# order.exchange_ack()
# order.fill(fill_qty=100)
# Serialize/restore (e.g. to Redis or a DB)
snapshot = order.snapshot()
restored = EntitySession.from_snapshot(machine, snapshot)
EntitySession auto-generates instance.is_<state> properties and instance.<trigger>() methods for every state and trigger in your FSM. Import EntitySession from pystator to use from_snapshot.
With guards and actions
You can bind guards and actions in two ways: decorators on the machine, or registries plus bind_guards / bind_actions.
Option 1 — decorators:
from pystator import StateMachine
machine = StateMachine.from_yaml("order_fsm.yaml")
@machine.guard("is_full_fill")
def is_full_fill(ctx):
return ctx.get("fill_qty", 0) >= ctx.get("order_qty", 1)
@machine.action("update_positions")
def update_positions(ctx):
print("Positions updated")
# Then: machine.process(...) and ActionExecutor(machine.action_registry).execute(result, context) to run actions
Option 2 — registries:
from pystator import StateMachine, GuardRegistry, ActionRegistry
from pystator.actions import ActionExecutor
machine = StateMachine.from_yaml("order_fsm.yaml")
guards = GuardRegistry()
guards.register("is_full_fill", lambda ctx: ctx.get("fill_qty", 0) >= ctx.get("order_qty", 1))
machine.bind_guards(guards)
actions = ActionRegistry()
actions.register("update_positions", lambda ctx: print("Positions updated"))
executor = ActionExecutor(actions)
result = machine.process("OPEN", "execution_report", {"fill_qty": 100, "order_qty": 100})
if result.success:
# 1. Persist state change to your DB
# 2. Then run actions
executor.execute(result, {"fill_qty": 100, "order_qty": 100})
Orchestrator and delayed transitions
For persistence plus delayed transitions (after: "5s" in YAML), use the Orchestrator with a state store and a scheduler. The orchestrator runs the full loop: load state → process → persist → schedule delayed transitions → execute actions.
import asyncio
from pystator import StateMachine, Orchestrator, GuardRegistry, ActionRegistry
from pystator import InMemoryStateStore
from pystator.scheduler import AsyncioScheduler
machine = StateMachine.from_yaml("my_fsm.yaml") # has a transition with after: "2s"
store = InMemoryStateStore()
guards = GuardRegistry()
actions = ActionRegistry()
scheduler = AsyncioScheduler()
orchestrator = Orchestrator(
machine=machine, state_store=store, guards=guards, actions=actions, scheduler=scheduler
)
async def main():
await orchestrator.async_process_event("entity-1", "start", {})
await asyncio.sleep(2.5) # delayed transition fires
await orchestrator.close()
asyncio.run(main())
No extra infrastructure: AsyncioScheduler keeps delays in memory. For multiple replicas or restarts, use RedisScheduler or CeleryScheduler.
Context validation (e.g. with PyCharter)
Pass a context_validator to the Orchestrator to validate context before any guards run. The canonical use case is integrating PyCharter data contracts:
from pystator import Orchestrator, ContextValidatorFn
from pycharter import Validator
validator = Validator.from_dir("contracts/order")
def order_context_validator(entity_id: str, state: str, trigger: str, ctx: dict):
result = validator.validate_for_state(ctx, state)
if result.is_valid:
return True, []
return False, [str(e) for e in result.errors]
orchestrator = Orchestrator(
machine=machine,
state_store=store,
context_validator=order_context_validator, # (entity_id, state, trigger, ctx) -> (ok, errors)
)
ContextValidatorFn is a type alias: Callable[[str, str, str, dict], tuple[bool, list[str]]]. The validator runs before guards; on failure the orchestrator returns a TransitionResult with success=False and metadata["reason"] == "context_validation".
Three context validation mechanisms exist and can be combined:
| Mechanism | Runs | Best for |
|---|---|---|
context_validator |
Before guards | Schema/contract validation (PyCharter) |
machine.meta["validate_context"] |
Before guards | Required-key checks (config-driven, no deps) |
| Guards | After both | Business-rule conditions ("can this transition fire?") |
Hooks and metrics
Attach lifecycle hooks to the TransitionObserver to add logging, tracing, or custom metrics:
from pystator import LoggingHook, MetricsCollector, StateMachine, TransitionObserver
machine = StateMachine.from_yaml("order_fsm.yaml")
# Optional: attach hooks to a standalone observer (use at orchestrator/process level)
observer = TransitionObserver()
observer.add_hook(LoggingHook())
metrics = MetricsCollector()
observer.add_hook(metrics)
# ... process events ...
machine.process("PENDING", "exchange_ack", {})
summary = metrics.get_summary()
print(summary["total_transitions"]) # 1
print(summary["success_rate"]) # 1.0
print(summary["duration"]) # {"min_ms": ..., "avg_ms": ..., "p95_ms": ...}
To write a custom hook implement the TransitionHook protocol (four methods: on_before_process, on_transition_start, on_transition_complete, on_transition_error).
Error handling
All exceptions inherit from FSMError. Catch the specific subclass you care about:
from pystator import (
FSMError, # base — catch everything
GuardRejectedError, # guard returned False
InvalidTransitionError, UndefinedTriggerError,
TerminalStateError, # trying to leave a terminal state
StaleVersionError, # optimistic locking conflict
ErrorCode, # enum: GUARD_REJECTED, VALIDATION_FAILED, TERMINAL_STATE, …
)
try:
result = machine.process(current_state, trigger, context)
except GuardRejectedError as e:
print(e.guard_name, e.current_state) # which guard failed and in which state
except TerminalStateError:
print("Entity already in a terminal state")
except FSMError as e:
print(e.code) # ErrorCode member, e.g. ErrorCode.INVALID_TRIGGER
TransitionResult (the non-exception path) carries the same information for soft failures:
result = machine.process(current_state, trigger, context)
if not result.success:
print(result.error) # FSMError or subclass
print(result.metadata) # {"reason": "guard_rejected", "guard_name": "is_full_fill", …}
Common pitfalls
- Guards vs actions: Use guards for pure logic (can this transition run?). Use actions for side effects (notify, persist to another system). Don’t put side effects in guards.
- AsyncioScheduler: Delays are in-memory; they are lost if the process exits. Use Redis or Celery for production or multiple replicas.
- State store: With Option B, you must implement a StateStore and persist before running actions; the library does not persist for you.
REST API
With pip install pystator[api]:
# Start API (default: http://localhost:8000)
pystator api
# or: uvicorn pystator.api.main:app --reload
# Optional: use pystator.cfg for database and auth (copy pystator.cfg.example to pystator.cfg)
| Endpoint | Method | Description |
|---|---|---|
/health |
GET | Health check |
/api/v1/auth/me |
GET | Current user (auth) |
/api/v1/validate |
POST | Validate FSM config |
/api/v1/process |
POST | Compute transition |
/api/v1/machines |
GET/POST | List/create machines |
/api/v1/machines/{id} |
GET/PUT/DELETE | CRUD machine |
API docs: http://localhost:8000/docs.
Documentation
- Quick start (detailed) — Step-by-step first FSM and first API call
- Concepts — States, transitions, guards, actions, hierarchical and parallel
- Architecture — Design goals, core flow, sandwich pattern, components
- Configuration — Config file, environment, database (for API)
- Worker — Continuous event-processing service:
pystator worker,submit_event,worker_events - Tutorials — Order workflow, API & UI, delayed transitions
- Examples — List of runnable examples with descriptions
- FSM config reference — Full YAML/JSON schema (meta, states, transitions, validation)
- API reference — StateMachine, Orchestrator, Worker, schedulers, execution modes
Examples and tutorials
Runnable examples live in the examples/ directory:
| Example | Description |
|---|---|
| basic_usage.py + order_fsm.yaml | Order lifecycle: load FSM, register guards/actions, process events |
| day_trading_example.py + day_trading_fsm.yaml | Parallel states (trading + risk monitor + data feed) |
| portfolio_optimization_example.py + portfolio_optimization_fsm.yaml | Hierarchical states and workflows |
See examples/README.md for how to run each. Tutorials in docs/tutorials/ walk through building an order workflow and using the API and UI.
API reference (condensed)
StateMachine
# Create
machine = StateMachine.from_yaml("config.yaml")
machine = StateMachine.from_dict(config_dict)
# Process (sync)
result = machine.process(current_state, trigger, context)
# Process (async, for async guards)
result = await machine.aprocess(current_state, trigger, context)
# Parallel states
config = machine.enter_parallel_state("parallel_state_name")
config, results = machine.process_parallel(config, event, context)
# Queries
machine.get_initial_state()
machine.get_available_transitions("STATE_NAME")
EntitySession
instance = machine.create(context={...}, initial_state=None)
instance.current_state # str
instance.context # dict
instance.is_<state> # bool — auto-generated (e.g. instance.is_pending)
instance.<trigger>(**payload) # auto-generated method (e.g. instance.confirm())
instance.send(trigger, **payload)
await instance.asend(trigger, **payload) # async
instance.allowed_triggers # list[str]
instance.is_final # bool — True if in a terminal state
snapshot = instance.snapshot()
instance2 = EntitySession.from_snapshot(machine, snapshot)
TransitionResult
result.success # bool
result.source_state # str
result.target_state # str | None
result.trigger # str
result.all_actions # tuple[str, ...] (exit + transition + enter)
result.error # FSMError | None
result.metadata # dict — e.g. {"reason": "guard_rejected", "guard_name": "..."}
Guards and actions
guards = GuardRegistry()
guards.register("name", lambda ctx: bool)
@guards.decorator("name")
def my_guard(ctx: dict) -> bool: ...
actions = ActionRegistry()
actions.register("name", lambda ctx: None)
@actions.decorator()
def my_action(ctx: dict) -> None: ...
machine.bind_guards(guards)
executor = ActionExecutor(actions)
executor.execute(transition_result, context)
# Async: await executor.async_execute_parallel(result, context)
Orchestrator
from pystator import Orchestrator, ContextValidatorFn
orchestrator = Orchestrator(
machine=machine,
state_store=store,
guards=guards,
actions=actions,
context_validator=fn, # ContextValidatorFn: (entity_id, state, trigger, ctx) -> (ok, errors)
scheduler=scheduler,
use_initial_state_when_missing=True,
)
result = orchestrator.process_event(entity_id, trigger, context)
result = await orchestrator.async_process_event(entity_id, trigger, context)
await orchestrator.close() # shut down scheduler
Hooks
from pystator import LoggingHook, MetricsCollector, TransitionHook, TransitionObserver
observer = TransitionObserver()
observer.add_hook(LoggingHook())
metrics = MetricsCollector()
observer.add_hook(metrics)
# Use observer at your process/orchestrator level; then metrics.get_summary()
# returns {"total_transitions", "success_rate", "duration": {"avg_ms", "p95_ms", ...}}
Error codes
from pystator import ErrorCode
# ErrorCode.GUARD_REJECTED, VALIDATION_FAILED, INVALID_TRIGGER, INVALID_STATE,
# TERMINAL_STATE, TIMEOUT, CONFIG, ACTION_FAILED, STALE_VERSION,
# GUARD_NOT_FOUND, ACTION_NOT_FOUND, UNDEFINED_STATE
Development
pip install -e ".[dev]"
pytest
mypy src/
ruff check . && ruff format .
License
MIT — see LICENSE.
Links
- Repository: GitHub
- Issues: GitHub Issues
- Documentation: docs/ — quick start, guides, tutorials, examples
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pystator-0.0.10.tar.gz.
File metadata
- Download URL: pystator-0.0.10.tar.gz
- Upload date:
- Size: 1.2 MB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ab2bc9937fad380cd039a315cf2b266f2f90871cc3f0f91a531dc70067dcdd92
|
|
| MD5 |
a01a76cb3ae94d56128571e7cf78b928
|
|
| BLAKE2b-256 |
1fabd48dcca55c52e9be72d53ec162ef42c9cf301ef1ab6003dbc7b89d4bbe5d
|
Provenance
The following attestation bundles were made for pystator-0.0.10.tar.gz:
Publisher:
publish.yml on auscheng/pystator
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pystator-0.0.10.tar.gz -
Subject digest:
ab2bc9937fad380cd039a315cf2b266f2f90871cc3f0f91a531dc70067dcdd92 - Sigstore transparency entry: 1008353538
- Sigstore integration time:
-
Permalink:
auscheng/pystator@ce0a81602ad6c6ea6341701418b287ccec53603c -
Branch / Tag:
refs/tags/v0.0.10 - Owner: https://github.com/auscheng
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@ce0a81602ad6c6ea6341701418b287ccec53603c -
Trigger Event:
push
-
Statement type:
File details
Details for the file pystator-0.0.10-py3-none-any.whl.
File metadata
- Download URL: pystator-0.0.10-py3-none-any.whl
- Upload date:
- Size: 1.4 MB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
511e2407329544734e91dce7c9071e74909770dcdc607d1f2c5a703b59cf2c2f
|
|
| MD5 |
9eede12ca195e6e188d02af5dbb2a6de
|
|
| BLAKE2b-256 |
64cce36e349fec6c73350f2a58e329fc88045c5eee5549d59a99c9c5b0adb13c
|
Provenance
The following attestation bundles were made for pystator-0.0.10-py3-none-any.whl:
Publisher:
publish.yml on auscheng/pystator
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pystator-0.0.10-py3-none-any.whl -
Subject digest:
511e2407329544734e91dce7c9071e74909770dcdc607d1f2c5a703b59cf2c2f - Sigstore transparency entry: 1008353540
- Sigstore integration time:
-
Permalink:
auscheng/pystator@ce0a81602ad6c6ea6341701418b287ccec53603c -
Branch / Tag:
refs/tags/v0.0.10 - Owner: https://github.com/auscheng
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@ce0a81602ad6c6ea6341701418b287ccec53603c -
Trigger Event:
push
-
Statement type: