Skip to main content

Python utility functions and classes for KiwiBot AI&Robotics team

Project description


Logo

Kiwi Booster

Python utils and classes for KiwiBot AI&Robotics team
Make a Pull Request · Report Bug · Request Feature


Table of contents


About The Project

This library contains utility functions and classes from Python that are commonly used in the AI&Robotics team. It is divided into 5 main sections:

  • common_utils: Some common utils that are normally used in most of the projects.

    • kiwi_booster.loggers This module contains GCP and local loggers with a predefined format.

    • kiwi_booster.mixed This module contains miscellaneous utils from multiple objectives.

    • kiwi_booster.requests This module contains utils for working with HTTP requests.

    • kiwi_booster.video This module contains utils for working with videos. This includes the VideoWriter class, which is used to write videos in a predefined format. IMPORTANT: This class needs CV2 to be installed in the environment.

  • gcp_utils: Utils that are related to the Google Cloud Platform.

    • kiwi_booster.gcp_utils.bigquery This module contains utils for working with BigQuery.

    • kiwi_booster.gcp_utils.kfp This module contains utils for working with Vertex (Kubeflow) Pipelines.

    • kiwi_booster.gcp_utils.secrets This module contains utils for working with Google Cloud Secrets Manager.

    • kiwi_booster.gcp_utils.storage This module contains utils for working with Google Cloud Storage.

  • ml_utils: Utils that are related to Machine Learning.

    • kiwi_booster.ml_utils.benchmarks This module contains utils for benchmarking machine learning models.

    • kiwi_booster.ml_utils.prediction This module contains utils to handle the prediction of the semantic segmentation model.

  • decorators: Decorators that are used to improve the codebase.

  • slack_utils: Utils that are related to Slack.

  • mcap_utils: Utils related to read and decode ROSbags messages in mcap format without the need of a ROS environment. More information on the next section.

MCAP ROSBag Decoder

The mcap utils are designed as tools to decode MCAP ROSBags without needing a ROS environment. It supports various message types and provides a custom decoder factory for handling ROS2IDL-encoded messages.

IDLDecoderFactory Class The IDLDecoderFactory class extends the DecoderFactory class. It is specifically designed to work with ROS2IDL encoded messages. The class provides a method decoder_for that returns a decoder for a given message encoding and schema. Currently, it uses a dummy decoder that does nothing but can be extended to use more complex decoders.

Supported Messages The module supports decoding of the following message types:

  • CompressedImage
  • Image
  • NavSatFix
  • TFMessage
  • Odometry

Each message type has a corresponding decode function that takes in the message and returns the decoded data along with some metadata.

CompressedImage and Image The decode_compressed_image and decode_image functions decode sensor_msgs/msg/CompressedImage and sensor_msgs/msg/Image messages respectively. They return a tuple containing the image as a Numpy array and a metadata dictionary. The metadata includes the image format, the image's timestamp as a datetime object, the image's frame id, and the image's step (only for decode_image).

NavSatFix The decode_navsatfix function decodes sensor_msgs/msg/NavSatFix messages. It returns a dictionary of the message's fields, the measurement's accuracy, and some metadata. The metadata includes the message's timestamp as a datetime object, the message's frame ID, the measurement's accuracy, the altitude, latitude, longitude, position covariance, and position covariance type of the measurement.

TFMessage The decode_tfmessage function decodes tf2_msgs/msg/TFMessage messages. It returns a list of dictionaries of the transforms. Each dictionary includes the child frame id, frame id, timestamp as a datetime object, translation as a 3D vector, and rotation as a Quaternion.

Odometry The decode_odometry function decodes nav_msgs/msg/Odometry messages. It returns a dictionary with the timestamp as a datetime object, frame id, child frame id, pose (including covariance, position as a 3D vector, and orientation as a Quaternion), twist (including covariance, linear as a 3D vector, and angular as a 3D vector), and covariance of the message.

Reading Messages To read a message, call the corresponding decode function with the message as the argument. For example, to decode a CompressedImage message, you would do:

image, metadata = decode_compressed_image(msg)

Where msg is the CompressedImage message to decode, the function returns the image as a Numpy array and the metadata as a dictionary.

Here is an example of how to use read and mcap file:

from kiwi_booster.mcap_utils.decode import (
        IDLDecoderFactory,
        decode_compressed_image,
        decode_image,
        decode_navsatfix,
        decode_odometry,
        decode_image_marker,
    )
from mcap.reader import make_reader
from mcap_ros2.decoder import DecoderFactory

# Initialize the list for all the topics
topics = {topic: [] for topic in topics_to_read}

# Read the rosbag file
with open(rosbag_path, "rb") as f:
    reader = make_reader(
        f, decoder_factories=[DecoderFactory(), IDLDecoderFactory()]
    )
    for schema, channel, _, decoded_msg in reader.iter_decoded_messages(
        log_time_order=True # This is used to return the messages in the real time order 
    ):
        if channel.topic in topics_to_read:
            if schema.name == "sensor_msgs/msg/CompressedImage":
                image, image_metadata = decode_compressed_image(decoded_msg)
                topics[channel.topic].append([image, image_metadata])
            elif schema.name == "sensor_msgs/msg/NavSatFix":
                latlon = decode_navsatfix(decoded_msg)
                topics[channel.topic].append(latlon)
            elif schema.name == "nav_msgs/msg/Odometry":
                odometry = decode_odometry(decoded_msg)
                topics[channel.topic].append(odometry)
            elif schema.name == "sensor_msgs/msg/Image":
                image, image_metadata = decode_image(decoded_msg)
                topics[channel.topic].append(image, image_metadata)
            elif schema.name == "visualization_msgs/msg/ImageMarker":
                marker_array = decode_image_marker(decoded_msg)
                topics[channel.topic].append(marker_array)
            else:
                raise ValueError(f"Unknown schema {schema.name}")

(back to top)


Getting started

Installation

To install the package, simply run the following command:

pip install kiwi-booster

Usage

To use the package, we recommend using relative imports for each function or class you want to import to improve readability. For example, if you want to use the SlackBot class, you can import it as follows:

from kiwi_booster.slack_utils import SlackBot

slack_bot = SlackBot(
        SLACK_TOKEN,
        SLACK_CHANNEL_ID,
        SLACK_BOT_IMAGE_URL,
        image_alt_text="Bot description",
)

Or any decorator as follows:

from kiwi_booster.decorators import try_catch_log

@try_catch_log
def my_function():
    # Do something

As well, we recommend importing them in a separate section from the rest of the imports.

(back to top)


Contributing

If you'd like to contribute to Kiwi Booster, please feel free to submit a pull request! We're always looking for ways to improve our codebase and make it more useful to a wider range of use cases. You can also request a new feature by submitting an issue.

License

Kiwi Booster is licensed under the GNU license. See the LICENSE file for more information.

Contact

Sebastian Hernández Reyes - Machine Learning Engineer - Mail contact

Carlos Alvarez - Machine Learning Engineer Lead - Mail contact

(back to top)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kiwi_booster-0.3.10.tar.gz (30.7 kB view details)

Uploaded Source

Built Distribution

kiwi_booster-0.3.10-py3-none-any.whl (31.2 kB view details)

Uploaded Python 3

File details

Details for the file kiwi_booster-0.3.10.tar.gz.

File metadata

  • Download URL: kiwi_booster-0.3.10.tar.gz
  • Upload date:
  • Size: 30.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.1 CPython/3.8.18 Linux/6.5.0-26-generic

File hashes

Hashes for kiwi_booster-0.3.10.tar.gz
Algorithm Hash digest
SHA256 f22429d97db9e02e39dcb8b681fc890afb5ce3a6f3540620775b4dee55a948f5
MD5 46f12be8465f0d8b2c2092c753447040
BLAKE2b-256 9dff8c7db3a58e58d4479e3ed0599256dba5661e25385eb6a3c5b776368843e5

See more details on using hashes here.

File details

Details for the file kiwi_booster-0.3.10-py3-none-any.whl.

File metadata

  • Download URL: kiwi_booster-0.3.10-py3-none-any.whl
  • Upload date:
  • Size: 31.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.1 CPython/3.8.18 Linux/6.5.0-26-generic

File hashes

Hashes for kiwi_booster-0.3.10-py3-none-any.whl
Algorithm Hash digest
SHA256 d315e9670ea1f8c2898282353009469e1d1f695bb1967e9a32a41bc5f74f8e49
MD5 f0bc1c06a6f41554f693f8a19077e910
BLAKE2b-256 6aafcdce9dd855c0365bc6d58cf0549b87a15c48f54edc7651334385a5274234

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page