Skip to main content

Asyncio interface for ROS 2, Zenoh, and other robotic middlewares.

Project description

Asyncio For Robotics

Requirements Compatibility Tests
python
mit
ros
zenoh
Python
ROS 2

The Asyncio For Robotics (afor) library makes asyncio usable with ROS 2, Zenoh and more, letting you write linear, testable, and non-blocking Python code.

  • Better syntax.
  • Only native python: Better docs and support.
  • Simplifies testing.

Will this make my code slower? Likely not.

Will this make my code faster? No. However, asyncio will help YOU write better, faster code.

[!TIP] asyncio_for_robotics interfaces do not replace their primary interfaces! We add capabilities, giving you more choices, not less.

Install

Barebone

Compatible with ROS 2 (jazzy,humble and newer) out of the box. This library is pure python (>=3.10), so it installs easily.

pip install asyncio_for_robotics

Along with Zenoh

pip install asyncio_for_robotics eclipse-zenoh

Read more

Available interfaces:

  • Rate: Every tick of a clock. (native)
  • TextIO: stdout lines of a Popen process (and other TextIO files). (native)
  • ROS 2: Subscriber, Service Client, Service Server.
  • Zenoh: Subscriber.
  • Implement your own interface!

Additional Projects and Interfaces

[!TIP] An interface is not required for every operation. ROS 2 native publishers and nodes work just fine. Furthermore, advanced behavior can be composed of generic afor object (see ROS2 Event Callback Example).

Code sample

Syntax is identical between ROS 2, Zenoh, TextIO, Rate...

Wait for messages one by one

Application:

  • Get the latest sensor data
  • Get clock value
  • Wait for trigger
  • Wait for next tick of the Rate
  • Wait for system to be operational
sub = afor.Sub(...)

# Get the latest message
latest = await sub.wait_for_value()

# Get a new message
new = await sub.wait_for_new()

# Get the next message received
next = await sub.wait_for_next()

Continuously listen to a data stream

Application:

  • Process a whole data stream
  • React to changes in sensor data
  • Execute on every tick of the Rate
# Continuously process the latest messages
async for msg in sub.listen():
    status = foo(msg)
    if status == DONE:
        break

# Continuously process all incoming messages
async for msg in sub.listen_reliable():
    status = foo(msg)
    if status == DONE:
        break

Scope and Lifetimes

Application:

  • Destroy subscribers automatically when leaving a block
  • Keep ownership visible
  • Group several afor objects under one lifetime
async with afor.Scope():
    sub1 = afor.ros2.Sub(String, "/chatter1")
    sub2 = afor.ros2.Sub(String, "/chatter2")

    # sub1 and sub2 are alive inside this scope/codeblock
    await sub1.wait_for_value()
    ...
# exiting the scope cleans up resources of sub1 and sub2
# here ROS 2 subscription are destroyed from the transport

See Lifetime and afor.Scope for details.

Improved Services / Queryable for ROS 2

[!NOTE] This is only for ROS 2.

Application:

  • Client request reply from a server.
  • Servers can delay their response without blocking (not possible in native ROS 2)
# Server is once again a afor subscriber, but generating responder objects
server = afor.Server(...)

# processes all requests.
# listen_reliable method is recommanded as it cannot skip requests
async for responder in server.listen_reliable():
    if responder.request == "PING!":
        reponder.response = "PONG!"
        await asyncio.sleep(...) # reply can be differed
        reponder.send()
    else:
        ... # reply is not necessary
# the client implements a async call method
client = afor.Client(...)

response = await client.call("PING!")

Process for the right amount of time

Application:

  • Test if the system is responding as expected
  • Run small tasks with small and local code
# Listen with a timeout
data = await afor.soft_wait_for(sub.wait_for_new(), timeout=1)
if isinstance(data, TimeoutError):
    pytest.fail(f"Failed to get new data in under 1 second")


# Process a codeblock with a timeout
async with afor.soft_timeout(1):
    sum = 0
    total = 0
    async for msg in sub.listen_reliable():
        number = process(msg)
        sum += number
        total += 1

last_second_average = sum/total
assert last_second_average == pytest.approx(expected_average)

Apply pre-processing to a data-stream

Application:

  • Parse payload of different transport into a common type.
# ROS2 String type afor subscriber
inner_sub: Sub[String] = afor.ros2.Sub(String, "topic_name")
# converted into a subscriber generating python `str`
ros2str_func = lambda msg: msg.data
sub: BaseSub[str] = afor.ConverterSub(sub=inner_sub, convert_func=ros2str_func)

About Speed

The obvious question is whether this adds latency compared to native ROS 2.

In this benchmark, the answer is: a little on ROS 2, very little on Zenoh.

  • On ROS 2 Jazzy with SingleThreadedExecutor and rmw_zenoh_cpp, trip duration increases from 70 μs to 140 μs when using afor, for an added overhead of about 70 μs.
  • On Zenoh, afor adds only about 7 μs over the native path. This suggests that most of the ROS 2 cost comes from cross-thread operations with the rclpy machinery.
  • Even with this added overhead, Zenoh + afor remains about an order of magnitude faster than ROS 2 + rclpy in this benchmark.
  • For many Python robotics applications, an extra few dozen microseconds is negligible relative to the benefits of a uniform asyncio interface.
  • This benchmark measures latency; it does not measure throughput. A 2x latency increase, does not imply a 2x throughput decrease.
Backend Interface Latency (μs)
No-backend afor 4
Zenoh native 3
Zenoh afor 10
ROS Single Thrd native 70
ROS Single Thrd afor 136
ROS Multi Thrd native 125
ROS Multi Thrd afor 217
ros_loop Method afor 280

Benchmark code is available at https://github.com/2lian/afor_benchmarks. The benchmark uses two pub/sub pairs that continuously echo messages on localhost, with a single participant and a local Zenoh router.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asyncio_for_robotics-1.4.0a5.tar.gz (43.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

asyncio_for_robotics-1.4.0a5-py3-none-any.whl (47.6 kB view details)

Uploaded Python 3

File details

Details for the file asyncio_for_robotics-1.4.0a5.tar.gz.

File metadata

File hashes

Hashes for asyncio_for_robotics-1.4.0a5.tar.gz
Algorithm Hash digest
SHA256 c1bfcb866c943e96b3a1ad35092c1f1bd2a8717d31cd7c091aae5a2c32e09d57
MD5 e5f8a873d7c0bbd86765337eefee7112
BLAKE2b-256 b68aae375dca06394a183eea6572d983c6e74d017564218fdbe5bdbc06d5879a

See more details on using hashes here.

File details

Details for the file asyncio_for_robotics-1.4.0a5-py3-none-any.whl.

File metadata

File hashes

Hashes for asyncio_for_robotics-1.4.0a5-py3-none-any.whl
Algorithm Hash digest
SHA256 63f7ca4c0dd1700ea1f86a55bfa6e10800af8eab6aa75beb616c16b0c638d04c
MD5 ce9584aed9f30514de596ce7c68f7a4d
BLAKE2b-256 4c4f3b4d372e9d81c45f3aaf4d069742090f52f172ae805dcb268098b5171c30

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page