Declarative state machine framework for machine apps
Project description
vention-state-machine
A lightweight wrapper around transitions for building async-safe, recoverable hierarchical state machines with minimal boilerplate.
Table of Contents
- ✨ Features
- 🧠 Concepts & Overview
- ⚙️ Installation & Setup
- 🚀 Quickstart Tutorial
- 🛠 How-to Guides
- 🧩 Extension Hooks
- 📖 API Reference
- 🔍 Troubleshooting & FAQ
✨ Features
- Built-in
ready/faultstates - Global transitions:
to_fault,reset - Optional state recovery (
recover__state) - Async task spawning and cancellation
- Timeouts and auto-fault handling
- Transition history recording with timestamps + durations
- Guard conditions for blocking transitions
- Global state change callbacks for logging/MQTT
- Optional RPC bundle for exposing state machine via Connect RPCs
- Extension hooks loaded from a folder for deployment-specific customization without forking
🧠 Concepts & Overview
This library uses a declarative domain-specific language (DSL) to define state machines in a readable, strongly typed way.
- State → A leaf node in the state machine
- StateGroup → Groups related states, creating hierarchical namespaces
- Trigger → Named events that initiate transitions
Example:
class MyStates(StateGroup):
idle: State = State()
working: State = State()
class Triggers:
begin = Trigger("begin")
finish = Trigger("finish")
TRANSITIONS = [
Triggers.finish.transition(MyStates.working, MyStates.idle),
]
Base States and Triggers
All machines include:
States:
ready(initial)fault(global error)
Triggers:
start,to_fault,reset
from state_machine.core import BaseStates, BaseTriggers
state_machine.trigger(BaseTriggers.RESET.value)
assert state_machine.state == BaseStates.READY.value
⚙️ Installation & Setup
pip install vention-state-machine
Optional dependencies:
- Graphviz (required for diagram generation)
- vention-communication (for RPC bundle integration)
Install optional tools:
MacOS:
brew install graphviz
pip install vention-communication
Linux (Debian/Ubuntu)
sudo apt-get install graphviz
pip install vention-communication
🚀 Quickstart Tutorial
1. Define States and Triggers
from state_machine.defs import StateGroup, State, Trigger
class Running(StateGroup):
picking: State = State()
placing: State = State()
homing: State = State()
class States:
running = Running()
class Triggers:
start = Trigger("start")
finished_picking = Trigger("finished_picking")
finished_placing = Trigger("finished_placing")
finished_homing = Trigger("finished_homing")
to_fault = Trigger("to_fault")
reset = Trigger("reset")
2. Define Transitions
TRANSITIONS = [
Triggers.start.transition("ready", States.running.picking),
Triggers.finished_picking.transition(States.running.picking, States.running.placing),
Triggers.finished_placing.transition(States.running.placing, States.running.homing),
Triggers.finished_homing.transition(States.running.homing, States.running.picking),
]
3. Implement Your State Machine
from state_machine.core import StateMachine
from state_machine.decorators import on_enter_state, auto_timeout, guard, on_state_change
class CustomMachine(StateMachine):
def __init__(self):
super().__init__(states=States, transitions=TRANSITIONS)
@on_enter_state(States.running.picking)
@auto_timeout(5.0, Triggers.to_fault)
def enter_picking(self, _):
print("🔹 Entering picking")
@on_enter_state(States.running.placing)
def enter_placing(self, _):
print("🔸 Entering placing")
@on_enter_state(States.running.homing)
def enter_homing(self, _):
print("🔺 Entering homing")
@guard(Triggers.reset)
def check_safety_conditions(self) -> bool:
return not self.estop_pressed
@on_state_change
def publish_state_to_mqtt(self, old_state: str, new_state: str, trigger: str):
mqtt_client.publish("machine/state", {
"old_state": old_state,
"new_state": new_state,
"trigger": trigger
})
4. Start It
start() is async — it awaits any @on_runtime_start hook handlers and then schedules @register_background_task coroutines. Even when you have no hooks, you still need to await it.
import asyncio
from state_machine.core import StateMachine
async def main():
state_machine = StateMachine()
await state_machine.start()
# ... run ...
await state_machine.stop_background_tasks() # cancels @register_background_task tasks only
asyncio.run(main())
If you don't use the hook system at all, state_machine.trigger("start") is a sync drop-in that fires the initial transition without the lifecycle dispatch.
🛠 How-to Guides
Expose Over RPC with VentionApp
import asyncio
from communication.app import VentionApp
from state_machine.vention_communication import build_state_machine_bundle
from state_machine.core import StateMachine
async def setup():
state_machine = StateMachine(...)
await state_machine.start()
app = VentionApp(name="MyApp")
bundle = build_state_machine_bundle(state_machine)
app.register_rpc_plugin(bundle)
app.finalize()
return state_machine, app
state_machine, app = asyncio.run(setup())
RPC Actions:
GetState→ Returns current state and last known stateGetHistory→ Returns transition history with timestampsTrigger_<TriggerName>→ Triggers a state transition (e.g.,Trigger_Start,Trigger_Activate)
Options:
# Customize which actions are included
bundle = build_state_machine_bundle(
state_machine,
include_state_actions=True, # Include GetState
include_history_action=True, # Include GetHistory
triggers=["start", "activate"], # Only include specific triggers
)
Timeout Example
@auto_timeout(5.0, Triggers.to_fault)
def enter_state(self, _):
...
Recovery Example
state_machine = StateMachine(enable_last_state_recovery=True)
await state_machine.start() # will attempt recover__{last_state}
Triggering state transitions via I/O
Here's an example of hooking up state transitions to I/O events via MQTT
import asyncio
import paho.mqtt.client as mqtt
from state_machine.core import StateMachine
from state_machine.defs import State, StateGroup, Trigger
from state_machine.decorators import on_enter_state
class MachineStates(StateGroup):
idle: State = State()
running: State = State()
class States:
machine = MachineStates()
class Triggers:
start_button = Trigger("start_button")
box_missing = Trigger("box_missing")
TRANSITIONS = [
Triggers.start_button.transition(States.machine.idle, States.machine.running),
Triggers.box_missing.transition(States.machine.running, States.machine.idle),
]
class MachineController(StateMachine):
def __init__(self):
super().__init__(states=States, transitions=TRANSITIONS)
self.mqtt_client = mqtt.Client()
self.setup_mqtt()
def setup_mqtt(self):
"""Configure MQTT client to listen for I/O signals."""
self.mqtt_client.on_connect = self.on_mqtt_connect
self.mqtt_client.on_message = self.on_mqtt_message
self.mqtt_client.connect("localhost", 1883, 60)
# Start MQTT loop in background
self.spawn(self.mqtt_loop())
async def mqtt_loop(self):
"""Background task to handle MQTT messages."""
self.mqtt_client.loop_start()
while True:
await asyncio.sleep(0.1)
def on_mqtt_connect(self, client, userdata, flags, rc):
"""Subscribe to I/O topics when connected."""
client.subscribe("machine/io/start_button")
client.subscribe("machine/sensors/box_sensor")
def on_mqtt_message(self, client, userdata, msg):
"""Handle incoming MQTT messages and trigger state transitions."""
topic = msg.topic
payload = msg.payload.decode()
# Map MQTT topics to state machine triggers
if topic == "machine/io/start_button" and payload == "pressed":
self.trigger(Triggers.start_button.value)
elif topic == "machine/sensors/box_sensor" and payload == "0":
self.trigger(Triggers.box_missing.value)
@on_enter_state(States.machine.running)
def enter_running(self, _):
print("🔧 Machine started - processing parts")
self.mqtt_client.publish("machine/status", "running")
@on_enter_state(States.machine.idle)
def enter_idle(self, _):
print("⏸️ Machine idle - ready for start")
self.mqtt_client.publish("machine/status", "idle")
🧩 Extension Hooks
Extension hooks let deployment-specific code attach to an existing state machine without forking the consumer's source. Handler files live in a folder on disk, self-register via decorators when imported, and are wired into the machine by a single load_hooks(...) call at boot. Once loaded they participate in the same guard / on-enter / on-exit / lifecycle machinery that class-level decorators use.
The library exposes the hook surface from state_machine.hooks. Consumers (palletizer, sanding, future controllers) own the integration root they hand to handlers via a context subclass.
When to reach for hooks
Use class-level decorators (@on_enter_state, @guard, etc.) for behavior the consuming application owns: anything that ships with the state machine and is the same for every deployment.
Use hooks for behavior a deployment adds on top: a barcode scan during validation for one customer, a tower-light renderer for another, a CSV logger that follows the firmware across over-the-air updates. Hooks live outside the consumer's source tree, persist across firmware updates, and never require touching application code.
add_* and replace_* exist so a deployment can either compose with class-level behavior or take over from it. See Composition rules below.
Decorators
All eight decorators are exported from state_machine.hooks. Each handler takes a single HookContext argument (or your subclass of it).
State entry and exit — sync or async. Async hooks are scheduled on the running event loop via machine.spawn(...) and do not block the transition. If no event loop is running, the coroutine is closed and a warning is logged.
from state_machine.hooks import (
add_on_enter_state, replace_on_enter_state,
add_on_exit_state, replace_on_exit_state,
)
@add_on_enter_state("Picking") # runs alongside class @on_enter_state
def telemetry_on_pick(context): ...
@replace_on_enter_state("Picking") # runs instead of class @on_enter_state
async def custom_pick(context): ...
@add_on_exit_state("Picking")
def telemetry_on_exit(context): ...
@replace_on_exit_state("Picking")
async def custom_exit(context): ...
Guards — sync only; async guards are rejected at boot because pytransitions does not await condition callables.
from state_machine.hooks import add_guard, replace_guard
@add_guard("start_pallet") # composes with class-level @guard(s)
def check_capacity(context) -> bool: ...
@replace_guard("start_pallet") # only gate for this trigger
def custom_safety_check(context) -> bool: ...
Lifecycle — run around state_machine.start(). @on_runtime_start handlers fire in registration order (sync or async, awaited if async; failure aborts startup). @register_background_task coroutines are scheduled after runtime-start completes, survive to_fault, and are only cancelled by stop_background_tasks().
from state_machine.hooks import on_runtime_start, register_background_task
@on_runtime_start
async def connect_mqtt(context):
...
@register_background_task
async def telemetry_loop(context):
while True:
await publish_status(context)
await asyncio.sleep(1)
Composition rules
| Decorator pair | What runs |
|---|---|
add_on_enter_state |
hook first, then class-level @on_enter_state |
replace_on_enter_state |
hook only; class-level binding skipped |
add_on_exit_state |
hook first, then class-level @on_exit_state |
replace_on_exit_state |
hook only; class-level binding skipped |
add_guard |
class-level @guards first, then hook; all must return True |
replace_guard |
hook only; class-level guards skipped |
on_runtime_start |
every handler runs in registration order; failure aborts startup |
register_background_task |
scheduled after runtime-start completes; cancelled by stop_background_tasks() |
One handler per (decorator, target) pair. A second registration for the same slot is a boot error.
Extending HookContext
Each handler receives a HookContext carrying the live state machine. Consumers extend it with one integration root — services, config, model accessors — and pass a context_factory to load_hooks(...).
from dataclasses import dataclass
from state_machine.hooks import HookContext
@dataclass(frozen=True)
class PalletizerHookContext(HookContext):
app: PalletizerApp # services, config, model, runtime
def palletizer_context_factory(base: HookContext) -> PalletizerHookContext:
return PalletizerHookContext(state_machine=base.state_machine, app=palletizer_app)
A fresh context is constructed for every hook invocation so handlers see consistent live state. The factory closes over whatever the consumer wants to expose.
Loading hooks
state_machine.load_hooks(hooks_directory, *, context_factory=None)
hooks_directory— aPathwhosehandlers/package contains the handler modules. One handler per file is a useful convention but not a constraint; the loader imports every top-level.pymodule insidehandlers/(no recursion into subpackages) and decorators self-register globally. Missing directory is a silent no-op (deployment installed no hooks). Missinghandlers/package raisesModuleNotFoundError.context_factory— optionalCallable[[HookContext], YourContext]. If omitted, handlers receive the baseHookContext.- One-shot per machine. A second call raises
RuntimeError— create a fresh machine to swap directories. - Boot-time validation catches unknown states / triggers, async guards, sync background tasks, and signatures that don't accept a context. All of these aggregate into a single
HookValidationErrorso consumers fix every offending handler in one pass, not one reboot at a time. Duplicate(decorator, target)registrations are the exception: they raiseDuplicateHookErrorat import time, before the aggregated validation runs.
Expected layout:
/data/vention/my-hooks/
└── handlers/
├── __init__.py
├── barcode_check.py
├── capacity_guard.py
├── startup_banner.py
└── telemetry_loop.py
Adding a file activates it. Deleting it (or removing the decorator) deactivates it. No manifest required.
Files can hold any number of decorators, including a mix of types — @add_guard, @add_on_enter_state, and @register_background_task can live side by side. The only registration rule is that (decorator, target) pairs are globally unique across the whole handlers/ package: two @add_on_enter_state("Picking") handlers in the same package (same file or different files) is a DuplicateHookError at boot. @on_runtime_start and @register_background_task can register any number and fire in import order.
End-to-end example
The consumer wires the state machine, declares its context type, and calls load_hooks(...):
import asyncio
from dataclasses import dataclass
from pathlib import Path
from state_machine.core import StateMachine
from state_machine.decorators import on_enter_state
from state_machine.defs import State, StateGroup, Trigger
from state_machine.hooks import HookContext
@dataclass(frozen=True)
class DemoApp:
inventory_threshold: int
@dataclass(frozen=True)
class DemoHookContext(HookContext):
app: DemoApp
class Running(StateGroup):
picking: State = State()
placing: State = State()
class States:
running = Running()
class Triggers:
start = Trigger("start")
picked = Trigger("picked")
TRANSITIONS = [
Triggers.start.transition("ready", States.running.picking),
Triggers.picked.transition(States.running.picking, States.running.placing),
]
class DemoMachine(StateMachine):
def __init__(self) -> None:
super().__init__(states=States, transitions=TRANSITIONS)
@on_enter_state(States.running.placing)
def enter_placing(self, _):
print("placing reached")
async def main() -> None:
machine = DemoMachine()
app = DemoApp(inventory_threshold=5)
machine.load_hooks(
Path("/data/vention/demo-hooks"),
context_factory=lambda base: DemoHookContext(state_machine=base.state_machine, app=app),
)
await machine.start()
# ... run ...
await machine.stop_background_tasks()
asyncio.run(main())
The deployment ships a handlers/ package, one decorator per file. Handlers pass state and trigger targets as strings ("Running_picking", "start") rather than typed descriptors (States.running.picking) because they don't import the consumer's States / Triggers containers — only the shared context type and the hook decorators cross the contract boundary:
# /data/vention/demo-hooks/handlers/capacity_guard.py
from state_machine.hooks import add_guard
from my_contract import DemoHookContext
@add_guard("start")
def check_capacity(context: DemoHookContext) -> bool:
return context.app.inventory_threshold > 0
# /data/vention/demo-hooks/handlers/barcode_check.py
from state_machine.hooks import add_on_enter_state
from my_contract import DemoHookContext
@add_on_enter_state("Running_picking")
async def scan_barcode(context: DemoHookContext) -> None:
# imagine an async barcode read here
context.state_machine.trigger("picked")
# /data/vention/demo-hooks/handlers/startup_banner.py
from state_machine.hooks import on_runtime_start
from my_contract import DemoHookContext
@on_runtime_start
async def announce_startup(context: DemoHookContext) -> None:
print(f"hooks online, threshold={context.app.inventory_threshold}")
# /data/vention/demo-hooks/handlers/telemetry_loop.py
import asyncio
from state_machine.hooks import register_background_task
from my_contract import DemoHookContext
@register_background_task
async def telemetry(context: DemoHookContext) -> None:
while True:
# publish whatever your app exposes
await asyncio.sleep(1)
DemoHookContext lives in a shared contract module (my_contract above) so both the consumer and the handler package import the same type and CI catches breaking changes on either side. Outside a monorepo, publish that module as a small companion package.
Boot-time validation
load_hooks(...) fails loudly if any handler is misconfigured. Two distinct failure paths:
Decorator-time (fail-fast, raised during handler import):
- Two handlers register for the same
(decorator, target)slot →DuplicateHookError.
Validate-time (aggregated into a single HookValidationError so consumers fix every offending handler in one pass):
- Unknown state target → lists the registered handler plus every known state name (top-level and nested).
- Unknown trigger target → lists every known trigger.
- Async function registered with
@add_guard/@replace_guard→ pytransitions does not await guard conditions; an async guard always returns a coroutine (truthy) and silently blocks every transition. - Sync function registered with
@register_background_task→ background tasks are scheduled on the event loop and must be coroutine functions. - Handler signature does not accept the context argument → caught by
inspect.signature.
Either failure runs before any hook is wired, so the machine isn't left half-modified. The instance itself is still consumed by the attempt though — fix the handlers and construct a fresh machine to load again. Retrying load_hooks() on the same instance raises RuntimeError.
📖 API Reference
StateMachine
class StateMachine(HierarchicalGraphMachine):
def __init__(
self,
states: Union[object, list[dict[str, Any]], None],
*,
transitions: Optional[list[dict[str, str]]] = None,
history_size: Optional[int] = None,
enable_last_state_recovery: bool = True,
**kw: Any,
)
Parameters:
states: Either a container of StateGroups or a list of state dicts.transitions: List of transition dictionaries, or[].history_size: Max number of entries in transition history (default 1000).enable_last_state_recovery: If True, machine can resume from last recorded state.
Methods
spawn(coro: Coroutine) -> asyncio.Task
Start a background coroutine and track it. Auto-cancelled on fault/reset.
cancel_tasks() -> None
Cancel all tracked tasks and timeouts.
set_timeout(state_name: str, seconds: float, trigger_fn: Callable[[], str]) -> None
Schedule a trigger if state_name stays active too long.
record_last_state() -> None
Save current state for recovery.
get_last_state() -> Optional[str]
Return most recently recorded state.
async start() -> None
Run @on_runtime_start handlers in registration order (awaited if async), then fire the initial transition (recover__<last_state> if recovery is enabled and a state was recorded, else start), then schedule @register_background_task coroutines. A failure in @on_runtime_start aborts startup, cancels anything scheduled in the same call, and propagates.
async stop_background_tasks() -> None
Cancel every @register_background_task task. Named explicitly so it doesn't shadow "stop" if a consumer uses that as a trigger. Does NOT touch cancel_tasks / timeouts — to_fault and stop_background_tasks have independent lifecycles, so background tasks survive faults. Safe to call without a prior start().
load_hooks(hooks_directory: Path, *, context_factory: Optional[Callable[[HookContext], T]] = None) -> None
Import every module under hooks_directory/handlers/ so decorators self-register, validate every registration against this machine, and wire the result into pytransitions' callback / condition machinery. Missing directory is a silent no-op. Validation aggregates into a single HookValidationError. One-shot per machine. See Extension Hooks.
Properties
history -> list[dict[str, Any]]
Full transition history with timestamps/durations.
get_last_history_entries(n: int) -> list[dict[str, Any]]
Return last n transitions.
Decorators
@on_enter_state(state: State)
Bind function to run on entry.
@on_exit_state(state: State)
Bind function to run on exit.
@auto_timeout(seconds: float, trigger: Trigger)
Auto-trigger if timeout expires.
@guard(*triggers: Trigger)
Guard transition; blocks if function returns False.
@on_state_change
Global callback (old_state, new_state, trigger) fired after each transition.
Extension Hook API
Exported from state_machine.hooks. See Extension Hooks for the full guide.
@dataclass(frozen=True) class HookContext
Argument passed to every hook handler. Carries state_machine: StateMachine. Consumers extend it with a subclass that adds their integration root and supply a context_factory to load_hooks(...).
@add_on_enter_state(state) / @replace_on_enter_state(state)
Run a handler when state is entered. add composes with class-level @on_enter_state (hook first, then class). replace runs instead of the class binding. Sync or async; async hooks are scheduled on the running event loop and do not block the transition. If no event loop is running, the coroutine is closed and a warning is logged.
@add_on_exit_state(state) / @replace_on_exit_state(state)
Symmetric to entry hooks, fired on exit.
@add_guard(trigger) / @replace_guard(trigger)
Gate a trigger. add composes with class-level @guard (all must return True). replace runs instead of every class-level guard. Sync only — async guards are rejected at boot because pytransitions does not await condition callables.
@on_runtime_start
Run once on state_machine.start() in registration order. Sync or async. Any failure aborts startup before the initial transition fires and propagates from start().
@register_background_task
Schedule a coroutine on the runtime loop after @on_runtime_start completes. Must be async def; sync functions are rejected at boot. Cancelled only by stop_background_tasks() — bg tasks survive to_fault so MQTT subscribers, telemetry loggers, and tower-light renderers keep running to observe and report the fault. Exceptions are logged and the machine continues.
HookValidationError(RuntimeError)
Raised by load_hooks(...) listing every failing handler. Read .errors for the list.
DuplicateHookError(Exception)
Raised during decorator import when two handlers register for the same (decorator, target) slot.
RPC Bundle
def build_state_machine_bundle(
sm: StateMachine,
*,
include_state_actions: bool = True,
include_history_action: bool = True,
triggers: Optional[Sequence[str]] = None,
) -> RpcBundle
Builds an RPC bundle exposing the state machine via Connect-style RPCs:
GetState- Returns current and last known stateGetHistory- Returns transition historyTrigger_<TriggerName>- One RPC per trigger (PascalCase naming)
The bundle can be registered with a VentionApp using app.register_rpc_plugin(bundle).
🔍 Troubleshooting & FAQ
- Transitions blocked unexpectedly → Check guard conditions.
- Callbacks not firing → Only successful transitions trigger them.
- State not restored after restart → Ensure
enable_last_state_recovery=True. - RPC actions not available → Ensure
app.finalize()is called after registering bundles.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file vention_state_machine-0.5.1.tar.gz.
File metadata
- Download URL: vention_state_machine-0.5.1.tar.gz
- Upload date:
- Size: 44.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.6.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c1a8ad65630a034369366d6deadb832acfd23003b49769e267384e08a71a6702
|
|
| MD5 |
8aa5dcec7d61fac7e87baa6fdf0dafd0
|
|
| BLAKE2b-256 |
75895e157307fa9785ffb36a3363a22f59c88c4d59840377158cd8cf8d79ac80
|
File details
Details for the file vention_state_machine-0.5.1-py3-none-any.whl.
File metadata
- Download URL: vention_state_machine-0.5.1-py3-none-any.whl
- Upload date:
- Size: 32.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.6.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2d94a8b7f2b78c1e37fb33c5a5e33a8cf66c373d984f87f8563e16e71e5cb3b0
|
|
| MD5 |
f00b8e513998123278e88d5b7fb38dad
|
|
| BLAKE2b-256 |
8f005c5ef159cbf9ff29417c90592322b84173ef9a1447c65e8429798acaf011
|