Skip to main content

A configuration-driven, stateless finite state machine library for Python

Project description

PyStator

Configuration-driven, stateless finite state machines for Python: define behavior in YAML, compute transitions, run guards and actions.

Python 3.10+ License: MIT


Quick start (2 minutes)

Install, define a tiny state machine, and process one event. Copy-paste into a new terminal:

pip install pystator

1. Save this as order_fsm.yaml:

meta:
  version: "1.0.0"
  machine_name: "order_management"
  strict_mode: true

states:
  - name: PENDING
    type: initial
  - name: OPEN
    type: stable
  - name: FILLED
    type: terminal

transitions:
  - trigger: exchange_ack
    source: PENDING
    dest: OPEN
  - trigger: fill
    source: OPEN
    dest: FILLED

2. Run this Python:

from pystator import StateMachine

machine = StateMachine.from_yaml("order_fsm.yaml")

# Pure computation: current state + event → next state
result = machine.process("PENDING", "exchange_ack", {})

print(result.success)        # True
print(result.target_state)   # OPEN

Then add guards (conditions) and actions (side effects), or use the REST API and UI with pip install pystator[api] (see Concepts and Documentation).


What is PyStator?

PyStator is a stateless finite state machine (FSM) library for Python. You define behavior in YAML or JSON; the engine computes transitions from (current state + event + context) and returns the next state and any actions to run. No internal state is held—ideal for APIs, workers, and distributed systems.

  • Configuration-driven: Define states, transitions, guards, and actions in YAML/JSON with schema validation.
  • Stateless: Pure computation—pass state in, get state and actions out; you persist state in your database.
  • Hierarchical & parallel: Compound states, orthogonal regions, and statechart-style exit/enter semantics.
  • Guards & actions: Conditional transitions (sync/async guards) and side effects executed after you persist the transition.
  • Delayed transitions: Schedule transitions after a delay (asyncio, Redis, or Celery).
  • Optional API & UI: REST API and web UI for validation, process, and machine CRUD (pip install pystator[api]).

Core vs optional: The core is the FSM library (StateMachine, EntitySession, guards, actions, stores, Orchestrator). The API, DB, UI, and schedulers are optional layers; see Package structure.


Concepts

A short mental model so you know what to reach for.

Concept What it is When you use it
State A node in the graph: initial, stable, terminal, or parallel. Define the possible states of your entity (e.g. order: PENDING, OPEN, FILLED).
Transition A rule: from state(s), on trigger event, to state; optional guards and actions. Define how events move the entity between states.
EntitySession One stateful object per entity: holds current state and context; use send(trigger, **payload). The main way to run a stateful machine in memory (e.g. one order per request).
Guard A condition (sync or async) that must be true for the transition to fire. Business rules (e.g. "full fill only if fill_qty >= order_qty").
Action A side effect (sync or async) run after you persist the new state. Notifications, DB updates, messaging—never for transition logic.
Context A dict passed into send() (as payload) or process(current_state, trigger, context). Event payload, entity data, and anything guards/actions need.

Flow from "just compute" to "full app":

  Option A: In-memory (EntitySession or stateless)
  YAML FSM  →  StateMachine.from_yaml()  →  machine.create(context=...)  →  instance.send(trigger, **payload)
  Or: machine.process(state, event, context) with state you supply. State in memory or you persist snapshot() yourself.

  Option B: With persistence by entity ID
  State store (DB/Redis)  →  Orchestrator  →  load state  →  process()  →  Persist  →  Execute actions
  (Sandwich pattern). Use a [StateStore](docs/guides/state-stores.md) adapter.

  Option C: With API & UI
  pystator api  →  Validate configs, run process, manage machines via REST/UI

Start with Option A (Quick start above); use EntitySession when you want a stateful object per entity; add Orchestrator when you need persistence by entity ID; add API/UI when you want HTTP and a visual builder.

How PyStator runs (execution model)

PyStator is event-driven and stateless. You do not run a long-lived loop per FSM.

  • Events drive transitions: When an event occurs (HTTP request, message from a queue, cron job, or a delayed-transition callback), you call process(current_state, trigger, context) or the Orchestrator’s async_process_event(entity_id, trigger, context) once. State is loaded from your store, the transition is computed, and you persist the new state and run actions.
  • Scale by replicas: Run multiple copies of your API or worker (e.g. several pods). They share the same state store (database or Redis). Any replica can process any event for any entity—there is no “one pod per FSM.” See Deployment.
  • Delayed transitions (after: "30s" in YAML) need a scheduler so that when the delay expires, something calls the orchestrator again. One process (or Redis/Celery) can track many delays for many entities and FSM types. See Choosing a scheduler below.

Glossary:

Term Meaning
Machine (FSM definition) The YAML/config: states and transitions. One per “type” (e.g. order_management).
Entity The domain object whose state you model (e.g. order-123). Each entity has a current FSM state.
EntitySession The in-process object for one entity: machine.create(context=...); use instance.send(trigger) to process events.
Replica Another copy of your service (e.g. API pod). All replicas use the same state store and can process any entity.

Choosing a scheduler

Only needed if your FSM uses delayed transitions (after: "5s" or after: 5000 in a transition).

Need Scheduler Extra infra
Delayed transitions, single process or dev AsyncioScheduler None
Multiple replicas, HA, or survive restarts RedisScheduler or CeleryScheduler Redis or Celery (+ broker)

With AsyncioScheduler, one pod can track delays for all entities and all FSM types; delays are lost if that process exits. With Redis or Celery, delays are stored externally so any healthy worker can fire them. See Schedulers and delayed transitions.


Features

  • Configuration-driven: YAML/JSON definitions with schema validation
  • Stateless: Pure computation—no internal state
  • Hierarchical states: Compound states, parent/child, LCA exit/enter
  • Parallel states: Orthogonal regions—multiple active sub-states
  • Delayed transitions: after: 5s or after: 5000 with pluggable schedulers (asyncio, Redis, Celery)
  • Inline guards: expr: "fill_qty >= order_qty" in YAML (no Python for simple rules)
  • Guards & actions: Sync and async; decorator-based registration
  • Action parameters: Pass config from YAML into actions via params
  • Timeouts: State-level timeout to a destination state
  • Type-safe: Full type hints and PEP 561
  • Retry & idempotency: Configurable retry, pluggable idempotency backends
  • REST API & UI: Optional server and web UI for FSM validation and process

Installation

Core library

pip install pystator

With API and UI

pip install pystator[api]

Installs FastAPI, Uvicorn, and PyJWT for the REST API (and optional auth). The UI is served by the same server when you run pystator ui serve (requires a built UI; see below).

With UI (development)

To build and serve the Next.js UI from source:

pip install pystator[api,ui]
cd src/pystator/ui && npm install && npm run build
pystator ui serve   # Serves UI + proxies API

From the project root you can also run pystator ui dev for hot-reload development.

Optional: recipes (inline guards)

For inline guard expressions in YAML (expr: "qty > 0"):

pip install pystator[recipes]

Development

pip install -e ".[dev]"

Quick start (extended)

From a YAML file

from pystator import StateMachine

machine = StateMachine.from_yaml("order_fsm.yaml")
result = machine.process("PENDING", "exchange_ack", {})

From a dict

from pystator import StateMachine

config = {
    "meta": {"version": "1.0.0", "machine_name": "my_fsm", "strict_mode": True},
    "states": [
        {"name": "A", "type": "initial"},
        {"name": "B", "type": "stable"},
        {"name": "C", "type": "terminal"},
    ],
    "transitions": [
        {"trigger": "go", "source": "A", "dest": "B"},
        {"trigger": "done", "source": "B", "dest": "C"},
    ],
}
machine = StateMachine.from_dict(config)
result = machine.process("A", "go", {})

EntitySession — per-entity stateful wrapper

For per-entity stateful processing in memory (one instance per order, user, trade, etc.), use EntitySession:

from pystator import StateMachine

machine = StateMachine.from_yaml("order_fsm.yaml")

# Create a stateful instance for one entity (e.g. order-123)
order = machine.create(context={"order_id": "123", "order_qty": 100})

print(order.current_state)   # PENDING
print(order.is_pending)      # True  — auto-generated per-state property

# Send events; payload merges into context
order.send("exchange_ack")
print(order.current_state)   # OPEN

order.send("fill", fill_qty=100)
print(order.current_state)   # FILLED
print(order.is_final)        # True — entity has reached a terminal state

# Auto-generated trigger methods (same as send):
# order.exchange_ack()
# order.fill(fill_qty=100)

# Serialize/restore (e.g. to Redis or a DB)
snapshot = order.snapshot()
restored = EntitySession.from_snapshot(machine, snapshot)

EntitySession auto-generates instance.is_<state> properties and instance.<trigger>() methods for every state and trigger in your FSM. Import EntitySession from pystator to use from_snapshot.

With guards and actions

You can bind guards and actions in two ways: decorators on the machine, or registries plus bind_guards / bind_actions.

Option 1 — decorators:

from pystator import StateMachine

machine = StateMachine.from_yaml("order_fsm.yaml")

@machine.guard("is_full_fill")
def is_full_fill(ctx):
    return ctx.get("fill_qty", 0) >= ctx.get("order_qty", 1)

@machine.action("update_positions")
def update_positions(ctx):
    print("Positions updated")

# Then: machine.process(...) and ActionExecutor(machine.action_registry).execute(result, context) to run actions

Option 2 — registries:

from pystator import StateMachine, GuardRegistry, ActionRegistry
from pystator.actions import ActionExecutor

machine = StateMachine.from_yaml("order_fsm.yaml")

guards = GuardRegistry()
guards.register("is_full_fill", lambda ctx: ctx.get("fill_qty", 0) >= ctx.get("order_qty", 1))
machine.bind_guards(guards)

actions = ActionRegistry()
actions.register("update_positions", lambda ctx: print("Positions updated"))
executor = ActionExecutor(actions)

result = machine.process("OPEN", "execution_report", {"fill_qty": 100, "order_qty": 100})
if result.success:
    # 1. Persist state change to your DB
    # 2. Then run actions
    executor.execute(result, {"fill_qty": 100, "order_qty": 100})

Orchestrator and delayed transitions

For persistence plus delayed transitions (after: "5s" in YAML), use the Orchestrator with a state store and a scheduler. The orchestrator runs the full loop: load state → process → persist → schedule delayed transitions → execute actions.

import asyncio
from pystator import StateMachine, Orchestrator, GuardRegistry, ActionRegistry
from pystator import InMemoryStateStore
from pystator.scheduler import AsyncioScheduler

machine = StateMachine.from_yaml("my_fsm.yaml")  # has a transition with after: "2s"
store = InMemoryStateStore()
guards = GuardRegistry()
actions = ActionRegistry()
scheduler = AsyncioScheduler()

orchestrator = Orchestrator(
    machine=machine, state_store=store, guards=guards, actions=actions, scheduler=scheduler
)

async def main():
    await orchestrator.async_process_event("entity-1", "start", {})
    await asyncio.sleep(2.5)  # delayed transition fires
    await orchestrator.close()

asyncio.run(main())

No extra infrastructure: AsyncioScheduler keeps delays in memory. For multiple replicas or restarts, use RedisScheduler or CeleryScheduler.

Context validation (e.g. with PyCharter)

Pass a context_validator to the Orchestrator to validate context before any guards run. The canonical use case is integrating PyCharter data contracts:

from pystator import Orchestrator, ContextValidatorFn
from pycharter import Validator

validator = Validator.from_dir("contracts/order")

def order_context_validator(entity_id: str, state: str, trigger: str, ctx: dict):
    result = validator.validate_for_state(ctx, state)
    if result.is_valid:
        return True, []
    return False, [str(e) for e in result.errors]

orchestrator = Orchestrator(
    machine=machine,
    state_store=store,
    context_validator=order_context_validator,   # (entity_id, state, trigger, ctx) -> (ok, errors)
)

ContextValidatorFn is a type alias: Callable[[str, str, str, dict], tuple[bool, list[str]]]. The validator runs before guards; on failure the orchestrator returns a TransitionResult with success=False and metadata["reason"] == "context_validation".

Three context validation mechanisms exist and can be combined:

Mechanism Runs Best for
context_validator Before guards Schema/contract validation (PyCharter)
machine.meta["validate_context"] Before guards Required-key checks (config-driven, no deps)
Guards After both Business-rule conditions ("can this transition fire?")

Hooks and metrics

Attach lifecycle hooks to the TransitionObserver to add logging, tracing, or custom metrics:

from pystator import LoggingHook, MetricsCollector, StateMachine, TransitionObserver

machine = StateMachine.from_yaml("order_fsm.yaml")

# Optional: attach hooks to a standalone observer (use at orchestrator/process level)
observer = TransitionObserver()
observer.add_hook(LoggingHook())
metrics = MetricsCollector()
observer.add_hook(metrics)

# ... process events ...
machine.process("PENDING", "exchange_ack", {})

summary = metrics.get_summary()
print(summary["total_transitions"])   # 1
print(summary["success_rate"])        # 1.0
print(summary["duration"])            # {"min_ms": ..., "avg_ms": ..., "p95_ms": ...}

To write a custom hook implement the TransitionHook protocol (four methods: on_before_process, on_transition_start, on_transition_complete, on_transition_error).

Error handling

All exceptions inherit from FSMError. Catch the specific subclass you care about:

from pystator import (
    FSMError,             # base — catch everything
    GuardRejectedError,   # guard returned False
    InvalidTransitionError, UndefinedTriggerError,
    TerminalStateError,   # trying to leave a terminal state
    StaleVersionError,    # optimistic locking conflict
    ErrorCode,            # enum: GUARD_REJECTED, VALIDATION_FAILED, TERMINAL_STATE, …
)

try:
    result = machine.process(current_state, trigger, context)
except GuardRejectedError as e:
    print(e.guard_name, e.current_state)   # which guard failed and in which state
except TerminalStateError:
    print("Entity already in a terminal state")
except FSMError as e:
    print(e.code)   # ErrorCode member, e.g. ErrorCode.INVALID_TRIGGER

TransitionResult (the non-exception path) carries the same information for soft failures:

result = machine.process(current_state, trigger, context)
if not result.success:
    print(result.error)          # FSMError or subclass
    print(result.metadata)       # {"reason": "guard_rejected", "guard_name": "is_full_fill", …}

Common pitfalls

  • Guards vs actions: Use guards for pure logic (can this transition run?). Use actions for side effects (notify, persist to another system). Don’t put side effects in guards.
  • AsyncioScheduler: Delays are in-memory; they are lost if the process exits. Use Redis or Celery for production or multiple replicas.
  • State store: With Option B, you must implement a StateStore and persist before running actions; the library does not persist for you.

REST API

With pip install pystator[api]:

# Start API (default: http://localhost:8000)
pystator api
# or: uvicorn pystator.api.main:app --reload
# Optional: use pystator.cfg for database and auth (copy pystator.cfg.example to pystator.cfg)
Endpoint Method Description
/health GET Health check
/api/v1/auth/me GET Current user (auth)
/api/v1/validate POST Validate FSM config
/api/v1/process POST Compute transition
/api/v1/machines GET/POST List/create machines
/api/v1/machines/{id} GET/PUT/DELETE CRUD machine

API docs: http://localhost:8000/docs.


Documentation

  • Quick start (detailed) — Step-by-step first FSM and first API call
  • Concepts — States, transitions, guards, actions, hierarchical and parallel
  • Architecture — Design goals, core flow, sandwich pattern, components
  • Configuration — Config file, environment, database (for API)
  • Worker — Continuous event-processing service: pystator worker, submit_event, worker_events
  • Tutorials — Order workflow, API & UI, delayed transitions
  • Examples — List of runnable examples with descriptions
  • FSM config reference — Full YAML/JSON schema (meta, states, transitions, validation)
  • API reference — StateMachine, Orchestrator, Worker, schedulers, execution modes

Examples and tutorials

Runnable examples live in the examples/ directory:

Example Description
basic_usage.py + order_fsm.yaml Order lifecycle: load FSM, register guards/actions, process events
day_trading_example.py + day_trading_fsm.yaml Parallel states (trading + risk monitor + data feed)
portfolio_optimization_example.py + portfolio_optimization_fsm.yaml Hierarchical states and workflows

See examples/README.md for how to run each. Tutorials in docs/tutorials/ walk through building an order workflow and using the API and UI.


API reference (condensed)

StateMachine

# Create
machine = StateMachine.from_yaml("config.yaml")
machine = StateMachine.from_dict(config_dict)

# Process (sync)
result = machine.process(current_state, trigger, context)

# Process (async, for async guards)
result = await machine.aprocess(current_state, trigger, context)

# Parallel states
config = machine.enter_parallel_state("parallel_state_name")
config, results = machine.process_parallel(config, event, context)

# Queries
machine.get_initial_state()
machine.get_available_transitions("STATE_NAME")

EntitySession

instance = machine.create(context={...}, initial_state=None)
instance.current_state       # str
instance.context             # dict
instance.is_<state>          # bool — auto-generated (e.g. instance.is_pending)
instance.<trigger>(**payload) # auto-generated method (e.g. instance.confirm())
instance.send(trigger, **payload)
await instance.asend(trigger, **payload)  # async
instance.allowed_triggers    # list[str]
instance.is_final            # bool — True if in a terminal state
snapshot = instance.snapshot()
instance2 = EntitySession.from_snapshot(machine, snapshot)

TransitionResult

result.success          # bool
result.source_state     # str
result.target_state     # str | None
result.trigger          # str
result.all_actions      # tuple[str, ...]  (exit + transition + enter)
result.error            # FSMError | None
result.metadata         # dict — e.g. {"reason": "guard_rejected", "guard_name": "..."}

Guards and actions

guards = GuardRegistry()
guards.register("name", lambda ctx: bool)
@guards.decorator("name")
def my_guard(ctx: dict) -> bool: ...

actions = ActionRegistry()
actions.register("name", lambda ctx: None)
@actions.decorator()
def my_action(ctx: dict) -> None: ...

machine.bind_guards(guards)
executor = ActionExecutor(actions)
executor.execute(transition_result, context)
# Async: await executor.async_execute_parallel(result, context)

Orchestrator

from pystator import Orchestrator, ContextValidatorFn

orchestrator = Orchestrator(
    machine=machine,
    state_store=store,
    guards=guards,
    actions=actions,
    context_validator=fn,    # ContextValidatorFn: (entity_id, state, trigger, ctx) -> (ok, errors)
    scheduler=scheduler,
    use_initial_state_when_missing=True,
)
result = orchestrator.process_event(entity_id, trigger, context)
result = await orchestrator.async_process_event(entity_id, trigger, context)
await orchestrator.close()  # shut down scheduler

Hooks

from pystator import LoggingHook, MetricsCollector, TransitionHook, TransitionObserver

observer = TransitionObserver()
observer.add_hook(LoggingHook())
metrics = MetricsCollector()
observer.add_hook(metrics)
# Use observer at your process/orchestrator level; then metrics.get_summary()
# returns {"total_transitions", "success_rate", "duration": {"avg_ms", "p95_ms", ...}}

Error codes

from pystator import ErrorCode
# ErrorCode.GUARD_REJECTED, VALIDATION_FAILED, INVALID_TRIGGER, INVALID_STATE,
# TERMINAL_STATE, TIMEOUT, CONFIG, ACTION_FAILED, STALE_VERSION,
# GUARD_NOT_FOUND, ACTION_NOT_FOUND, UNDEFINED_STATE

Development

pip install -e ".[dev]"
pytest
mypy src/
ruff check . && ruff format .

License

MIT — see LICENSE.


Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pystator-0.0.10.tar.gz (1.2 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pystator-0.0.10-py3-none-any.whl (1.4 MB view details)

Uploaded Python 3

File details

Details for the file pystator-0.0.10.tar.gz.

File metadata

  • Download URL: pystator-0.0.10.tar.gz
  • Upload date:
  • Size: 1.2 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for pystator-0.0.10.tar.gz
Algorithm Hash digest
SHA256 ab2bc9937fad380cd039a315cf2b266f2f90871cc3f0f91a531dc70067dcdd92
MD5 a01a76cb3ae94d56128571e7cf78b928
BLAKE2b-256 1fabd48dcca55c52e9be72d53ec162ef42c9cf301ef1ab6003dbc7b89d4bbe5d

See more details on using hashes here.

Provenance

The following attestation bundles were made for pystator-0.0.10.tar.gz:

Publisher: publish.yml on auscheng/pystator

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pystator-0.0.10-py3-none-any.whl.

File metadata

  • Download URL: pystator-0.0.10-py3-none-any.whl
  • Upload date:
  • Size: 1.4 MB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for pystator-0.0.10-py3-none-any.whl
Algorithm Hash digest
SHA256 511e2407329544734e91dce7c9071e74909770dcdc607d1f2c5a703b59cf2c2f
MD5 9eede12ca195e6e188d02af5dbb2a6de
BLAKE2b-256 64cce36e349fec6c73350f2a58e329fc88045c5eee5549d59a99c9c5b0adb13c

See more details on using hashes here.

Provenance

The following attestation bundles were made for pystator-0.0.10-py3-none-any.whl:

Publisher: publish.yml on auscheng/pystator

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page