Skip to main content

A configuration-driven, stateless finite state machine library for Python

Project description

PyStator

A configuration-driven, stateless finite state machine library for Python with support for hierarchical states, parallel states (orthogonal regions), and async execution.

PyStator defines behavioral contracts through YAML/JSON specifications, computing state transitions without holding internal state. Designed for high-integrity systems like order management, trading workflows, and distributed applications.

Features

  • Configuration-Driven: Define state machines in YAML/JSON with schema validation
  • Stateless Design: Pure computation—takes state in, returns state/actions out
  • Hierarchical States: Compound states with parent/child; transitions from parent match any sub-state; exit/enter chains follow LCA (see Hierarchical States)
  • Parallel States: Orthogonal regions—each region is an independent sub-machine; events can trigger region-scoped transitions (see Parallel States)
  • Delayed Transitions: Schedule transitions to fire after a delay with pluggable schedulers (see Delayed Transitions)
  • Inline Guard Expressions: Define guards directly in YAML without Python code (see Inline Guard Expressions)
  • Action Parameters: Pass configuration to actions from YAML (see Action Parameters)
  • Async Actions: Register async callables and run them with sequential, parallel, or phased execution (see Async Support and Execution Modes (Actions))
  • Guards: Conditional transitions based on runtime context (sync and async)
  • Actions/Hooks: Entry/exit hooks and transition actions
  • Timeouts/TTL: Automatic transitions after configurable durations
  • Type-Safe: Full type hints and PEP 561 compliance
  • Retry Mechanism: Configurable retry with exponential backoff
  • Idempotency: Pluggable backends for duplicate detection
  • Visualization: Generate diagrams from FSM definitions
  • Scheduler Adapters: Pluggable backends for delayed transitions (asyncio, Redis, Celery)
  • Optional API: REST API via pip install pystator[api]

Installation

# Core library
pip install pystator

# With API server
pip install pystator[api]

# Development
pip install -e ".[dev]"

Quick Start

1. Define Your State Machine (YAML)

# order_fsm.yaml
meta:
  version: "1.0.0"
  machine_name: "order_management"
  strict_mode: true

states:
  - name: PENDING
    type: initial
    timeout:
      seconds: 5.0
      destination: TIMED_OUT

  - name: OPEN
    type: stable
    on_enter:
      - notify_ui
      - log_audit

  - name: FILLED
    type: terminal

transitions:
  - trigger: exchange_ack
    source: PENDING
    dest: OPEN
    actions:
      - update_order_id

  - trigger: execution_report
    source: OPEN
    dest: FILLED
    guards:
      - is_full_fill
    actions:
      - update_positions

2. Use the State Machine

from pystator import StateMachine, GuardRegistry, ActionRegistry, Event
from pystator.actions import ActionExecutor

# Load FSM
machine = StateMachine.from_yaml("order_fsm.yaml")

# Register guards and actions
guards = GuardRegistry()
guards.register("is_full_fill", lambda ctx: ctx["fill_qty"] >= ctx["order_qty"])
machine.bind_guards(guards)

actions = ActionRegistry()
actions.register("update_order_id", lambda ctx: print(f"Order ID: {ctx['order_id']}"))
actions.register("update_positions", lambda ctx: print("Positions updated"))

executor = ActionExecutor(actions)

# Process event (pure computation)
result = machine.process(
    current_state="OPEN",
    event="execution_report",
    context={"fill_qty": 100, "order_qty": 100}
)

# Execute actions after persistence
if result.success:
    db.update_state(order_id, result.target_state)
    executor.execute(result, context)

Core Concepts

States

States represent nodes in the state machine graph:

from pystator import State, StateType, Timeout

state = State(
    name="PENDING",
    type=StateType.INITIAL,  # initial, stable, terminal, error, parallel
    on_enter=("log_entry",),
    on_exit=("log_exit",),
    timeout=Timeout(seconds=5.0, destination="TIMED_OUT"),
)

Transitions

from pystator import Transition

transition = Transition(
    trigger="execution_report",
    source=frozenset({"OPEN", "PARTIALLY_FILLED"}),
    dest="FILLED",
    guards=("is_full_fill",),
    actions=("update_positions",),
)

Guards

Guards are pure functions that control whether transitions are allowed:

from pystator import GuardRegistry

guards = GuardRegistry()

@guards.decorator("has_buying_power")
def has_buying_power(ctx: dict) -> bool:
    return ctx["buying_power"] >= ctx["order_value"]

# Async guards
@guards.decorator("check_external")
async def check_external(ctx: dict) -> bool:
    account = await broker.get_account()
    return account.buying_power >= ctx["order_value"]

Actions

Actions handle side effects and are executed after state persistence. You can register sync or async callables; both take a single context dict (event payload, entity state, etc.).

Async actions: Register async callables with ActionRegistry and run them with executor.async_execute_*. Use Execution Modes (sequential, parallel, phased) to control how actions are run.

from pystator import ActionRegistry
from pystator.actions import ActionExecutor, ExecutionMode

actions = ActionRegistry()

@actions.decorator()
def send_notification(ctx: dict) -> None:
    email_service.send(ctx["user_email"], "Order updated")

@actions.decorator()
async def update_position(ctx: dict) -> None:
    await db.update_position(ctx["symbol"], ctx["quantity"])

executor = ActionExecutor(actions, default_mode=ExecutionMode.PHASED)

# Sync execution (sequential by default)
result = executor.execute(transition_result, context)

# Async execution: parallel (all at once) or phased (exit → transition → enter)
result = await executor.async_execute_parallel(transition_result, context)
result = await executor.async_execute_phased(transition_result, context)

Hierarchical States (Statecharts)

States can be nested with parent/child relationships. The machine resolves to a single active leaf: compound states specify an initial_child, and the engine follows that chain until it reaches a leaf. Transitions are matched by ancestry—a transition whose source is a parent applies when the current state is that parent or any descendant.

Exit/enter order: When transitioning between two leaves, the engine computes the lowest common ancestor (LCA). Exit actions run from the current leaf up to (but not including) the LCA; enter actions run from the LCA down to the target leaf. This preserves statechart semantics.

states:
  - name: active
    type: stable
    initial_child: active.scanning
    on_enter: [start_feed]
    on_exit: [stop_feed]
    
  - name: active.scanning
    parent: active
    type: stable
    
  - name: active.analyzing
    parent: active
    type: stable
    
  - name: halted
    type: terminal

transitions:
  # Transition between siblings
  - trigger: signal_detected
    source: active.scanning
    dest: active.analyzing
    
  # Transition from parent applies to any child
  - trigger: emergency_stop
    source: active
    dest: halted

When processing emergency_stop from active.scanning, the transition succeeds because active is an ancestor of the current leaf. get_initial_state() returns the resolved leaf (e.g. active.scanning), not the compound root.

Parallel States (Orthogonal Regions)

Parallel states contain orthogonal regions—each region is an independent sub-machine with its own current state. All regions are active at once; an event can trigger a transition in one or more regions via region-scoped transitions (region: region_name). Use parallel states when you need concurrent behaviors (e.g. trading workflow + risk monitor + data feed).

Each region has an initial state and a states list (the valid states in that region). You can define region states as separate state entries (with parent: parallel_state_name) to attach on_enter/on_exit hooks.

states:
  - name: active
    type: parallel
    regions:
      - name: trading
        initial: scanning
        states: [scanning, analyzing, executing]
        
      - name: risk_monitor
        initial: normal
        states: [normal, elevated, critical]
        
      - name: data_feed
        initial: connecting
        states: [connecting, connected, failed]

  # Optional: define region states for on_enter/on_exit
  - name: scanning
    type: stable
    parent: active
    on_enter: [start_scanners]
    
  - name: normal
    type: stable
    parent: active
    on_enter: [reset_risk_alerts]

transitions:
  # Region-scoped: only this region is considered
  - trigger: signal_detected
    source: scanning
    dest: analyzing
    region: trading
    
  - trigger: risk_warning
    source: normal
    dest: elevated
    region: risk_monitor

Using Parallel States

machine = StateMachine.from_yaml("trading_fsm.yaml")

# Enter parallel state - initializes all regions
config = machine.enter_parallel_state("active")
# config.region_states = {"trading": "scanning", "risk_monitor": "normal", "data_feed": "connecting"}

# Process events that affect specific regions
config, results = machine.process_parallel(config, "signal_detected", context)
# Trading region: scanning -> analyzing

config, results = machine.process_parallel(config, "risk_warning", context)
# Risk region: normal -> elevated (if guards pass)

# Each result contains actions for that region transition
for result in results:
    await executor.async_execute_phased(result, context)

# Exit parallel state
exit_actions = machine.get_parallel_exit_actions(config)

Delayed Transitions

Schedule transitions to fire automatically after a specified delay using the after field. This is useful for timeouts, retries, and scheduled state changes.

YAML Syntax

transitions:
  - trigger: timeout
    source: waiting
    dest: retry
    after: 5000        # 5 seconds (milliseconds)
  
  - trigger: expire
    source: pending
    dest: cancelled
    after: "30m"       # 30 minutes (string with unit)
  
  - trigger: daily_close
    source: active
    dest: closed
    after: "1h"        # 1 hour

Delay formats:

  • Integer: milliseconds (e.g., 5000 = 5 seconds)
  • String with unit: "5s" (seconds), "30m" (minutes), "1h" (hours)

Implicit trigger: When only after (and dest, optional guards) is set, you can omit trigger; the loader assigns a synthetic trigger so the orchestrator can schedule the transition. Delayed transitions with implicit trigger must have exactly one source state.

Using Delayed Transitions

Delayed transitions require a scheduler adapter. Use AsyncioScheduler for zero-infrastructure scheduling:

from pystator import StateMachine, Orchestrator, AsyncioScheduler
from pystator.core.state_store import InMemoryStateStore

machine = StateMachine.from_yaml("order_fsm.yaml")
store = InMemoryStateStore()
scheduler = AsyncioScheduler()

orchestrator = Orchestrator(
    machine=machine,
    state_store=store,
    guards=guards,
    actions=actions,
    scheduler=scheduler,  # Enable delayed transitions
)

# Process events - delayed transitions are scheduled automatically
result = await orchestrator.async_process_event("order-123", "submit", context)
# If target state has delayed transitions, they'll be scheduled
# When the delay expires, the event fires automatically

Scheduler Adapters

Adapter Use Case Infrastructure
AsyncioScheduler Development, testing, single-process apps None (in-memory)
RedisScheduler Production, distributed, persistence needed Redis server
CeleryScheduler Production, task queue integration Celery + broker
# Zero infrastructure (asyncio)
from pystator.scheduler import AsyncioScheduler
scheduler = AsyncioScheduler()

# With Redis (production)
from pystator.scheduler import RedisScheduler
from redis.asyncio import Redis
redis = Redis.from_url("redis://localhost:6379")
scheduler = RedisScheduler(redis)
await scheduler.start()  # Start polling loop

# With Celery
from pystator.scheduler import CeleryScheduler
from celery import Celery
celery_app = Celery('tasks', broker='redis://localhost:6379')
scheduler = CeleryScheduler(celery_app)
scheduler.register_task()  # Register with Celery

Inline Guard Expressions

Define guards directly in YAML using the expr syntax. No Python code needed for simple conditions.

YAML Syntax

transitions:
  - trigger: fill
    source: open
    dest: filled
    guards:
      - expr: "fill_qty >= order_qty"           # Inline expression
      - expr: "price > 0 and symbol != ''"      # Multiple conditions
      - is_valid_order                           # Named guard function
  
  - trigger: partial_fill
    source: open
    dest: partial
    guards:
      - expr: "fill_qty > 0 and fill_qty < order_qty"

Expression Features

Context variables are directly available in expressions:

# If context = {"qty": 100, "filled": 50, "price": 10.5}
guards:
  - expr: "qty > 0"              # True
  - expr: "filled < qty"         # True
  - expr: "price * qty < 2000"   # True (1050 < 2000)
  - expr: "status == 'active'"   # String comparison
  - expr: "item in items"        # List membership
  - expr: "not is_cancelled"     # Boolean negation

Supported operations:

  • Comparisons: >, <, >=, <=, ==, !=
  • Boolean: and, or, not
  • Arithmetic: +, -, *, /
  • Membership: in, not in

Safe builtins: Inline expressions can use a small allowlist of builtins: len, min, max, abs, sum. For example: len(positions) < max_positions works without precomputing position_count in context.

Note: Inline expressions require simpleeval. Install with: pip install pystator[recipes]

Event Normalization

Optional meta.event_normalizer (e.g. "lower" or "upper") normalizes trigger and incoming event names at process time before matching. Config and application code can use UPPERCASE event names while the engine normalizes for comparison.

meta:
  event_normalizer: lower

transitions:
  - trigger: FILL
    source: open
    dest: filled

Calling machine.process(state, "fill", context) or machine.process(state, "FILL", context) both match the transition.

Context for guards

Guard expressions see a flat namespace. Build it in your application before calling process, or use the helper from the recipes package:

from pystator.recipes import flatten_context_for_guards

# Nested context: add derived values for guards
ctx = flatten_context_for_guards(
    {"trading_context": {"positions": [...], "buying_power": 10000}},
    overrides={"position_count": 3},  # or len(trading_context["positions"])
)
result = machine.process(current_state, "signal", ctx)

Recommend building the flat dict in the application or in the state store adapter when providing context.

Invoke (long-lived services)

Optional invoke on a state: list of service refs (id, src, optional on_done). The orchestrator starts services on enter and stops them on exit via an invoke adapter. When a service completes, the adapter emits the on_done event (e.g. the app calls process_event(entity_id, on_done)). Without an adapter, invoke is a no-op.

states:
  - name: monitoring
    on_enter: [start_metrics]
    invoke:
      - id: price_feed
        src: websocket
        on_done: feed_ended

Provide an InvokeAdapter when creating the orchestrator; implement start_services(entity_id, state_name, invoke_specs, context) and stop_services(entity_id, state_name). See orchestration.invoke.InvokeAdapter and NoOpInvokeAdapter.

Action Parameters

Pass configuration to actions directly from YAML using the params syntax. Actions receive parameters via the context. Parameterized entry/exit: on_enter and on_exit on states support the same shape as transition actions: a string (action name) or { name, params }.

YAML Syntax

transitions:
  - trigger: order_filled
    source: open
    dest: filled
    actions:
      - log_fill                    # Simple action (no params)
      - name: send_notification     # Parameterized action
        params:
          channel: slack
          priority: high
      - name: update_metrics
        params:
          metric_name: fills
          increment: 1

Accessing Parameters

Parameters are injected into the context under _action_params:

from pystator.actions import ACTION_PARAMS_KEY

@actions.decorator()
def send_notification(context: dict) -> None:
    params = context.get(ACTION_PARAMS_KEY, {})
    channel = params.get("channel", "email")  # "slack"
    priority = params.get("priority", "normal")  # "high"
    
    notify(channel=channel, priority=priority, message=context["message"])

Async Support

PyStator supports async throughout: guards, actions, and processing.

  • Async guards: Register async callables; they are awaited when you use machine.async_process() or machine.async_process_parallel(). Use for checks that call external services (e.g. buying power, permissions).
  • Async actions: Register async callables; run them with executor.async_execute_* after persisting the transition. Use for side effects that call APIs, DBs, or message queues.
  • Async processing: Use async_process() / async_process_parallel() when guards are async or you want non-blocking transition computation.
# Async guards (evaluated during async_process / async_process_parallel)
@guards.decorator()
async def check_buying_power(ctx: dict) -> bool:
    account = await broker.get_account()
    return account.buying_power >= ctx["order_value"]

# Async actions (run with executor.async_execute_* after persistence)
@actions.decorator()
async def update_position(ctx: dict) -> None:
    async with db.transaction():
        await db.update_position(ctx["symbol"], ctx["qty"])

# Async transition computation
result = await machine.async_process("OPEN", "execution_report", context)

# Async parallel state processing
config, results = await machine.async_process_parallel(config, event, context)

# Async action execution (parallel or phased)
execution = await executor.async_execute_parallel(result, context)

Execution Modes (Actions)

How actions are run after a transition (sync with execute(), async with async_execute_*):

Mode Description Use Case
SEQUENTIAL Actions run one after another Default, safest
PARALLEL All actions run concurrently Independent actions, latency critical
PHASED Exit → transition → enter in parallel phases Respects state machine semantics
from pystator.actions import ExecutionMode

executor = ActionExecutor(actions, default_mode=ExecutionMode.PHASED)

# Or specify per-call
result = await executor.async_execute_with_mode(
    transition_result, context, mode=ExecutionMode.PARALLEL
)

The Sandwich Pattern

PyStator is designed around "Load → Decide → Commit → Act":

┌─────────────────────────────────────────────────────────────┐
│ 1. INGRESS: Receive event, normalize to trigger + payload   │
├─────────────────────────────────────────────────────────────┤
│ 2. HYDRATION: Load current state from database              │
├─────────────────────────────────────────────────────────────┤
│ 3. COMPUTE: machine.process() - pure, no side effects       │
├─────────────────────────────────────────────────────────────┤
│ 4. PERSIST: Atomic DB transaction (state + audit trail)     │
├─────────────────────────────────────────────────────────────┤
│ 5. EXECUTE: Run actions AFTER successful commit             │
└─────────────────────────────────────────────────────────────┘

Configuration Schema

meta:
  version: "1.0.0"
  machine_name: "my_fsm"
  strict_mode: true

states:
  - name: STATE_NAME
    type: initial|stable|terminal|error|parallel
    parent: PARENT_STATE        # For hierarchical states
    initial_child: CHILD_STATE  # For compound states
    regions:                    # For parallel states
      - name: region_name
        initial: initial_state
        states: [state1, state2]
    on_enter: [action1]
    on_exit: [action2]
    timeout:
      seconds: 5.0
      destination: TIMEOUT_STATE

transitions:
  - trigger: event_name
    source: STATE_A
    dest: STATE_B
    region: region_name         # For parallel state transitions
    after: 5000                 # Delayed transition (milliseconds or "5s"/"5m"/"1h")
    guards:
      - named_guard             # Named guard function
      - expr: "qty > 0"         # Inline expression
    actions:
      - simple_action           # Simple action
      - name: parameterized     # Action with parameters
        params:
          key: value

error_policy:
  default_fallback: ERROR_STATE
  retry_attempts: 3

Examples

See the examples/ directory for complete examples:

  • order_fsm.yaml / basic_usage.py: Simple order lifecycle
  • day_trading_fsm.yaml / day_trading_example.py: Parallel states for trading
  • portfolio_optimization_fsm.yaml / portfolio_optimization_example.py: Hierarchical states for workflows

REST API

With pip install pystator[api]:

uvicorn pystator.api.main:app --port 8000
Endpoint Method Description
/health GET Health check
/api/v1/validate POST Validate FSM config
/api/v1/process POST Compute transition
/api/v1/machines GET/POST List/create machines
/api/v1/machines/{id} GET/PUT/DELETE CRUD operations

API Reference

StateMachine

# Creation
machine = StateMachine.from_yaml("config.yaml")
machine = StateMachine.from_dict(config_dict)

# Processing
result = machine.process(current_state, event, context)
result = await machine.async_process(current_state, event, context)

# Parallel states
config = machine.enter_parallel_state("parallel_state")
config, results = machine.process_parallel(config, event, context)
config, results = await machine.async_process_parallel(config, event, context)

# Queries
machine.get_state("STATE")
machine.get_initial_state()
machine.get_available_transitions("STATE")
machine.is_parallel_state("STATE")
machine.validate_parallel_config(config)

TransitionResult

result.success           # bool
result.source_state      # str
result.target_state      # str | None
result.trigger           # str
result.all_actions       # tuple[str, ...] (exit + transition + enter)
result.error             # FSMError | None

ParallelStateConfig

config.parallel_state       # str
config.region_states        # dict[str, str]
config.get_region_state("trading")  # str | None
config.is_in_state("scanning")      # bool
config.get_all_active()             # list[str]
config.to_string()                  # "active:trading=scanning,risk=normal"

Development

# Install dev dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Type checking
mypy src/

# Linting
ruff check .
ruff format .

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pystator-0.0.1.tar.gz (1.2 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pystator-0.0.1-py3-none-any.whl (1.3 MB view details)

Uploaded Python 3

File details

Details for the file pystator-0.0.1.tar.gz.

File metadata

  • Download URL: pystator-0.0.1.tar.gz
  • Upload date:
  • Size: 1.2 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.11

File hashes

Hashes for pystator-0.0.1.tar.gz
Algorithm Hash digest
SHA256 415d58e4153d74f679e42d31c19368442f3e129146467d09ba4f2caa6688976c
MD5 32482bc5a024712b39bd7389495b3797
BLAKE2b-256 b3405f86340de36e668ee96ceead3ce35a0a4edcc8b4a0a2497d92bfa736736a

See more details on using hashes here.

File details

Details for the file pystator-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: pystator-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 1.3 MB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.11

File hashes

Hashes for pystator-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 e6523cca100bd0d7696565da448712f28ac7daa624942368bc42177cedadc764
MD5 3874c6bbf8622338685a487b69b3992c
BLAKE2b-256 a6c905d69d3cf7c8b1fc571e1d368339ca2daf785dbbf3096379e61090669ad6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page