The ECU framework is a project that wants to simplify writing for Raspberry applications
Project description
ECUframework
Documentation: work in progress...
Example:
from enum import Enum
from ecuframework.job import Job
from ecuframework.mcu import Mcu, McuPattern
from ecuframework.module import Module, ModulePattern
class JobGoal(Enum):
GOAL1 = 1
GOAL2 = 2
class ModuleType(Enum):
MODULE1 = 1,
MODULE2 = 2
class Module1(Module):
mp = ModulePattern()
name = 'm1'
count = 2
recevied_count = 0
def __init__(self):
super().__init__(name=self.name, module_type=ModuleType.MODULE1)
self.controller.init(self, self.mp)
@mp.setup()
def setup(self):
print(f'{self.name}: setup')
@mp.on_incoming_data()
def on_incoming_data(self, job):
print(f'{self.name}: received a new job')
self.controller.run_job(job)
@mp.timer(name='timer1', interval=5)
def timer1(self):
print(f'{self.name}: send job from timer1')
self.controller.send_new_job(
Job(data={'x': 'module1'}, goal=JobGoal.GOAL1, producer=self.name, target=ModuleType.MODULE2))
@mp.solve(JobGoal.GOAL2)
def goal2(self, job):
print(f'{self.name}: Received {job.data} from {job.producer}')
self.recevied_count = job.data['x']
@mp.main_loop()
def main_loop(self):
print(f'{self.name}: count = {self.count + self.recevied_count}')
self.count += self.recevied_count
class Module2(Module):
mp = ModulePattern()
name = 'm2'
count = 0
def __init__(self):
super().__init__(name=self.name, module_type=ModuleType.MODULE2)
self.controller.init(self, self.mp)
@mp.setup()
def setup(self):
print(f'{self.name}: setup')
@mp.on_incoming_data()
def on_incoming_data(self, job):
self.controller.run_job(job)
@mp.timer(name='timer2', interval=8)
def timer1(self):
print(f'{self.name}: send job from timer2')
self.controller.send_new_job(
Job(data={'x': 2}, goal=JobGoal.GOAL2, producer=self.name, target=ModuleType.MODULE1))
@mp.solve(JobGoal.GOAL1)
def goal2(self, job):
print(f'{self.name}: Received {job.data} from {job.producer}')
class MyMcu(Mcu):
mp = McuPattern()
name = 'mcu'
def __init__(self):
super().__init__(name=self.name)
self.controller.init(self, self.mp)
@mp.on_receiver()
def on_receiver(self, job):
if job is None:
return
self.controller.shared_queue.put(job)
@mp.assigning_job()
def assigning_job(self, job):
module_target = self.controller.get_module_target(job.target)
module_target.controller.put_job(job) if module_target else print(
f'The {job.target} destination of the job {job.goal} is unreachable')
if __name__ == '__main__':
print('Start')
mcu = MyMcu()
mcu.register_modules([Module1(), Module2()])
mcu.start()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
ecuframework-0.0.3.tar.gz
(5.5 kB
view hashes)
Built Distribution
Close
Hashes for ecuframework-0.0.3-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 07a886875900ed3e7c34b75833bcd3916d551b26eaa1385f519f8fcf2dcbbec6 |
|
MD5 | 5d21ee14433365be6656e92c0f6d3a3b |
|
BLAKE2b-256 | 6ece7f6ced8c1ab0a44d35a76df90ab8d3b4b381f8a4ca345d49600551b89b78 |