A robotic middleware in Python.
Project description
rosia: robotic middleware for python
Install
pip install rosia
Quick Start
First program
Let's write a first program that generates and prints numbers using timers.
First, write a node that generates numbers:
@Node
class IntGenerator:
output_int = OutputPort[int]()
def __init__(self):
self.count = 0
def start(self):
while True:
print("Sending:", self.count)
self.output_int(self.count)
self.count += 1
time.sleep(1)
Output ports are declared with <output_port> = OutputPort[<type>](). __init__ initializes the node, and start automatically runs after all nodes are initialized.
@Node
class Printer:
input_int = InputPort[int]()
@reaction([input_int])
def print_message(self):
print(f"Received: {self.input_int}")
This nodes prints the numbers. Input ports are declared with <input_port> = InputPort[<type>](). The received message can be referenced by self.<input_port>.
With the @reaction decorator, every time the input port self.input_int receives a message, the print_message method is executed.
To connect the two nodes, use a coordinator and create nodes within the coordinator. Connect ports using the >>= operator. Each node is a separate process for true concurrency.
coor = Coordinator()
int_gen = coor.create_node(IntGenerator())
printer = coor.create_node(Printer())
int_gen.output_int >>= printer.input_int
coor.execute()
Nodes initialize and run after coor.execute() is called. Run this example with python tests/examples/easy.py.
Time and synchronization
Let's modify the int generator to be triggered by a timer.
@Node
class IntGenerator(Rosia):
timer = InputPort[Time]()
output = OutputPort[int]()
def __init__(self):
self.count = 0
@reaction([timer])
def generate(self):
self.output(self.count)
self.count += 1
and the printer to print two numbers from two input ports.
@Node
class Printer:
input_int1 = InputPort[int]()
input_int2 = InputPort[int]()
@reaction([input_int1, input_int2])
def print_message(self):
print(f"Received message: {self.input_int1} {self.input_int2}")
We can create two instances of IntGenerator in a coordinator:
coor = Coordinator(logging.INFO)
timer1 = coor.create_node(Timer(interval=1 * ms, offset=0 * s))
timer2 = coor.create_node(Timer(interval=1 * ms, offset=0 * s))
int_gen1 = coor.create_node(IntGenerator())
int_gen2 = coor.create_node(IntGenerator())
printer = coor.create_node(Printer())
timer1.output_timer >>= int_gen1.timer
timer2.output_timer >>= int_gen2.timer
int_gen1.output >>= printer.input_int1
int_gen2.output >>= printer.input_int2
coor.execute()
When we execute this example with python tests/examples/parallel_timed.py, notice how the two inputs are always synchronized since the timers are aligned. Rosia handles synchronization internally so you don't have to worry about it!
Contributing
Install Dev Dependencies
pip install --group dev .
Install Pre-commit Hooks
pre-commit install --hook-type commit-msg --hook-type pre-push --hook-type pre-commit
pre-commit run --all-files
Code is automatically formatted before committing. Commit messages should follow conventional commit.
Distribution Archives
python -m build
python -m twine upload dist/*
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rosia-0.0.4.tar.gz.
File metadata
- Download URL: rosia-0.0.4.tar.gz
- Upload date:
- Size: 19.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
44a99412a36a52ae8bf159b6e66c8a133b94b9273e966c774f23a29253732f75
|
|
| MD5 |
c952db78ff92d23ec9c83e469503f132
|
|
| BLAKE2b-256 |
1e17de7ded06a246b94629b8f677a0df15d10cb57724675cc830fa8a495729da
|
File details
Details for the file rosia-0.0.4-py3-none-any.whl.
File metadata
- Download URL: rosia-0.0.4-py3-none-any.whl
- Upload date:
- Size: 24.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7615d972180210d45304487541b5f2c7651efbb550739ac28e4e6332d14207ab
|
|
| MD5 |
7a9a1083033ffa0be5173b5de2d7991d
|
|
| BLAKE2b-256 |
d4d0495f4770e0b9b759dc10f058df2a43187345f7fa5c333d95d7a3811589a6
|