Skip to main content

A robotic middleware in Python.

Project description

rosia: robotic middleware for python

Install

pip install rosia

Quick Start

First program

Let's write a first program that generates and prints numbers using timers.

First, write a node that generates numbers:

@Node
class IntGenerator:
    output_int = OutputPort[int]()
    def __init__(self):
        self.count = 0
    def start(self):
        while True:
            print("Sending:", self.count)
            self.output_int(self.count)
            self.count += 1
            time.sleep(1)

Output ports are declared with <output_port> = OutputPort[<type>](). __init__ initializes the node, and start automatically runs after all nodes are initialized.

@Node
class Printer:
    input_int = InputPort[int]()
    @reaction([input_int])
    def print_message(self):
        print(f"Received: {self.input_int}")

This nodes prints the numbers. Input ports are declared with <input_port> = InputPort[<type>](). The received message can be referenced by self.<input_port>.

With the @reaction decorator, every time the input port self.input_int receives a message, the print_message method is executed.

To connect the two nodes, use a coordinator and create nodes within the coordinator. Connect ports using the >>= operator. Each node is a separate process for true concurrency.

coor = Coordinator()
int_gen = coor.create_node(IntGenerator())
printer = coor.create_node(Printer())
int_gen.output_int >>= printer.input_int
coor.execute()

Nodes initialize and run after coor.execute() is called. Run this example with python tests/examples/easy.py.

Time and synchronization

Let's modify the int generator to be triggered by a timer.

@Node
class IntGenerator(Rosia):
    timer = InputPort[Time]()
    output = OutputPort[int]()

    def __init__(self):
        self.count = 0

    @reaction([timer])
    def generate(self):
        self.output(self.count)
        self.count += 1

and the printer to print two numbers from two input ports.

@Node
class Printer:
    input_int1 = InputPort[int]()
    input_int2 = InputPort[int]()

    @reaction([input_int1, input_int2])
    def print_message(self):
        print(f"Received message: {self.input_int1} {self.input_int2}")

We can create two instances of IntGenerator in a coordinator:

coor = Coordinator(logging.INFO)
timer1 = coor.create_node(Timer(interval=1 * ms, offset=0 * s))
timer2 = coor.create_node(Timer(interval=1 * ms, offset=0 * s))
int_gen1 = coor.create_node(IntGenerator())
int_gen2 = coor.create_node(IntGenerator())
printer = coor.create_node(Printer())
timer1.output_timer >>= int_gen1.timer
timer2.output_timer >>= int_gen2.timer
int_gen1.output >>= printer.input_int1
int_gen2.output >>= printer.input_int2

coor.execute()

When we execute this example with python tests/examples/parallel_timed.py, notice how the two inputs are always synchronized since the timers are aligned. Rosia handles synchronization internally so you don't have to worry about it!

Contributing

Install Dev Dependencies

pip install --group dev .

Install Pre-commit Hooks

pre-commit install --hook-type commit-msg --hook-type pre-push --hook-type pre-commit
pre-commit run --all-files

Code is automatically formatted before committing. Commit messages should follow conventional commit.

Distribution Archives

python -m build
python -m twine upload dist/*

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rosia-0.0.4.tar.gz (19.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rosia-0.0.4-py3-none-any.whl (24.4 kB view details)

Uploaded Python 3

File details

Details for the file rosia-0.0.4.tar.gz.

File metadata

  • Download URL: rosia-0.0.4.tar.gz
  • Upload date:
  • Size: 19.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for rosia-0.0.4.tar.gz
Algorithm Hash digest
SHA256 44a99412a36a52ae8bf159b6e66c8a133b94b9273e966c774f23a29253732f75
MD5 c952db78ff92d23ec9c83e469503f132
BLAKE2b-256 1e17de7ded06a246b94629b8f677a0df15d10cb57724675cc830fa8a495729da

See more details on using hashes here.

File details

Details for the file rosia-0.0.4-py3-none-any.whl.

File metadata

  • Download URL: rosia-0.0.4-py3-none-any.whl
  • Upload date:
  • Size: 24.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for rosia-0.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 7615d972180210d45304487541b5f2c7651efbb550739ac28e4e6332d14207ab
MD5 7a9a1083033ffa0be5173b5de2d7991d
BLAKE2b-256 d4d0495f4770e0b9b759dc10f058df2a43187345f7fa5c333d95d7a3811589a6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page