Skip to main content

A package for processing MF4 and Rosbag data to HDF5 format with merging capabilities.

Project description

Fast Data Pipeline

A Python package for processing MF4 (Measurement Data Format) and ROS2 bag files, converting them to HDF5 format, and merging synchronized data streams.

Features

  • MF4 Ingestion: Convert MF4 files to HDF5 format based on YAML layout specifications
  • ROS2 Bag Ingestion: Extract and convert ROS2 bag data to HDF5 format
  • Data Merging: Synchronize and merge multiple HDF5 data sources based on timestamps
  • Flexible Configuration: YAML-based layout specifications for customizable data mapping
  • State Management: Track processed files to avoid reprocessing
  • Validation: Built-in data validation and completeness checks

Installation

pip install fast_data_pipeline

Quick Start

MF4 to HDF5 Conversion

from data_pipeline.ingestion.mf4_ingestor import MF4Ingester
from data_pipeline.common.state_manager import StateManager

# Initialize state manager
state_manager = StateManager(
    output_folder="/path/to/output",
    state_filename="mf4_processing_state.pkl"
)

# Create MF4 ingester
ingester = MF4Ingester(
    input_folder="/path/to/mf4/files",
    output_folder="/path/to/output",
    state_manager=state_manager,
    file_pattern="*.mf4",
    layout_yaml_path="path/to/layout_spec.yaml"
)

# Process files
ingester.run()

ROS2 Bag to HDF5 Conversion

from data_pipeline.ingestion.rosbag_ingestor import RosbagIngester
from data_pipeline.common.state_manager import StateManager

# Initialize state manager
state_manager = StateManager(
    output_folder="/path/to/output",
    state_filename="rosbag_processing_state.pkl"
)

# Create Rosbag ingester
ingester = RosbagIngester(
    input_folder="/path/to/rosbag/files",
    output_folder="/path/to/output",
    state_manager=state_manager,
    file_pattern="*.db3",
    layout_yaml_path="path/to/layout_spec.yaml"
)

# Process files
ingester.run()

Merging HDF5 Files

from data_pipeline.processing.h5_merger import run as h5_merger_run

# Define metadata function (optional)
def add_metadata(h5file, rec_file, rosbag_file):
    h5file.attrs['source_rec'] = rec_file
    h5file.attrs['source_rosbag'] = rosbag_file
    return h5file

# Run merger
h5_merger_run(
    rec_folder="/path/to/mf4-h5",
    rosbag_folder="/path/to/rosbag-h5",
    output_folder="/path/to/merged",
    rec_timestamp_spec="hi5/vehicle_data/timestamp_s::value",
    rosbag_timestamp_spec="hi5/perception/camera/timestamp_s|hi5/perception/camera/timestamp_ns",
    rec_global_pattern="rec*.h5",
    rosbag_global_pattern="rosbag*.h5",
    logging_file_name="sync_log.pkl",
    metadata_func=add_metadata
)

Layout Specification

The package uses YAML files to define how data should be extracted and structured in HDF5 format.

Example Layout YAML

mapping:
  # MF4 source
  - source: mf4
    original_name: "Model Root/recorder/hi5/velocity_x_mps"
    target_name: /hi5/vehicle_data/velocity_x_mps
    units: "m/s"

  # ROS2 bag source
  - source: ros2bag
    original_name: /camera/image
    target_name: /hi5/perception/camera/image
    units: "-"

Use with Apache Airflow

This package is designed to work seamlessly with Apache Airflow for automated data processing pipelines.

Example with PythonVirtualenvOperator

from airflow.operators.python import PythonVirtualenvOperator

def process_mf4_data(input_dir, output_dir, layout_path):
    from data_pipeline.ingestion.mf4_ingestor import MF4Ingester
    from data_pipeline.common.state_manager import StateManager

    import os
    os.makedirs(output_dir, exist_ok=True)

    state_manager = StateManager(
        output_folder=output_dir,
        state_filename="mf4_state.pkl"
    )

    ingester = MF4Ingester(
        input_folder=input_dir,
        output_folder=output_dir,
        state_manager=state_manager,
        file_pattern="*.mf4",
        layout_yaml_path=layout_path
    )

    ingester.run()

task = PythonVirtualenvOperator(
    task_id='process_mf4',
    python_callable=process_mf4_data,
    requirements=["fast_data_pipeline==0.1.2"],
    op_kwargs={
        'input_dir': '/data/mf4',
        'output_dir': '/data/mf4-h5',
        'layout_path': 'layout.yaml'
    }
)

Requirements

  • Python >= 3.8
  • asammdf >= 8.6.0 (for MF4 processing)
  • rosbags >= 0.10.0 (for ROS2 bag processing)
  • tables >= 3.10.0 (for HDF5 operations)
  • pandas >= 2.3.0
  • numpy >= 2.0.0
  • PyYAML >= 6.0.0

See pyproject.toml for complete dependency list.

Project Structure

data_pipeline/
├── ingestion/          # Data ingestion modules
│   ├── mf4_ingestor.py
│   ├── rosbag_ingestor.py
│   └── base_ingestor.py
├── processing/         # Data processing modules
│   ├── h5_merger.py
│   └── metadata_functions.py
├── common/             # Common utilities
│   └── state_manager.py
└── validation/         # Data validation

Development

Installing for Development

git clone <repository-url>
cd data-pipeline
pip install -e ".[dev]"

Running Tests

pytest

Building the Package

python -m build

License

MIT License - see LICENSE file for details

Author

Bora Pilav (bbpilav@gmail.com)

Changelog

0.1.2 (2025-01-18)

  • Added support for multi-dimensional array data in MF4 files
  • Improved HDF5 structure for array channels
  • Enhanced logging and error handling
  • Added comprehensive dependencies in pyproject.toml

0.1.0

  • Initial release
  • MF4 to HDF5 conversion
  • ROS2 bag to HDF5 conversion
  • HDF5 merging functionality

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fast_data_pipeline-0.2.0.tar.gz (41.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fast_data_pipeline-0.2.0-py3-none-any.whl (43.9 kB view details)

Uploaded Python 3

File details

Details for the file fast_data_pipeline-0.2.0.tar.gz.

File metadata

  • Download URL: fast_data_pipeline-0.2.0.tar.gz
  • Upload date:
  • Size: 41.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.2

File hashes

Hashes for fast_data_pipeline-0.2.0.tar.gz
Algorithm Hash digest
SHA256 134f39086116f89ac7365af64c5b0d6c55b80ab2af77a5791514bda6f76c2771
MD5 825ce49ea6cecdf1cc33d276ec56671b
BLAKE2b-256 0261e57103d38e7d9791276b140a6aa58ecd2b42e6150fff783bb90691c0e4ce

See more details on using hashes here.

File details

Details for the file fast_data_pipeline-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for fast_data_pipeline-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 957fe26aa7ffd0d03aeba460471e17029a63f0880c4a88286ce412be0fc8d0b6
MD5 ca0aec806e94e302cc0b4a4d58ee14ee
BLAKE2b-256 a1a15558575bfb02534321eaf95f721ee01642c7b2eb96a4465a24f115596782

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page