Declarative state machine framework for machine apps
Project description
vention-state-machine
A lightweight wrapper around transitions for building async-safe, recoverable hierarchical state machines with minimal boilerplate.
Table of Contents
- ✨ Features
- 🧠 Concepts & Overview
- ⚙️ Installation & Setup
- 🚀 Quickstart Tutorial
- 🛠 How-to Guides
- 📖 API Reference
- 🔍 Troubleshooting & FAQ
✨ Features
- Built-in
ready/faultstates - Global transitions:
to_fault,reset - Optional state recovery (
recover__state) - Async task spawning and cancellation
- Timeouts and auto-fault handling
- Transition history recording with timestamps + durations
- Guard conditions for blocking transitions
- Global state change callbacks for logging/MQTT
- Optional RPC bundle for exposing state machine via Connect RPCs
🧠 Concepts & Overview
This library uses a declarative domain-specific language (DSL) to define state machines in a readable, strongly typed way.
- State → A leaf node in the state machine
- StateGroup → Groups related states, creating hierarchical namespaces
- Trigger → Named events that initiate transitions
Example:
class MyStates(StateGroup):
idle: State = State()
working: State = State()
class Triggers:
begin = Trigger("begin")
finish = Trigger("finish")
TRANSITIONS = [
Triggers.finish.transition(MyStates.working, MyStates.idle),
]
Base States and Triggers
All machines include:
States:
ready(initial)fault(global error)
Triggers:
start,to_fault,reset
from state_machine.core import BaseStates, BaseTriggers
state_machine.trigger(BaseTriggers.RESET.value)
assert state_machine.state == BaseStates.READY.value
⚙️ Installation & Setup
pip install vention-state-machine
Optional dependencies:
- Graphviz (required for diagram generation)
- vention-communication (for RPC bundle integration)
Install optional tools:
MacOS:
brew install graphviz
pip install vention-communication
Linux (Debian/Ubuntu)
sudo apt-get install graphviz
pip install vention-communication
🚀 Quickstart Tutorial
1. Define States and Triggers
from state_machine.defs import StateGroup, State, Trigger
class Running(StateGroup):
picking: State = State()
placing: State = State()
homing: State = State()
class States:
running = Running()
class Triggers:
start = Trigger("start")
finished_picking = Trigger("finished_picking")
finished_placing = Trigger("finished_placing")
finished_homing = Trigger("finished_homing")
to_fault = Trigger("to_fault")
reset = Trigger("reset")
2. Define Transitions
TRANSITIONS = [
Triggers.start.transition("ready", States.running.picking),
Triggers.finished_picking.transition(States.running.picking, States.running.placing),
Triggers.finished_placing.transition(States.running.placing, States.running.homing),
Triggers.finished_homing.transition(States.running.homing, States.running.picking),
]
3. Implement Your State Machine
from state_machine.core import StateMachine
from state_machine.decorators import on_enter_state, auto_timeout, guard, on_state_change
class CustomMachine(StateMachine):
def __init__(self):
super().__init__(states=States, transitions=TRANSITIONS)
@on_enter_state(States.running.picking)
@auto_timeout(5.0, Triggers.to_fault)
def enter_picking(self, _):
print("🔹 Entering picking")
@on_enter_state(States.running.placing)
def enter_placing(self, _):
print("🔸 Entering placing")
@on_enter_state(States.running.homing)
def enter_homing(self, _):
print("🔺 Entering homing")
@guard(Triggers.reset)
def check_safety_conditions(self) -> bool:
return not self.estop_pressed
@on_state_change
def publish_state_to_mqtt(self, old_state: str, new_state: str, trigger: str):
mqtt_client.publish("machine/state", {
"old_state": old_state,
"new_state": new_state,
"trigger": trigger
})
4. Start It
state_machine = StateMachine()
state_machine.start()
🛠 How-to Guides
Expose Over RPC with VentionApp
from communication.app import VentionApp
from state_machine.vention_communication import build_state_machine_bundle
from state_machine.core import StateMachine
state_machine = StateMachine(...)
state_machine.start()
app = VentionApp(name="MyApp")
bundle = build_state_machine_bundle(state_machine)
app.register_rpc_plugin(bundle)
app.finalize()
RPC Actions:
GetState→ Returns current state and last known stateGetHistory→ Returns transition history with timestampsTrigger_<TriggerName>→ Triggers a state transition (e.g.,Trigger_Start,Trigger_Activate)
Options:
# Customize which actions are included
bundle = build_state_machine_bundle(
state_machine,
include_state_actions=True, # Include GetState
include_history_action=True, # Include GetHistory
triggers=["start", "activate"], # Only include specific triggers
)
Timeout Example
@auto_timeout(5.0, Triggers.to_fault)
def enter_state(self, _):
...
Recovery Example
state_machine = StateMachine(enable_last_state_recovery=True)
state_machine.start() # will attempt recover__{last_state}
Triggering state transitions via I/O
Here's an example of hooking up state transitions to I/O events via MQTT
import asyncio
import paho.mqtt.client as mqtt
from state_machine.core import StateMachine
from state_machine.defs import State, StateGroup, Trigger
from state_machine.decorators import on_enter_state
class MachineStates(StateGroup):
idle: State = State()
running: State = State()
class States:
machine = MachineStates()
class Triggers:
start_button = Trigger("start_button")
box_missing = Trigger("box_missing")
TRANSITIONS = [
Triggers.start_button.transition(States.machine.idle, States.machine.running),
Triggers.box_missing.transition(States.machine.running, States.machine.idle),
]
class MachineController(StateMachine):
def __init__(self):
super().__init__(states=States, transitions=TRANSITIONS)
self.mqtt_client = mqtt.Client()
self.setup_mqtt()
def setup_mqtt(self):
"""Configure MQTT client to listen for I/O signals."""
self.mqtt_client.on_connect = self.on_mqtt_connect
self.mqtt_client.on_message = self.on_mqtt_message
self.mqtt_client.connect("localhost", 1883, 60)
# Start MQTT loop in background
self.spawn(self.mqtt_loop())
async def mqtt_loop(self):
"""Background task to handle MQTT messages."""
self.mqtt_client.loop_start()
while True:
await asyncio.sleep(0.1)
def on_mqtt_connect(self, client, userdata, flags, rc):
"""Subscribe to I/O topics when connected."""
client.subscribe("machine/io/start_button")
client.subscribe("machine/sensors/box_sensor")
def on_mqtt_message(self, client, userdata, msg):
"""Handle incoming MQTT messages and trigger state transitions."""
topic = msg.topic
payload = msg.payload.decode()
# Map MQTT topics to state machine triggers
if topic == "machine/io/start_button" and payload == "pressed":
self.trigger(Triggers.start_button.value)
elif topic == "machine/sensors/box_sensor" and payload == "0":
self.trigger(Triggers.box_missing.value)
@on_enter_state(States.machine.running)
def enter_running(self, _):
print("🔧 Machine started - processing parts")
self.mqtt_client.publish("machine/status", "running")
@on_enter_state(States.machine.idle)
def enter_idle(self, _):
print("⏸️ Machine idle - ready for start")
self.mqtt_client.publish("machine/status", "idle")
📖 API Reference
StateMachine
class StateMachine(HierarchicalGraphMachine):
def __init__(
self,
states: Union[object, list[dict[str, Any]], None],
*,
transitions: Optional[list[dict[str, str]]] = None,
history_size: Optional[int] = None,
enable_last_state_recovery: bool = True,
**kw: Any,
)
Parameters:
states: Either a container of StateGroups or a list of state dicts.transitions: List of transition dictionaries, or[].history_size: Max number of entries in transition history (default 1000).enable_last_state_recovery: If True, machine can resume from last recorded state.
Methods
spawn(coro: Coroutine) -> asyncio.Task
Start a background coroutine and track it. Auto-cancelled on fault/reset.
cancel_tasks() -> None
Cancel all tracked tasks and timeouts.
set_timeout(state_name: str, seconds: float, trigger_fn: Callable[[], str]) -> None
Schedule a trigger if state_name stays active too long.
record_last_state() -> None
Save current state for recovery.
get_last_state() -> Optional[str]
Return most recently recorded state.
start() -> None
Enter machine (recover__... if applicable, else start).
Properties
history -> list[dict[str, Any]]
Full transition history with timestamps/durations.
get_last_history_entries(n: int) -> list[dict[str, Any]]
Return last n transitions.
Decorators
@on_enter_state(state: State)
Bind function to run on entry.
@on_exit_state(state: State)
Bind function to run on exit.
@auto_timeout(seconds: float, trigger: Trigger)
Auto-trigger if timeout expires.
@guard(*triggers: Trigger)
Guard transition; blocks if function returns False.
@on_state_change
Global callback (old_state, new_state, trigger) fired after each transition.
RPC Bundle
def build_state_machine_bundle(
sm: StateMachine,
*,
include_state_actions: bool = True,
include_history_action: bool = True,
triggers: Optional[Sequence[str]] = None,
) -> RpcBundle
Builds an RPC bundle exposing the state machine via Connect-style RPCs:
GetState- Returns current and last known stateGetHistory- Returns transition historyTrigger_<TriggerName>- One RPC per trigger (PascalCase naming)
The bundle can be registered with a VentionApp using app.register_rpc_plugin(bundle).
🔍 Troubleshooting & FAQ
- Transitions blocked unexpectedly → Check guard conditions.
- Callbacks not firing → Only successful transitions trigger them.
- State not restored after restart → Ensure
enable_last_state_recovery=True. - RPC actions not available → Ensure
app.finalize()is called after registering bundles.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file vention_state_machine-0.4.7.tar.gz.
File metadata
- Download URL: vention_state_machine-0.4.7.tar.gz
- Upload date:
- Size: 14.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.1 CPython/3.10.12 Linux/6.14.0-1017-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
48422ecf996bb19191646f13d124f2a563aae6e13c1fcb945c3d046ddb75ebca
|
|
| MD5 |
5379d62a052e9b9bbec3cc9f8f35d180
|
|
| BLAKE2b-256 |
6c5a07ed16f09f614d2e259a429e33f0603ab24b20a7fd6d5a4eb2ecc598e844
|
File details
Details for the file vention_state_machine-0.4.7-py3-none-any.whl.
File metadata
- Download URL: vention_state_machine-0.4.7-py3-none-any.whl
- Upload date:
- Size: 15.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.1 CPython/3.10.12 Linux/6.14.0-1017-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9b2f55b6d203ff94ca41aa1c04a40ab8750ea8baa01dfc82c11bf6d0281624e9
|
|
| MD5 |
52ba3c309be0573962ff576d4e346fa6
|
|
| BLAKE2b-256 |
53fb1ce642452d0fda52aaa2b4d377f8e32d66a72db24614808c41eb5291fda2
|