A package for processing MF4 and Rosbag data to HDF5 format with merging capabilities.
Project description
Fast Data Pipeline
A Python package for processing MF4 (Measurement Data Format) and ROS2 bag files, converting them to HDF5 format, and merging synchronized data streams.
Features
- MF4 Ingestion: Convert MF4 files to HDF5 format based on YAML layout specifications
- ROS2 Bag Ingestion: Extract and convert ROS2 bag data to HDF5 format
- Data Merging: Synchronize and merge multiple HDF5 data sources based on timestamps
- Flexible Configuration: YAML-based layout specifications for customizable data mapping
- State Management: Track processed files to avoid reprocessing
- Validation: Built-in data validation and completeness checks
Installation
pip install fast_data_pipeline
Quick Start
MF4 to HDF5 Conversion
from data_pipeline.ingestion.mf4_ingestor import MF4Ingester
from data_pipeline.common.state_manager import StateManager
# Initialize state manager
state_manager = StateManager(
output_folder="/path/to/output",
state_filename="mf4_processing_state.pkl"
)
# Create MF4 ingester
ingester = MF4Ingester(
input_folder="/path/to/mf4/files",
output_folder="/path/to/output",
state_manager=state_manager,
file_pattern="*.mf4",
layout_yaml_path="path/to/layout_spec.yaml"
)
# Process files
ingester.run()
ROS2 Bag to HDF5 Conversion
from data_pipeline.ingestion.rosbag_ingestor import RosbagIngester
from data_pipeline.common.state_manager import StateManager
# Initialize state manager
state_manager = StateManager(
output_folder="/path/to/output",
state_filename="rosbag_processing_state.pkl"
)
# Create Rosbag ingester
ingester = RosbagIngester(
input_folder="/path/to/rosbag/files",
output_folder="/path/to/output",
state_manager=state_manager,
file_pattern="*.db3",
layout_yaml_path="path/to/layout_spec.yaml"
)
# Process files
ingester.run()
Merging HDF5 Files
from data_pipeline.processing.h5_merger import run as h5_merger_run
# Define metadata function (optional)
def add_metadata(h5file, rec_file, rosbag_file):
h5file.attrs['source_rec'] = rec_file
h5file.attrs['source_rosbag'] = rosbag_file
return h5file
# Run merger
h5_merger_run(
rec_folder="/path/to/mf4-h5",
rosbag_folder="/path/to/rosbag-h5",
output_folder="/path/to/merged",
rec_timestamp_spec="hi5/vehicle_data/timestamp_s::value",
rosbag_timestamp_spec="hi5/perception/camera/timestamp_s|hi5/perception/camera/timestamp_ns",
rec_global_pattern="rec*.h5",
rosbag_global_pattern="rosbag*.h5",
logging_file_name="sync_log.pkl",
metadata_func=add_metadata
)
Layout Specification
The package uses YAML files to define how data should be extracted and structured in HDF5 format.
Example Layout YAML
mapping:
# MF4 source
- source: mf4
original_name: "Model Root/recorder/hi5/velocity_x_mps"
target_name: /hi5/vehicle_data/velocity_x_mps
units: "m/s"
# ROS2 bag source
- source: ros2bag
original_name: /camera/image
target_name: /hi5/perception/camera/image
units: "-"
Use with Apache Airflow
This package is designed to work seamlessly with Apache Airflow for automated data processing pipelines.
Example with PythonVirtualenvOperator
from airflow.operators.python import PythonVirtualenvOperator
def process_mf4_data(input_dir, output_dir, layout_path):
from data_pipeline.ingestion.mf4_ingestor import MF4Ingester
from data_pipeline.common.state_manager import StateManager
import os
os.makedirs(output_dir, exist_ok=True)
state_manager = StateManager(
output_folder=output_dir,
state_filename="mf4_state.pkl"
)
ingester = MF4Ingester(
input_folder=input_dir,
output_folder=output_dir,
state_manager=state_manager,
file_pattern="*.mf4",
layout_yaml_path=layout_path
)
ingester.run()
task = PythonVirtualenvOperator(
task_id='process_mf4',
python_callable=process_mf4_data,
requirements=["fast_data_pipeline==0.1.2"],
op_kwargs={
'input_dir': '/data/mf4',
'output_dir': '/data/mf4-h5',
'layout_path': 'layout.yaml'
}
)
Requirements
- Python >= 3.8
- asammdf >= 8.6.0 (for MF4 processing)
- rosbags >= 0.10.0 (for ROS2 bag processing)
- tables >= 3.10.0 (for HDF5 operations)
- pandas >= 2.3.0
- numpy >= 2.0.0
- PyYAML >= 6.0.0
See pyproject.toml for complete dependency list.
Project Structure
data_pipeline/
├── ingestion/ # Data ingestion modules
│ ├── mf4_ingestor.py
│ ├── rosbag_ingestor.py
│ └── base_ingestor.py
├── processing/ # Data processing modules
│ ├── h5_merger.py
│ └── metadata_functions.py
├── common/ # Common utilities
│ └── state_manager.py
└── validation/ # Data validation
Development
Installing for Development
git clone <repository-url>
cd data-pipeline
pip install -e ".[dev]"
Running Tests
pytest
Building the Package
python -m build
License
MIT License - see LICENSE file for details
Author
Bora Pilav (bbpilav@gmail.com)
Changelog
0.1.2 (2025-01-18)
- Added support for multi-dimensional array data in MF4 files
- Improved HDF5 structure for array channels
- Enhanced logging and error handling
- Added comprehensive dependencies in pyproject.toml
0.1.0
- Initial release
- MF4 to HDF5 conversion
- ROS2 bag to HDF5 conversion
- HDF5 merging functionality
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fast_data_pipeline-0.2.0.tar.gz.
File metadata
- Download URL: fast_data_pipeline-0.2.0.tar.gz
- Upload date:
- Size: 41.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
134f39086116f89ac7365af64c5b0d6c55b80ab2af77a5791514bda6f76c2771
|
|
| MD5 |
825ce49ea6cecdf1cc33d276ec56671b
|
|
| BLAKE2b-256 |
0261e57103d38e7d9791276b140a6aa58ecd2b42e6150fff783bb90691c0e4ce
|
File details
Details for the file fast_data_pipeline-0.2.0-py3-none-any.whl.
File metadata
- Download URL: fast_data_pipeline-0.2.0-py3-none-any.whl
- Upload date:
- Size: 43.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
957fe26aa7ffd0d03aeba460471e17029a63f0880c4a88286ce412be0fc8d0b6
|
|
| MD5 |
ca0aec806e94e302cc0b4a4d58ee14ee
|
|
| BLAKE2b-256 |
a1a15558575bfb02534321eaf95f721ee01642c7b2eb96a4465a24f115596782
|