Skip to main content

Declarative state machine framework for machine apps

Project description

vention-state-machine

A lightweight wrapper around transitions for building async-safe, recoverable hierarchical state machines with minimal boilerplate.

Table of Contents

✨ Features

  • Built-in ready / fault states
  • Global transitions: to_fault, reset
  • Optional state recovery (recover__state)
  • Async task spawning and cancellation
  • Timeouts and auto-fault handling
  • Transition history recording with timestamps + durations
  • Guard conditions for blocking transitions
  • Global state change callbacks for logging/MQTT
  • Optional RPC bundle for exposing state machine via Connect RPCs

🧠 Concepts & Overview

This library uses a declarative domain-specific language (DSL) to define state machines in a readable, strongly typed way.

  • State → A leaf node in the state machine
  • StateGroup → Groups related states, creating hierarchical namespaces
  • Trigger → Named events that initiate transitions

Example:

class MyStates(StateGroup):
    idle: State = State()
    working: State = State()

class Triggers:
    begin = Trigger("begin")
    finish = Trigger("finish")

TRANSITIONS = [
    Triggers.finish.transition(MyStates.working, MyStates.idle),
]

Base States and Triggers

All machines include:

States:

  • ready (initial)
  • fault (global error)

Triggers:

  • start, to_fault, reset
from state_machine.core import BaseStates, BaseTriggers

state_machine.trigger(BaseTriggers.RESET.value)
assert state_machine.state == BaseStates.READY.value

⚙️ Installation & Setup

pip install vention-state-machine

Optional dependencies:

  • Graphviz (required for diagram generation)
  • vention-communication (for RPC bundle integration)

Install optional tools:

MacOS:

brew install graphviz
pip install vention-communication

Linux (Debian/Ubuntu)

sudo apt-get install graphviz
pip install vention-communication

🚀 Quickstart Tutorial

1. Define States and Triggers

from state_machine.defs import StateGroup, State, Trigger

class Running(StateGroup):
    picking: State = State()
    placing: State = State()
    homing: State = State()

class States:
    running = Running()

class Triggers:
    start = Trigger("start")
    finished_picking = Trigger("finished_picking")
    finished_placing = Trigger("finished_placing")
    finished_homing = Trigger("finished_homing")
    to_fault = Trigger("to_fault")
    reset = Trigger("reset")

2. Define Transitions

TRANSITIONS = [
    Triggers.start.transition("ready", States.running.picking),
    Triggers.finished_picking.transition(States.running.picking, States.running.placing),
    Triggers.finished_placing.transition(States.running.placing, States.running.homing),
    Triggers.finished_homing.transition(States.running.homing, States.running.picking),
]

3. Implement Your State Machine

from state_machine.core import StateMachine
from state_machine.decorators import on_enter_state, auto_timeout, guard, on_state_change

class CustomMachine(StateMachine):
    def __init__(self):
        super().__init__(states=States, transitions=TRANSITIONS)

    @on_enter_state(States.running.picking)
    @auto_timeout(5.0, Triggers.to_fault)
    def enter_picking(self, _):
        print("🔹 Entering picking")

    @on_enter_state(States.running.placing)
    def enter_placing(self, _):
        print("🔸 Entering placing")

    @on_enter_state(States.running.homing)
    def enter_homing(self, _):
        print("🔺 Entering homing")

    @guard(Triggers.reset)
    def check_safety_conditions(self) -> bool:
        return not self.estop_pressed

    @on_state_change
    def publish_state_to_mqtt(self, old_state: str, new_state: str, trigger: str):
        mqtt_client.publish("machine/state", {
            "old_state": old_state,
            "new_state": new_state,
            "trigger": trigger
        })

4. Start It

state_machine = StateMachine()
state_machine.start()

🛠 How-to Guides

Expose Over RPC with VentionApp

from communication.app import VentionApp
from state_machine.vention_communication import build_state_machine_bundle
from state_machine.core import StateMachine

state_machine = StateMachine(...)
state_machine.start()

app = VentionApp(name="MyApp")
bundle = build_state_machine_bundle(state_machine)
app.register_rpc_plugin(bundle)
app.finalize()

RPC Actions:

  • GetState → Returns current state and last known state
  • GetHistory → Returns transition history with timestamps
  • Trigger_<TriggerName> → Triggers a state transition (e.g., Trigger_Start, Trigger_Activate)

Options:

# Customize which actions are included
bundle = build_state_machine_bundle(
    state_machine,
    include_state_actions=True,  # Include GetState
    include_history_action=True,  # Include GetHistory
    triggers=["start", "activate"],  # Only include specific triggers
)

Timeout Example

@auto_timeout(5.0, Triggers.to_fault)
def enter_state(self, _):
    ...

Recovery Example

state_machine = StateMachine(enable_last_state_recovery=True)
state_machine.start()  # will attempt recover__{last_state}

Triggering state transitions via I/O

Here's an example of hooking up state transitions to I/O events via MQTT

import asyncio
import paho.mqtt.client as mqtt
from state_machine.core import StateMachine
from state_machine.defs import State, StateGroup, Trigger
from state_machine.decorators import on_enter_state

class MachineStates(StateGroup):
    idle: State = State()
    running: State = State()

class States:
    machine = MachineStates()

class Triggers:
    start_button = Trigger("start_button")
    box_missing = Trigger("box_missing")

TRANSITIONS = [
    Triggers.start_button.transition(States.machine.idle, States.machine.running),
    Triggers.box_missing.transition(States.machine.running, States.machine.idle),
]

class MachineController(StateMachine):
    def __init__(self):
        super().__init__(states=States, transitions=TRANSITIONS)
        self.mqtt_client = mqtt.Client()
        self.setup_mqtt()

    def setup_mqtt(self):
        """Configure MQTT client to listen for I/O signals."""
        self.mqtt_client.on_connect = self.on_mqtt_connect
        self.mqtt_client.on_message = self.on_mqtt_message
        self.mqtt_client.connect("localhost", 1883, 60)
        
        # Start MQTT loop in background
        self.spawn(self.mqtt_loop())

    async def mqtt_loop(self):
        """Background task to handle MQTT messages."""
        self.mqtt_client.loop_start()
        while True:
            await asyncio.sleep(0.1)

    def on_mqtt_connect(self, client, userdata, flags, rc):
        """Subscribe to I/O topics when connected."""
        client.subscribe("machine/io/start_button")
        client.subscribe("machine/sensors/box_sensor")

    def on_mqtt_message(self, client, userdata, msg):
        """Handle incoming MQTT messages and trigger state transitions."""
        topic = msg.topic
        payload = msg.payload.decode()
        
        # Map MQTT topics to state machine triggers
        if topic == "machine/io/start_button" and payload == "pressed":
            self.trigger(Triggers.start_button.value)
        elif topic == "machine/sensors/box_sensor" and payload == "0":
            self.trigger(Triggers.box_missing.value)

    @on_enter_state(States.machine.running)
    def enter_running(self, _):
        print("🔧 Machine started - processing parts")
        self.mqtt_client.publish("machine/status", "running")

    @on_enter_state(States.machine.idle)
    def enter_idle(self, _):
        print("⏸️ Machine idle - ready for start")
        self.mqtt_client.publish("machine/status", "idle")

📖 API Reference

StateMachine

class StateMachine(HierarchicalGraphMachine):
    def __init__(
        self,
        states: Union[object, list[dict[str, Any]], None],
        *,
        transitions: Optional[list[dict[str, str]]] = None,
        history_size: Optional[int] = None,
        enable_last_state_recovery: bool = True,
        **kw: Any,
    )

Parameters:

  • states: Either a container of StateGroups or a list of state dicts.
  • transitions: List of transition dictionaries, or [].
  • history_size: Max number of entries in transition history (default 1000).
  • enable_last_state_recovery: If True, machine can resume from last recorded state.

Methods

spawn(coro: Coroutine) -> asyncio.Task Start a background coroutine and track it. Auto-cancelled on fault/reset.

cancel_tasks() -> None Cancel all tracked tasks and timeouts.

set_timeout(state_name: str, seconds: float, trigger_fn: Callable[[], str]) -> None Schedule a trigger if state_name stays active too long.

record_last_state() -> None Save current state for recovery.

get_last_state() -> Optional[str] Return most recently recorded state.

start() -> None Enter machine (recover__... if applicable, else start).

Properties

history -> list[dict[str, Any]] Full transition history with timestamps/durations.

get_last_history_entries(n: int) -> list[dict[str, Any]] Return last n transitions.

Decorators

@on_enter_state(state: State) Bind function to run on entry.

@on_exit_state(state: State) Bind function to run on exit.

@auto_timeout(seconds: float, trigger: Trigger) Auto-trigger if timeout expires.

@guard(*triggers: Trigger) Guard transition; blocks if function returns False.

@on_state_change Global callback (old_state, new_state, trigger) fired after each transition.

RPC Bundle

def build_state_machine_bundle(
    sm: StateMachine,
    *,
    include_state_actions: bool = True,
    include_history_action: bool = True,
    triggers: Optional[Sequence[str]] = None,
) -> RpcBundle

Builds an RPC bundle exposing the state machine via Connect-style RPCs:

  • GetState - Returns current and last known state
  • GetHistory - Returns transition history
  • Trigger_<TriggerName> - One RPC per trigger (PascalCase naming)

The bundle can be registered with a VentionApp using app.register_rpc_plugin(bundle).

🔍 Troubleshooting & FAQ

  • Transitions blocked unexpectedly → Check guard conditions.
  • Callbacks not firing → Only successful transitions trigger them.
  • State not restored after restart → Ensure enable_last_state_recovery=True.
  • RPC actions not available → Ensure app.finalize() is called after registering bundles.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

vention_state_machine-0.4.tar.gz (15.0 kB view details)

Uploaded Source

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

vention_state_machine-0.4.0-py3-none-any.whl (15.6 kB view details)

Uploaded Python 3

vention_state_machine-0.4-py3-none-any.whl (15.6 kB view details)

Uploaded Python 3

File details

Details for the file vention_state_machine-0.4.tar.gz.

File metadata

  • Download URL: vention_state_machine-0.4.tar.gz
  • Upload date:
  • Size: 15.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.10.12 Linux/6.11.0-1018-azure

File hashes

Hashes for vention_state_machine-0.4.tar.gz
Algorithm Hash digest
SHA256 ebd9462435d818f901bcc7ea4bff0b6c5d5b280d03b0602c82a876427f7f0c1c
MD5 e5d747bae154d342807060e3fef2e297
BLAKE2b-256 ab623f5d465a99954a871657f18df8747ad259c8950f629dc9efb0b38db2215f

See more details on using hashes here.

File details

Details for the file vention_state_machine-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: vention_state_machine-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 15.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.10.12 Linux/6.11.0-1018-azure

File hashes

Hashes for vention_state_machine-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 86d4388f50aa48906a5db0415ec93de4757fae79cbe2777ec82d7c35b33b7522
MD5 49a4f8d1462b8a11988fe90b2b2c07c8
BLAKE2b-256 88428b48b1ef265f25541cff38d88e9359c0f8f1155838f2cd76da66630a27e9

See more details on using hashes here.

File details

Details for the file vention_state_machine-0.4-py3-none-any.whl.

File metadata

  • Download URL: vention_state_machine-0.4-py3-none-any.whl
  • Upload date:
  • Size: 15.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.10.12 Linux/6.11.0-1018-azure

File hashes

Hashes for vention_state_machine-0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 f0ed2c7dc95070ee5fb4540cabd7acac0a836e2d6c44d6fda1336e7a8708b351
MD5 175142333888323e47ec8257b9743202
BLAKE2b-256 1f1f799172f5e55e0e74f7a64eb616aa964b12b8077d7be114b0c696c0cbfc3a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page