Skip to main content

A framework for a responsive heirarchical state-machine

Project description

Pyrarchical State Machine

Pyrarchical state machine is a framework for a responsive heirarchical state-machine. An HSM built with this framework provides callbacks to execute functionality as the HSM responds to dispatched events. The PHSM loosely follows a hybrid dynamic system model allowing for a persistant analog state held by the HSM in addition to discrete state (which may be heirarchical). Your HSM is constructed through inheritance of the base functionality.

The simplest two-state model with no heirarchy is shown here with more complete examples later on.

from src.pyrarchical import hsm


class TwoState(hsm.HSM):
    top = hsm.State("top")
    s1 = hsm.State("s1")
    s2 = hsm.State("s2")

    def __init__(self):
        self.x.append(0)
        self.top.configure(parent=None,
                           init=self.s1,
                           transitions=(hsm.Transition(event=Events.RESET,
                                                       target=self.s1),))
        self.s1.configure(parent=self.top,
                          transitions=(hsm.Transition(event=Events.FLIP,
                                                      target=self.s2),))
        self.s2.configure(parent=self.top,
                          transitions=(hsm.Transition(event=Events.FLOP,
                                                      target=self.s1),))
        super().__init__(self.top)


class Events(hsm.Event):
    FLIP = "flip"
    FLOP = "flop"
    RESET = "reset"


if __name__ == "__main__":
    import logging

    logging.basicConfig(level=logging.INFO)
    flip_flop = TwoState()
    flip_flop.dispatch(Events.FLIP)
    flip_flop.dispatch(Events.FLOP)
    flip_flop.dispatch(Events.FLIP)
    flip_flop.dispatch(Events.RESET)

Description

This module provides a framework for a responsive heirarchical state-machine.

For hybrid dynamical systems, the following update equations are commonly used.

x[k+1] = f_s(x[k], u(k]))
y[k] = g_s(x[k], u(k]))

where f and g are functions specific to each discrete state s, x is a continuous state that exists in all discrete states, u is a set of inputs, and y is the output of the system.

In a Python HSM implementation, only the during state will really obey the discrete-time step, k. However, we may need to update states during other HSM actions.

x = init_s(x, u)
x = entry_s(x, u)
x = during_s(x, u)
x = exit_s(x, u)
y = output_s(x, u)

The continuous state, x, is a member variable of the HSM. The output function is a static method. The input, continuous state, and output each need an object defined to describe them. At the simplest, this will just be a list.

Getting Started

The only file needed to use the state-machine is hsm.py. There is currently no installation. Copy the file to your active project to use it.

Prerequisites

hsm.py is only implemented in Python3 using standard Python libraries (enum and logging).

Installing

git clone git@gitlab.com:roelle/pyrarchical-state-machine.git
cp pyrarchical-state-machine/hsm.py ../your-project/hsm.py

Running the tests

Running hsm.py will run an example. Continupous integration tests are run by executing hsm_test.py in the pyrarchical-state-machine directory.

Break down into end to end tests

To test, you need a heirarchincal state state-machine. The state machine shown in this figure is described in the following code. This particular implementation is based on an example offers a comprehensive set of tests of all types of transitions.

Canonical heirarchical-state machine with all transitions

from src.pyrarchical import hsm
from hashlib import md5


class S211(hsm.State):

    def default_exit_action(self, x, u):
        hsm._log.info(self._name + " : Exit")
        return x


class Events(hsm.Event):
    A = "a"
    B = "b"
    C = "c"
    D = "d"
    E = "e"
    F = "f"
    G = "g"
    H = "h"


class Example(hsm.HSM):
    s0 = hsm.State("s0")
    s1 = hsm.State("s1")
    s11 = hsm.State("s11")
    s2 = hsm.State("s2")
    s21 = hsm.State("s21")
    s211 = S211("s211")

    def __init__(self):
        self.x.append(md5())
        self.s0.configure(parent=None,
                          init=self.s1,
                          transitions=(hsm.Transition(event=Events.E,
                                                      target=self.s211),))
        self.s1.configure(parent=self.s0,
                          init=self.s11,
                          transitions=(hsm.Transition(event=Events.A,
                                                      target=self.s1),
                                       hsm.Transition(event=Events.B,
                                                      target=self.s11),
                                       hsm.Transition(event=Events.C,
                                                      target=self.s2),
                                       hsm.Transition(event=Events.D,
                                                      target=self.s0),
                                       hsm.Transition(event=Events.F,
                                                      target=self.s211)))
        self.s11.configure(parent=self.s1,
                           transitions=(hsm.Transition(event=Events.G,
                                                       target=self.s211),))
        self.s2.configure(parent=self.s0,
                          init=self.s21,
                          transitions=(hsm.Transition(event=Events.C,
                                                      target=self.s1),
                                       hsm.Transition(event=Events.F,
                                                      target=self.s11)))
        self.s21.configure(parent=self.s2,
                           init=self.s211,
                           transitions=(hsm.Transition(event=Events.B,
                                                       target=self.s211),
                                        hsm.Transition(event=Events.H,
                                                       target=self.s21)))
        self.s211.configure(parent=self.s21,
                            transitions=(hsm.Transition(event=Events.B,
                                                        target=self.s21),
                                         hsm.Transition(event=Events.D,
                                                        target=self.s21),
                                         hsm.Transition(event=Events.G,
                                                        target=self.s0),
                                         hsm.Transition(guard=self.s211_s11_guard,
                                                        target=self.s11)))

        self.s0.init_action = lambda x, u: self.update(self.x, b"s0_init")
        self.s0.entry_action = lambda x, u: self.update(self.x, b"s0_entry")
        self.s0.exit_action = lambda x, u: self.update(self.x, b"s0_exit")
        self.s0.during_action = lambda x, u: self.update(self.x, b"s0_during")

        self.s1.init_action = lambda x, u: self.update(self.x, b"s1_init")
        self.s1.entry_action = lambda x, u: self.update(self.x, b"s1_entry")
        self.s1.exit_action = lambda x, u: self.update(self.x, b"s1_exit")
        self.s1.during_action = lambda x, u: self.update(self.x, b"s1_during")

        self.s11.init_action = lambda x, u: self.update(self.x, b"s11_init")
        self.s11.entry_action = lambda x, u: self.update(self.x, b"s11_entry")
        self.s11.exit_action = lambda x, u: self.update(self.x, b"s11_exit")
        self.s11.during_action = lambda x, u: self.update(self.x, b"s11_during")

        self.s2.init_action = lambda x, u: self.update(self.x, b"s2_init")
        self.s2.entry_action = lambda x, u: self.update(self.x, b"s2_entry")
        self.s2.exit_action = lambda x, u: self.update(self.x, b"s2_exit")
        self.s2.during_action = lambda x, u: self.update(self.x, b"s2_during")

        self.s21.init_action = lambda x, u: self.update(self.x, b"s21_init")
        self.s21.entry_action = lambda x, u: self.update(self.x, b"s21_entry")
        self.s21.exit_action = lambda x, u: self.update(self.x, b"s21_exit")
        self.s21.during_action = lambda x, u: self.update(self.x, b"s21_during")

        self.s211.init_action = lambda x, u: self.update(self.x, b"s211_init")
        self.s211.entry_action = lambda x, u: self.update(self.x, b"s211_entry")
        self.s211.exit_action = lambda x, u: self.update(self.x, b"s211_exit")
        self.s211.during_action = lambda x, u: self.update(self.x, b"s211_during")

        super().__init__(self.s0)

    @staticmethod
    def s211_s11_guard(x, u):
        try:
            return u > 10.0
        except:
            return False

    @staticmethod
    def update(x, u):
        x[0].update(u)
        return x

This machine may be tested for any transition

test_hsm = Example()
test_hsm.x[0] = md5() # Reset the hsm continuous state
test_hsm.dispatch(Events.E)
x = [md5()]
x = Example.update(x, b"s11_exit")
x = Example.update(x, b"s1_exit")
x = Example.update(x, b"s2_entry")
x = Example.update(x, b"s21_entry")
x = Example.update(x, b"s211_entry")
x[0].digest == test_hsm.x[0].digest

Versioning

We use SemVer for versioning. For the versions available, see the tags on this repository.

Authors

  • Matt Roelle - Initial work - roelle

See also the list of contributors who participated in this project.

License

This project is licensed under the MIT License - see the LICENSE.md file for details

Acknowledgments

  • This README template was copied from PurpleBooth
  • Many discussions about heirarchical-state machines informed this work.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyrarchical-0.7.8.tar.gz (33.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyrarchical-0.7.8-py3-none-any.whl (26.0 kB view details)

Uploaded Python 3

File details

Details for the file pyrarchical-0.7.8.tar.gz.

File metadata

  • Download URL: pyrarchical-0.7.8.tar.gz
  • Upload date:
  • Size: 33.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.9

File hashes

Hashes for pyrarchical-0.7.8.tar.gz
Algorithm Hash digest
SHA256 117c5b97edd0a1f378e8942f34fafe298d0b1a6a87cfc7c759e5f99853096188
MD5 0f0afcd1bbd239b3e81756ac6e93d51f
BLAKE2b-256 067df0473a05720d506afbc7d4aef8642952b3c50ca99c3f3304ec49472f346a

See more details on using hashes here.

File details

Details for the file pyrarchical-0.7.8-py3-none-any.whl.

File metadata

  • Download URL: pyrarchical-0.7.8-py3-none-any.whl
  • Upload date:
  • Size: 26.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.9

File hashes

Hashes for pyrarchical-0.7.8-py3-none-any.whl
Algorithm Hash digest
SHA256 c8b0a7b5070c39cd8dd7f2183a1f8a6019b104ba6c417a182d239f77cc51b2e0
MD5 aab6f88aafe52b807b150fb934a90159
BLAKE2b-256 a89bf3d01e0c3c9edab01fe1a4639d14309d0ff2717aa70132758a5878f29c09

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page