Skip to main content

Asyncio interface for ROS 2, Zenoh, and other robotic middlewares.

Project description

Asyncio For Robotics

Requirements Compatibility Tests
python
mit
ros
zenoh
Python
ROS 2

The Asyncio For Robotics (afor) library makes asyncio usable with ROS 2, Zenoh and more, letting you write linear, testable, and non-blocking Python code.

  • Better syntax.
  • Only native python: Better docs and support.
  • Simplifies testing.

Will this make my code slower? Likely not.

Will this make my code faster? No. However, asyncio will help YOU write better, faster code.

[!TIP] asyncio_for_robotics interfaces do not replace their primary interfaces! We add capabilities, giving you more choices, not less.

Install

Barebone

Compatible with ROS 2 (jazzy,humble and newer) out of the box. This library is pure python (>=3.10), so it installs easily.

pip install asyncio_for_robotics

Along with Zenoh

pip install asyncio_for_robotics eclipse-zenoh

Read more

Available interfaces:

  • Rate: Every tick of a clock. (native)
  • TextIO: stdout lines of a Popen process (and other TextIO files). (native)
  • ROS 2: Subscriber, Service Client, Service Server.
  • Zenoh: Subscriber.
  • Implement your own interface!

Additional Projects and Interfaces

[!TIP] An interface is not required for every operation. ROS 2 native publishers and nodes work just fine. Furthermore, advanced behavior can be composed of generic afor object (see ROS2 Event Callback Example).

Code sample

Syntax is identical between ROS 2, Zenoh, TextIO, Rate...

Wait for messages one by one

Application:

  • Get the latest sensor data
  • Get clock value
  • Wait for trigger
  • Wait for next tick of the Rate
  • Wait for system to be operational
sub = afor.Sub(...)

# Get the latest message
latest = await sub.wait_for_value()

# Get a new message
new = await sub.wait_for_new()

# Get the next message received
next = await sub.wait_for_next()

Continuously listen to a data stream

Application:

  • Process a whole data stream
  • React to changes in sensor data
  • Execute on every tick of the Rate
# Continuously process the latest messages
async for msg in sub.listen():
    status = foo(msg)
    if status == DONE:
        break

# Continuously process all incoming messages
async for msg in sub.listen_reliable():
    status = foo(msg)
    if status == DONE:
        break

Scope and Lifetimes

Application:

  • Destroy subscribers automatically when leaving a block
  • Keep ownership visible
  • Group several afor objects under one lifetime
async with afor.Scope():
    sub1 = afor.ros2.Sub(String, "/chatter1")
    sub2 = afor.ros2.Sub(String, "/chatter2")

    # sub1 and sub2 are alive inside this scope/codeblock
    await sub1.wait_for_value()
    ...
# exiting the scope cleans up resources of sub1 and sub2
# here ROS 2 subscription are destroyed from the transport

See Lifetime and afor.Scope for details.

Improved Services / Queryable for ROS 2

[!NOTE] This is only for ROS 2.

Application:

  • Client request reply from a server.
  • Servers can delay their response without blocking (not possible in native ROS 2)
# Server is once again a afor subscriber, but generating responder objects
server = afor.Server(...)

# processes all requests.
# listen_reliable method is recommanded as it cannot skip requests
async for responder in server.listen_reliable():
    if responder.request == "PING!":
        reponder.response = "PONG!"
        await asyncio.sleep(...) # reply can be differed
        reponder.send()
    else:
        ... # reply is not necessary
# the client implements a async call method
client = afor.Client(...)

response = await client.call("PING!")

Process for the right amount of time

Application:

  • Test if the system is responding as expected
  • Run small tasks with small and local code
# Listen with a timeout
data = await afor.soft_wait_for(sub.wait_for_new(), timeout=1)
if isinstance(data, TimeoutError):
    pytest.fail(f"Failed to get new data in under 1 second")


# Process a codeblock with a timeout
async with afor.soft_timeout(1):
    sum = 0
    total = 0
    async for msg in sub.listen_reliable():
        number = process(msg)
        sum += number
        total += 1

last_second_average = sum/total
assert last_second_average == pytest.approx(expected_average)

Apply pre-processing to a data-stream

Application:

  • Parse payload of different transport into a common type.
# ROS2 String type afor subscriber
inner_sub: Sub[String] = afor.ros2.Sub(String, "topic_name")
# converted into a subscriber generating python `str`
ros2str_func = lambda msg: msg.data
sub: BaseSub[str] = afor.ConverterSub(sub=inner_sub, convert_func=ros2str_func)

About Speed

The obvious question is whether this adds latency compared to native ROS 2.

In this benchmark, the answer is: a little on ROS 2, very little on Zenoh.

  • On ROS 2 Jazzy with SingleThreadedExecutor and rmw_zenoh_cpp, trip duration increases from 70 μs to 140 μs when using afor, for an added overhead of about 70 μs.
  • On Zenoh, afor adds only about 7 μs over the native path. This suggests that most of the ROS 2 cost comes from cross-thread operations with the rclpy machinery.
  • Even with this added overhead, Zenoh + afor remains about an order of magnitude faster than ROS 2 + rclpy in this benchmark.
  • For many Python robotics applications, an extra few dozen microseconds is negligible relative to the benefits of a uniform asyncio interface.
  • This benchmark measures latency; it does not measure throughput. A 2x latency increase, does not imply a 2x throughput decrease.
Backend Interface Latency (μs)
No-backend afor 4
Zenoh native 3
Zenoh afor 10
ROS Single Thrd native 70
ROS Single Thrd afor 136
ROS Multi Thrd native 125
ROS Multi Thrd afor 217
ros_loop Method afor 280

Benchmark code is available at https://github.com/2lian/afor_benchmarks. The benchmark uses two pub/sub pairs that continuously echo messages on localhost, with a single participant and a local Zenoh router.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asyncio_for_robotics-1.4.0a3.tar.gz (43.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

asyncio_for_robotics-1.4.0a3-py3-none-any.whl (47.6 kB view details)

Uploaded Python 3

File details

Details for the file asyncio_for_robotics-1.4.0a3.tar.gz.

File metadata

File hashes

Hashes for asyncio_for_robotics-1.4.0a3.tar.gz
Algorithm Hash digest
SHA256 5d1dd7d9b7cc06f58510b2777285683be21064ab75655176a47d85fe11b1f0b2
MD5 9482f2accfdfe16e059feec3b8d9ea4b
BLAKE2b-256 256f5ef8597ece002a201395985ea3ad78a55551750bee3abbf521af9d137408

See more details on using hashes here.

File details

Details for the file asyncio_for_robotics-1.4.0a3-py3-none-any.whl.

File metadata

File hashes

Hashes for asyncio_for_robotics-1.4.0a3-py3-none-any.whl
Algorithm Hash digest
SHA256 52c311003ebf1b4a3c240481a2324636cb6545439c8a31044e71a2a7f0e6c790
MD5 71ae7afad30bbed8ea934c00ed4d7e24
BLAKE2b-256 9c374fb7dc73b17772993f7116c98c09e81aafa6b41cadd5bd83ff3314e90764

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page