Skip to main content

Suite of tools for sensor transposition and camera intrinsic/extrinsic calibration

Project description

sensor_transposition

A Python toolkit for multi-sensor calibration, data parsing, and coordinate-frame management in autonomous-vehicle and robotics pipelines.


Contents


Features

  • Sensor collection – YAML-driven multi-sensor configuration storing extrinsics (translation + quaternion), temporal extrinsic calibration (time offset per sensor), and per-sensor intrinsics/parameters for cameras, LiDARs, radars, GPS, and IMUs.
  • Pinhole camera model – focal-length derivation from FOV or physical sensor geometry, camera matrix construction, point projection / unprojection, and Brown–Conrady lens distortion/undistortion.
  • Fisheye / omnidirectional camera model – Kannala-Brandt equidistant projection supporting fields of view up to 360°, with focal-length derivation, point projection / unprojection, and Kannala-Brandt distortion/undistortion (compatible with OpenCV's cv2.fisheye module).
  • Homogeneous transforms – composable 4×4 Transform objects with helpers for building from quaternions or rotation matrices, inverting, and applying to point clouds.
  • LiDAR–camera fusion – project 3-D LiDAR point clouds onto a camera image plane and colour the cloud by sampling pixel values.
  • LiDAR parsers – binary readers for Velodyne (KITTI .bin), Ouster (4-column and 8-column .bin), and Livox (LVX / LVX2) file formats.
  • LiDAR scan matching – point-to-point ICP (Iterative Closest Point) with the Kabsch SVD algorithm and a KD-tree nearest-neighbour search; point-to-plane ICP with per-point surface normal estimation via PCA for faster convergence on structured scenes (walls, floors, roads); supports a maximum correspondence-distance filter, an optional initial transform, and a configurable convergence tolerance.
  • KISS-ICP odometry – minimal-parameter LiDAR odometry with adaptive correspondence-distance threshold, voxel-hashed local map, and point-to-point ICP; pure NumPy/SciPy — no additional dependencies required. An optional kiss_icp extra pulls in the upstream reference implementation.
  • Ground plane segmentation – three complementary methods for separating ground points from obstacles: height-threshold (fast, flat terrain), RANSAC plane fitting (robust to outliers and unknown sensor height), and normal-based filtering (handles uneven/hilly terrain via per-point PCA normals); all pure NumPy/SciPy.
  • LiDAR motion distortion correction – IMU-based scan deskewing that corrects per-point timestamps across a spinning LiDAR sweep.
  • GPS / GNSS – NMEA 0183 parser supporting GGA and RMC sentence types, plus a coordinate-frame converter for ECEF ↔ ENU and geodetic ↔ UTM conversions.
  • GPS fusionGpsFuser converts GPS fixes to local ENU and integrates them into an ImuEkf state or a FramePoseSequence; hdop_to_noise converts HDOP to a 3×3 position noise covariance; GNSS outage detection via max_fix_age_sec and an on_outage callback automatically skips stale fixes and falls back to IMU/wheel-odometry dead-reckoning.
  • RTK GPS – RTCM 3.x correction-stream parser supporting MT1005 (base-station ECEF position) and MSM4/MSM7 pseudorange/carrier-phase messages for GPS, GLONASS, Galileo, and BeiDou; pure Python, no additional dependencies. See docs/rtk_gps_setup.md for NTRIP client setup and centimetre-level sigma configuration.
  • IMU – binary parser for 32-byte (accel + gyro) and 48-byte (accel + gyro + quaternion) records.
  • IMU Error-State EKF – 15-state error-state extended Kalman filter fusing IMU, GPS, and pose observations.
  • IMU pre-integration – accumulates raw IMU measurements between keyframes into compact (ΔR, Δv, Δp) increments for tight IMU–LiDAR coupling.
  • Radar – binary parser for 5-field (range, azimuth, elevation, velocity, SNR) detection records with spherical → Cartesian conversion.
  • Radar odometry – Doppler-based ego-velocity estimation and scan-to-scan ICP radar odometry.
  • Visual odometry – essential-matrix estimation (normalised 8-point + RANSAC), pose recovery, and Perspective-n-Point (PnP) solver.
  • Feature detection & matching – Harris corner detector, normalised intensity-patch descriptors, and brute-force L2 matching with Lowe's ratio test; all pure NumPy/SciPy, enabling a complete visual odometry pipeline without OpenCV. An optional opencv extra is available for users who need production-quality ORB/SIFT features.
  • Stereo camera – stereo rectification (Bouguet algorithm), dense block-matching disparity (pure NumPy SAD), and stereo triangulation for metric 3-D depth recovery; recovers absolute scale unavailable in monocular VO.
  • Wheel odometry – differential-drive and Ackermann (bicycle-model) dead-reckoning with midpoint integration.
  • Loop closure – Scan Context and M2DP LiDAR place-recognition descriptors with ScanContextDatabase for efficient loop-closure candidate retrieval; plus compute_image_descriptor (HOG-like) and ImageLoopClosureDatabase for camera-based visual loop closure — all pure NumPy/SciPy.
  • Pose graph – pose graph data structure and Gauss-Newton optimisation back-end for graph-SLAM; supports standard 6-DOF odometry and loop-closure edges plus :class:ImuFactor edges that wrap IMU pre-integration results for tightly-coupled IMU–LiDAR optimisation.
  • Sliding-window smoother – fixed-lag online SLAM smoother that bounds per-step cost to O(window_size³).
  • Submap manager – keyframe selection and submap division for large-scale long-duration SLAM.
  • Occupancy grid – 2-D probabilistic occupancy grid with log-odds ray-casting; exports to ROS nav_msgs/OccupancyGrid int8 format. SparseOccupancyGrid stores only observed cells in a hash map, reducing memory by up to 14× for large outdoor environments.
  • Voxel map (TSDF) – Truncated Signed-Distance Function volumetric map for dense 3-D reconstruction. SparseTSDFVolume stores only observed voxels, reducing memory by up to 64× for large outdoor volumes.
  • Point-cloud map – accumulated coloured point-cloud map with voxel-grid downsampling, PCD / PLY I/O, and dynamic-object filtering: filter_dynamic_points (Doppler velocity threshold) and consistency_filter (cross-scan support check) remove moving-object "ghost" trails from the map.
  • Visualisation – BEV rendering, trajectory overlay, LiDAR-on-image overlay, Open3D and RViz export helpers.
  • Bag recorder / player – lightweight multi-topic binary bag format (.sbag) with streaming write and indexed playback; no external dependencies. sbag_to_rosbag() converts to MCAP for use with ros2 bag (requires pip install ".[mcap]").
  • SLAM sessionSLAMSession orchestration class that wires ICP odometry, Scan Context loop closure, pose-graph optimisation, and point-cloud map accumulation into a single object with sensible defaults and per-topic callbacks. Supports scan-to-local-submap odometry via use_local_map=True to reduce frame-to-frame drift. A LocalMap helper class maintains a downsampled sliding window of the last N keyframe scans. Pass imu_topic to automatically accumulate IMU measurements between keyframes, pre-integrate them, and add an :class:ImuFactor edge to the pose graph for tightly-coupled IMU–LiDAR optimisation.
  • Camera–LiDAR extrinsic calibration – target-based extrinsic calibration using plane correspondences (fit_plane, ransac_plane, calibrate_lidar_camera).
  • ROS examples – ready-to-use launch / parameter files for Velodyne and Ouster LiDARs in both ROS 1 and ROS 2.

Installation

pip install .

Development dependencies (pytest):

pip install ".[dev]"

Requires Python ≥ 3.9 and numpy, pyyaml, and scipy.

Optional extras

Install optional extras to unlock additional integrations:

Extra Package installed Enables
open3d open3d>=0.17 export_point_cloud_open3d() and Open3D visualisation
rerun rerun-sdk>=0.14 rerun.io visualisation logging
mcap mcap>=1.0 sbag_to_rosbag() MCAP conversion
kiss_icp kiss-icp>=1.0 Upstream reference KISS-ICP implementation
opencv opencv-python>=4.0 Production-quality ORB/SIFT features via cv2
pip install ".[open3d]"    # Open3D support
pip install ".[rerun]"     # rerun.io support
pip install ".[mcap]"      # MCAP / ROS 2 bag conversion
pip install ".[kiss_icp]"  # Upstream KISS-ICP reference implementation
pip install ".[opencv]"    # OpenCV for ORB/SIFT feature detection

Coordinate Systems

All sensors declare their native coordinate frame convention. The ego (vehicle) frame is FLU – x forward, y left, z up.

Convention Axes Typical use
FLU Forward, Left, Up Ego frame; ROS; Velodyne / Ouster
RDF Right, Down, Fwd Camera optical frame
FRD Forward, Right, Dwn NED-like robotics frames
ENU East, North, Up GPS / local tangent plane
NED North, East, Down Aviation / navigation

Modules

SensorCollection

Loads a YAML file describing a rig of sensors and provides transform queries between any two sensors.

from sensor_transposition import SensorCollection

col = SensorCollection.from_yaml("examples/sensor_collection.yaml")

# 4×4 transform: front_lidar → front_camera
T = col.transform_between("front_lidar", "front_camera")

# Modify and save
col.to_yaml("my_rig.yaml")

YAML sensor types: camera, lidar, radar, gps, imu.
Each entry specifies coordinate_system, extrinsics (translation + quaternion + optional time_offset_sec), and optional type-specific parameter blocks.

from_yaml() validation – when loading from YAML, SensorCollection.validate() is automatically called to catch common mistakes (e.g. a camera sensor missing its intrinsics block, or non-positive fx/fy/width/height values) before they cause cryptic errors downstream:

# Raises ValueError immediately with a clear message:
# "Camera sensor 'front_camera' is missing a required 'intrinsics' block."
col = SensorCollection.from_yaml("bad_config.yaml")

# You can also call validate() explicitly on a programmatically-built collection:
col.validate()

Temporal extrinsic calibration – each sensor can declare a time_offset_sec in its extrinsics block that captures the delay between the sensor's hardware clock and the reference/ego clock. Use SensorCollection.time_offset_between(source, target) to compute the time difference between any two sensors:

# How many seconds ahead is the camera clock relative to the LiDAR clock?
dt = col.time_offset_between("front_lidar", "front_camera")   # e.g. +0.033

# Synchronise a LiDAR timestamp to the camera's timebase
t_camera_equiv = t_lidar + dt

Sensor Synchroniser

Resample multiple sensor streams onto a common reference timeline.

from sensor_transposition.sync import (
    SensorSynchroniser,
    SensorSynchronizer,   # American-spelling alias
)
import numpy as np

sync = SensorSynchroniser()
sync.add_stream("lidar", lidar_times, lidar_data,
                time_offset_sec=col.get_sensor("front_lidar").time_offset_sec)
sync.add_stream("imu",   imu_times,   imu_accel,
                time_offset_sec=col.get_sensor("imu").time_offset_sec)

# Check that the streams actually overlap before resampling:
overlap = sync.temporal_overlap()
if overlap is None:
    raise RuntimeError("Streams do not overlap — check time offsets.")
t_start, t_end = overlap

# Resample all streams at LiDAR reference times (synchronise / synchronize both work):
aligned = sync.synchronise(lidar_times)
imu_at_lidar_times = aligned["imu"]
Method Description
add_stream(name, times, data) Register a named sensor stream
temporal_overlap() Return (start, end) of the common overlap, or None
stream_start_time(name) / stream_end_time(name) Per-stream time bounds
synchronise(ref_times) / synchronize(ref_times) Resample all streams
interpolate(name, query_times) Resample a single stream

Camera Intrinsics

Pinhole model utilities.

from sensor_transposition.camera_intrinsics import (
    focal_length_from_fov,
    focal_length_from_sensor,
    fov_from_focal_length,
    camera_matrix,
    project_point,
    unproject_pixel,
    distort_point,
    undistort_point,
)

# Focal length from horizontal FOV
fx = focal_length_from_fov(image_size=1920, fov_deg=90.0)

# Focal length from physical sensor geometry
fx = focal_length_from_sensor(image_size_px=1920, sensor_size_mm=6.4, focal_length_mm=4.0)

# Build 3×3 K matrix
K = camera_matrix(fx=1266.4, fy=1266.4, cx=816.0, cy=612.0)

# Project a 3-D point to pixel coordinates
u, v = project_point(K, [1.0, 0.5, 5.0])

# Unproject a pixel at a known depth back to 3-D
xyz = unproject_pixel(K, (u, v), depth=5.0)

# Apply / remove Brown–Conrady distortion
coeffs = (-0.05, 0.08, 0.0, 0.0, -0.03)  # (k1, k2, p1, p2, k3)
distorted = distort_point([x_n, y_n], coeffs)
undistorted = undistort_point(distorted, coeffs)

Fisheye / Omnidirectional Camera

Kannala-Brandt equidistant projection for wide-FOV and omnidirectional cameras. Supports fields of view up to 360° and is compatible with coefficients from OpenCV's cv2.fisheye.calibrate.

from sensor_transposition.camera_intrinsics import (
    fisheye_focal_length_from_fov,
    fisheye_project_point,
    fisheye_unproject_pixel,
    fisheye_distort_point,
    fisheye_undistort_point,
    camera_matrix,
)
import numpy as np

# Focal length for a 190° horizontal FOV on a 1920-pixel-wide sensor
fx = fisheye_focal_length_from_fov(image_size=1920, fov_deg=190.0)
fy = fisheye_focal_length_from_fov(image_size=1080, fov_deg=130.0)

K = camera_matrix(fx=fx, fy=fy, cx=960.0, cy=540.0)

# Distortion coefficients (k1, k2, k3, k4) from cv2.fisheye.calibrate
dist_coeffs = (0.05, -0.02, 0.003, -0.001)

# Project a 3-D point (including points with large off-axis angles)
point_3d = np.array([0.8, 0.4, 3.0])
u, v = fisheye_project_point(K, point_3d, dist_coeffs=dist_coeffs)

# Unproject: depth is the Euclidean distance ‖[X, Y, Z]‖, not Z
depth = float(np.linalg.norm(point_3d))
recovered = fisheye_unproject_pixel(K, (u, v), depth=depth,
                                    dist_coeffs=dist_coeffs)

# Apply / remove Kannala-Brandt distortion on normalised coordinates
x_n, y_n = point_3d[0] / point_3d[2], point_3d[1] / point_3d[2]
distorted   = fisheye_distort_point([x_n, y_n], dist_coeffs)
undistorted = fisheye_undistort_point(distorted, dist_coeffs)

See docs/fisheye_camera_intrinsics_guide.md for a full guide including how to calibrate with OpenCV and how to handle omnidirectional cameras with FOV > 180°.


Transform

Composable 4×4 homogeneous transformation matrices.

from sensor_transposition.transform import Transform, sensor_to_sensor
import numpy as np

# Build from a quaternion [w, x, y, z] and translation [x, y, z]
T = Transform.from_quaternion([1.0, 0.0, 0.0, 0.0], translation=[1.84, 0.0, 1.91])

# Compose transforms
T_combined = T1 @ T2

# Apply to a single point or an (N, 3) cloud
pt_ego = T.apply_to_point([0.0, 0.0, 0.0])
cloud_ego = T.apply_to_points(lidar_cloud_xyz)

# Invert
T_ego_to_sensor = T.inverse()

# Compute sensor-to-sensor transform from two sensor-to-ego matrices
T_src_to_tgt = sensor_to_sensor(T_src_to_ego, T_tgt_to_ego)

LiDAR–Camera Fusion

Project a 3-D LiDAR point cloud onto a camera image plane, or colour the cloud from an image.

from sensor_transposition.lidar_camera import project_lidar_to_image, color_lidar_from_image

# pixel_coords: (N, 2) [u, v]; valid_mask: (N,) bool
pixel_coords, valid_mask = project_lidar_to_image(
    points=lidar_xyz,          # (N, 3) float array
    lidar_to_camera=T,         # 4×4 extrinsic matrix
    camera_matrix=K,           # 3×3 intrinsic matrix
    image_width=1632,
    image_height=1224,
)

# colours: (N, C); valid_mask: (N,) bool
colours, valid_mask = color_lidar_from_image(lidar_xyz, T, K, image)

Both methods are also exposed through SensorCollection:

pixel_coords, valid = col.project_lidar_to_image("front_lidar", "front_camera", xyz)
colours, valid     = col.color_lidar_from_image("front_lidar", "front_camera", xyz, image)

LiDAR Parsers

Velodyne (KITTI .bin)

from sensor_transposition.lidar.velodyne import VelodyneParser, load_velodyne_bin

parser = VelodyneParser("frame0000.bin")
cloud  = parser.read()          # structured array: x, y, z, intensity
xyz    = parser.xyz()           # (N, 3) float32
xyzI   = parser.xyz_intensity() # (N, 4) float32

# Or as a one-liner
cloud = load_velodyne_bin("frame0000.bin")

Supported models: HDL-32E, HDL-64E, VLP-16, VLP-32C (all share the same KITTI binary format).

Ouster (.bin)

from sensor_transposition.lidar.ouster import OusterParser, load_ouster_bin

parser = OusterParser("ouster_frame.bin")
cloud  = parser.read()   # 4-column (x,y,z,intensity) or 8-column variant auto-detected
xyz    = parser.xyz()

The 8-column extended format additionally provides t, reflectivity, ring, and ambient fields.

Livox (LVX / LVX2)

from sensor_transposition.lidar.livox import LivoxParser, load_livox_lvx

parser = LivoxParser("recording.lvx2")
cloud  = parser.read()   # structured array: x, y, z, intensity (coordinates in metres)
xyz    = parser.xyz()

Supported data types: Cartesian float32 (type 0), Spherical float32 (type 1), Cartesian int32 mm-precision (type 2, LVX2).


LiDAR Scan Matching

Point-to-point and point-to-plane ICP (Iterative Closest Point) scan matching for LiDAR frame-to-frame odometry and map-to-scan localisation.

Point-to-point ICP

from sensor_transposition.lidar.scan_matching import icp_align
import numpy as np

# Align source cloud to target cloud
result = icp_align(source_xyz, target_xyz, max_iterations=50)

if result.converged:
    print("Transform:\n", result.transform)   # 4×4 homogeneous matrix
    print("MSE:", result.mean_squared_error)  # mean squared point distance

# Apply the recovered transform to the source points
R = result.transform[:3, :3]
t = result.transform[:3, 3]
aligned = (R @ source_xyz.T).T + t

Parameters:

Parameter Default Description
max_iterations 50 Maximum ICP iterations
tolerance 1e-6 Convergence threshold on MSE change per iteration
max_correspondence_dist inf Reject source–target pairs farther apart than this (metres)
initial_transform None Optional 4×4 initial guess applied before the first iteration
callback None Optional callable (iteration, mse) for progress monitoring
# Monitor ICP progress with a callback:
result = icp_align(
    source_xyz, target_xyz,
    callback=lambda i, mse: print(f"iter {i}: mse={mse:.6f}"),
)

The returned IcpResult contains:

  • transform – 4×4 homogeneous matrix mapping source → target frame
  • convergedTrue if the tolerance condition was met
  • num_iterations – iterations actually performed
  • mean_squared_error – final mean squared point-to-point distance (inliers only)

Point-to-plane ICP

Point-to-plane ICP minimises the sum of squared distances from each source point to the tangent plane of its nearest target point. It converges in roughly half as many iterations as point-to-point ICP on structured indoor/outdoor scenes and is the standard variant used in production LiDAR SLAM systems (Cartographer, LOAM, LIO-SAM).

from sensor_transposition.lidar.scan_matching import (
    icp_align_point_to_plane,
    point_cloud_normals,
)

# Point-to-plane ICP (normals estimated automatically)
result = icp_align_point_to_plane(source_xyz, target_xyz, max_iterations=50)

if result.converged:
    R = result.transform[:3, :3]
    t = result.transform[:3, 3]
    aligned = (R @ source_xyz.T).T + t

# Pre-compute normals once for repeated use with the same target
target_normals = point_cloud_normals(target_xyz, k=20)
result = icp_align_point_to_plane(
    source_xyz, target_xyz,
    target_normals=target_normals,
)

Additional parameters for icp_align_point_to_plane:

Parameter Default Description
target_normals None Pre-computed (M, 3) unit normals for the target cloud. Auto-estimated if not supplied.
normals_k 20 Neighbours used for automatic normal estimation

Per-point surface normals

from sensor_transposition.lidar.scan_matching import point_cloud_normals

# Estimate surface normals via PCA on k-nearest neighbours
normals = point_cloud_normals(cloud, k=20)   # (N, 3) unit vectors

point_cloud_normals returns one unit-length surface normal per point, computed via PCA on the k-nearest neighbours of each point using a KD-tree. Normals are consistently oriented toward the cloud centroid.


KISS-ICP Odometry

KISS-ICP (Keep It Small and Simple ICP) is a minimal-parameter LiDAR odometry algorithm that uses an adaptive correspondence-distance threshold and a voxel-hashed local map to outperform vanilla ICP with essentially no tuning. The implementation here is pure NumPy/SciPy — no additional dependencies required.

from sensor_transposition.lidar.kiss_icp_odometry import KissIcpOdometry

# Create the odometry estimator
odometry = KissIcpOdometry(voxel_size=0.5)

# Register each LiDAR scan; returns the current 4×4 ego pose (world ← sensor)
poses = []
for scan in lidar_scans:
    pose = odometry.register_frame(scan)
    poses.append(pose)

Constructor parameters:

Parameter Default Description
voxel_size 0.5 Voxel edge length for input downsampling and local map (metres)
max_iterations 50 Maximum ICP iterations per frame
tolerance 1e-4 ICP convergence tolerance
initial_threshold 2.0 Initial adaptive-threshold σ̂ (metres)
min_threshold 0.1 Minimum correspondence threshold (metres)
max_threshold 10.0 Maximum correspondence threshold (metres)
local_map_max_voxels 100 000 Maximum voxels retained in the local map

Properties and methods:

odometry.pose             # Current 4×4 ego pose (copy)
odometry.local_map        # VoxelHashMap instance
odometry.adaptive_threshold   # AdaptiveThreshold instance
odometry.reset()          # Reset pose, local map, and threshold

Installing the upstream reference implementation (optional):

pip install ".[kiss_icp]"   # installs kiss-icp>=1.0

Users who want the full, C++-accelerated upstream KISS-ICP library can install the kiss_icp optional extra and use it directly alongside or instead of the pure-Python implementation provided here.


Ground Plane Segmentation

Separate ground points from obstacles in a LiDAR point cloud using one of three methods. All methods assume the cloud is in a frame where the ground normal points toward +Z (e.g. the FLU ego frame). See docs/ground_plane_identification.md for a detailed guide with worked examples.

from sensor_transposition.ground_plane import (
    height_threshold_segment,
    ransac_ground_plane,
    normal_based_segment,
)
import numpy as np

xyz = np.load("frame.npy")  # (N, 3) float array in the ego frame

# ── Method 1: Height threshold (fast, flat terrain) ──────────────────────
ground_mask, non_ground_mask = height_threshold_segment(xyz, threshold=0.3)
ground_pts = xyz[ground_mask]
obstacles  = xyz[non_ground_mask]

# ── Method 2: RANSAC plane fitting (robust, works on tilted ground) ───────
ground_mask, plane = ransac_ground_plane(
    xyz,
    distance_threshold=0.2,   # inlier tolerance in metres
    max_iterations=1000,
    normal_threshold=0.9,     # min cos(angle) between plane normal and +Z
)
a, b, c, d = plane  # ax + by + cz + d = 0  (unit normal oriented to +Z)

# ── Method 3: Normal-based (best for uneven / hilly terrain) ─────────────
ground_mask, normals = normal_based_segment(
    xyz,
    k=20,                     # k-nearest neighbours for PCA
    verticality_threshold=0.85,  # min dot product with +Z
)

Method comparison:

Method Speed Terrain Requires
height_threshold_segment ★★★ Flat Known sensor height
ransac_ground_plane ★★ Flat / mildly sloped Nothing
normal_based_segment Hilly / uneven SciPy

For best results on structured environments, combine a coarse height pre-filter with RANSAC refinement — see the docs guide for the pattern.


LiDAR Motion Distortion Correction

Correct per-point motion distortion in a spinning LiDAR scan using IMU data.

from sensor_transposition.lidar.motion_distortion import deskew_scan
from sensor_transposition.imu.imu import ImuParser

imu_data  = ImuParser("imu.bin").read()
imu_times = imu_data["timestamp"].astype(float)
accel     = imu_data[["ax", "ay", "az"]].view(float).reshape(-1, 3).astype(float)
gyro      = imu_data[["wx", "wy", "wz"]].view(float).reshape(-1, 3).astype(float)

# per_point_timestamps: (N,) float array in the same clock as the IMU
# ref_timestamp: the scan's reference time (start or end of sweep)
deskewed = deskew_scan(
    points=lidar_scan,
    per_point_timestamps=per_point_timestamps,
    imu_timestamps=imu_times,
    imu_accel=accel,
    imu_gyro=gyro,
    ref_timestamp=sweep_end_time,
)

Both per_point_timestamps and ref_timestamp must be in the same reference clock as the IMU (apply apply_time_offset from sync.py first if needed). ref_timestamp is typically the end of the sweep so that the corrected cloud represents the sensor pose at the time the last point was captured.


GPS / GNSS

Parse NMEA 0183 log files (GGA and RMC sentences).

from sensor_transposition.gps.nmea import NmeaParser, GgaFix, RmcFix, load_nmea

parser = NmeaParser("gps_log.nmea")

for record in parser.records():
    if isinstance(record, GgaFix):
        print(record.latitude, record.longitude, record.altitude)
    elif isinstance(record, RmcFix) and record.is_valid:
        print(record.speed_knots, record.course)

# Filtered helpers
gga_fixes = parser.gga_fixes()
rmc_fixes = parser.rmc_fixes()

# One-liner
records = load_nmea("gps_log.nmea")

GPS Coordinate-Frame Converter

Convert raw GPS latitude / longitude / altitude into local Cartesian frames needed for SLAM initialisation, global constraints, or sensor-fusion.

ECEF ↔ Geodetic

from sensor_transposition.gps.converter import geodetic_to_ecef, ecef_to_geodetic

# Geodetic (lat/lon/alt) → ECEF Cartesian
X, Y, Z = geodetic_to_ecef(lat_deg=51.5074, lon_deg=-0.1278, alt_m=11.0)

# ECEF → Geodetic
lat, lon, alt = ecef_to_geodetic(X, Y, Z)

ECEF / Geodetic ↔ ENU (local tangent plane)

from sensor_transposition.gps.converter import (
    ecef_to_enu,
    enu_to_ecef,
    geodetic_to_enu,
)

# Define a local ENU origin (e.g. the first GPS fix)
lat0, lon0, alt0 = 51.5074, -0.1278, 11.0

# Geodetic → ENU (convenience wrapper)
east, north, up = geodetic_to_enu(
    lat_deg=51.5075, lon_deg=-0.1279, alt_m=11.0,
    lat0_deg=lat0, lon0_deg=lon0, alt0_m=alt0,
)

# ECEF → ENU
east, north, up = ecef_to_enu(X, Y, Z, lat0, lon0, alt0)

# ENU → ECEF (inversion)
X2, Y2, Z2 = enu_to_ecef(east, north, up, lat0, lon0, alt0)

Geodetic ↔ UTM

from sensor_transposition.gps.converter import geodetic_to_utm, utm_to_geodetic

# Geodetic → UTM
easting, northing, zone_number, zone_letter = geodetic_to_utm(51.5074, -0.1278)
# → (699_330.6, 5_710_155.4, 30, 'U')

# UTM → Geodetic
lat, lon = utm_to_geodetic(easting, northing, zone_number, zone_letter)

GPS Fusion

Integrate GPS fixes into the local ENU map frame used by the SLAM pipeline.

from sensor_transposition.gps.fusion import GpsFuser, hdop_to_noise
from sensor_transposition.gps.nmea import NmeaParser
from sensor_transposition.imu.ekf import ImuEkf, EkfState
from sensor_transposition.frame_pose import FramePoseSequence

# Load GPS fixes.
fixes = NmeaParser("gps_log.nmea").gga_fixes()

# 1. Anchor the local ENU map frame to the first valid fix.
origin = fixes[0]
fuser = GpsFuser(
    ref_lat=origin.latitude,
    ref_lon=origin.longitude,
    ref_alt=origin.altitude,
)

# 2. Fuse all fixes into a FramePoseSequence (GPS-only trajectory).
seq = FramePoseSequence()
for i, fix in enumerate(fixes):
    fuser.fuse_into_sequence(seq, timestamp=float(i) * 0.1, fix=fix)

# 3. Fuse GPS into a running EKF state (after IMU prediction steps).
ekf   = ImuEkf()
state = EkfState()
for fix in fixes:
    noise = hdop_to_noise(fix.hdop)   # 3×3 ENU position covariance
    state = fuser.fuse_into_ekf(ekf, state, fix, noise)

hdop_to_noise(hdop) converts the HDOP field from a GGA sentence into a 3×3 diagonal ENU position noise covariance matrix, ready to pass to ImuEkf.position_update.

FramePoseSequence convenience accessors:

# Get trajectory as numpy arrays for analysis or export:
positions   = seq.positions    # (N, 3) float64 array of [x, y, z]
quaternions = seq.quaternions  # (N, 4) float64 array of [w, x, y, z]

# Export to / load from CSV (timestamp, x, y, z, qw, qx, qy, qz):
seq.to_csv("trajectory.csv")
seq2 = FramePoseSequence.from_csv("trajectory.csv")

RTK GPS & NTRIP

Real-Time Kinematic (RTK) GPS uses carrier-phase corrections from a reference station to achieve 1–5 cm horizontal accuracy — from the 2–5 m accuracy of standard GPS. See docs/rtk_gps_setup.md for a full setup guide covering:

  • RTK base-station hardware requirements
  • NTRIP caster connection (using str2str from RTKLIB, or a pure-Python socket client)
  • Setting the correct base_sigma_m in hdop_to_noise for RTK-fixed (≈ 2 cm) vs RTK-float (≈ 30 cm) solutions
from sensor_transposition.gps.fusion import hdop_to_noise

# Standard GPS:  σ_H ≈ 3 m × HDOP
noise_standard = hdop_to_noise(fix.hdop)

# RTK fixed solution: σ_H ≈ 2 cm × HDOP
noise_rtk = hdop_to_noise(fix.hdop, base_sigma_m=0.02, vertical_sigma_m=0.05)

RTCM 3.x Parser

Parse RTCM 3.x binary correction streams — e.g. from an NTRIP caster, UHF radio, or serial port:

from sensor_transposition.gps.rtcm import parse_rtcm_file, Rtcm1005, RtcmMsm

# Parse from a file
messages = parse_rtcm_file("corrections.rtcm3")

for msg in messages:
    if isinstance(msg, Rtcm1005):
        # MT1005: base-station antenna reference-point position in ECEF
        print(f"Base ARP: X={msg.x_m:.4f} m  Y={msg.y_m:.4f} m  Z={msg.z_m:.4f} m")
    elif isinstance(msg, RtcmMsm):
        # MSM4 / MSM7: pseudorange & carrier-phase observations
        print(f"MSM{msg.msm_type} {msg.constellation}  "
              f"epoch={msg.epoch_ms} ms  "
              f"sats=0x{msg.satellite_mask:016x}  "
              f"n_pr={len(msg.pseudoranges_m)}")

# Stream from a binary socket or serial port
import io
from sensor_transposition.gps.rtcm import RtcmParser

with open("stream.rtcm3", "rb") as fh:
    for msg in RtcmParser(fh).messages():
        ...

Supported message types:

Message Description
MT1005 Stationary reference station ECEF ARP position
MT1074 / 1077 GPS MSM4 / MSM7 observations
MT1084 / 1087 GLONASS MSM4 / MSM7 observations
MT1094 / 1097 Galileo MSM4 / MSM7 observations
MT1124 / 1127 BeiDou MSM4 / MSM7 observations

GNSS Outage Handling

Configure GpsFuser with a max_fix_age_sec threshold and an optional on_outage callback to detect GNSS outages and automatically skip EKF updates when fixes are stale:

from sensor_transposition.gps.fusion import GpsFuser, hdop_to_noise
from sensor_transposition.imu.ekf import ImuEkf, EkfState

def on_outage(age_sec: float) -> None:
    print(f"GNSS outage — last fix {age_sec:.1f} s ago, "
          f"switching to LiDAR/wheel odometry")

fuser = GpsFuser(
    ref_lat=51.5080, ref_lon=-0.1281,
    max_fix_age_sec=2.0,      # skip updates when fix is > 2 s old
    on_outage=on_outage,
)

ekf   = ImuEkf()
state = EkfState()
t = 0.0

for fix, accel, gyro, dt in sensor_stream:
    state = ekf.predict(state, accel, gyro, dt)
    t += dt
    noise = hdop_to_noise(fix.hdop)
    # Pass current_timestamp so the fuser can track fix age
    state = fuser.fuse_into_ekf(ekf, state, fix, noise, current_timestamp=t)

# Query fix age at any time:
age = fuser.fix_age(t)   # None if no fix has been fused yet

When age > max_fix_age_sec the EKF update is skipped and on_outage is called. When a fresh fix eventually arrives, fusion resumes automatically. See docs/gps_fusion.md for a detailed treatment.


IMU

Parse binary IMU data files.

from sensor_transposition.imu.imu import ImuParser, load_imu_bin

parser = ImuParser("imu_data.bin")
data   = parser.read()          # structured array: timestamp, ax, ay, az, wx, wy, wz [, qw, qx, qy, qz]

accel  = parser.linear_acceleration()  # (N, 3) float32 – m/s²
gyro   = parser.angular_velocity()     # (N, 3) float32 – rad/s
times  = parser.timestamps()           # (N,)   float64 – UNIX seconds

Two record formats are auto-detected by file size:

  • 32 bytes/record – timestamp (f64) + accelerometer (3×f32) + gyroscope (3×f32)
  • 48 bytes/record – above + orientation quaternion (4×f32)

IMU Error-State EKF

15-state Error-State Extended Kalman Filter (ES-EKF) for fusing IMU with GPS and pose observations.

from sensor_transposition.imu.ekf import ImuEkf, EkfState
import numpy as np

ekf   = ImuEkf(gyro_noise=1e-4, accel_noise=1e-3)
state = EkfState()   # starts at origin, zero velocity, identity orientation

# Predict with IMU sample
accel = np.array([0.0, 0.0, 9.81])  # m/s²
gyro  = np.array([0.0, 0.0, 0.01])  # rad/s
dt    = 0.01                          # seconds
state = ekf.predict(state, accel, gyro, dt)

# Update with a GPS position observation
z_pos   = np.array([1.0, 0.0, 0.0])  # ENU position (metres)
R_noise = np.eye(3) * 0.25           # 3×3 position noise covariance
state   = ekf.position_update(state, z_pos, R_noise)

The 15-D error state is [δp (3), δv (3), δθ (3), δba (3), δbg (3)]. See docs/state_estimation.md for a full derivation.


IMU Pre-integration

Accumulate raw IMU measurements between two keyframe timestamps into compact (ΔR, Δv, Δp) relative-motion increments.

from sensor_transposition.imu.preintegration import ImuPreintegrator
import numpy as np

integrator = ImuPreintegrator()

for accel, gyro, dt in imu_samples:
    integrator.integrate(accel, gyro, dt)

result = integrator.get_result()
print("ΔR:\n", result.delta_rotation)   # (3, 3) rotation matrix
print("Δv:", result.delta_velocity)      # (3,)   m/s
print("Δp:", result.delta_position)      # (3,)   metres

Radar

Parse binary radar detection files.

from sensor_transposition.radar.radar import RadarParser, load_radar_bin

parser     = RadarParser("radar_frame.bin")
detections = parser.read()   # structured array: range, azimuth, elevation, velocity, snr
xyz        = parser.xyz()    # (N, 3) Cartesian float32 converted from spherical coords

Each 20-byte record contains: range (m), azimuth (°), elevation (°), velocity (m/s, negative = approaching), snr (dB).


Radar Odometry

Estimate ego-velocity from Doppler measurements, or run scan-to-scan ICP radar odometry.

from sensor_transposition.radar.radar import RadarParser
from sensor_transposition.radar.radar_odometry import (
    estimate_ego_velocity,
    RadarOdometer,
)

# Doppler-based ego-velocity (single frame)
detections = RadarParser("frame.bin").read()
result = estimate_ego_velocity(detections, min_snr=10.0)
if result.valid:
    print("Ego velocity (m/s):", result.velocity)   # (3,) m/s

# Scan-to-scan odometry over a sequence
odom = RadarOdometer()
for path in sorted(frame_paths):
    xyz = RadarParser(path).xyz()
    step = odom.update(xyz)
    if step is not None:
        print("Relative transform:\n", step.transform)

Visual Odometry

Essential-matrix estimation, pose recovery, and Perspective-n-Point (PnP) solver for monocular or stereo visual odometry.

from sensor_transposition.visual_odometry import (
    estimate_essential_matrix,
    recover_pose_from_essential,
    solve_pnp,
)

# 1. Estimate E from matched pixel pairs (same camera)
result = estimate_essential_matrix(pts1, pts2, K)
E, mask = result.essential_matrix, result.inlier_mask

# 2. Recover relative camera pose
R, t = recover_pose_from_essential(E, pts1[mask], pts2[mask], K)

# 3. Estimate absolute pose from 3-D / 2-D correspondences
pnp = solve_pnp(points_3d, pixels_2d, K)
if pnp.success:
    print("R:\n", pnp.rotation)
    print("t:", pnp.translation)

Feature Detection & Matching

Pure NumPy/SciPy Harris corner detection, intensity-patch descriptors, and brute-force feature matching — enabling a complete visual odometry pipeline without OpenCV or any additional dependencies.

from sensor_transposition.feature_detection import (
    detect_harris_corners,
    compute_patch_descriptor,
    match_features,
)
from sensor_transposition.visual_odometry import (
    estimate_essential_matrix,
    recover_pose_from_essential,
)
import numpy as np

# Camera intrinsics
K = np.array([[718.856, 0, 607.193],
              [0, 718.856, 185.216],
              [0, 0, 1.0]])

# 1. Detect Harris corners in both frames
kp1 = detect_harris_corners(gray1, threshold=0.01, max_corners=500)
kp2 = detect_harris_corners(gray2, threshold=0.01, max_corners=500)

# 2. Compute normalised intensity-patch descriptors
desc1 = compute_patch_descriptor(gray1, kp1, patch_size=11)
desc2 = compute_patch_descriptor(gray2, kp2, patch_size=11)

# 3. Match with Lowe's ratio test
matches = match_features(desc1, desc2, ratio_threshold=0.75)
pts1 = kp1[matches[:, 0], ::-1].astype(float)   # (row, col) → (u, v)
pts2 = kp2[matches[:, 1], ::-1].astype(float)

# 4. Estimate essential matrix and recover relative pose
if len(pts1) >= 8:
    result = estimate_essential_matrix(pts1, pts2, K)
    E, mask = result.essential_matrix, result.inlier_mask
    R, t = recover_pose_from_essential(E, pts1[mask], pts2[mask], K)
    print("Rotation:\n", R)
    print("Translation (unit):", t)
Function Description
detect_harris_corners(image, ...) Harris corner detection with NMS; returns (M, 2) (row, col) array
compute_patch_descriptor(image, keypoints, ...) Normalised flat intensity-patch descriptors
match_features(desc1, desc2, ratio_threshold=0.75) Brute-force L2 + Lowe's ratio test; returns (K, 2) match index array

For production-quality ORB or SIFT features, install the optional opencv extra and use cv2 directly in your own code:

pip install ".[opencv]"

Stereo Camera

Stereo rectification, dense block-matching disparity, and stereo triangulation for metric 3-D depth recovery. Unlike monocular visual odometry (which recovers pose only up to a scale factor), stereo VO recovers the metric scale directly from the known baseline.

from sensor_transposition.stereo import (
    stereo_rectify,
    compute_disparity_sgbm,
    triangulate_stereo,
)
import numpy as np

# Camera calibration (from stereo calibration procedure)
K = np.array([[718.856, 0, 607.193],
              [0, 718.856, 185.216],
              [0, 0, 1.0]])
baseline = 0.537   # metres (KITTI stereo baseline)
R_stereo = np.eye(3)
t_stereo = np.array([-baseline, 0.0, 0.0])

# 1. Compute rectification transforms
R1, R2, P1, P2 = stereo_rectify(K, (), K, (), R_stereo, t_stereo,
                                  image_size=(375, 1242))

# 2. (Apply R1/R2 with your preferred image remapping, e.g. cv2.remap)

# 3. Compute dense disparity from the rectified stereo pair
disp = compute_disparity_sgbm(img_left_rect, img_right_rect,
                               block_size=11, num_disparities=64)

# 4a. Recover 3-D point cloud from the disparity map
pts3d = triangulate_stereo(K=K, baseline=baseline, disp=disp)
print(f"Reconstructed {len(pts3d)} 3-D points")

# 4b. Or use matched pixel pairs from sparse feature matching
pts3d_sparse = triangulate_stereo(pts_left, pts_right, K=K, baseline=baseline)
Function Description
stereo_rectify(K1, D1, K2, D2, R, t, image_size) Bouguet rectification → (R1, R2, P1, P2)
compute_disparity_sgbm(img_left, img_right, ...) Block-matching (SAD) disparity map in pure NumPy
triangulate_stereo(pts_left, pts_right, K=K, baseline=b) Pixel pairs → metric XYZ
triangulate_stereo(K=K, baseline=b, disp=disp) Disparity map → metric point cloud

Wheel Odometry

Dead-reckoning pose estimation for differential-drive and Ackermann vehicles.

from sensor_transposition.wheel_odometry import (
    DifferentialDriveOdometer,
    AckermannOdometer,
)

# Differential drive (encoder ticks)
odom = DifferentialDriveOdometer(wheel_base=0.54, wheel_radius=0.1)
result = odom.integrate(timestamps, left_ticks, right_ticks,
                        ticks_per_revolution=360)
print(result.x, result.y, result.theta)   # SE(2) pose

# Ackermann / bicycle model
odom = AckermannOdometer(wheel_base=2.7)
result = odom.integrate(timestamps, speeds, steering_angles)
print(result.x, result.y, result.theta)

Loop Closure

Place recognition using Scan Context and M2DP LiDAR descriptors, plus a HOG-like image descriptor for visual loop closure.

LiDAR loop closure (Scan Context / M2DP)

from sensor_transposition.loop_closure import (
    ScanContextDatabase,
    compute_m2dp,
)

db = ScanContextDatabase(num_rings=20, num_sectors=60, max_range=80.0)

for frame_id, cloud in enumerate(lidar_frames):
    # compute_descriptor() uses the database's own parameters — no duplication:
    desc = db.compute_descriptor(cloud)
    candidates = db.query(desc, top_k=1)
    db.add(desc, frame_id=frame_id)

    if candidates and candidates[0].distance < 0.15:
        loop_from = candidates[0].match_frame_id
        print(f"Loop closure: {frame_id}{loop_from}, "
              f"d={candidates[0].distance:.3f}")

# M2DP as a viewpoint-insensitive alternative
from sensor_transposition.loop_closure import m2dp_distance
desc_a = compute_m2dp(cloud_a)
desc_b = compute_m2dp(cloud_b)
print("M2DP distance:", m2dp_distance(desc_a, desc_b))

Visual loop closure (camera images)

For camera-heavy platforms or environments where the LiDAR loop-closure database is sparse, use the HOG-like image descriptor and ImageLoopClosureDatabase:

from sensor_transposition.loop_closure import (
    compute_image_descriptor,
    image_descriptor_distance,
    ImageLoopClosureDatabase,
)

# Standalone descriptor comparison
desc_a = compute_image_descriptor(gray_frame_a, grid=(4, 4), bins=8)
desc_b = compute_image_descriptor(gray_frame_b, grid=(4, 4), bins=8)
dist = image_descriptor_distance(desc_a, desc_b)
print(f"Visual similarity: {dist:.4f}  (0=identical, 1=maximally different)")

# Incremental database with exclusion window (mirrors ScanContextDatabase API)
db = ImageLoopClosureDatabase(grid=(4, 4), bins=8, exclusion_window=20)

for frame_id, gray_img in enumerate(camera_frames):
    desc = db.compute_descriptor(gray_img)
    candidates = db.query(desc, top_k=1)
    db.add(desc, frame_id=frame_id)

    if candidates and candidates[0].distance < 0.20:
        loop_from = candidates[0].match_frame_id
        print(f"Visual loop closure: {frame_id}{loop_from}, "
              f"d={candidates[0].distance:.3f}")

Pose Graph

Pose graph construction and Gauss-Newton optimisation for graph-SLAM.

from sensor_transposition.pose_graph import PoseGraph, optimize_pose_graph
import numpy as np

graph = PoseGraph()

# Add keyframe nodes
for i, (t, q) in enumerate(zip(translations, quaternions)):
    graph.add_node(i, translation=t, quaternion=q)

# Add odometry edges from ICP
for i in range(len(translations) - 1):
    result = icp_align(clouds[i], clouds[i + 1])
    graph.add_edge(from_id=i, to_id=i + 1,
                   transform=result.transform,
                   information=np.eye(6) * 100.0)

# Add a loop-closure edge
graph.add_edge(from_id=loop_from, to_id=loop_to,
               transform=loop_transform,
               information=np.eye(6) * 50.0)

opt = optimize_pose_graph(graph)
if opt.success:
    for node_id, pose in opt.optimized_poses.items():
        print(node_id, pose["translation"])

# Optional progress callback (iteration number + current cost):
opt = optimize_pose_graph(
    graph,
    callback=lambda i, c: print(f"iter {i}: cost={c:.4f}"),
)

IMU Factor edges

:class:ImuFactor edges encode tightly-coupled IMU constraints by wrapping a :class:~sensor_transposition.imu.preintegration.PreintegrationResult (ΔR, Δv, Δp) as a 6-DOF relative-pose constraint alongside ICP odometry edges. This improves robustness when LiDAR scans are sparse or degenerate.

from sensor_transposition.imu.preintegration import ImuPreintegrator
from sensor_transposition.pose_graph import PoseGraph, optimize_pose_graph
import numpy as np

graph = PoseGraph()
graph.add_node(0, translation=[0.0, 0.0, 0.0])
graph.add_node(1, translation=[0.5, 0.0, 0.0])

# ICP odometry edge.
graph.add_edge(from_id=0, to_id=1,
               transform=icp_result.transform,
               information=np.eye(6) * 100.0)

# IMU pre-integration factor edge.
integrator = ImuPreintegrator()
preint = integrator.integrate(timestamps, accel, gyro)
factor = graph.add_imu_factor(
    from_id=0,
    to_id=1,
    preint_result=preint,
    information=np.eye(6) * 10.0,   # lower confidence than ICP
)

# All ImuFactor objects are accessible via graph.imu_factors.
print(len(graph.imu_factors))   # 1

opt = optimize_pose_graph(graph)

Sliding-Window Smoother

Fixed-lag online SLAM smoother that keeps only the most recent window_size keyframes in the active optimisation window.

from sensor_transposition.sliding_window import SlidingWindowSmoother
import numpy as np

smoother = SlidingWindowSmoother(window_size=5)

for i, (trans, rel_tf) in enumerate(keyframe_stream):
    smoother.add_node(i, translation=trans)
    if i > 0:
        smoother.add_edge(i - 1, i, transform=rel_tf,
                          information=np.eye(6) * 200.0)
    result = smoother.optimize()
    if result.success:
        print(f"Node {i}: {result.optimized_poses[i]['translation']}")

# Progress callback is supported:
result = smoother.optimize(callback=lambda i, c: print(f"iter {i}: cost={c:.4f}"))

Submap Manager

Keyframe selection and submap division for large-scale or long-duration SLAM.

from sensor_transposition.submap_manager import KeyframeSelector, SubmapManager
import numpy as np

selector = KeyframeSelector(translation_threshold=1.0,
                            rotation_threshold_deg=10.0)
manager  = SubmapManager(max_keyframes_per_submap=20, overlap=2)

for frame_id, (pose, scan) in enumerate(zip(frame_poses, lidar_scans)):
    if selector.check_and_accept(pose.transform):
        manager.add_keyframe(frame_id, pose.transform, scan)

submaps = manager.get_all_submaps()
print(f"Created {len(submaps)} submaps.")

Occupancy Grid

2-D probabilistic occupancy grid built from LiDAR scans using log-odds ray-casting. Two implementations are provided:

  • OccupancyGrid – dense (height × width) NumPy array; constant memory allocation; best for small-to-medium environments.
  • SparseOccupancyGrid – identical API but stores only observed cells in a Python dict; no up-front memory allocation; best for large outdoor environments where most of the grid is never visited. For a 1 km × 1 km grid at 1 m resolution, a 1 % observation density yields a ~14× memory reduction.
from sensor_transposition.occupancy_grid import OccupancyGrid, SparseOccupancyGrid
import numpy as np

# Dense variant (small environments):
grid = OccupancyGrid(
    resolution=0.10,                    # 10 cm/cell
    width=200, height=200,              # 20 m × 20 m
    origin=np.array([-10.0, -10.0]),    # world coords of cell (0, 0)
)

# Sparse variant (large outdoor environments — drop-in replacement):
grid = SparseOccupancyGrid(
    resolution=0.10,
    width=10000, height=10000,          # 1 km × 1 km — no memory reserved
    origin=np.array([-500.0, -500.0]),
)

for frame_pose, lidar_scan in zip(trajectory, scans):
    sensor_origin = frame_pose.transform[:3, 3]
    grid.insert_scan(lidar_scan, frame_pose.transform, sensor_origin)

occupancy = grid.get_grid()       # (height, width) int8 — ROS convention
probs     = grid.to_probability() # (height, width) float64 in [0, 1]

# ROS nav_msgs/OccupancyGrid compatibility:
# to_ros_int8() is an explicit alias for get_grid() that maps cell states to
# the ROS nav_msgs/OccupancyGrid data convention:
#   -1  →  unknown (cell not yet observed)
#    0  →  free
#  100  →  occupied
# Flatten with .ravel() to assign to the message's `data` field:
ros_grid = grid.to_ros_int8()        # (height, width) int8
ros_msg_data = ros_grid.ravel()      # flat int8 array for nav_msgs/OccupancyGrid.data

Voxel Map (TSDF)

Truncated Signed-Distance Function (TSDF) volumetric map for dense 3-D reconstruction. Two implementations are provided:

  • TSDFVolume – dense (nx × ny × nz) NumPy arrays; constant memory allocation; best for bounded indoor scenes.
  • SparseTSDFVolume – identical API but stores only voxels that have received at least one observation in a Python dict; no up-front memory; best for large outdoor volumes where the vast majority of voxels are never near a surface. For a 200 × 200 × 50 voxel volume with 5 000 surface voxels updated, memory drops from ~32 MB to ~500 KB — a ~64× reduction.
import numpy as np
from sensor_transposition.voxel_map import TSDFVolume, SparseTSDFVolume

# Dense variant (bounded indoor scenes):
volume = TSDFVolume(
    voxel_size=0.10,
    origin=np.array([-10.0, -10.0, -0.5]),
    dims=(200, 200, 50),   # 200×200×50 voxels
)

# Sparse variant (large outdoor volumes — drop-in replacement):
volume = SparseTSDFVolume(
    voxel_size=0.10,
    origin=np.array([-500.0, -500.0, -2.0]),
    dims=(10000, 10000, 50),   # no memory reserved up front
)

for frame_pose, lidar_scan in zip(trajectory, scans):
    volume.integrate(lidar_scan, frame_pose.transform)

surface_pts  = volume.extract_surface_points(threshold=0.1)
tsdf_array   = volume.get_tsdf()     # (nx, ny, nz) float64; NaN = unseen
weight_array = volume.get_weights()  # (nx, ny, nz) float64

Point-Cloud Map

Accumulated coloured point-cloud map assembled from successive LiDAR scans, with voxel-grid downsampling, PCD / PLY file I/O, and dynamic-object filtering utilities. See docs/point_cloud_map.md for the full guide.

from sensor_transposition.point_cloud_map import PointCloudMap

pcd_map = PointCloudMap()

for frame_pose, lidar_scan in zip(trajectory, scans):
    pcd_map.add_scan(lidar_scan, frame_pose.transform)

# Downsample to 10-cm voxels
pcd_map.voxel_downsample(voxel_size=0.10)

world_points = pcd_map.get_points()   # (N, 3) float64
world_colors = pcd_map.get_colors()   # (N, 3) uint8, or None

# Save / load
pcd_map.save_pcd("map.pcd")
pcd_map.save_ply("map.ply")
loaded = PointCloudMap.from_pcd("map.pcd")

Dynamic Object Filtering

Moving objects leave "ghost" trails in an accumulated map. Two filters are provided to remove dynamic points before they enter the map:

from sensor_transposition.point_cloud_map import (
    filter_dynamic_points,
    consistency_filter,
)

# Remove points whose Doppler speed exceeds 0.5 m/s (requires co-registered radar).
static_mask = filter_dynamic_points(lidar_cloud, doppler_velocities, doppler_threshold=0.5)

# Remove points with no support within 0.5 m in a reference scan.
static_mask = consistency_filter(current_scan, reference_scan, threshold_m=0.5)

pcd_map.add_scan(lidar_cloud[static_mask], ego_to_world)

Visualisation

BEV rendering, trajectory overlay, LiDAR-on-image overlay, and export helpers for Open3D and RViz. All renderers return plain NumPy uint8 RGB arrays.

from sensor_transposition.visualisation import (
    render_birdseye_view,
    render_trajectory_birdseye,
    overlay_lidar_on_image,
    export_point_cloud_open3d,
    colour_by_height,
    color_by_height,      # American-spelling alias
    SensorFrameVisualiser,
)

# Bird's-eye view of the accumulated map
bev = render_birdseye_view(map_points, resolution=0.10)

# Colour a point cloud by height (both spellings accepted)
colours = colour_by_height(points[:, 2])   # (N, 3) uint8
colours = color_by_height(points[:, 2])    # identical result

# Overlay depth-coded LiDAR on a camera image
from sensor_transposition.lidar_camera import project_lidar_to_image
pixel_coords, valid = project_lidar_to_image(lidar_scan, T, K, W, H)
depth   = lidar_scan[:, 0]
overlay = overlay_lidar_on_image(camera_image, pixel_coords, valid, depth)

# Per-frame container — populate field-by-field:
vis = SensorFrameVisualiser()
vis.set_point_cloud(lidar_scan)
vis.set_camera_image(camera_image)
bev_frame = vis.render_birdseye(resolution=0.10)

# Or construct from a bag message payload in one step:
vis = SensorFrameVisualiser.from_dict({
    "point_cloud": lidar_scan,
    "camera_image": camera_image,
    "trajectory": trajectory_xy,
    "radar_scan": radar_points,
})
bev_frame = vis.render_birdseye(resolution=0.10)

# Export for Open3D
o3d_dict = export_point_cloud_open3d(map_points)

Bag Recorder / Player

Lightweight multi-topic binary bag format (.sbag) for recording and replaying multi-sensor data. No external dependencies — pure Python standard library.

from sensor_transposition.rosbag import BagWriter, BagReader
import numpy as np

# Record — numpy arrays are automatically converted (no .tolist() needed):
with BagWriter("session.sbag") as bag:
    bag.write("/lidar/points", timestamp, {"xyz": lidar_pts})  # numpy OK
    bag.write("/imu/data",     timestamp, {"accel": accel_arr, "gyro": gyro_arr})

# Replay
with BagReader("session.sbag") as bag:
    print("Topics:", bag.topics)
    for msg in bag.read_messages(topics=["/lidar/points"]):
        xyz = msg.data["xyz"]   # plain Python list after round-trip

The .sbag extension stands for sensor bag. Each message record stores a UTF-8 JSON payload, so files can be inspected with any text editor after stripping the binary header. See docs/rosbag.md for the full format specification.

Converting to MCAP (ROS 2 compatible)

Use sbag_to_rosbag to convert a .sbag file to MCAP format, which can be opened with ros2 bag, the MCAP CLI, or Foxglove Studio (requires pip install ".[mcap]"):

from sensor_transposition.rosbag import sbag_to_rosbag

sbag_to_rosbag("session.sbag", "session.mcap")
# Inspect with: ros2 bag info session.mcap

Camera–LiDAR Extrinsic Calibration

Target-based extrinsic calibration using plane correspondences observed from both the camera and the LiDAR.

from sensor_transposition.calibration import (
    fit_plane,
    ransac_plane,
    calibrate_lidar_camera,
)
# Also importable directly from sensor_transposition:
from sensor_transposition import fit_plane, ransac_plane, calibrate_lidar_camera
import numpy as np

# For each pose of a planar calibration target:
lidar_normals    = []
lidar_distances  = []
camera_normals   = []
camera_distances = []

# LiDAR side — fit a plane to the board region
normal, dist, inliers = ransac_plane(board_lidar_pts, distance_threshold=0.02)
lidar_normals.append(normal)
lidar_distances.append(dist)

# Camera side — use a PnP solver (e.g. cv2.solvePnP) to obtain the
# board normal and signed distance in the camera frame, then append.

# Solve for the 4×4 LiDAR → camera transform (need ≥ 3 observations)
T_lidar_to_cam = calibrate_lidar_camera(
    np.array(lidar_normals),   np.array(lidar_distances),
    np.array(camera_normals),  np.array(camera_distances),
)

See docs/camera_lidar_extrinsic_calibration.md for a complete worked example including the camera-side PnP procedure.


SLAM Session (Pipeline Orchestration)

:class:SLAMSession wires together the core SLAM modules — ICP scan matching, Scan Context loop closure, pose-graph optimisation, and point-cloud map accumulation — into a single object with sensible defaults.

from sensor_transposition.rosbag import BagReader
from sensor_transposition.slam_session import SLAMSession

# Run the pipeline
with BagReader("session.sbag") as bag:
    session = SLAMSession(
        icp_max_iterations=50,
        loop_closure_threshold=0.15,
    )
    session.run(bag, lidar_topic="/lidar/points")

# Optimise pose graph and rebuild map with corrected poses
session.optimize()

# Save outputs
session.point_cloud_map.voxel_downsample(voxel_size=0.10)
session.point_cloud_map.save_pcd("map.pcd")
session.trajectory.to_csv("trajectory.csv")

Scan-to-local-submap odometry

By default each incoming scan is matched against the immediately preceding scan. Pass use_local_map=True to match against a sliding-window accumulation of the last N keyframes (a :class:LocalMap) instead. This provides more ICP constraints simultaneously and significantly reduces frame-to-frame odometry drift before it enters the pose graph.

from sensor_transposition.slam_session import SLAMSession, LocalMap

with BagReader("session.sbag") as bag:
    session = SLAMSession()
    session.run(
        bag,
        use_local_map=True,
        local_map_size=10,           # keep last 10 keyframes
        local_map_voxel_size=0.2,    # downsample each keyframe to 20-cm voxels
    )

You can also use :class:LocalMap directly outside of SLAMSession:

local_map = LocalMap(window_size=10, voxel_size=0.2)

for scan in lidar_scans:
    local_map.add_keyframe(scan)
    submap = local_map.get_submap()   # (M, 3) float array
    result = icp_align(scan, submap)

Tightly-coupled IMU integration

Pass imu_topic to :meth:run to automatically accumulate IMU readings between consecutive LiDAR keyframes and add an :class:~sensor_transposition.pose_graph.ImuFactor edge to the pose graph for each inter-keyframe window. The factor encodes the pre-integrated (ΔR, Δv, Δp) increment as a 6-DOF relative-pose constraint alongside the ICP odometry edge, providing tighter coupling between IMU and LiDAR in the pose-graph back-end.

from sensor_transposition.slam_session import SLAMSession
from sensor_transposition.rosbag import BagReader

with BagReader("session.sbag") as bag:
    session = SLAMSession()
    session.run(
        bag,
        lidar_topic="/lidar/points",
        imu_topic="/imu/data",        # activates IMU factor integration
        imu_information=10.0,         # confidence scalar for ImuFactor edges
    )

session.optimize()

IMU messages are expected to carry "accel" and "gyro" keys in their payload dict. Use imu_accel_key and imu_gyro_key to customise the key names if your bag uses different field names. A factor is only added when at least two IMU samples arrive between consecutive LiDAR keyframes. The last sample of each window is carried forward to ensure continuity across keyframe boundaries.

Register callbacks for other sensor topics:

session = SLAMSession()

@session.on_topic("/imu/data")
def handle_imu(msg):
    print(f"IMU @ {msg.timestamp:.3f}s: accel={msg.data['accel']}")

with BagReader("session.sbag") as bag:
    session.run(bag)

All internal components are accessible as properties for advanced use: session.pose_graph, session.loop_db, session.trajectory, session.point_cloud_map.


Localization Against a Pre-Built Map

:class:SLAMSession supports a localization-only mode where a previously built map is loaded from a .pcd or .ply file and each incoming LiDAR scan is matched against it via ICP. In this mode the map is never modified; pose- graph construction and loop-closure detection are skipped. The resulting ego poses are accumulated in :attr:~SLAMSession.trajectory.

Loading a map and running localization

from sensor_transposition.rosbag import BagReader
from sensor_transposition.slam_session import SLAMSession

# Load a previously saved PCD map to activate localization-only mode.
session = SLAMSession(icp_max_iterations=30)
session.load_map("map.pcd")          # also works with .ply

with BagReader("live.sbag") as bag:
    session.run(bag, lidar_topic="/lidar/points")

# Trajectory contains ego poses in the pre-built map frame.
session.trajectory.to_csv("localization_trajectory.csv")

LocalizationSession convenience subclass

:class:LocalizationSession combines the load_map call with the constructor so you never forget to activate localization mode:

from sensor_transposition.rosbag import BagReader
from sensor_transposition.slam_session import LocalizationSession

with BagReader("live.sbag") as bag:
    session = LocalizationSession("map.pcd", icp_max_iterations=30)
    session.run(bag, lidar_topic="/lidar/points")

session.trajectory.to_csv("localization_trajectory.csv")

Workflow summary

  1. Build a map with a SLAMSession mapping run and save it:

    session.optimize()
    session.point_cloud_map.voxel_downsample(voxel_size=0.10)
    session.point_cloud_map.save_pcd("map.pcd")
    
  2. Localize against the saved map on subsequent runs using LocalizationSession (or SLAMSession.load_map).

The SLAMSession.optimize() method is a no-op in localization-only mode because no pose graph is built.


Multi-Session SLAM and Map Merging

SLAMSession supports two complementary workflows that go beyond a single mapping run:

  1. Saving and reloading a completed session so that its pose graph, trajectory, point-cloud map, and scan-context database can be inspected or extended in a future process.
  2. Merging two independently built sessions when a loop-closure between them is detected (e.g. from two survey robots or two separate survey runs of overlapping areas).

Saving and loading a session

After a mapping run, call SLAMSession.save(path) to write a .slam ZIP archive containing all session state. The archive holds the following entries:

Entry Contents
metadata.json Session construction parameters
graph.json Pose-graph nodes and edges
trajectory.json Ego-pose trajectory
scan_context_db.npz Scan Context descriptor matrices and frame IDs
scans.npz Raw sensor-frame scans (for map rebuilding after re-optimisation)
points.pcd Accumulated world-frame point-cloud map
from sensor_transposition.slam_session import SLAMSession

# Build the map.
session = SLAMSession()
# ... run session with a BagReader ...
session.optimize()

# Persist to disk.
session.save("run1.slam")

# Load back in a later process.
restored = SLAMSession.load("run1.slam")
restored.point_cloud_map.save_pcd("map.pcd")

Merging two sessions

merge_sessions(session_a, session_b, loop_edge) creates a new SLAMSession that contains the combined pose graph, trajectory, point-cloud map, and scan-context database from both input sessions.

Node ID remapping: Nodes from session_a keep their original IDs. Nodes from session_b are shifted by max(session_a node IDs) + 1 to avoid collisions. The to_id of loop_edge must use session_b's original node numbering — the offset is applied automatically.

import numpy as np
from sensor_transposition.slam_session import SLAMSession, merge_sessions
from sensor_transposition.pose_graph import PoseGraphEdge

session_a = SLAMSession.load("area_a.slam")
session_b = SLAMSession.load("area_b.slam")

# A loop closure was detected between node 42 in session_a and node 0 in
# session_b.  Provide the measured relative SE(3) transform.
loop_transform = np.eye(4)
loop_transform[:3, 3] = [2.0, 0.5, 0.0]   # approximate translation
loop_edge = PoseGraphEdge(
    from_id=42, to_id=0,
    transform=loop_transform,
    information=np.eye(6) * 50.0,
)

merged = merge_sessions(session_a, session_b, loop_edge)
merged.optimize()                          # joint optimisation over both maps
merged.point_cloud_map.save_pcd("merged.pcd")
merged.save("merged_session.slam")

Typical multi-session workflow

  1. Independent mapping runs — each robot or survey run produces its own .slam file via SLAMSession.save().
  2. Loop-closure detection — use ScanContextDatabase or ImageLoopClosureDatabase to find matching frames across sessions. Verify the candidate geometrically with icp_align to obtain the inter-session transform.
  3. Merge — call merge_sessions() with the verified loop edge.
  4. Joint optimisation — call merged.optimize() to jointly optimise all node poses across both sessions using the loop edge as a global constraint.
  5. Export — save the merged map with merged.point_cloud_map.save_pcd("merged.pcd") or persist the entire merged session with merged.save("merged_session.slam") for future runs.

Error Handling

sensor_transposition defines a small hierarchy of library-specific exceptions in sensor_transposition.exceptions. Every exception inherits from the base class SensorTranspositionError so a single except clause can catch any library error. Each subclass also inherits from the appropriate standard Python exception so existing except KeyError / RuntimeError / ValueError handlers continue to work without change.

Exception Inherits from Raised when
SensorTranspositionError Exception Base class for all library errors
SensorNotFoundError SensorTranspositionError, KeyError SensorCollection.get_sensor() — sensor name not found
BagError SensorTranspositionError, RuntimeError BagWriter.write() / BagReader.read_messages() — writer or reader is closed
CalibrationError SensorTranspositionError, ValueError Calibration operations that fail with invalid input
from sensor_transposition.exceptions import (
    SensorTranspositionError,
    SensorNotFoundError,
    BagError,
)

# Catch a specific library exception:
try:
    sensor = collection.get_sensor("unknown_lidar")
except SensorNotFoundError as exc:
    print(f"Sensor not found: {exc}")

# Or catch all library exceptions at once:
try:
    bag.write("/lidar/points", timestamp, payload)
except SensorTranspositionError as exc:
    print(f"sensor_transposition error: {exc}")

# Existing KeyError / RuntimeError handlers also work unchanged:
try:
    sensor = collection.get_sensor("front_camera")
except KeyError:
    pass  # SensorNotFoundError is a KeyError

All four classes are exported from the top-level package:

import sensor_transposition as st
st.SensorNotFoundError
st.BagError
st.CalibrationError
st.SensorTranspositionError

Configuration Example

See examples/sensor_collection.yaml for a full vehicle rig including a front camera, front/rear LiDARs, a front radar, a GNSS antenna, and an IMU.

sensors:
  front_lidar:
    type: lidar
    coordinate_system: FLU
    extrinsics:
      translation: [1.84, 0.00, 1.91]
      rotation:
        quaternion: [1.0, 0.0, 0.0, 0.0]
      time_offset_sec: 0.0              # LiDAR is the reference clock

  front_camera:
    type: camera
    coordinate_system: RDF
    extrinsics:
      translation: [1.80, 0.00, 1.45]
      rotation:
        quaternion: [0.5, -0.5, 0.5, -0.5]
      time_offset_sec: 0.033            # camera triggers ~33 ms after reference
    intrinsics:
      fx: 1266.417
      fy: 1266.417
      cx: 816.0
      cy: 612.0
      width: 1632
      height: 1224
      distortion_coefficients: [-0.05, 0.08, 0.0, 0.0, -0.03]

Quick-Start: End-to-End SLAM Pipeline

examples/slam_pipeline.py demonstrates a complete offline SLAM pipeline using synthetic (randomly generated) data, so it runs immediately without any real sensor hardware:

python examples/slam_pipeline.py

The script walks through each stage of the pipeline:

  1. Generate synthetic LiDAR scans and IMU samples.
  2. Record data to a .sbag file with BagWriter.
  3. Replay the bag with BagReader and extract LiDAR scans.
  4. ICP odometry – align consecutive scans with icp_align.
  5. Loop closure – detect revisits with ScanContextDatabase.
  6. Pose graph optimisation – correct accumulated drift with optimize_pose_graph.
  7. Map accumulation – build a PointCloudMap from optimised poses and save it as a .pcd file.

ROS Examples

Ready-to-use driver configurations for Velodyne and Ouster LiDARs are provided in the ros_examples/ directory.

ros_examples/
├── ros/
│   ├── velodyne.launch       # ROS 1 – VLP-16 / VLP-32C / HDL-32E / HDL-64E
│   └── ouster.launch         # ROS 1 – OS0 / OS1 / OS2
└── ros2/
    ├── velodyne_params.yaml  # ROS 2 – velodyne_driver + velodyne_pointcloud
    └── ouster_params.yaml    # ROS 2 – ouster_ros (ROS 2 branch)

ROS 1

# Velodyne VLP-16 (live sensor)
roslaunch ros_examples/ros/velodyne.launch device_ip:=192.168.1.201 model:=VLP16

# Ouster OS1
roslaunch ros_examples/ros/ouster.launch sensor_hostname:=os-<serial>.local

ROS 2

# Velodyne VLP-16
ros2 launch velodyne velodyne-all-nodes-VLP16-launch.py \
    params_file:=ros_examples/ros2/velodyne_params.yaml

# Ouster OS1
ros2 launch ouster_ros driver.launch.py \
    params_file:=ros_examples/ros2/ouster_params.yaml

Both ROS 2 YAML files use the ros__parameters namespace convention and can be passed directly via params_file:=.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dwilson_sensor_transposition-0.1.0.tar.gz (309.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dwilson_sensor_transposition-0.1.0-py3-none-any.whl (220.2 kB view details)

Uploaded Python 3

File details

Details for the file dwilson_sensor_transposition-0.1.0.tar.gz.

File metadata

File hashes

Hashes for dwilson_sensor_transposition-0.1.0.tar.gz
Algorithm Hash digest
SHA256 dc39f088e248cf657cea91259e58255b6c4b42dea8fb357d75532b6ae3a5dcbc
MD5 09c3ca33cdf280ec96dc6b2a9b9c2cc8
BLAKE2b-256 62a5a77964bf4baad985fc2d8aa66a2d7869adac61ceb258379fb34c1667e70c

See more details on using hashes here.

File details

Details for the file dwilson_sensor_transposition-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for dwilson_sensor_transposition-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 4073ed6bd63837a810c0524225c844f5de1009df555212b33ee0822622d326cc
MD5 2be915fae4e64fa66991320b62f08d0c
BLAKE2b-256 18af36bbf453b6ba3ae6d98a656ecf607587719d335b580c462f07f13c2b78f2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page