Suite of tools for sensor transposition and camera intrinsic/extrinsic calibration
Project description
sensor_transposition
A Python toolkit for multi-sensor calibration, data parsing, and coordinate-frame management in autonomous-vehicle and robotics pipelines.
Contents
- Features
- Installation
- Coordinate Systems
- Modules
- SensorCollection
- Sensor Synchroniser
- Camera Intrinsics
- Fisheye / Omnidirectional Camera
- Transform
- LiDAR–Camera Fusion
- LiDAR Parsers
- LiDAR Scan Matching
- KISS-ICP Odometry
- Ground Plane Segmentation
- LiDAR Motion Distortion Correction
- GPS / GNSS
- GPS Coordinate-Frame Converter
- GPS Fusion
- RTK GPS & NTRIP
- RTCM 3.x Parser
- GNSS Outage Handling
- IMU
- IMU Error-State EKF
- IMU Pre-integration
- Radar
- Radar Odometry
- Visual Odometry
- Feature Detection & Matching
- Stereo Camera
- Wheel Odometry
- Loop Closure
- Pose Graph
- Sliding-Window Smoother
- Submap Manager
- Occupancy Grid
- Voxel Map (TSDF)
- Point-Cloud Map
- Visualisation
- Bag Recorder / Player
- Camera–LiDAR Extrinsic Calibration
- SLAM Session (Pipeline Orchestration)
- Localization Against a Pre-Built Map
- Multi-Session SLAM and Map Merging
- Error Handling
- Configuration Example
- Quick-Start: End-to-End SLAM Pipeline
- ROS Examples
Features
- Sensor collection – YAML-driven multi-sensor configuration storing extrinsics (translation + quaternion), temporal extrinsic calibration (time offset per sensor), and per-sensor intrinsics/parameters for cameras, LiDARs, radars, GPS, and IMUs.
- Pinhole camera model – focal-length derivation from FOV or physical sensor geometry, camera matrix construction, point projection / unprojection, and Brown–Conrady lens distortion/undistortion.
- Fisheye / omnidirectional camera model – Kannala-Brandt equidistant projection supporting fields of view up to 360°, with focal-length derivation, point projection / unprojection, and Kannala-Brandt distortion/undistortion (compatible with OpenCV's
cv2.fisheyemodule). - Homogeneous transforms – composable 4×4
Transformobjects with helpers for building from quaternions or rotation matrices, inverting, and applying to point clouds. - LiDAR–camera fusion – project 3-D LiDAR point clouds onto a camera image plane and colour the cloud by sampling pixel values.
- LiDAR parsers – binary readers for Velodyne (KITTI
.bin), Ouster (4-column and 8-column.bin), and Livox (LVX / LVX2) file formats. - LiDAR scan matching – point-to-point ICP (Iterative Closest Point) with the Kabsch SVD algorithm and a KD-tree nearest-neighbour search; point-to-plane ICP with per-point surface normal estimation via PCA for faster convergence on structured scenes (walls, floors, roads); supports a maximum correspondence-distance filter, an optional initial transform, and a configurable convergence tolerance.
- KISS-ICP odometry – minimal-parameter LiDAR odometry with adaptive correspondence-distance threshold, voxel-hashed local map, and point-to-point ICP; pure NumPy/SciPy — no additional dependencies required. An optional
kiss_icpextra pulls in the upstream reference implementation. - Ground plane segmentation – three complementary methods for separating ground points from obstacles: height-threshold (fast, flat terrain), RANSAC plane fitting (robust to outliers and unknown sensor height), and normal-based filtering (handles uneven/hilly terrain via per-point PCA normals); all pure NumPy/SciPy.
- LiDAR motion distortion correction – IMU-based scan deskewing that corrects per-point timestamps across a spinning LiDAR sweep.
- GPS / GNSS – NMEA 0183 parser supporting GGA and RMC sentence types, plus a coordinate-frame converter for ECEF ↔ ENU and geodetic ↔ UTM conversions.
- GPS fusion –
GpsFuserconverts GPS fixes to local ENU and integrates them into anImuEkfstate or aFramePoseSequence;hdop_to_noiseconverts HDOP to a 3×3 position noise covariance; GNSS outage detection viamax_fix_age_secand anon_outagecallback automatically skips stale fixes and falls back to IMU/wheel-odometry dead-reckoning. - RTK GPS – RTCM 3.x correction-stream parser supporting MT1005 (base-station ECEF position) and MSM4/MSM7 pseudorange/carrier-phase messages for GPS, GLONASS, Galileo, and BeiDou; pure Python, no additional dependencies. See
docs/rtk_gps_setup.mdfor NTRIP client setup and centimetre-level sigma configuration. - IMU – binary parser for 32-byte (accel + gyro) and 48-byte (accel + gyro + quaternion) records.
- IMU Error-State EKF – 15-state error-state extended Kalman filter fusing IMU, GPS, and pose observations.
- IMU pre-integration – accumulates raw IMU measurements between keyframes into compact (ΔR, Δv, Δp) increments for tight IMU–LiDAR coupling.
- Radar – binary parser for 5-field (range, azimuth, elevation, velocity, SNR) detection records with spherical → Cartesian conversion.
- Radar odometry – Doppler-based ego-velocity estimation and scan-to-scan ICP radar odometry.
- Visual odometry – essential-matrix estimation (normalised 8-point + RANSAC), pose recovery, and Perspective-n-Point (PnP) solver.
- Feature detection & matching – Harris corner detector, normalised intensity-patch descriptors, and brute-force L2 matching with Lowe's ratio test; all pure NumPy/SciPy, enabling a complete visual odometry pipeline without OpenCV. An optional
opencvextra is available for users who need production-quality ORB/SIFT features. - Stereo camera – stereo rectification (Bouguet algorithm), dense block-matching disparity (pure NumPy SAD), and stereo triangulation for metric 3-D depth recovery; recovers absolute scale unavailable in monocular VO.
- Wheel odometry – differential-drive and Ackermann (bicycle-model) dead-reckoning with midpoint integration.
- Loop closure – Scan Context and M2DP LiDAR place-recognition descriptors with
ScanContextDatabasefor efficient loop-closure candidate retrieval; pluscompute_image_descriptor(HOG-like) andImageLoopClosureDatabasefor camera-based visual loop closure — all pure NumPy/SciPy. - Pose graph – pose graph data structure and Gauss-Newton optimisation back-end for graph-SLAM; supports standard 6-DOF odometry and loop-closure edges plus :class:
ImuFactoredges that wrap IMU pre-integration results for tightly-coupled IMU–LiDAR optimisation. - Sliding-window smoother – fixed-lag online SLAM smoother that bounds per-step cost to O(window_size³).
- Submap manager – keyframe selection and submap division for large-scale long-duration SLAM.
- Occupancy grid – 2-D probabilistic occupancy grid with log-odds ray-casting; exports to ROS
nav_msgs/OccupancyGridint8 format.SparseOccupancyGridstores only observed cells in a hash map, reducing memory by up to 14× for large outdoor environments. - Voxel map (TSDF) – Truncated Signed-Distance Function volumetric map for dense 3-D reconstruction.
SparseTSDFVolumestores only observed voxels, reducing memory by up to 64× for large outdoor volumes. - Point-cloud map – accumulated coloured point-cloud map with voxel-grid downsampling, PCD / PLY I/O, and dynamic-object filtering:
filter_dynamic_points(Doppler velocity threshold) andconsistency_filter(cross-scan support check) remove moving-object "ghost" trails from the map. - Visualisation – BEV rendering, trajectory overlay, LiDAR-on-image overlay, Open3D and RViz export helpers.
- Bag recorder / player – lightweight multi-topic binary bag format (
.sbag) with streaming write and indexed playback; no external dependencies.sbag_to_rosbag()converts to MCAP for use withros2 bag(requirespip install ".[mcap]"). - SLAM session –
SLAMSessionorchestration class that wires ICP odometry, Scan Context loop closure, pose-graph optimisation, and point-cloud map accumulation into a single object with sensible defaults and per-topic callbacks. Supports scan-to-local-submap odometry viause_local_map=Trueto reduce frame-to-frame drift. ALocalMaphelper class maintains a downsampled sliding window of the last N keyframe scans. Passimu_topicto automatically accumulate IMU measurements between keyframes, pre-integrate them, and add an :class:ImuFactoredge to the pose graph for tightly-coupled IMU–LiDAR optimisation. - Camera–LiDAR extrinsic calibration – target-based extrinsic calibration using plane correspondences (
fit_plane,ransac_plane,calibrate_lidar_camera). - ROS examples – ready-to-use launch / parameter files for Velodyne and Ouster LiDARs in both ROS 1 and ROS 2.
Installation
pip install .
Development dependencies (pytest):
pip install ".[dev]"
Requires Python ≥ 3.9 and numpy, pyyaml, and scipy.
Optional extras
Install optional extras to unlock additional integrations:
| Extra | Package installed | Enables |
|---|---|---|
open3d |
open3d>=0.17 |
export_point_cloud_open3d() and Open3D visualisation |
rerun |
rerun-sdk>=0.14 |
rerun.io visualisation logging |
mcap |
mcap>=1.0 |
sbag_to_rosbag() MCAP conversion |
kiss_icp |
kiss-icp>=1.0 |
Upstream reference KISS-ICP implementation |
opencv |
opencv-python>=4.0 |
Production-quality ORB/SIFT features via cv2 |
pip install ".[open3d]" # Open3D support
pip install ".[rerun]" # rerun.io support
pip install ".[mcap]" # MCAP / ROS 2 bag conversion
pip install ".[kiss_icp]" # Upstream KISS-ICP reference implementation
pip install ".[opencv]" # OpenCV for ORB/SIFT feature detection
Coordinate Systems
All sensors declare their native coordinate frame convention. The ego (vehicle) frame is FLU – x forward, y left, z up.
| Convention | Axes | Typical use |
|---|---|---|
| FLU | Forward, Left, Up | Ego frame; ROS; Velodyne / Ouster |
| RDF | Right, Down, Fwd | Camera optical frame |
| FRD | Forward, Right, Dwn | NED-like robotics frames |
| ENU | East, North, Up | GPS / local tangent plane |
| NED | North, East, Down | Aviation / navigation |
Modules
SensorCollection
Loads a YAML file describing a rig of sensors and provides transform queries between any two sensors.
from sensor_transposition import SensorCollection
col = SensorCollection.from_yaml("examples/sensor_collection.yaml")
# 4×4 transform: front_lidar → front_camera
T = col.transform_between("front_lidar", "front_camera")
# Modify and save
col.to_yaml("my_rig.yaml")
YAML sensor types: camera, lidar, radar, gps, imu.
Each entry specifies coordinate_system, extrinsics (translation + quaternion + optional time_offset_sec), and optional type-specific parameter blocks.
from_yaml() validation – when loading from YAML, SensorCollection.validate() is automatically called to catch common mistakes (e.g. a camera sensor missing its intrinsics block, or non-positive fx/fy/width/height values) before they cause cryptic errors downstream:
# Raises ValueError immediately with a clear message:
# "Camera sensor 'front_camera' is missing a required 'intrinsics' block."
col = SensorCollection.from_yaml("bad_config.yaml")
# You can also call validate() explicitly on a programmatically-built collection:
col.validate()
Temporal extrinsic calibration – each sensor can declare a time_offset_sec in its extrinsics block that captures the delay between the sensor's hardware clock and the reference/ego clock. Use SensorCollection.time_offset_between(source, target) to compute the time difference between any two sensors:
# How many seconds ahead is the camera clock relative to the LiDAR clock?
dt = col.time_offset_between("front_lidar", "front_camera") # e.g. +0.033
# Synchronise a LiDAR timestamp to the camera's timebase
t_camera_equiv = t_lidar + dt
Sensor Synchroniser
Resample multiple sensor streams onto a common reference timeline.
from sensor_transposition.sync import (
SensorSynchroniser,
SensorSynchronizer, # American-spelling alias
)
import numpy as np
sync = SensorSynchroniser()
sync.add_stream("lidar", lidar_times, lidar_data,
time_offset_sec=col.get_sensor("front_lidar").time_offset_sec)
sync.add_stream("imu", imu_times, imu_accel,
time_offset_sec=col.get_sensor("imu").time_offset_sec)
# Check that the streams actually overlap before resampling:
overlap = sync.temporal_overlap()
if overlap is None:
raise RuntimeError("Streams do not overlap — check time offsets.")
t_start, t_end = overlap
# Resample all streams at LiDAR reference times (synchronise / synchronize both work):
aligned = sync.synchronise(lidar_times)
imu_at_lidar_times = aligned["imu"]
| Method | Description |
|---|---|
add_stream(name, times, data) |
Register a named sensor stream |
temporal_overlap() |
Return (start, end) of the common overlap, or None |
stream_start_time(name) / stream_end_time(name) |
Per-stream time bounds |
synchronise(ref_times) / synchronize(ref_times) |
Resample all streams |
interpolate(name, query_times) |
Resample a single stream |
Camera Intrinsics
Pinhole model utilities.
from sensor_transposition.camera_intrinsics import (
focal_length_from_fov,
focal_length_from_sensor,
fov_from_focal_length,
camera_matrix,
project_point,
unproject_pixel,
distort_point,
undistort_point,
)
# Focal length from horizontal FOV
fx = focal_length_from_fov(image_size=1920, fov_deg=90.0)
# Focal length from physical sensor geometry
fx = focal_length_from_sensor(image_size_px=1920, sensor_size_mm=6.4, focal_length_mm=4.0)
# Build 3×3 K matrix
K = camera_matrix(fx=1266.4, fy=1266.4, cx=816.0, cy=612.0)
# Project a 3-D point to pixel coordinates
u, v = project_point(K, [1.0, 0.5, 5.0])
# Unproject a pixel at a known depth back to 3-D
xyz = unproject_pixel(K, (u, v), depth=5.0)
# Apply / remove Brown–Conrady distortion
coeffs = (-0.05, 0.08, 0.0, 0.0, -0.03) # (k1, k2, p1, p2, k3)
distorted = distort_point([x_n, y_n], coeffs)
undistorted = undistort_point(distorted, coeffs)
Fisheye / Omnidirectional Camera
Kannala-Brandt equidistant projection for wide-FOV and omnidirectional
cameras. Supports fields of view up to 360° and is compatible with
coefficients from OpenCV's cv2.fisheye.calibrate.
from sensor_transposition.camera_intrinsics import (
fisheye_focal_length_from_fov,
fisheye_project_point,
fisheye_unproject_pixel,
fisheye_distort_point,
fisheye_undistort_point,
camera_matrix,
)
import numpy as np
# Focal length for a 190° horizontal FOV on a 1920-pixel-wide sensor
fx = fisheye_focal_length_from_fov(image_size=1920, fov_deg=190.0)
fy = fisheye_focal_length_from_fov(image_size=1080, fov_deg=130.0)
K = camera_matrix(fx=fx, fy=fy, cx=960.0, cy=540.0)
# Distortion coefficients (k1, k2, k3, k4) from cv2.fisheye.calibrate
dist_coeffs = (0.05, -0.02, 0.003, -0.001)
# Project a 3-D point (including points with large off-axis angles)
point_3d = np.array([0.8, 0.4, 3.0])
u, v = fisheye_project_point(K, point_3d, dist_coeffs=dist_coeffs)
# Unproject: depth is the Euclidean distance ‖[X, Y, Z]‖, not Z
depth = float(np.linalg.norm(point_3d))
recovered = fisheye_unproject_pixel(K, (u, v), depth=depth,
dist_coeffs=dist_coeffs)
# Apply / remove Kannala-Brandt distortion on normalised coordinates
x_n, y_n = point_3d[0] / point_3d[2], point_3d[1] / point_3d[2]
distorted = fisheye_distort_point([x_n, y_n], dist_coeffs)
undistorted = fisheye_undistort_point(distorted, dist_coeffs)
See docs/fisheye_camera_intrinsics_guide.md
for a full guide including how to calibrate with OpenCV and how to handle
omnidirectional cameras with FOV > 180°.
Transform
Composable 4×4 homogeneous transformation matrices.
from sensor_transposition.transform import Transform, sensor_to_sensor
import numpy as np
# Build from a quaternion [w, x, y, z] and translation [x, y, z]
T = Transform.from_quaternion([1.0, 0.0, 0.0, 0.0], translation=[1.84, 0.0, 1.91])
# Compose transforms
T_combined = T1 @ T2
# Apply to a single point or an (N, 3) cloud
pt_ego = T.apply_to_point([0.0, 0.0, 0.0])
cloud_ego = T.apply_to_points(lidar_cloud_xyz)
# Invert
T_ego_to_sensor = T.inverse()
# Compute sensor-to-sensor transform from two sensor-to-ego matrices
T_src_to_tgt = sensor_to_sensor(T_src_to_ego, T_tgt_to_ego)
LiDAR–Camera Fusion
Project a 3-D LiDAR point cloud onto a camera image plane, or colour the cloud from an image.
from sensor_transposition.lidar_camera import project_lidar_to_image, color_lidar_from_image
# pixel_coords: (N, 2) [u, v]; valid_mask: (N,) bool
pixel_coords, valid_mask = project_lidar_to_image(
points=lidar_xyz, # (N, 3) float array
lidar_to_camera=T, # 4×4 extrinsic matrix
camera_matrix=K, # 3×3 intrinsic matrix
image_width=1632,
image_height=1224,
)
# colours: (N, C); valid_mask: (N,) bool
colours, valid_mask = color_lidar_from_image(lidar_xyz, T, K, image)
Both methods are also exposed through SensorCollection:
pixel_coords, valid = col.project_lidar_to_image("front_lidar", "front_camera", xyz)
colours, valid = col.color_lidar_from_image("front_lidar", "front_camera", xyz, image)
LiDAR Parsers
Velodyne (KITTI .bin)
from sensor_transposition.lidar.velodyne import VelodyneParser, load_velodyne_bin
parser = VelodyneParser("frame0000.bin")
cloud = parser.read() # structured array: x, y, z, intensity
xyz = parser.xyz() # (N, 3) float32
xyzI = parser.xyz_intensity() # (N, 4) float32
# Or as a one-liner
cloud = load_velodyne_bin("frame0000.bin")
Supported models: HDL-32E, HDL-64E, VLP-16, VLP-32C (all share the same KITTI binary format).
Ouster (.bin)
from sensor_transposition.lidar.ouster import OusterParser, load_ouster_bin
parser = OusterParser("ouster_frame.bin")
cloud = parser.read() # 4-column (x,y,z,intensity) or 8-column variant auto-detected
xyz = parser.xyz()
The 8-column extended format additionally provides t, reflectivity, ring, and ambient fields.
Livox (LVX / LVX2)
from sensor_transposition.lidar.livox import LivoxParser, load_livox_lvx
parser = LivoxParser("recording.lvx2")
cloud = parser.read() # structured array: x, y, z, intensity (coordinates in metres)
xyz = parser.xyz()
Supported data types: Cartesian float32 (type 0), Spherical float32 (type 1), Cartesian int32 mm-precision (type 2, LVX2).
LiDAR Scan Matching
Point-to-point and point-to-plane ICP (Iterative Closest Point) scan matching for LiDAR frame-to-frame odometry and map-to-scan localisation.
Point-to-point ICP
from sensor_transposition.lidar.scan_matching import icp_align
import numpy as np
# Align source cloud to target cloud
result = icp_align(source_xyz, target_xyz, max_iterations=50)
if result.converged:
print("Transform:\n", result.transform) # 4×4 homogeneous matrix
print("MSE:", result.mean_squared_error) # mean squared point distance
# Apply the recovered transform to the source points
R = result.transform[:3, :3]
t = result.transform[:3, 3]
aligned = (R @ source_xyz.T).T + t
Parameters:
| Parameter | Default | Description |
|---|---|---|
max_iterations |
50 |
Maximum ICP iterations |
tolerance |
1e-6 |
Convergence threshold on MSE change per iteration |
max_correspondence_dist |
inf |
Reject source–target pairs farther apart than this (metres) |
initial_transform |
None |
Optional 4×4 initial guess applied before the first iteration |
callback |
None |
Optional callable (iteration, mse) for progress monitoring |
# Monitor ICP progress with a callback:
result = icp_align(
source_xyz, target_xyz,
callback=lambda i, mse: print(f"iter {i}: mse={mse:.6f}"),
)
The returned IcpResult contains:
transform– 4×4 homogeneous matrix mapping source → target frameconverged–Trueif the tolerance condition was metnum_iterations– iterations actually performedmean_squared_error– final mean squared point-to-point distance (inliers only)
Point-to-plane ICP
Point-to-plane ICP minimises the sum of squared distances from each source point to the tangent plane of its nearest target point. It converges in roughly half as many iterations as point-to-point ICP on structured indoor/outdoor scenes and is the standard variant used in production LiDAR SLAM systems (Cartographer, LOAM, LIO-SAM).
from sensor_transposition.lidar.scan_matching import (
icp_align_point_to_plane,
point_cloud_normals,
)
# Point-to-plane ICP (normals estimated automatically)
result = icp_align_point_to_plane(source_xyz, target_xyz, max_iterations=50)
if result.converged:
R = result.transform[:3, :3]
t = result.transform[:3, 3]
aligned = (R @ source_xyz.T).T + t
# Pre-compute normals once for repeated use with the same target
target_normals = point_cloud_normals(target_xyz, k=20)
result = icp_align_point_to_plane(
source_xyz, target_xyz,
target_normals=target_normals,
)
Additional parameters for icp_align_point_to_plane:
| Parameter | Default | Description |
|---|---|---|
target_normals |
None |
Pre-computed (M, 3) unit normals for the target cloud. Auto-estimated if not supplied. |
normals_k |
20 |
Neighbours used for automatic normal estimation |
Per-point surface normals
from sensor_transposition.lidar.scan_matching import point_cloud_normals
# Estimate surface normals via PCA on k-nearest neighbours
normals = point_cloud_normals(cloud, k=20) # (N, 3) unit vectors
point_cloud_normals returns one unit-length surface normal per point,
computed via PCA on the k-nearest neighbours of each point using a KD-tree.
Normals are consistently oriented toward the cloud centroid.
KISS-ICP Odometry
KISS-ICP (Keep It Small and Simple ICP) is a minimal-parameter LiDAR odometry algorithm that uses an adaptive correspondence-distance threshold and a voxel-hashed local map to outperform vanilla ICP with essentially no tuning. The implementation here is pure NumPy/SciPy — no additional dependencies required.
from sensor_transposition.lidar.kiss_icp_odometry import KissIcpOdometry
# Create the odometry estimator
odometry = KissIcpOdometry(voxel_size=0.5)
# Register each LiDAR scan; returns the current 4×4 ego pose (world ← sensor)
poses = []
for scan in lidar_scans:
pose = odometry.register_frame(scan)
poses.append(pose)
Constructor parameters:
| Parameter | Default | Description |
|---|---|---|
voxel_size |
0.5 |
Voxel edge length for input downsampling and local map (metres) |
max_iterations |
50 |
Maximum ICP iterations per frame |
tolerance |
1e-4 |
ICP convergence tolerance |
initial_threshold |
2.0 |
Initial adaptive-threshold σ̂ (metres) |
min_threshold |
0.1 |
Minimum correspondence threshold (metres) |
max_threshold |
10.0 |
Maximum correspondence threshold (metres) |
local_map_max_voxels |
100 000 |
Maximum voxels retained in the local map |
Properties and methods:
odometry.pose # Current 4×4 ego pose (copy)
odometry.local_map # VoxelHashMap instance
odometry.adaptive_threshold # AdaptiveThreshold instance
odometry.reset() # Reset pose, local map, and threshold
Installing the upstream reference implementation (optional):
pip install ".[kiss_icp]" # installs kiss-icp>=1.0
Users who want the full, C++-accelerated upstream KISS-ICP library can
install the kiss_icp optional extra and use it directly alongside or
instead of the pure-Python implementation provided here.
Ground Plane Segmentation
Separate ground points from obstacles in a LiDAR point cloud using one of
three methods. All methods assume the cloud is in a frame where the ground
normal points toward +Z (e.g. the FLU ego frame). See
docs/ground_plane_identification.md
for a detailed guide with worked examples.
from sensor_transposition.ground_plane import (
height_threshold_segment,
ransac_ground_plane,
normal_based_segment,
)
import numpy as np
xyz = np.load("frame.npy") # (N, 3) float array in the ego frame
# ── Method 1: Height threshold (fast, flat terrain) ──────────────────────
ground_mask, non_ground_mask = height_threshold_segment(xyz, threshold=0.3)
ground_pts = xyz[ground_mask]
obstacles = xyz[non_ground_mask]
# ── Method 2: RANSAC plane fitting (robust, works on tilted ground) ───────
ground_mask, plane = ransac_ground_plane(
xyz,
distance_threshold=0.2, # inlier tolerance in metres
max_iterations=1000,
normal_threshold=0.9, # min cos(angle) between plane normal and +Z
)
a, b, c, d = plane # ax + by + cz + d = 0 (unit normal oriented to +Z)
# ── Method 3: Normal-based (best for uneven / hilly terrain) ─────────────
ground_mask, normals = normal_based_segment(
xyz,
k=20, # k-nearest neighbours for PCA
verticality_threshold=0.85, # min dot product with +Z
)
Method comparison:
| Method | Speed | Terrain | Requires |
|---|---|---|---|
height_threshold_segment |
★★★ | Flat | Known sensor height |
ransac_ground_plane |
★★ | Flat / mildly sloped | Nothing |
normal_based_segment |
★ | Hilly / uneven | SciPy |
For best results on structured environments, combine a coarse height pre-filter with RANSAC refinement — see the docs guide for the pattern.
LiDAR Motion Distortion Correction
Correct per-point motion distortion in a spinning LiDAR scan using IMU data.
from sensor_transposition.lidar.motion_distortion import deskew_scan
from sensor_transposition.imu.imu import ImuParser
imu_data = ImuParser("imu.bin").read()
imu_times = imu_data["timestamp"].astype(float)
accel = imu_data[["ax", "ay", "az"]].view(float).reshape(-1, 3).astype(float)
gyro = imu_data[["wx", "wy", "wz"]].view(float).reshape(-1, 3).astype(float)
# per_point_timestamps: (N,) float array in the same clock as the IMU
# ref_timestamp: the scan's reference time (start or end of sweep)
deskewed = deskew_scan(
points=lidar_scan,
per_point_timestamps=per_point_timestamps,
imu_timestamps=imu_times,
imu_accel=accel,
imu_gyro=gyro,
ref_timestamp=sweep_end_time,
)
Both per_point_timestamps and ref_timestamp must be in the same reference
clock as the IMU (apply apply_time_offset from sync.py first if needed).
ref_timestamp is typically the end of the sweep so that the corrected
cloud represents the sensor pose at the time the last point was captured.
GPS / GNSS
Parse NMEA 0183 log files (GGA and RMC sentences).
from sensor_transposition.gps.nmea import NmeaParser, GgaFix, RmcFix, load_nmea
parser = NmeaParser("gps_log.nmea")
for record in parser.records():
if isinstance(record, GgaFix):
print(record.latitude, record.longitude, record.altitude)
elif isinstance(record, RmcFix) and record.is_valid:
print(record.speed_knots, record.course)
# Filtered helpers
gga_fixes = parser.gga_fixes()
rmc_fixes = parser.rmc_fixes()
# One-liner
records = load_nmea("gps_log.nmea")
GPS Coordinate-Frame Converter
Convert raw GPS latitude / longitude / altitude into local Cartesian frames needed for SLAM initialisation, global constraints, or sensor-fusion.
ECEF ↔ Geodetic
from sensor_transposition.gps.converter import geodetic_to_ecef, ecef_to_geodetic
# Geodetic (lat/lon/alt) → ECEF Cartesian
X, Y, Z = geodetic_to_ecef(lat_deg=51.5074, lon_deg=-0.1278, alt_m=11.0)
# ECEF → Geodetic
lat, lon, alt = ecef_to_geodetic(X, Y, Z)
ECEF / Geodetic ↔ ENU (local tangent plane)
from sensor_transposition.gps.converter import (
ecef_to_enu,
enu_to_ecef,
geodetic_to_enu,
)
# Define a local ENU origin (e.g. the first GPS fix)
lat0, lon0, alt0 = 51.5074, -0.1278, 11.0
# Geodetic → ENU (convenience wrapper)
east, north, up = geodetic_to_enu(
lat_deg=51.5075, lon_deg=-0.1279, alt_m=11.0,
lat0_deg=lat0, lon0_deg=lon0, alt0_m=alt0,
)
# ECEF → ENU
east, north, up = ecef_to_enu(X, Y, Z, lat0, lon0, alt0)
# ENU → ECEF (inversion)
X2, Y2, Z2 = enu_to_ecef(east, north, up, lat0, lon0, alt0)
Geodetic ↔ UTM
from sensor_transposition.gps.converter import geodetic_to_utm, utm_to_geodetic
# Geodetic → UTM
easting, northing, zone_number, zone_letter = geodetic_to_utm(51.5074, -0.1278)
# → (699_330.6, 5_710_155.4, 30, 'U')
# UTM → Geodetic
lat, lon = utm_to_geodetic(easting, northing, zone_number, zone_letter)
GPS Fusion
Integrate GPS fixes into the local ENU map frame used by the SLAM pipeline.
from sensor_transposition.gps.fusion import GpsFuser, hdop_to_noise
from sensor_transposition.gps.nmea import NmeaParser
from sensor_transposition.imu.ekf import ImuEkf, EkfState
from sensor_transposition.frame_pose import FramePoseSequence
# Load GPS fixes.
fixes = NmeaParser("gps_log.nmea").gga_fixes()
# 1. Anchor the local ENU map frame to the first valid fix.
origin = fixes[0]
fuser = GpsFuser(
ref_lat=origin.latitude,
ref_lon=origin.longitude,
ref_alt=origin.altitude,
)
# 2. Fuse all fixes into a FramePoseSequence (GPS-only trajectory).
seq = FramePoseSequence()
for i, fix in enumerate(fixes):
fuser.fuse_into_sequence(seq, timestamp=float(i) * 0.1, fix=fix)
# 3. Fuse GPS into a running EKF state (after IMU prediction steps).
ekf = ImuEkf()
state = EkfState()
for fix in fixes:
noise = hdop_to_noise(fix.hdop) # 3×3 ENU position covariance
state = fuser.fuse_into_ekf(ekf, state, fix, noise)
hdop_to_noise(hdop) converts the HDOP field from a GGA sentence into a 3×3
diagonal ENU position noise covariance matrix, ready to pass to
ImuEkf.position_update.
FramePoseSequence convenience accessors:
# Get trajectory as numpy arrays for analysis or export:
positions = seq.positions # (N, 3) float64 array of [x, y, z]
quaternions = seq.quaternions # (N, 4) float64 array of [w, x, y, z]
# Export to / load from CSV (timestamp, x, y, z, qw, qx, qy, qz):
seq.to_csv("trajectory.csv")
seq2 = FramePoseSequence.from_csv("trajectory.csv")
RTK GPS & NTRIP
Real-Time Kinematic (RTK) GPS uses carrier-phase corrections from a reference
station to achieve 1–5 cm horizontal accuracy — from the 2–5 m
accuracy of standard GPS. See docs/rtk_gps_setup.md
for a full setup guide covering:
- RTK base-station hardware requirements
- NTRIP caster connection (using
str2strfrom RTKLIB, or a pure-Python socket client) - Setting the correct
base_sigma_minhdop_to_noisefor RTK-fixed (≈ 2 cm) vs RTK-float (≈ 30 cm) solutions
from sensor_transposition.gps.fusion import hdop_to_noise
# Standard GPS: σ_H ≈ 3 m × HDOP
noise_standard = hdop_to_noise(fix.hdop)
# RTK fixed solution: σ_H ≈ 2 cm × HDOP
noise_rtk = hdop_to_noise(fix.hdop, base_sigma_m=0.02, vertical_sigma_m=0.05)
RTCM 3.x Parser
Parse RTCM 3.x binary correction streams — e.g. from an NTRIP caster, UHF radio, or serial port:
from sensor_transposition.gps.rtcm import parse_rtcm_file, Rtcm1005, RtcmMsm
# Parse from a file
messages = parse_rtcm_file("corrections.rtcm3")
for msg in messages:
if isinstance(msg, Rtcm1005):
# MT1005: base-station antenna reference-point position in ECEF
print(f"Base ARP: X={msg.x_m:.4f} m Y={msg.y_m:.4f} m Z={msg.z_m:.4f} m")
elif isinstance(msg, RtcmMsm):
# MSM4 / MSM7: pseudorange & carrier-phase observations
print(f"MSM{msg.msm_type} {msg.constellation} "
f"epoch={msg.epoch_ms} ms "
f"sats=0x{msg.satellite_mask:016x} "
f"n_pr={len(msg.pseudoranges_m)}")
# Stream from a binary socket or serial port
import io
from sensor_transposition.gps.rtcm import RtcmParser
with open("stream.rtcm3", "rb") as fh:
for msg in RtcmParser(fh).messages():
...
Supported message types:
| Message | Description |
|---|---|
| MT1005 | Stationary reference station ECEF ARP position |
| MT1074 / 1077 | GPS MSM4 / MSM7 observations |
| MT1084 / 1087 | GLONASS MSM4 / MSM7 observations |
| MT1094 / 1097 | Galileo MSM4 / MSM7 observations |
| MT1124 / 1127 | BeiDou MSM4 / MSM7 observations |
GNSS Outage Handling
Configure GpsFuser with a max_fix_age_sec threshold and an optional
on_outage callback to detect GNSS outages and automatically skip EKF
updates when fixes are stale:
from sensor_transposition.gps.fusion import GpsFuser, hdop_to_noise
from sensor_transposition.imu.ekf import ImuEkf, EkfState
def on_outage(age_sec: float) -> None:
print(f"GNSS outage — last fix {age_sec:.1f} s ago, "
f"switching to LiDAR/wheel odometry")
fuser = GpsFuser(
ref_lat=51.5080, ref_lon=-0.1281,
max_fix_age_sec=2.0, # skip updates when fix is > 2 s old
on_outage=on_outage,
)
ekf = ImuEkf()
state = EkfState()
t = 0.0
for fix, accel, gyro, dt in sensor_stream:
state = ekf.predict(state, accel, gyro, dt)
t += dt
noise = hdop_to_noise(fix.hdop)
# Pass current_timestamp so the fuser can track fix age
state = fuser.fuse_into_ekf(ekf, state, fix, noise, current_timestamp=t)
# Query fix age at any time:
age = fuser.fix_age(t) # None if no fix has been fused yet
When age > max_fix_age_sec the EKF update is skipped and on_outage is
called. When a fresh fix eventually arrives, fusion resumes automatically.
See docs/gps_fusion.md for a detailed treatment.
IMU
Parse binary IMU data files.
from sensor_transposition.imu.imu import ImuParser, load_imu_bin
parser = ImuParser("imu_data.bin")
data = parser.read() # structured array: timestamp, ax, ay, az, wx, wy, wz [, qw, qx, qy, qz]
accel = parser.linear_acceleration() # (N, 3) float32 – m/s²
gyro = parser.angular_velocity() # (N, 3) float32 – rad/s
times = parser.timestamps() # (N,) float64 – UNIX seconds
Two record formats are auto-detected by file size:
- 32 bytes/record – timestamp (f64) + accelerometer (3×f32) + gyroscope (3×f32)
- 48 bytes/record – above + orientation quaternion (4×f32)
IMU Error-State EKF
15-state Error-State Extended Kalman Filter (ES-EKF) for fusing IMU with GPS and pose observations.
from sensor_transposition.imu.ekf import ImuEkf, EkfState
import numpy as np
ekf = ImuEkf(gyro_noise=1e-4, accel_noise=1e-3)
state = EkfState() # starts at origin, zero velocity, identity orientation
# Predict with IMU sample
accel = np.array([0.0, 0.0, 9.81]) # m/s²
gyro = np.array([0.0, 0.0, 0.01]) # rad/s
dt = 0.01 # seconds
state = ekf.predict(state, accel, gyro, dt)
# Update with a GPS position observation
z_pos = np.array([1.0, 0.0, 0.0]) # ENU position (metres)
R_noise = np.eye(3) * 0.25 # 3×3 position noise covariance
state = ekf.position_update(state, z_pos, R_noise)
The 15-D error state is [δp (3), δv (3), δθ (3), δba (3), δbg (3)].
See docs/state_estimation.md for a full derivation.
IMU Pre-integration
Accumulate raw IMU measurements between two keyframe timestamps into compact
(ΔR, Δv, Δp) relative-motion increments.
from sensor_transposition.imu.preintegration import ImuPreintegrator
import numpy as np
integrator = ImuPreintegrator()
for accel, gyro, dt in imu_samples:
integrator.integrate(accel, gyro, dt)
result = integrator.get_result()
print("ΔR:\n", result.delta_rotation) # (3, 3) rotation matrix
print("Δv:", result.delta_velocity) # (3,) m/s
print("Δp:", result.delta_position) # (3,) metres
Radar
Parse binary radar detection files.
from sensor_transposition.radar.radar import RadarParser, load_radar_bin
parser = RadarParser("radar_frame.bin")
detections = parser.read() # structured array: range, azimuth, elevation, velocity, snr
xyz = parser.xyz() # (N, 3) Cartesian float32 converted from spherical coords
Each 20-byte record contains: range (m), azimuth (°), elevation (°), velocity (m/s, negative = approaching), snr (dB).
Radar Odometry
Estimate ego-velocity from Doppler measurements, or run scan-to-scan ICP radar odometry.
from sensor_transposition.radar.radar import RadarParser
from sensor_transposition.radar.radar_odometry import (
estimate_ego_velocity,
RadarOdometer,
)
# Doppler-based ego-velocity (single frame)
detections = RadarParser("frame.bin").read()
result = estimate_ego_velocity(detections, min_snr=10.0)
if result.valid:
print("Ego velocity (m/s):", result.velocity) # (3,) m/s
# Scan-to-scan odometry over a sequence
odom = RadarOdometer()
for path in sorted(frame_paths):
xyz = RadarParser(path).xyz()
step = odom.update(xyz)
if step is not None:
print("Relative transform:\n", step.transform)
Visual Odometry
Essential-matrix estimation, pose recovery, and Perspective-n-Point (PnP) solver for monocular or stereo visual odometry.
from sensor_transposition.visual_odometry import (
estimate_essential_matrix,
recover_pose_from_essential,
solve_pnp,
)
# 1. Estimate E from matched pixel pairs (same camera)
result = estimate_essential_matrix(pts1, pts2, K)
E, mask = result.essential_matrix, result.inlier_mask
# 2. Recover relative camera pose
R, t = recover_pose_from_essential(E, pts1[mask], pts2[mask], K)
# 3. Estimate absolute pose from 3-D / 2-D correspondences
pnp = solve_pnp(points_3d, pixels_2d, K)
if pnp.success:
print("R:\n", pnp.rotation)
print("t:", pnp.translation)
Feature Detection & Matching
Pure NumPy/SciPy Harris corner detection, intensity-patch descriptors, and brute-force feature matching — enabling a complete visual odometry pipeline without OpenCV or any additional dependencies.
from sensor_transposition.feature_detection import (
detect_harris_corners,
compute_patch_descriptor,
match_features,
)
from sensor_transposition.visual_odometry import (
estimate_essential_matrix,
recover_pose_from_essential,
)
import numpy as np
# Camera intrinsics
K = np.array([[718.856, 0, 607.193],
[0, 718.856, 185.216],
[0, 0, 1.0]])
# 1. Detect Harris corners in both frames
kp1 = detect_harris_corners(gray1, threshold=0.01, max_corners=500)
kp2 = detect_harris_corners(gray2, threshold=0.01, max_corners=500)
# 2. Compute normalised intensity-patch descriptors
desc1 = compute_patch_descriptor(gray1, kp1, patch_size=11)
desc2 = compute_patch_descriptor(gray2, kp2, patch_size=11)
# 3. Match with Lowe's ratio test
matches = match_features(desc1, desc2, ratio_threshold=0.75)
pts1 = kp1[matches[:, 0], ::-1].astype(float) # (row, col) → (u, v)
pts2 = kp2[matches[:, 1], ::-1].astype(float)
# 4. Estimate essential matrix and recover relative pose
if len(pts1) >= 8:
result = estimate_essential_matrix(pts1, pts2, K)
E, mask = result.essential_matrix, result.inlier_mask
R, t = recover_pose_from_essential(E, pts1[mask], pts2[mask], K)
print("Rotation:\n", R)
print("Translation (unit):", t)
| Function | Description |
|---|---|
detect_harris_corners(image, ...) |
Harris corner detection with NMS; returns (M, 2) (row, col) array |
compute_patch_descriptor(image, keypoints, ...) |
Normalised flat intensity-patch descriptors |
match_features(desc1, desc2, ratio_threshold=0.75) |
Brute-force L2 + Lowe's ratio test; returns (K, 2) match index array |
For production-quality ORB or SIFT features, install the optional opencv
extra and use cv2 directly in your own code:
pip install ".[opencv]"
Stereo Camera
Stereo rectification, dense block-matching disparity, and stereo triangulation for metric 3-D depth recovery. Unlike monocular visual odometry (which recovers pose only up to a scale factor), stereo VO recovers the metric scale directly from the known baseline.
from sensor_transposition.stereo import (
stereo_rectify,
compute_disparity_sgbm,
triangulate_stereo,
)
import numpy as np
# Camera calibration (from stereo calibration procedure)
K = np.array([[718.856, 0, 607.193],
[0, 718.856, 185.216],
[0, 0, 1.0]])
baseline = 0.537 # metres (KITTI stereo baseline)
R_stereo = np.eye(3)
t_stereo = np.array([-baseline, 0.0, 0.0])
# 1. Compute rectification transforms
R1, R2, P1, P2 = stereo_rectify(K, (), K, (), R_stereo, t_stereo,
image_size=(375, 1242))
# 2. (Apply R1/R2 with your preferred image remapping, e.g. cv2.remap)
# 3. Compute dense disparity from the rectified stereo pair
disp = compute_disparity_sgbm(img_left_rect, img_right_rect,
block_size=11, num_disparities=64)
# 4a. Recover 3-D point cloud from the disparity map
pts3d = triangulate_stereo(K=K, baseline=baseline, disp=disp)
print(f"Reconstructed {len(pts3d)} 3-D points")
# 4b. Or use matched pixel pairs from sparse feature matching
pts3d_sparse = triangulate_stereo(pts_left, pts_right, K=K, baseline=baseline)
| Function | Description |
|---|---|
stereo_rectify(K1, D1, K2, D2, R, t, image_size) |
Bouguet rectification → (R1, R2, P1, P2) |
compute_disparity_sgbm(img_left, img_right, ...) |
Block-matching (SAD) disparity map in pure NumPy |
triangulate_stereo(pts_left, pts_right, K=K, baseline=b) |
Pixel pairs → metric XYZ |
triangulate_stereo(K=K, baseline=b, disp=disp) |
Disparity map → metric point cloud |
Wheel Odometry
Dead-reckoning pose estimation for differential-drive and Ackermann vehicles.
from sensor_transposition.wheel_odometry import (
DifferentialDriveOdometer,
AckermannOdometer,
)
# Differential drive (encoder ticks)
odom = DifferentialDriveOdometer(wheel_base=0.54, wheel_radius=0.1)
result = odom.integrate(timestamps, left_ticks, right_ticks,
ticks_per_revolution=360)
print(result.x, result.y, result.theta) # SE(2) pose
# Ackermann / bicycle model
odom = AckermannOdometer(wheel_base=2.7)
result = odom.integrate(timestamps, speeds, steering_angles)
print(result.x, result.y, result.theta)
Loop Closure
Place recognition using Scan Context and M2DP LiDAR descriptors, plus a HOG-like image descriptor for visual loop closure.
LiDAR loop closure (Scan Context / M2DP)
from sensor_transposition.loop_closure import (
ScanContextDatabase,
compute_m2dp,
)
db = ScanContextDatabase(num_rings=20, num_sectors=60, max_range=80.0)
for frame_id, cloud in enumerate(lidar_frames):
# compute_descriptor() uses the database's own parameters — no duplication:
desc = db.compute_descriptor(cloud)
candidates = db.query(desc, top_k=1)
db.add(desc, frame_id=frame_id)
if candidates and candidates[0].distance < 0.15:
loop_from = candidates[0].match_frame_id
print(f"Loop closure: {frame_id} ↔ {loop_from}, "
f"d={candidates[0].distance:.3f}")
# M2DP as a viewpoint-insensitive alternative
from sensor_transposition.loop_closure import m2dp_distance
desc_a = compute_m2dp(cloud_a)
desc_b = compute_m2dp(cloud_b)
print("M2DP distance:", m2dp_distance(desc_a, desc_b))
Visual loop closure (camera images)
For camera-heavy platforms or environments where the LiDAR loop-closure
database is sparse, use the HOG-like image descriptor and
ImageLoopClosureDatabase:
from sensor_transposition.loop_closure import (
compute_image_descriptor,
image_descriptor_distance,
ImageLoopClosureDatabase,
)
# Standalone descriptor comparison
desc_a = compute_image_descriptor(gray_frame_a, grid=(4, 4), bins=8)
desc_b = compute_image_descriptor(gray_frame_b, grid=(4, 4), bins=8)
dist = image_descriptor_distance(desc_a, desc_b)
print(f"Visual similarity: {dist:.4f} (0=identical, 1=maximally different)")
# Incremental database with exclusion window (mirrors ScanContextDatabase API)
db = ImageLoopClosureDatabase(grid=(4, 4), bins=8, exclusion_window=20)
for frame_id, gray_img in enumerate(camera_frames):
desc = db.compute_descriptor(gray_img)
candidates = db.query(desc, top_k=1)
db.add(desc, frame_id=frame_id)
if candidates and candidates[0].distance < 0.20:
loop_from = candidates[0].match_frame_id
print(f"Visual loop closure: {frame_id} ↔ {loop_from}, "
f"d={candidates[0].distance:.3f}")
Pose Graph
Pose graph construction and Gauss-Newton optimisation for graph-SLAM.
from sensor_transposition.pose_graph import PoseGraph, optimize_pose_graph
import numpy as np
graph = PoseGraph()
# Add keyframe nodes
for i, (t, q) in enumerate(zip(translations, quaternions)):
graph.add_node(i, translation=t, quaternion=q)
# Add odometry edges from ICP
for i in range(len(translations) - 1):
result = icp_align(clouds[i], clouds[i + 1])
graph.add_edge(from_id=i, to_id=i + 1,
transform=result.transform,
information=np.eye(6) * 100.0)
# Add a loop-closure edge
graph.add_edge(from_id=loop_from, to_id=loop_to,
transform=loop_transform,
information=np.eye(6) * 50.0)
opt = optimize_pose_graph(graph)
if opt.success:
for node_id, pose in opt.optimized_poses.items():
print(node_id, pose["translation"])
# Optional progress callback (iteration number + current cost):
opt = optimize_pose_graph(
graph,
callback=lambda i, c: print(f"iter {i}: cost={c:.4f}"),
)
IMU Factor edges
:class:ImuFactor edges encode tightly-coupled IMU constraints by wrapping
a :class:~sensor_transposition.imu.preintegration.PreintegrationResult
(ΔR, Δv, Δp) as a 6-DOF relative-pose constraint alongside ICP odometry edges.
This improves robustness when LiDAR scans are sparse or degenerate.
from sensor_transposition.imu.preintegration import ImuPreintegrator
from sensor_transposition.pose_graph import PoseGraph, optimize_pose_graph
import numpy as np
graph = PoseGraph()
graph.add_node(0, translation=[0.0, 0.0, 0.0])
graph.add_node(1, translation=[0.5, 0.0, 0.0])
# ICP odometry edge.
graph.add_edge(from_id=0, to_id=1,
transform=icp_result.transform,
information=np.eye(6) * 100.0)
# IMU pre-integration factor edge.
integrator = ImuPreintegrator()
preint = integrator.integrate(timestamps, accel, gyro)
factor = graph.add_imu_factor(
from_id=0,
to_id=1,
preint_result=preint,
information=np.eye(6) * 10.0, # lower confidence than ICP
)
# All ImuFactor objects are accessible via graph.imu_factors.
print(len(graph.imu_factors)) # 1
opt = optimize_pose_graph(graph)
Sliding-Window Smoother
Fixed-lag online SLAM smoother that keeps only the most recent window_size
keyframes in the active optimisation window.
from sensor_transposition.sliding_window import SlidingWindowSmoother
import numpy as np
smoother = SlidingWindowSmoother(window_size=5)
for i, (trans, rel_tf) in enumerate(keyframe_stream):
smoother.add_node(i, translation=trans)
if i > 0:
smoother.add_edge(i - 1, i, transform=rel_tf,
information=np.eye(6) * 200.0)
result = smoother.optimize()
if result.success:
print(f"Node {i}: {result.optimized_poses[i]['translation']}")
# Progress callback is supported:
result = smoother.optimize(callback=lambda i, c: print(f"iter {i}: cost={c:.4f}"))
Submap Manager
Keyframe selection and submap division for large-scale or long-duration SLAM.
from sensor_transposition.submap_manager import KeyframeSelector, SubmapManager
import numpy as np
selector = KeyframeSelector(translation_threshold=1.0,
rotation_threshold_deg=10.0)
manager = SubmapManager(max_keyframes_per_submap=20, overlap=2)
for frame_id, (pose, scan) in enumerate(zip(frame_poses, lidar_scans)):
if selector.check_and_accept(pose.transform):
manager.add_keyframe(frame_id, pose.transform, scan)
submaps = manager.get_all_submaps()
print(f"Created {len(submaps)} submaps.")
Occupancy Grid
2-D probabilistic occupancy grid built from LiDAR scans using log-odds ray-casting. Two implementations are provided:
OccupancyGrid– dense(height × width)NumPy array; constant memory allocation; best for small-to-medium environments.SparseOccupancyGrid– identical API but stores only observed cells in a Pythondict; no up-front memory allocation; best for large outdoor environments where most of the grid is never visited. For a 1 km × 1 km grid at 1 m resolution, a 1 % observation density yields a ~14× memory reduction.
from sensor_transposition.occupancy_grid import OccupancyGrid, SparseOccupancyGrid
import numpy as np
# Dense variant (small environments):
grid = OccupancyGrid(
resolution=0.10, # 10 cm/cell
width=200, height=200, # 20 m × 20 m
origin=np.array([-10.0, -10.0]), # world coords of cell (0, 0)
)
# Sparse variant (large outdoor environments — drop-in replacement):
grid = SparseOccupancyGrid(
resolution=0.10,
width=10000, height=10000, # 1 km × 1 km — no memory reserved
origin=np.array([-500.0, -500.0]),
)
for frame_pose, lidar_scan in zip(trajectory, scans):
sensor_origin = frame_pose.transform[:3, 3]
grid.insert_scan(lidar_scan, frame_pose.transform, sensor_origin)
occupancy = grid.get_grid() # (height, width) int8 — ROS convention
probs = grid.to_probability() # (height, width) float64 in [0, 1]
# ROS nav_msgs/OccupancyGrid compatibility:
# to_ros_int8() is an explicit alias for get_grid() that maps cell states to
# the ROS nav_msgs/OccupancyGrid data convention:
# -1 → unknown (cell not yet observed)
# 0 → free
# 100 → occupied
# Flatten with .ravel() to assign to the message's `data` field:
ros_grid = grid.to_ros_int8() # (height, width) int8
ros_msg_data = ros_grid.ravel() # flat int8 array for nav_msgs/OccupancyGrid.data
Voxel Map (TSDF)
Truncated Signed-Distance Function (TSDF) volumetric map for dense 3-D reconstruction. Two implementations are provided:
TSDFVolume– dense(nx × ny × nz)NumPy arrays; constant memory allocation; best for bounded indoor scenes.SparseTSDFVolume– identical API but stores only voxels that have received at least one observation in a Pythondict; no up-front memory; best for large outdoor volumes where the vast majority of voxels are never near a surface. For a200 × 200 × 50voxel volume with 5 000 surface voxels updated, memory drops from ~32 MB to ~500 KB — a ~64× reduction.
import numpy as np
from sensor_transposition.voxel_map import TSDFVolume, SparseTSDFVolume
# Dense variant (bounded indoor scenes):
volume = TSDFVolume(
voxel_size=0.10,
origin=np.array([-10.0, -10.0, -0.5]),
dims=(200, 200, 50), # 200×200×50 voxels
)
# Sparse variant (large outdoor volumes — drop-in replacement):
volume = SparseTSDFVolume(
voxel_size=0.10,
origin=np.array([-500.0, -500.0, -2.0]),
dims=(10000, 10000, 50), # no memory reserved up front
)
for frame_pose, lidar_scan in zip(trajectory, scans):
volume.integrate(lidar_scan, frame_pose.transform)
surface_pts = volume.extract_surface_points(threshold=0.1)
tsdf_array = volume.get_tsdf() # (nx, ny, nz) float64; NaN = unseen
weight_array = volume.get_weights() # (nx, ny, nz) float64
Point-Cloud Map
Accumulated coloured point-cloud map assembled from successive LiDAR scans,
with voxel-grid downsampling, PCD / PLY file I/O, and dynamic-object
filtering utilities. See docs/point_cloud_map.md
for the full guide.
from sensor_transposition.point_cloud_map import PointCloudMap
pcd_map = PointCloudMap()
for frame_pose, lidar_scan in zip(trajectory, scans):
pcd_map.add_scan(lidar_scan, frame_pose.transform)
# Downsample to 10-cm voxels
pcd_map.voxel_downsample(voxel_size=0.10)
world_points = pcd_map.get_points() # (N, 3) float64
world_colors = pcd_map.get_colors() # (N, 3) uint8, or None
# Save / load
pcd_map.save_pcd("map.pcd")
pcd_map.save_ply("map.ply")
loaded = PointCloudMap.from_pcd("map.pcd")
Dynamic Object Filtering
Moving objects leave "ghost" trails in an accumulated map. Two filters are provided to remove dynamic points before they enter the map:
from sensor_transposition.point_cloud_map import (
filter_dynamic_points,
consistency_filter,
)
# Remove points whose Doppler speed exceeds 0.5 m/s (requires co-registered radar).
static_mask = filter_dynamic_points(lidar_cloud, doppler_velocities, doppler_threshold=0.5)
# Remove points with no support within 0.5 m in a reference scan.
static_mask = consistency_filter(current_scan, reference_scan, threshold_m=0.5)
pcd_map.add_scan(lidar_cloud[static_mask], ego_to_world)
Visualisation
BEV rendering, trajectory overlay, LiDAR-on-image overlay, and export helpers
for Open3D and RViz. All renderers return plain NumPy uint8 RGB arrays.
from sensor_transposition.visualisation import (
render_birdseye_view,
render_trajectory_birdseye,
overlay_lidar_on_image,
export_point_cloud_open3d,
colour_by_height,
color_by_height, # American-spelling alias
SensorFrameVisualiser,
)
# Bird's-eye view of the accumulated map
bev = render_birdseye_view(map_points, resolution=0.10)
# Colour a point cloud by height (both spellings accepted)
colours = colour_by_height(points[:, 2]) # (N, 3) uint8
colours = color_by_height(points[:, 2]) # identical result
# Overlay depth-coded LiDAR on a camera image
from sensor_transposition.lidar_camera import project_lidar_to_image
pixel_coords, valid = project_lidar_to_image(lidar_scan, T, K, W, H)
depth = lidar_scan[:, 0]
overlay = overlay_lidar_on_image(camera_image, pixel_coords, valid, depth)
# Per-frame container — populate field-by-field:
vis = SensorFrameVisualiser()
vis.set_point_cloud(lidar_scan)
vis.set_camera_image(camera_image)
bev_frame = vis.render_birdseye(resolution=0.10)
# Or construct from a bag message payload in one step:
vis = SensorFrameVisualiser.from_dict({
"point_cloud": lidar_scan,
"camera_image": camera_image,
"trajectory": trajectory_xy,
"radar_scan": radar_points,
})
bev_frame = vis.render_birdseye(resolution=0.10)
# Export for Open3D
o3d_dict = export_point_cloud_open3d(map_points)
Bag Recorder / Player
Lightweight multi-topic binary bag format (.sbag) for recording and replaying
multi-sensor data. No external dependencies — pure Python standard library.
from sensor_transposition.rosbag import BagWriter, BagReader
import numpy as np
# Record — numpy arrays are automatically converted (no .tolist() needed):
with BagWriter("session.sbag") as bag:
bag.write("/lidar/points", timestamp, {"xyz": lidar_pts}) # numpy OK
bag.write("/imu/data", timestamp, {"accel": accel_arr, "gyro": gyro_arr})
# Replay
with BagReader("session.sbag") as bag:
print("Topics:", bag.topics)
for msg in bag.read_messages(topics=["/lidar/points"]):
xyz = msg.data["xyz"] # plain Python list after round-trip
The .sbag extension stands for sensor bag. Each message record stores a
UTF-8 JSON payload, so files can be inspected with any text editor after
stripping the binary header. See docs/rosbag.md for the full format
specification.
Converting to MCAP (ROS 2 compatible)
Use sbag_to_rosbag to convert a .sbag file to
MCAP format, which can be opened with ros2 bag,
the MCAP CLI, or Foxglove Studio (requires pip install ".[mcap]"):
from sensor_transposition.rosbag import sbag_to_rosbag
sbag_to_rosbag("session.sbag", "session.mcap")
# Inspect with: ros2 bag info session.mcap
Camera–LiDAR Extrinsic Calibration
Target-based extrinsic calibration using plane correspondences observed from both the camera and the LiDAR.
from sensor_transposition.calibration import (
fit_plane,
ransac_plane,
calibrate_lidar_camera,
)
# Also importable directly from sensor_transposition:
from sensor_transposition import fit_plane, ransac_plane, calibrate_lidar_camera
import numpy as np
# For each pose of a planar calibration target:
lidar_normals = []
lidar_distances = []
camera_normals = []
camera_distances = []
# LiDAR side — fit a plane to the board region
normal, dist, inliers = ransac_plane(board_lidar_pts, distance_threshold=0.02)
lidar_normals.append(normal)
lidar_distances.append(dist)
# Camera side — use a PnP solver (e.g. cv2.solvePnP) to obtain the
# board normal and signed distance in the camera frame, then append.
# Solve for the 4×4 LiDAR → camera transform (need ≥ 3 observations)
T_lidar_to_cam = calibrate_lidar_camera(
np.array(lidar_normals), np.array(lidar_distances),
np.array(camera_normals), np.array(camera_distances),
)
See docs/camera_lidar_extrinsic_calibration.md
for a complete worked example including the camera-side PnP procedure.
SLAM Session (Pipeline Orchestration)
:class:SLAMSession wires together the core SLAM modules — ICP scan matching,
Scan Context loop closure, pose-graph optimisation, and point-cloud map
accumulation — into a single object with sensible defaults.
from sensor_transposition.rosbag import BagReader
from sensor_transposition.slam_session import SLAMSession
# Run the pipeline
with BagReader("session.sbag") as bag:
session = SLAMSession(
icp_max_iterations=50,
loop_closure_threshold=0.15,
)
session.run(bag, lidar_topic="/lidar/points")
# Optimise pose graph and rebuild map with corrected poses
session.optimize()
# Save outputs
session.point_cloud_map.voxel_downsample(voxel_size=0.10)
session.point_cloud_map.save_pcd("map.pcd")
session.trajectory.to_csv("trajectory.csv")
Scan-to-local-submap odometry
By default each incoming scan is matched against the immediately preceding
scan. Pass use_local_map=True to match against a sliding-window
accumulation of the last N keyframes (a :class:LocalMap) instead. This
provides more ICP constraints simultaneously and significantly reduces
frame-to-frame odometry drift before it enters the pose graph.
from sensor_transposition.slam_session import SLAMSession, LocalMap
with BagReader("session.sbag") as bag:
session = SLAMSession()
session.run(
bag,
use_local_map=True,
local_map_size=10, # keep last 10 keyframes
local_map_voxel_size=0.2, # downsample each keyframe to 20-cm voxels
)
You can also use :class:LocalMap directly outside of SLAMSession:
local_map = LocalMap(window_size=10, voxel_size=0.2)
for scan in lidar_scans:
local_map.add_keyframe(scan)
submap = local_map.get_submap() # (M, 3) float array
result = icp_align(scan, submap)
Tightly-coupled IMU integration
Pass imu_topic to :meth:run to automatically accumulate IMU readings
between consecutive LiDAR keyframes and add an
:class:~sensor_transposition.pose_graph.ImuFactor edge to the pose graph for
each inter-keyframe window. The factor encodes the pre-integrated (ΔR, Δv, Δp)
increment as a 6-DOF relative-pose constraint alongside the ICP odometry edge,
providing tighter coupling between IMU and LiDAR in the pose-graph back-end.
from sensor_transposition.slam_session import SLAMSession
from sensor_transposition.rosbag import BagReader
with BagReader("session.sbag") as bag:
session = SLAMSession()
session.run(
bag,
lidar_topic="/lidar/points",
imu_topic="/imu/data", # activates IMU factor integration
imu_information=10.0, # confidence scalar for ImuFactor edges
)
session.optimize()
IMU messages are expected to carry "accel" and "gyro" keys in their
payload dict. Use imu_accel_key and imu_gyro_key to customise the
key names if your bag uses different field names. A factor is only added
when at least two IMU samples arrive between consecutive LiDAR keyframes.
The last sample of each window is carried forward to ensure continuity across
keyframe boundaries.
Register callbacks for other sensor topics:
session = SLAMSession()
@session.on_topic("/imu/data")
def handle_imu(msg):
print(f"IMU @ {msg.timestamp:.3f}s: accel={msg.data['accel']}")
with BagReader("session.sbag") as bag:
session.run(bag)
All internal components are accessible as properties for advanced use:
session.pose_graph, session.loop_db, session.trajectory,
session.point_cloud_map.
Localization Against a Pre-Built Map
:class:SLAMSession supports a localization-only mode where a previously
built map is loaded from a .pcd or .ply file and each incoming LiDAR scan
is matched against it via ICP. In this mode the map is never modified; pose-
graph construction and loop-closure detection are skipped. The resulting ego
poses are accumulated in :attr:~SLAMSession.trajectory.
Loading a map and running localization
from sensor_transposition.rosbag import BagReader
from sensor_transposition.slam_session import SLAMSession
# Load a previously saved PCD map to activate localization-only mode.
session = SLAMSession(icp_max_iterations=30)
session.load_map("map.pcd") # also works with .ply
with BagReader("live.sbag") as bag:
session.run(bag, lidar_topic="/lidar/points")
# Trajectory contains ego poses in the pre-built map frame.
session.trajectory.to_csv("localization_trajectory.csv")
LocalizationSession convenience subclass
:class:LocalizationSession combines the load_map call with the
constructor so you never forget to activate localization mode:
from sensor_transposition.rosbag import BagReader
from sensor_transposition.slam_session import LocalizationSession
with BagReader("live.sbag") as bag:
session = LocalizationSession("map.pcd", icp_max_iterations=30)
session.run(bag, lidar_topic="/lidar/points")
session.trajectory.to_csv("localization_trajectory.csv")
Workflow summary
-
Build a map with a
SLAMSessionmapping run and save it:session.optimize() session.point_cloud_map.voxel_downsample(voxel_size=0.10) session.point_cloud_map.save_pcd("map.pcd")
-
Localize against the saved map on subsequent runs using
LocalizationSession(orSLAMSession.load_map).
The SLAMSession.optimize() method is a no-op in localization-only mode
because no pose graph is built.
Multi-Session SLAM and Map Merging
SLAMSession supports two complementary workflows that go beyond a single
mapping run:
- Saving and reloading a completed session so that its pose graph, trajectory, point-cloud map, and scan-context database can be inspected or extended in a future process.
- Merging two independently built sessions when a loop-closure between them is detected (e.g. from two survey robots or two separate survey runs of overlapping areas).
Saving and loading a session
After a mapping run, call SLAMSession.save(path) to write a .slam ZIP
archive containing all session state. The archive holds the following entries:
| Entry | Contents |
|---|---|
metadata.json |
Session construction parameters |
graph.json |
Pose-graph nodes and edges |
trajectory.json |
Ego-pose trajectory |
scan_context_db.npz |
Scan Context descriptor matrices and frame IDs |
scans.npz |
Raw sensor-frame scans (for map rebuilding after re-optimisation) |
points.pcd |
Accumulated world-frame point-cloud map |
from sensor_transposition.slam_session import SLAMSession
# Build the map.
session = SLAMSession()
# ... run session with a BagReader ...
session.optimize()
# Persist to disk.
session.save("run1.slam")
# Load back in a later process.
restored = SLAMSession.load("run1.slam")
restored.point_cloud_map.save_pcd("map.pcd")
Merging two sessions
merge_sessions(session_a, session_b, loop_edge) creates a new
SLAMSession that contains the combined pose graph, trajectory, point-cloud
map, and scan-context database from both input sessions.
Node ID remapping: Nodes from session_a keep their original IDs. Nodes
from session_b are shifted by max(session_a node IDs) + 1 to avoid
collisions. The to_id of loop_edge must use session_b's original
node numbering — the offset is applied automatically.
import numpy as np
from sensor_transposition.slam_session import SLAMSession, merge_sessions
from sensor_transposition.pose_graph import PoseGraphEdge
session_a = SLAMSession.load("area_a.slam")
session_b = SLAMSession.load("area_b.slam")
# A loop closure was detected between node 42 in session_a and node 0 in
# session_b. Provide the measured relative SE(3) transform.
loop_transform = np.eye(4)
loop_transform[:3, 3] = [2.0, 0.5, 0.0] # approximate translation
loop_edge = PoseGraphEdge(
from_id=42, to_id=0,
transform=loop_transform,
information=np.eye(6) * 50.0,
)
merged = merge_sessions(session_a, session_b, loop_edge)
merged.optimize() # joint optimisation over both maps
merged.point_cloud_map.save_pcd("merged.pcd")
merged.save("merged_session.slam")
Typical multi-session workflow
- Independent mapping runs — each robot or survey run produces its own
.slamfile viaSLAMSession.save(). - Loop-closure detection — use
ScanContextDatabaseorImageLoopClosureDatabaseto find matching frames across sessions. Verify the candidate geometrically withicp_alignto obtain the inter-session transform. - Merge — call
merge_sessions()with the verified loop edge. - Joint optimisation — call
merged.optimize()to jointly optimise all node poses across both sessions using the loop edge as a global constraint. - Export — save the merged map with
merged.point_cloud_map.save_pcd("merged.pcd")or persist the entire merged session withmerged.save("merged_session.slam")for future runs.
Error Handling
sensor_transposition defines a small hierarchy of library-specific exceptions
in sensor_transposition.exceptions. Every exception inherits from the base
class SensorTranspositionError so a single except clause can catch any
library error. Each subclass also inherits from the appropriate standard
Python exception so existing except KeyError / RuntimeError / ValueError
handlers continue to work without change.
| Exception | Inherits from | Raised when |
|---|---|---|
SensorTranspositionError |
Exception |
Base class for all library errors |
SensorNotFoundError |
SensorTranspositionError, KeyError |
SensorCollection.get_sensor() — sensor name not found |
BagError |
SensorTranspositionError, RuntimeError |
BagWriter.write() / BagReader.read_messages() — writer or reader is closed |
CalibrationError |
SensorTranspositionError, ValueError |
Calibration operations that fail with invalid input |
from sensor_transposition.exceptions import (
SensorTranspositionError,
SensorNotFoundError,
BagError,
)
# Catch a specific library exception:
try:
sensor = collection.get_sensor("unknown_lidar")
except SensorNotFoundError as exc:
print(f"Sensor not found: {exc}")
# Or catch all library exceptions at once:
try:
bag.write("/lidar/points", timestamp, payload)
except SensorTranspositionError as exc:
print(f"sensor_transposition error: {exc}")
# Existing KeyError / RuntimeError handlers also work unchanged:
try:
sensor = collection.get_sensor("front_camera")
except KeyError:
pass # SensorNotFoundError is a KeyError
All four classes are exported from the top-level package:
import sensor_transposition as st
st.SensorNotFoundError
st.BagError
st.CalibrationError
st.SensorTranspositionError
Configuration Example
See examples/sensor_collection.yaml for a full vehicle rig including a front camera, front/rear LiDARs, a front radar, a GNSS antenna, and an IMU.
sensors:
front_lidar:
type: lidar
coordinate_system: FLU
extrinsics:
translation: [1.84, 0.00, 1.91]
rotation:
quaternion: [1.0, 0.0, 0.0, 0.0]
time_offset_sec: 0.0 # LiDAR is the reference clock
front_camera:
type: camera
coordinate_system: RDF
extrinsics:
translation: [1.80, 0.00, 1.45]
rotation:
quaternion: [0.5, -0.5, 0.5, -0.5]
time_offset_sec: 0.033 # camera triggers ~33 ms after reference
intrinsics:
fx: 1266.417
fy: 1266.417
cx: 816.0
cy: 612.0
width: 1632
height: 1224
distortion_coefficients: [-0.05, 0.08, 0.0, 0.0, -0.03]
Quick-Start: End-to-End SLAM Pipeline
examples/slam_pipeline.py demonstrates a
complete offline SLAM pipeline using synthetic (randomly generated) data,
so it runs immediately without any real sensor hardware:
python examples/slam_pipeline.py
The script walks through each stage of the pipeline:
- Generate synthetic LiDAR scans and IMU samples.
- Record data to a
.sbagfile withBagWriter. - Replay the bag with
BagReaderand extract LiDAR scans. - ICP odometry – align consecutive scans with
icp_align. - Loop closure – detect revisits with
ScanContextDatabase. - Pose graph optimisation – correct accumulated drift with
optimize_pose_graph. - Map accumulation – build a
PointCloudMapfrom optimised poses and save it as a.pcdfile.
ROS Examples
Ready-to-use driver configurations for Velodyne and Ouster LiDARs are provided in the ros_examples/ directory.
ros_examples/
├── ros/
│ ├── velodyne.launch # ROS 1 – VLP-16 / VLP-32C / HDL-32E / HDL-64E
│ └── ouster.launch # ROS 1 – OS0 / OS1 / OS2
└── ros2/
├── velodyne_params.yaml # ROS 2 – velodyne_driver + velodyne_pointcloud
└── ouster_params.yaml # ROS 2 – ouster_ros (ROS 2 branch)
ROS 1
# Velodyne VLP-16 (live sensor)
roslaunch ros_examples/ros/velodyne.launch device_ip:=192.168.1.201 model:=VLP16
# Ouster OS1
roslaunch ros_examples/ros/ouster.launch sensor_hostname:=os-<serial>.local
ROS 2
# Velodyne VLP-16
ros2 launch velodyne velodyne-all-nodes-VLP16-launch.py \
params_file:=ros_examples/ros2/velodyne_params.yaml
# Ouster OS1
ros2 launch ouster_ros driver.launch.py \
params_file:=ros_examples/ros2/ouster_params.yaml
Both ROS 2 YAML files use the ros__parameters namespace convention and can be passed directly via params_file:=.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dwilson_sensor_transposition-0.1.0.tar.gz.
File metadata
- Download URL: dwilson_sensor_transposition-0.1.0.tar.gz
- Upload date:
- Size: 309.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dc39f088e248cf657cea91259e58255b6c4b42dea8fb357d75532b6ae3a5dcbc
|
|
| MD5 |
09c3ca33cdf280ec96dc6b2a9b9c2cc8
|
|
| BLAKE2b-256 |
62a5a77964bf4baad985fc2d8aa66a2d7869adac61ceb258379fb34c1667e70c
|
File details
Details for the file dwilson_sensor_transposition-0.1.0-py3-none-any.whl.
File metadata
- Download URL: dwilson_sensor_transposition-0.1.0-py3-none-any.whl
- Upload date:
- Size: 220.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4073ed6bd63837a810c0524225c844f5de1009df555212b33ee0822622d326cc
|
|
| MD5 |
2be915fae4e64fa66991320b62f08d0c
|
|
| BLAKE2b-256 |
18af36bbf453b6ba3ae6d98a656ecf607587719d335b580c462f07f13c2b78f2
|