Skip to main content

A framework for a responsive heirarchical state-machine

Project description

Pyrarchical State Machine

Pyrarchical state machine is a framework for a responsive heirarchical state-machine. An HSM built with this framework provides callbacks to execute functionality as the HSM responds to dispatched events. The PHSM loosely follows a hybrid dynamic system model allowing for a persistant analog state held by the HSM in addition to discrete state (which may be heirarchical). Your HSM is constructed through inheritance of the base functionality.

The simplest two-state model with no heirarchy is shown here with more complete examples later on.

from src.pyrarchical import hsm


class TwoState(hsm.HSM):
    top = hsm.State("top")
    s1 = hsm.State("s1")
    s2 = hsm.State("s2")

    def __init__(self):
        self.x.append(0)
        self.top.configure(parent=None,
                           init=self.s1,
                           transitions=(hsm.Transition(event=Events.RESET,
                                                       target=self.s1),))
        self.s1.configure(parent=self.top,
                          transitions=(hsm.Transition(event=Events.FLIP,
                                                      target=self.s2),))
        self.s2.configure(parent=self.top,
                          transitions=(hsm.Transition(event=Events.FLOP,
                                                      target=self.s1),))
        super().__init__(self.top)


class Events(hsm.Event):
    FLIP = "flip"
    FLOP = "flop"
    RESET = "reset"


if __name__ == "__main__":
    import logging

    logging.basicConfig(level=logging.INFO)
    flip_flop = TwoState()
    flip_flop.dispatch(Events.FLIP)
    flip_flop.dispatch(Events.FLOP)
    flip_flop.dispatch(Events.FLIP)
    flip_flop.dispatch(Events.RESET)

Description

This module provides a framework for a responsive heirarchical state-machine.

For hybrid dynamical systems, the following update equations are commonly used.

x[k+1] = f_s(x[k], u(k]))
y[k] = g_s(x[k], u(k]))

where f and g are functions specific to each discrete state s, x is a continuous state that exists in all discrete states, u is a set of inputs, and y is the output of the system.

In a Python HSM implementation, only the during state will really obey the discrete-time step, k. However, we may need to update states during other HSM actions.

x = init_s(x, u)
x = entry_s(x, u)
x = during_s(x, u)
x = exit_s(x, u)
y = output_s(x, u)

The continuous state, x, is a member variable of the HSM. The output function is a static method. The input, continuous state, and output each need an object defined to describe them. At the simplest, this will just be a list.

Getting Started

The only file needed to use the state-machine is hsm.py. There is currently no installation. Copy the file to your active project to use it.

Prerequisites

hsm.py is only implemented in Python3 using standard Python libraries (enum and logging).

Installing

git clone git@gitlab.com:roelle/pyrarchical-state-machine.git
cp pyrarchical-state-machine/hsm.py ../your-project/hsm.py

Running the tests

Running hsm.py will run an example. Continupous integration tests are run by executing hsm_test.py in the pyrarchical-state-machine directory.

Break down into end to end tests

To test, you need a heirarchincal state state-machine. The state machine shown in this figure is described in the following code. This particular implementation is based on an example offers a comprehensive set of tests of all types of transitions.

Canonical heirarchical-state machine with all transitions

from src.pyrarchical import hsm
from hashlib import md5


class S211(hsm.State):

    def default_exit_action(self, x, u):
        hsm._log.info(self._name + " : Exit")
        return x


class Events(hsm.Event):
    A = "a"
    B = "b"
    C = "c"
    D = "d"
    E = "e"
    F = "f"
    G = "g"
    H = "h"


class Example(hsm.HSM):
    s0 = hsm.State("s0")
    s1 = hsm.State("s1")
    s11 = hsm.State("s11")
    s2 = hsm.State("s2")
    s21 = hsm.State("s21")
    s211 = S211("s211")

    def __init__(self):
        self.x.append(md5())
        self.s0.configure(parent=None,
                          init=self.s1,
                          transitions=(hsm.Transition(event=Events.E,
                                                      target=self.s211),))
        self.s1.configure(parent=self.s0,
                          init=self.s11,
                          transitions=(hsm.Transition(event=Events.A,
                                                      target=self.s1),
                                       hsm.Transition(event=Events.B,
                                                      target=self.s11),
                                       hsm.Transition(event=Events.C,
                                                      target=self.s2),
                                       hsm.Transition(event=Events.D,
                                                      target=self.s0),
                                       hsm.Transition(event=Events.F,
                                                      target=self.s211)))
        self.s11.configure(parent=self.s1,
                           transitions=(hsm.Transition(event=Events.G,
                                                       target=self.s211),))
        self.s2.configure(parent=self.s0,
                          init=self.s21,
                          transitions=(hsm.Transition(event=Events.C,
                                                      target=self.s1),
                                       hsm.Transition(event=Events.F,
                                                      target=self.s11)))
        self.s21.configure(parent=self.s2,
                           init=self.s211,
                           transitions=(hsm.Transition(event=Events.B,
                                                       target=self.s211),
                                        hsm.Transition(event=Events.H,
                                                       target=self.s21)))
        self.s211.configure(parent=self.s21,
                            transitions=(hsm.Transition(event=Events.B,
                                                        target=self.s21),
                                         hsm.Transition(event=Events.D,
                                                        target=self.s21),
                                         hsm.Transition(event=Events.G,
                                                        target=self.s0),
                                         hsm.Transition(guard=self.s211_s11_guard,
                                                        target=self.s11)))

        self.s0.init_action = lambda x, u: self.update(self.x, b"s0_init")
        self.s0.entry_action = lambda x, u: self.update(self.x, b"s0_entry")
        self.s0.exit_action = lambda x, u: self.update(self.x, b"s0_exit")
        self.s0.during_action = lambda x, u: self.update(self.x, b"s0_during")

        self.s1.init_action = lambda x, u: self.update(self.x, b"s1_init")
        self.s1.entry_action = lambda x, u: self.update(self.x, b"s1_entry")
        self.s1.exit_action = lambda x, u: self.update(self.x, b"s1_exit")
        self.s1.during_action = lambda x, u: self.update(self.x, b"s1_during")

        self.s11.init_action = lambda x, u: self.update(self.x, b"s11_init")
        self.s11.entry_action = lambda x, u: self.update(self.x, b"s11_entry")
        self.s11.exit_action = lambda x, u: self.update(self.x, b"s11_exit")
        self.s11.during_action = lambda x, u: self.update(self.x, b"s11_during")

        self.s2.init_action = lambda x, u: self.update(self.x, b"s2_init")
        self.s2.entry_action = lambda x, u: self.update(self.x, b"s2_entry")
        self.s2.exit_action = lambda x, u: self.update(self.x, b"s2_exit")
        self.s2.during_action = lambda x, u: self.update(self.x, b"s2_during")

        self.s21.init_action = lambda x, u: self.update(self.x, b"s21_init")
        self.s21.entry_action = lambda x, u: self.update(self.x, b"s21_entry")
        self.s21.exit_action = lambda x, u: self.update(self.x, b"s21_exit")
        self.s21.during_action = lambda x, u: self.update(self.x, b"s21_during")

        self.s211.init_action = lambda x, u: self.update(self.x, b"s211_init")
        self.s211.entry_action = lambda x, u: self.update(self.x, b"s211_entry")
        self.s211.exit_action = lambda x, u: self.update(self.x, b"s211_exit")
        self.s211.during_action = lambda x, u: self.update(self.x, b"s211_during")

        super().__init__(self.s0)

    @staticmethod
    def s211_s11_guard(x, u):
        try:
            return u > 10.0
        except:
            return False

    @staticmethod
    def update(x, u):
        x[0].update(u)
        return x

This machine may be tested for any transition

test_hsm = Example()
test_hsm.x[0] = md5() # Reset the hsm continuous state
test_hsm.dispatch(Events.E)
x = [md5()]
x = Example.update(x, b"s11_exit")
x = Example.update(x, b"s1_exit")
x = Example.update(x, b"s2_entry")
x = Example.update(x, b"s21_entry")
x = Example.update(x, b"s211_entry")
x[0].digest == test_hsm.x[0].digest

Versioning

We use SemVer for versioning. For the versions available, see the tags on this repository.

Authors

  • Matt Roelle - Initial work - roelle

See also the list of contributors who participated in this project.

License

This project is licensed under the MIT License - see the LICENSE.md file for details

Acknowledgments

  • This README template was copied from PurpleBooth
  • Many discussions about heirarchical-state machines informed this work.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyrarchical-0.7.9.tar.gz (33.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyrarchical-0.7.9-py3-none-any.whl (26.1 kB view details)

Uploaded Python 3

File details

Details for the file pyrarchical-0.7.9.tar.gz.

File metadata

  • Download URL: pyrarchical-0.7.9.tar.gz
  • Upload date:
  • Size: 33.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.9

File hashes

Hashes for pyrarchical-0.7.9.tar.gz
Algorithm Hash digest
SHA256 abc3b8a50b0a56b9b60daa92bc82547cefdee42ed3d8731bdf10def0e33c3564
MD5 11106fb83a60cba094e8a6d147ed1bc9
BLAKE2b-256 d508f2d0ebebbc3ecbfcff530ab8a8f810d03d44e5240e51988be01bb512929d

See more details on using hashes here.

File details

Details for the file pyrarchical-0.7.9-py3-none-any.whl.

File metadata

  • Download URL: pyrarchical-0.7.9-py3-none-any.whl
  • Upload date:
  • Size: 26.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.9

File hashes

Hashes for pyrarchical-0.7.9-py3-none-any.whl
Algorithm Hash digest
SHA256 f19e29e20d1726012208b7ff680da2c3c7b9baaaab26ba256ee6721f2118af54
MD5 a8f25f3eaeb96efe735e8e81fb88dcd4
BLAKE2b-256 dab90baba90f766abb1c0aa750cf6d0f60dbffc4f75dc73cd886ecbd43d408fd

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page