Asyncio interface for ROS 2, Zenoh, and other robotic middlewares.
Project description
Asyncio For Robotics
| Requirements | Compatibility | Tests |
|---|---|---|
The Asyncio For Robotics (afor) library makes asyncio usable with ROS 2, Zenoh and more, letting you write linear, testable, and non-blocking Python code.
- Better syntax.
- Only native python: Better docs and support.
- Simplifies testing.
Will this make my code slower? Likely not.
Will this make my code faster? No. However, asyncio will help YOU write
better, faster code.
[!TIP]
asyncio_for_roboticsinterfaces do not replace their primary interfaces! We add capabilities, giving you more choices, not less.
Install
Barebone
Compatible with ROS 2 (jazzy,humble and newer) out of the box. This library is pure python (>=3.10), so it installs easily.
pip install asyncio_for_robotics
Along with Zenoh
pip install asyncio_for_robotics eclipse-zenoh
Read more
- Detailed ROS 2 tutorial
- Lifetime with
afor.Scopeand Backend's Sessions - Detailed examples
- Cross-Platform deployment even with ROS
- Usage for software testing
Available interfaces:
- Rate: Every tick of a clock. (native)
- TextIO:
stdoutlines of aPopenprocess (and otherTextIOfiles). (native) - ROS 2: Subscriber, Service Client, Service Server.
- Zenoh: Subscriber.
- Implement your own interface!
Additional Projects and Interfaces
- gogo_keyboard: Subscribe to keyboard key presses and release.
- asyncio_gazebo: Subscribe to Gazebo transport.
[!TIP] An interface is not required for every operation. ROS 2 native publishers and nodes work just fine. Furthermore, advanced behavior can be composed of generic
aforobject (see ROS2 Event Callback Example).
Code sample
Syntax is identical between ROS 2, Zenoh, TextIO, Rate...
Wait for messages one by one
Application:
- Get the latest sensor data
- Get clock value
- Wait for trigger
- Wait for next tick of the Rate
- Wait for system to be operational
sub = afor.Sub(...)
# Get the latest message
latest = await sub.wait_for_value()
# Get a new message
new = await sub.wait_for_new()
# Get the next message received
next = await sub.wait_for_next()
Continuously listen to a data stream
Application:
- Process a whole data stream
- React to changes in sensor data
- Execute on every tick of the Rate
# Continuously process the latest messages
async for msg in sub.listen():
status = foo(msg)
if status == DONE:
break
# Continuously process all incoming messages
async for msg in sub.listen_reliable():
status = foo(msg)
if status == DONE:
break
Scope and Lifetimes
Application:
- Destroy subscribers automatically when leaving a block
- Keep ownership visible
- Group several
aforobjects under one lifetime
async with afor.Scope():
sub1 = afor.ros2.Sub(String, "/chatter1")
sub2 = afor.ros2.Sub(String, "/chatter2")
# sub1 and sub2 are alive inside this scope/codeblock
await sub1.wait_for_value()
...
# exiting the scope cleans up resources of sub1 and sub2
# here ROS 2 subscription are destroyed from the transport
See Lifetime and afor.Scope for details.
Improved Services / Queryable for ROS 2
[!NOTE] This is only for ROS 2.
Application:
- Client request reply from a server.
- Servers can delay their response without blocking (not possible in native ROS 2)
# Server is once again a afor subscriber, but generating responder objects
server = afor.Server(...)
# processes all requests.
# listen_reliable method is recommanded as it cannot skip requests
async for responder in server.listen_reliable():
if responder.request == "PING!":
reponder.response = "PONG!"
await asyncio.sleep(...) # reply can be differed
reponder.send()
else:
... # reply is not necessary
# the client implements a async call method
client = afor.Client(...)
response = await client.call("PING!")
Process for the right amount of time
Application:
- Test if the system is responding as expected
- Run small tasks with small and local code
# Listen with a timeout
data = await afor.soft_wait_for(sub.wait_for_new(), timeout=1)
if isinstance(data, TimeoutError):
pytest.fail(f"Failed to get new data in under 1 second")
# Process a codeblock with a timeout
async with afor.soft_timeout(1):
sum = 0
total = 0
async for msg in sub.listen_reliable():
number = process(msg)
sum += number
total += 1
last_second_average = sum/total
assert last_second_average == pytest.approx(expected_average)
Apply pre-processing to a data-stream
Application:
- Parse payload of different transport into a common type.
# ROS2 String type afor subscriber
inner_sub: Sub[String] = afor.ros2.Sub(String, "topic_name")
# converted into a subscriber generating python `str`
ros2str_func = lambda msg: msg.data
sub: BaseSub[str] = afor.ConverterSub(sub=inner_sub, convert_func=ros2str_func)
About Speed
The obvious question is whether this adds latency compared to native ROS 2.
In this benchmark, the answer is: a little on ROS 2, very little on Zenoh.
- On ROS 2 Jazzy with
SingleThreadedExecutorandrmw_zenoh_cpp, trip duration increases from 70 μs to 140 μs when using afor, for an added overhead of about 70 μs. - On Zenoh,
aforadds only about 7 μs over the native path. This suggests that most of the ROS 2 cost comes from cross-thread operations with therclpymachinery. - Even with this added overhead, Zenoh +
aforremains about an order of magnitude faster than ROS 2 +rclpyin this benchmark. - For many Python robotics applications, an extra few dozen microseconds is
negligible relative to the benefits of a uniform
asynciointerface. - This benchmark measures latency; it does not measure throughput. A 2x latency increase, does not imply a 2x throughput decrease.
| Backend | Interface | Latency (μs) |
|---|---|---|
| No-backend | afor |
4 |
| Zenoh | native | 3 |
| Zenoh | afor |
10 |
| ROS Single Thrd | native | 70 |
| ROS Single Thrd | afor |
136 |
| ROS Multi Thrd | native | 125 |
| ROS Multi Thrd | afor |
217 |
ros_loop Method |
afor |
280 |
Benchmark code is available at https://github.com/2lian/afor_benchmarks. The benchmark uses two pub/sub pairs that continuously echo messages on localhost, with a single participant and a local Zenoh router.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file asyncio_for_robotics-1.4.0a0.tar.gz.
File metadata
- Download URL: asyncio_for_robotics-1.4.0a0.tar.gz
- Upload date:
- Size: 43.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.23
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a4e40e3be25a201ed2d84b3cfb77bb7add91f5a8c559f61570a55e67b534e5db
|
|
| MD5 |
45a3a837dc9bb45e9599ca7d4232a84a
|
|
| BLAKE2b-256 |
7e754e5f1637e1f1f23d192fd0937820111da2caa32eaedf8a8c2b20c1a9f198
|
File details
Details for the file asyncio_for_robotics-1.4.0a0-py3-none-any.whl.
File metadata
- Download URL: asyncio_for_robotics-1.4.0a0-py3-none-any.whl
- Upload date:
- Size: 46.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.23
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
41fdf395f7ae66df778dc084f1acf837b585e15d30c8d198f19f8faf3d002c09
|
|
| MD5 |
835df5b572734b4532bfbe6febb56838
|
|
| BLAKE2b-256 |
a846e1ae14e774e0bbfab373b29a958105ae72cb846bd870e08d87b06320aa38
|