Skip to main content

A framework for a responsive heirarchical state-machine

Project description

Pyrarchical State Machine

Pyrarchical state machine is a framework for a responsive heirarchical state-machine. An HSM built with this framework provides callbacks to execute functionality as the HSM responds to dispatched events. The PHSM loosely follows a hybrid dynamic system model allowing for a persistant analog state held by the HSM in addition to discrete state (which may be heirarchical). Your HSM is constructed through inheritance of the base functionality.

The simplest two-state model with no heirarchy is shown here with more complete examples later on.

from src.pyrarchical import hsm


class TwoState(hsm.HSM):
    top = hsm.State("top")
    s1 = hsm.State("s1")
    s2 = hsm.State("s2")

    def __init__(self):
        self.x.append(0)
        self.top.configure(parent=None,
                           init=self.s1,
                           transitions=(hsm.Transition(event=Events.RESET,
                                                       target=self.s1),))
        self.s1.configure(parent=self.top,
                          transitions=(hsm.Transition(event=Events.FLIP,
                                                      target=self.s2),))
        self.s2.configure(parent=self.top,
                          transitions=(hsm.Transition(event=Events.FLOP,
                                                      target=self.s1),))
        super().__init__(self.top)


class Events(hsm.Event):
    FLIP = "flip"
    FLOP = "flop"
    RESET = "reset"


if __name__ == "__main__":
    import logging

    logging.basicConfig(level=logging.INFO)
    flip_flop = TwoState()
    flip_flop.dispatch(Events.FLIP)
    flip_flop.dispatch(Events.FLOP)
    flip_flop.dispatch(Events.FLIP)
    flip_flop.dispatch(Events.RESET)

Description

This module provides a framework for a responsive heirarchical state-machine.

For hybrid dynamical systems, the following update equations are commonly used.

x[k+1] = f_s(x[k], u(k]))
y[k] = g_s(x[k], u(k]))

where f and g are functions specific to each discrete state s, x is a continuous state that exists in all discrete states, u is a set of inputs, and y is the output of the system.

In a Python HSM implementation, only the during state will really obey the discrete-time step, k. However, we may need to update states during other HSM actions.

x = init_s(x, u)
x = entry_s(x, u)
x = during_s(x, u)
x = exit_s(x, u)
y = output_s(x, u)

The continuous state, x, is a member variable of the HSM. The output function is a static method. The input, continuous state, and output each need an object defined to describe them. At the simplest, this will just be a list.

Getting Started

The only file needed to use the state-machine is hsm.py. There is currently no installation. Copy the file to your active project to use it.

Prerequisites

hsm.py is only implemented in Python3 using standard Python libraries (enum and logging).

Installing

git clone git@gitlab.com:roelle/pyrarchical-state-machine.git
cp pyrarchical-state-machine/hsm.py ../your-project/hsm.py

Running the tests

Running hsm.py will run an example. Continupous integration tests are run by executing hsm_test.py in the pyrarchical-state-machine directory.

Break down into end to end tests

To test, you need a heirarchincal state state-machine. The state machine shown in this figure is described in the following code. This particular implementation is based on an example offers a comprehensive set of tests of all types of transitions.

Canonical heirarchical-state machine with all transitions

from src.pyrarchical import hsm
from hashlib import md5


class S211(hsm.State):

    def default_exit_action(self, x, u):
        hsm._log.info(self._name + " : Exit")
        return x


class Events(hsm.Event):
    A = "a"
    B = "b"
    C = "c"
    D = "d"
    E = "e"
    F = "f"
    G = "g"
    H = "h"


class Example(hsm.HSM):
    s0 = hsm.State("s0")
    s1 = hsm.State("s1")
    s11 = hsm.State("s11")
    s2 = hsm.State("s2")
    s21 = hsm.State("s21")
    s211 = S211("s211")

    def __init__(self):
        self.x.append(md5())
        self.s0.configure(parent=None,
                          init=self.s1,
                          transitions=(hsm.Transition(event=Events.E,
                                                      target=self.s211),))
        self.s1.configure(parent=self.s0,
                          init=self.s11,
                          transitions=(hsm.Transition(event=Events.A,
                                                      target=self.s1),
                                       hsm.Transition(event=Events.B,
                                                      target=self.s11),
                                       hsm.Transition(event=Events.C,
                                                      target=self.s2),
                                       hsm.Transition(event=Events.D,
                                                      target=self.s0),
                                       hsm.Transition(event=Events.F,
                                                      target=self.s211)))
        self.s11.configure(parent=self.s1,
                           transitions=(hsm.Transition(event=Events.G,
                                                       target=self.s211),))
        self.s2.configure(parent=self.s0,
                          init=self.s21,
                          transitions=(hsm.Transition(event=Events.C,
                                                      target=self.s1),
                                       hsm.Transition(event=Events.F,
                                                      target=self.s11)))
        self.s21.configure(parent=self.s2,
                           init=self.s211,
                           transitions=(hsm.Transition(event=Events.B,
                                                       target=self.s211),
                                        hsm.Transition(event=Events.H,
                                                       target=self.s21)))
        self.s211.configure(parent=self.s21,
                            transitions=(hsm.Transition(event=Events.B,
                                                        target=self.s21),
                                         hsm.Transition(event=Events.D,
                                                        target=self.s21),
                                         hsm.Transition(event=Events.G,
                                                        target=self.s0),
                                         hsm.Transition(guard=self.s211_s11_guard,
                                                        target=self.s11)))

        self.s0.init_action = lambda x, u: self.update(self.x, b"s0_init")
        self.s0.entry_action = lambda x, u: self.update(self.x, b"s0_entry")
        self.s0.exit_action = lambda x, u: self.update(self.x, b"s0_exit")
        self.s0.during_action = lambda x, u: self.update(self.x, b"s0_during")

        self.s1.init_action = lambda x, u: self.update(self.x, b"s1_init")
        self.s1.entry_action = lambda x, u: self.update(self.x, b"s1_entry")
        self.s1.exit_action = lambda x, u: self.update(self.x, b"s1_exit")
        self.s1.during_action = lambda x, u: self.update(self.x, b"s1_during")

        self.s11.init_action = lambda x, u: self.update(self.x, b"s11_init")
        self.s11.entry_action = lambda x, u: self.update(self.x, b"s11_entry")
        self.s11.exit_action = lambda x, u: self.update(self.x, b"s11_exit")
        self.s11.during_action = lambda x, u: self.update(self.x, b"s11_during")

        self.s2.init_action = lambda x, u: self.update(self.x, b"s2_init")
        self.s2.entry_action = lambda x, u: self.update(self.x, b"s2_entry")
        self.s2.exit_action = lambda x, u: self.update(self.x, b"s2_exit")
        self.s2.during_action = lambda x, u: self.update(self.x, b"s2_during")

        self.s21.init_action = lambda x, u: self.update(self.x, b"s21_init")
        self.s21.entry_action = lambda x, u: self.update(self.x, b"s21_entry")
        self.s21.exit_action = lambda x, u: self.update(self.x, b"s21_exit")
        self.s21.during_action = lambda x, u: self.update(self.x, b"s21_during")

        self.s211.init_action = lambda x, u: self.update(self.x, b"s211_init")
        self.s211.entry_action = lambda x, u: self.update(self.x, b"s211_entry")
        self.s211.exit_action = lambda x, u: self.update(self.x, b"s211_exit")
        self.s211.during_action = lambda x, u: self.update(self.x, b"s211_during")

        super().__init__(self.s0)

    @staticmethod
    def s211_s11_guard(x, u):
        try:
            return u > 10.0
        except:
            return False

    @staticmethod
    def update(x, u):
        x[0].update(u)
        return x

This machine may be tested for any transition

test_hsm = Example()
test_hsm.x[0] = md5() # Reset the hsm continuous state
test_hsm.dispatch(Events.E)
x = [md5()]
x = Example.update(x, b"s11_exit")
x = Example.update(x, b"s1_exit")
x = Example.update(x, b"s2_entry")
x = Example.update(x, b"s21_entry")
x = Example.update(x, b"s211_entry")
x[0].digest == test_hsm.x[0].digest

Versioning

We use SemVer for versioning. For the versions available, see the tags on this repository.

Authors

  • Matt Roelle - Initial work - roelle

See also the list of contributors who participated in this project.

License

This project is licensed under the MIT License - see the LICENSE.md file for details

Acknowledgments

  • This README template was copied from PurpleBooth
  • Many discussions about heirarchical-state machines informed this work.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyrarchical-0.8.0.tar.gz (35.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyrarchical-0.8.0-py3-none-any.whl (28.0 kB view details)

Uploaded Python 3

File details

Details for the file pyrarchical-0.8.0.tar.gz.

File metadata

  • Download URL: pyrarchical-0.8.0.tar.gz
  • Upload date:
  • Size: 35.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.9

File hashes

Hashes for pyrarchical-0.8.0.tar.gz
Algorithm Hash digest
SHA256 4c82c8fa08705ce001c4d96e9a35915cace103516c64adc85ed703b6f3953463
MD5 2f3f18a8ed8dd8721b1fc817c99da2b1
BLAKE2b-256 5d7a32328ecd882a9ca9e8434a251ef0a5aff14c85746dd45d19e61d945336b1

See more details on using hashes here.

File details

Details for the file pyrarchical-0.8.0-py3-none-any.whl.

File metadata

  • Download URL: pyrarchical-0.8.0-py3-none-any.whl
  • Upload date:
  • Size: 28.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.9

File hashes

Hashes for pyrarchical-0.8.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8aee472460425ec0e97763a0a74ea5c2e1952ff5bed7f2c3314f96469c1984a5
MD5 702e3d6b04448d263e0b21be7a0bb7b7
BLAKE2b-256 e3378014f079fefbf4ae6eca37a8518cd75ad32a17ce2ae521bb973704b426df

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page