Skip to main content

Python framework for IoT applications

Project description

ECUframework

ECUframework (Electronic Central Unit) was born with the intent to simplify and speed up the process of writing code of IoT applications, making the developer avoid tedious and repetitive operations. It offer a solid and consolidated structure that lays the foundation for your application. The goal of the framework is to introduce as many people as possible to the world of IoT making the writing of an application accessible, legible, and fun.

For further information about ECUframework read the wiki

Documentation

https://github.com/tommasoviciani/ecuframework/wiki

Installation

PyPI: https://pypi.org/project/ecuframework/

pip install ecuframework

Example

from enum import Enum

from ecuframework.job import Job
from ecuframework.mcu import Mcu
from ecuframework.module import Module


class JobGoal(Enum):
    GOAL1 = 1
    GOAL2 = 2


class Module1(Module):

    # The pattern object must always be present in our modules because it stores our decorated methods
    mp = Module.Pattern()

    # The tag is a string that identifies a specific module for distributing jobs
    tag = 'm1'

    def __init__(self):
        # Call the constructor of the Module class. As arguments, it accepts the tag of the module that we are going
        # to initialize, and the instance of module needed to reach the jobs of the other modules.
        # Always put on top of the code
        super().__init__(instance=self, tag=self.tag)

        # Always make sure to call this method after each call to the Module class constructor.
        # Record the module pattern
        self.controller.register_pattern(self.mp)

        # Some variables of example
        self.count = 2
        self.recevied_count = 0

    @mp.setup()
    def setup(self):
        """
        The method decorated with @setup() will be executed only once when starting our module. It is used to configure our
        variables
        """
        print(f'{self.tag}: setup')

    @mp.on_incoming_data()
    def on_incoming_data(self, job):
        """
        The method decorated with @on_incoming_data() will be called when a new job is available in the queue. When
        we use this decorator, there must be an input job which is precisely the one obtained from the queue
        """
        print(f'{self.tag}: received a new job')

        # Call this method to execute the job just obtained from the queue
        self.controller.run_job(job)

    @mp.timer(name='timer', interval=5)
    def timer(self):
        """
        The method decorated with @timer() is a subprocess which will be executed cyclically. There can be multiple timers in
        a module and each of them needs a name to distinguish one from the other and a time interval expressed in seconds
        """
        print(f'{self.tag}[timer]: send job')
        self.controller.send_job(
            Job(data={'x': 'module1'}, goal=JobGoal.GOAL1, producer=self.tag, recipient='m2'))

    @mp.main_loop(interval=1)
    def main_loop(self):
        """
        The method decorated with @main_loop() is the subprocess that continues to run indefinitely until the module process
        is killed. It is performed cyclically and is the last decorated method that is called
        """
        print(f'{self.tag}: count = {self.count + self.recevied_count}')
        self.count += self.recevied_count

    @mp.solve(JobGoal.GOAL2)
    def solve_goal1(self, job):
        """
        The method decorated with @solve() defines how the module should behave when a given job with the usual JobGoal passed
        as input to the decorator is executed by the run_job() method of the controller.
        The decorated method should, therefore, accept this job as an argument
        """
        print(f'{self.tag}: Received {job.data} from {job.producer}')
        self.recevied_count = job.data['x']


class Module2(Module):

    # The pattern object must always be present in our modules because it stores our decorated methods
    mp = Module.Pattern()

    # The tag is a string that identifies a specific module for distributing jobs
    tag = 'm2'

    def __init__(self):
        # Call the constructor of the Module class. As arguments, it accepts the tag of the module that we are going
        # to initialize, and the instance of module needed to reach the jobs of the other modules.
        # Always put on top of the code
        super().__init__(instance=self, tag=self.tag)

        # Always make sure to call this method after each call to the Module class constructor.
        # Record the module pattern
        self.controller.register_pattern(self.mp)

        # Some variables of example
        self.count = 0

    @mp.setup()
    def setup(self):
        """
        The method decorated with @setup() will be executed only once when starting our module. It is used to configure our
        variables
        """
        print(f'{self.tag}: setup')

    @mp.on_incoming_data()
    def on_incoming_data(self, job):
        """
        The method decorated with @on_incoming_data() will be called when a new job is available in the queue. When
        we use this decorator, there must be an input job which is precisely the one obtained from the queue
        """

        # Call this method to execute the job just obtained from the queue
        self.controller.run_job(job)

    @mp.timer(name='timer', interval=8)
    def timer(self):
        """
        The method decorated with @timer() is a subprocess which will be executed cyclically. There can be multiple timers in
        a module and each of them needs a name to distinguish one from the other and a time interval expressed in seconds
        """
        print(f'{self.tag}[timer]: send job')
        self.controller.send_job(
            Job(data={'x': 2}, goal=JobGoal.GOAL2, producer=self.tag, recipient='m1'))

    @mp.solve(JobGoal.GOAL1)
    def solve_goal2(self, job):
        """
        The method decorated with @solve() defines how the module should behave when a given job with the usual JobGoal passed
        as input to the decorator is executed by the run_job() method of the controller.
        The decorated method should, therefore, accept this job as an argument
        """
        print(f'{self.tag}: Received {job.data} from {job.producer}')


class MyMcu(Mcu):

    # The pattern object must always be present in our MCU because it stores our decorated methods
    mp = Mcu.Pattern()

    tag = 'mcu'

    def __init__(self):
        # Call to the constructor of the MCU class. As arguments, it accepts the name of the MCU that we are going
        # to initialize. Always put on top of the code
        super().__init__(instance=self, tag=self.tag)

        # Always make sure to call this method after each call to the MCU class constructor.
        # Record the MCU pattern
        self.controller.register_pattern(self.mp)

    @mp.on_receiver()
    def on_receiver(self, job):
        """
        The method decorated with @on_receiver() as a receiver has as input the job sent by the sending module
        """
        if job is None:
            return
        self.shared_queue.put(job)

    @mp.assigning_job()
    def assigning_job(self, job):
        """
        The method decorated with @assinging_job() is called when a job shows up in the queue. In a nutshell, this decorated
        method is used to distribute shared queue jobs to modules
        """

        # Here the recipient module is obtained to which the job in question must be assigned
        module_recipient_result = self.controller.get_recipient_module(lambda module_recipient: module_recipient.tag == job.recipient)

        module_recipient_result.queue.put(job) if module_recipient_result else print(
            f'{self.tag}: the {job.target} destination of the job {job.goal} is unreachable')


if __name__ == '__main__':
    print('Start')
    mcu = MyMcu()
    mcu.register_modules([Module1(), Module2()])
    mcu.start()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ecuframework-0.0.4.tar.gz (7.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ecuframework-0.0.4-py3-none-any.whl (20.5 kB view details)

Uploaded Python 3

File details

Details for the file ecuframework-0.0.4.tar.gz.

File metadata

  • Download URL: ecuframework-0.0.4.tar.gz
  • Upload date:
  • Size: 7.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.20.1 setuptools/46.4.0 requests-toolbelt/0.9.1 tqdm/4.46.0 CPython/3.7.3

File hashes

Hashes for ecuframework-0.0.4.tar.gz
Algorithm Hash digest
SHA256 3add68007531d9437c42cb2bd2581459465e260f611a84c315462dbd8cffce40
MD5 26954812aaa8a121704f727250a6e07a
BLAKE2b-256 b1b8effcb20fed0913a6e40169cf2a046ddc90887aada1faeabaa7e87953931c

See more details on using hashes here.

File details

Details for the file ecuframework-0.0.4-py3-none-any.whl.

File metadata

  • Download URL: ecuframework-0.0.4-py3-none-any.whl
  • Upload date:
  • Size: 20.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.20.1 setuptools/46.4.0 requests-toolbelt/0.9.1 tqdm/4.46.0 CPython/3.7.3

File hashes

Hashes for ecuframework-0.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 96caa4cda2577a161811b6042bd8bcbd91e2d1355c8d5fece90fd4019ff06d05
MD5 543c1bb32e40cb8714426944c371540e
BLAKE2b-256 ea26bb34659a1690121d7632aecb453f8598ebde8d7688fc0e9048297e170fc5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page