Skip to main content

Columnar analysis utils for high-energy physics data analysis.

Project description

F9 Columnar

Ruff python pytorch License pytorch

A PyTorch-based library for processing ROOT and HDF5 event data in high energy physics.

Project description

F9Columnar is a Python library designed to streamline the processing of physics datasets from ROOT and HDF5 files for machine learning applications. Built on top of PyTorch, Uproot and Awkward Array, it provides a modular framework for efficient data loading, transformation, and analysis in high-energy physics research.

The library bridges the gap between traditional high-energy physics data formats (ROOT) and modern machine learning workflows. It enables physicists and data scientists to:

  • Process ROOT and HDF5 datasets using PyTorch DataLoaders
  • Apply event selections, calculate variables, and create histograms
  • Convert ROOT data to HDF5 format for ML training

Check out the PyHEP 2025 talk for a good overview of what the library can do.

Setup

The library comes without any PyTorch dependencies by default to allow flexible installations. You can install it with or without PyTorch GPU support depending on your needs.

Install with PyTorch for GPU

pip install f9columnar
pip install torch

Install with PyTorch for CPU

pip install f9columnar
pip install torch --index-url https://download.pytorch.org/whl/cpu

Library overview

Aim of the library

The main goal of this library is to provide a simple and efficient way to process large ROOT and HDF5 datasets using PyTorch DataLoaders. The library is designed to be modular and extensible, allowing users to easily add new processors and functionality as needed. A common workflow is illustrated in the diagram below:

F9Columnar

Getting started

To illustrate the basic usage of the library, we need some ROOT files. We can create artificial ROOT files using the AwkwardEventGenerator utility. This utility allows us to define flat and jagged branches with different distributions. Let's create some artificial ROOT files with 100,000 events, containing electrons and muons with transverse momentum (pt), pseudorapidity (eta), and azimuthal angle (phi) distributions, and a flat weight distribution.

from f9columnar.utils.ak_events import AwkwardEventGenerator

gen = AwkwardEventGenerator(n_events=100000, min_particles=0, max_particles=5)

flat_spec = {"weight": {"dist_type": "normal", "params": {"mean": 1.0, "stddev": 0.1}}}

jagged_spec = {
    "el": {
        "pt": {"dist_type": "pt", "params": {"pt_min": 2.0e4, "pt_max": 3.0e5, "n": 5.0}},
        "eta": {"dist_type": "eta", "params": {}},
        "phi": {"dist_type": "phi", "params": {}},
    },
    "mu": {
        "pt": {"dist_type": "pt", "params": {"pt_min": 2.0e4, "pt_max": 5.0e5, "n": 5.0}},
        "eta": {"dist_type": "eta", "params": {}},
        "phi": {"dist_type": "phi", "params": {}},
    },
}

arrays = gen.from_spec_to_dict(flat_spec=flat_spec, jagged_spec=jagged_spec)
gen.to_root(arrays, file_path="data/simpleNTuple.root", tree_name="physics", n_splits=5)

This will create 5 ROOT files named simpleNTuple_part<idx>.root each having 20,000 events in the data/ directory with artificial event data. We can now use the library to load and process this test data.

Basic data loading

The most basic usage of the library is to load data from ROOT files using the get_root_dataloader function. This function returns a PyTorch DataLoader that yields batches of events as Awkward arrays.

import glob

from f9columnar.root_dataloader import get_root_dataloader

def filter_branch(branch: str) -> bool:
    # select only these two branches
    return branch == "el_pt" or branch == "mu_pt"

# root_dataloader is an instance of a torch DataLoader that uses an IterableDataset
root_dataloader, total, _ = get_root_dataloader(
    name="awkwardEvents",  # name identifier
    files=glob.glob("data/simpleNTuple_part*.root"),  # root files
    key="physics",  # root file tree name
    step_size=1024,  # number of events per array batch to read into memory
    num_workers=0,  # number of workers for parallel processing (single threaded if 0)
    processors=None,  # arbitrary calculations on arrays
    filter_name=filter_branch,  # filter branches
)

# loop over batches of events from .root file(s), each batch is an awkward array
for events in root_dataloader:
    arrays, reports = events  # reports includes useful information about the batch
    el_pt, mu_pt = arrays["el_pt"], arrays["mu_pt"]  # awkward arrays of electron and muon pT
    # ... do something with the arrays

Using processors and histogramming

The following example demonstrates how to define variables, apply a cut, and create a histogram using processors.

Calculations on arrays within worker processes can be performed using a Processor. Multiple processors can be linked together in a ProcessorsGraph, forming a directed acyclic graph (DAG). These processors are applied to arrays in the sequence determined by the DAG's topological order.

Each worker executes the same processor graph on batches of event data and returns the results to the event loop once processing is complete.

Lets make a simple analysis that performs a pT cut on leptons, selects a leading lepton (highest pT), and creates a histogram of the leading lepton pT.

First, we define the processors:

import awkward as ak

from f9columnar.processors import CheckpointProcessor, Processor, ProcessorsGraph
from f9columnar.processors_collection import decorate_branches

@decorate_branches("el_pt", "el_eta", "el_phi", "mu_pt", "mu_eta", "mu_phi", "weight")
class CutLeptons(Processor):
    def __init__(self, name: str, pt_cut: float) -> None:
        super().__init__(name)
        self.pt_cut = pt_cut

    def run(self, arrays: ak.Array) -> dict[str, ak.Array]:
        # apply pt cut particle mask on electrons and muons
        mask_el, mask_mu = arrays["el_pt"] > self.pt_cut, arrays["mu_pt"] > self.pt_cut

        # mask arrays to keep only leptons passing the pt cut
        for field in ["el_pt", "el_eta", "el_phi"]:
            arrays[field] = arrays[field][mask_el]

        for field in ["mu_pt", "mu_eta", "mu_phi"]:
            arrays[field] = arrays[field][mask_mu]

        # return must be a dictionary with key name for the argument of the next processor
        return {"arrays": arrays}

class LeadingLeptons(Processor):
    def __init__(self, name: str) -> None:
        super().__init__(name)

    def run(self, arrays: ak.Array) -> dict[str, ak.Array]:
        # get leading electron (highest pt)
        el_pt_argmax = ak.argmax(arrays["el_pt"], axis=1, keepdims=True)
        for field in ["el_pt", "el_eta", "el_phi"]:
            arrays[field] = arrays[field][el_pt_argmax]
            arrays[field] = ak.drop_none(arrays[field])

        # get leading muon (highest pt)
        mu_pt_argmax = ak.argmax(arrays["mu_pt"], axis=1, keepdims=True)
        for field in ["mu_pt", "mu_eta", "mu_phi"]:
            arrays[field] = arrays[field][mu_pt_argmax]
            arrays[field] = ak.drop_none(arrays[field])

        return {"arrays": arrays}

The decorate_branches decorator is used to specify the branches that the processor depends on. This allows the dataloader to only load the necessary branches from the ROOT files.

Making a histogram processor is also straightforward:

from f9columnar.histograms import HistogramProcessor

class LeadingLeptonHistogram(HistogramProcessor):
    def __init__(self, name: str) -> None:
        super().__init__(name)
        self.make_hist1d("leading_el_pt", 50, 20.0, 300.0)  # num. bins, min, max
        self.make_hist1d("leading_mu_pt", 50, 20.0, 500.0)  # num. bins, min, max

    def run(self, arrays: ak.Array) -> dict[str, ak.Array]:
        # fill histogram with leading lepton pT in GeV
        leading_el_pt, leading_mu_pt = arrays["el_pt"] / 1e3, arrays["mu_pt"] / 1e3

        non_empty_el = ak.num(leading_el_pt) > 0
        leading_el_pt = ak.firsts(leading_el_pt[non_empty_el])
        el_weight = arrays["weight"][non_empty_el]

        # fill electron histogram with pt and weights
        self.fill_hist1d("leading_el_pt", leading_el_pt, el_weight)

        non_empty_mu = ak.num(leading_mu_pt) > 0
        leading_mu_pt = ak.firsts(leading_mu_pt[non_empty_mu])
        mu_weight = arrays["weight"][non_empty_mu]

        # fill muon histogram with pt and weights
        self.fill_hist1d("leading_mu_pt", leading_mu_pt, mu_weight)

        return {"arrays": arrays}

We can now build the processor graph from the defined processors:

from f9columnar.processors import CheckpointProcessor, ProcessorsGraph
from f9columnar.processors_collection import ProcessorsCollection

# define a collection of processors for bookkeeping
analysis_collection = ProcessorsCollection("leptonCollection")

# add processors to the collection
analysis_collection += CutLeptons(name="cutLeptons", pt_cut=25.0)  # pt cut at 25 GeV
analysis_collection += LeadingLeptons(name="leadingLeptons")
analysis_collection += LeadingLeptonHistogram(name="leadingLeptonHistogram")

# define the processor graph
analysis_graph = ProcessorsGraph()

# build a linear chain of processors
analysis_graph.add(
    CheckpointProcessor("input"),
    *analysis_collection.as_list(),
    CheckpointProcessor("output"),
)

# if the graph is a linear chain we can use chain() to connect the nodes
# for more complex graphs use connect() method that defines edges explicitly
analysis_graph.chain()

# draw the graph with pygraphviz
analysis_graph.draw("analysis_graph.pdf")

We now have a processor graph that can be passed into the DataLoader to process events. The graph will be executed in each worker process for each batch of events. Instead of using a simple get_root_dataloader function, we can create a RootPhysicsDataset and initialize the dataloader with the processor graph:

from f9columnar.dataset_builder import RootPhysicsDataset

# make one or more root datasets as a list of RootPhysicsDataset objects
root_datasets = [
    RootPhysicsDataset(
        name="awkwardEvents",
        root_files=glob.glob("data/simpleNTuple_part*.root"),
        is_data=False,
    )
]

# analysis collection provides the branch name filter automatically
branch_name_filter = analysis_collection.get_branch_name_filter()

# setup dataloader configuration
dataloader_config = {
    "step_size": 1024,
    "key": "physics",
    "num_workers": 0,
    "filter_name": branch_name_filter,
}

# setup and initialize the dataloader for the root dataset
root_datasets[0].setup_dataloader(**dataloader_config)
# runs get_root_dataloader() internally
root_datasets[0].init_dataloader(analysis_graph)

We have build a processor graph (the actual calculations) and a dataloader that uses the graph to process events. We can now run the event loop over batches of events:

from f9columnar.run import ColumnarEventLoop

# define the event loop over the root datasets
event_loop = ColumnarEventLoop(
    mc_datasets=root_datasets,
    data_datasets=None,
    cut_flow=False,
    save_path="results/",
)
# run it!
event_loop.run(mc_only=True)

That's it! After running the event loop, the histograms will be saved in the results/ directory as awkwardEvents_mc_0_hists.p.

Converting ROOT to HDF5

The library also includes a utility to convert ROOT files to HDF5 format.

Event shuffling is supported to randomize the order of events in the output HDF5 file. This is particularly useful for machine learning applications where data order can impact training. This is done using a 2-pass shuffling algorithm that is memory efficient and works well with large datasets:

-- writing step --
create empty datasets called piles p[0], p[1], ..., p[n - 1]
for step size of events x from a root file i do
    for chunk size c of events x do
        j ← random integer in [0, n - 1]
        append c to p[j]

-- reading step --
for j in [n1, n2] where 0 ≤ n1 < n2 and n1 < n2 ≤ n do
    read all events from p[j] into memory
    shuffle p[j] in memory
    for batch size of events x in p[j] do
        yield x to DataLoader

Below is an example of how to use the writing utility. In the example, the dataloader loop is abstracted away using the ColumnarEventLoop class.

from f9columnar.ml.hdf5_writer import Hdf5WriterPostprocessor

# we will make a linear chain of processors
analysis_graph = ProcessorsGraph()
analysis_graph.add(
    CheckpointProcessor("input"),
    *analysis_collection.as_list(), # add a collection of processors that do some calculations
    CheckpointProcessor("output", save_arrays=True), # save arrays at the end of the chain for the hdf5 writer
)
analysis_graph.chain()
analysis_graph.draw("hdf_analysis_graph.pdf")

# create the hdf5 writer postprocessor
# a postprocessor is a special processor that runs after the main processor graph and is executed in the main process
# ProcessorsGraph can be thought of as a map step and PostprocessorsGraph as a reduce step
hdf5_writer = Hdf5WriterPostprocessor(
    output_file,
    flat_column_names=["tau_vis_mass"], # flat columns to save
    jagged_column_names=None, # jagged columns to save, pads extra values with a pad value if needed
    chunk_shape=512, # shape of chunks to write to hdf5 file
    n_piles=1024, # number of piles to use for shuffling
    pile_assignment="random", # how to assign events to piles, random or deque
    merge_piles=False, # merge piles into a single hdf5 file at the end
    enforce_dtypes=None, # enforce specific dtypes for columns
    save_node="output", # name of the node in the processor graph to save arrays from
)

# create a linear chain of postprocessors
postprocessors_graph = PostprocessorsGraph()
postprocessors_graph.add(
    CheckpointPostprocessor("input"),
    hdf5_writer,
)
postprocessors_graph.chain()
postprocessors_graph.draw("hdf_post_analysis_graph.pdf")

# we can make a root dataset for MC and data files lists
data_dataset = RootPhysicsDataset(f"Data", [str(f) for f in data_files], is_data=True)
mc_dataset = RootPhysicsDataset(f"MC", [str(f) for f in mc_files], is_data=False)

# get the branch filter from the analysis collection to not load unnecessary branches
branch_filter = analysis_collection.branch_name_filter

# setup the data dataloader
data_dataset.setup_dataloader(**dataloader_config)
data_dataset.init_dataloader(processors=analysis_graph)

# setup the mc dataloader
mc_dataset.setup_dataloader(**dataloader_config)
mc_dataset.init_dataloader(processors=analysis_graph)

# run the DataLoader event loop over batches of events for both datasets
event_loop = ColumnarEventLoop(
    mc_datasets=[mc_dataset], # supports multiple datasets
    data_datasets=[data_dataset], # supports multiple datasets
    postprocessors_graph=postprocessors_graph,
    fit_postprocessors=True,
    cut_flow=False,
)
event_loop.run() # iterates over batches of events in both datasets

postprocessors_graph["hdf5WriterPostprocessor"].close() # close the file handles

Note that variables ending with *_type have a special meaning in the HDF5 writer. For example, a variable named label_type can be used to determine the type of the event (e.g., signal or background) and will be saved as a column in the HDF5 file. This is useful for classification tasks in machine learning. Another special variable is weights, which will be used to store event weights in the HDF5 file. These special variables help in organizing and utilizing the data effectively for training machine learning models.

Feature scaling

The library also includes a utility to scale the entire HDF5 dataset using sklearn scalers. This is particularly useful for machine learning applications where feature scaling can improve model performance.

from f9columnar.ml.dataset_scaling import DatasetScaler

ds_scaler = DatasetScaler(
    files, # list of hdf5 files
    scaler_type, # minmax, maxabs, standard, robust, quantile, power, logit, standard_logit
    features, # names of the features to scale
    scaler_save_path=save_path,
    n_max=None, # maximum number of events to use for fitting the scaler if the scaler does not have partial fit
    extra_hash="", # extra string to add to the hash of the scaler
    scaler_kwargs=scaler_kwargs, # kwargs for the scaler
    dataloader_kwargs=dataloader_kwargs, # kwargs for the dataloader
)
ds_scaler.feature_scale()

The result will be a pickle file with the fitted scaler object that can be used in the HDF5 ML DataLoader as:

feature_scaling_kwargs = {
    "scaler_type": scaler_type,
    "scaler_path": save_path, # where the scaler was saved
    "scalers_extra_hash": "",
}
dataset_kwargs = dataset_kwargs | feature_scaling_kwargs

Note that categorical features use a custom LabelEncoder that supports partial fitting and can be used in an online fashion.

HDF5 ML DataLoader

We will use PyTorch Lightning to demonstrate how to use the Hdf5IterableDataset in a training loop.

from typing import Any, Callable

import lightning as L
from torch.utils.data import DataLoader

from f9columnar.ml.hdf5_ml_dataloader import (
    WeightedBatch,
    WeightedBatchType,
    default_setup_func,
    get_ml_hdf5_dataloader,
)

class LightningHdf5DataModule(L.LightningDataModule):
    def __init__(
        self,
        name: str,
        files: str | list[str],
        column_names: list[str],
        stage_split_piles: dict[str, list[int] | int],
        shuffle: bool = False,
        collate_fn: Callable[[list[WeightedBatch]], WeightedBatchType] | None = None,
        dataset_kwargs: dict[str, Any] | None = None,
        dataloader_kwargs: dict[str, Any] | None = None,
    ) -> None:
        super().__init__()
        self.dl_name = name
        self.files = files
        self.column_names = column_names
        self.stage_split_piles = stage_split_piles
        self.shuffle = shuffle
        self.collate_fn = collate_fn
        self.dataset_kwargs = dataset_kwargs
        self.dl_kwargs = dataloader_kwargs

    def _get_dataloader(self, stage: str) -> DataLoader:
        # returns DataLoader object for the given stage, selection and number of events
        dl, _, _ = get_ml_hdf5_dataloader(
            f"{stage} - {self.dl_name}",
            self.files,
            self.column_names,
            stage_split_piles=self.stage_split_piles,
            stage=stage,
            shuffle=self.shuffle,
            collate_fn=self.collate_fn,
            dataset_kwargs=self.dataset_kwargs,
            dataloader_kwargs=self.dl_kwargs,
        )
        return dl

    def train_dataloader(self) -> DataLoader:
        return self._get_dataloader("train")

    def val_dataloader(self) -> DataLoader:
        return self._get_dataloader("val")

    def test_dataloader(self) -> DataLoader:
        return self._get_dataloader("test")

def events_collate_fn(batch: tuple[WeightedDatasetBatch, dict[str, Any]]) -> WeightedBatchType:
    ds, reports = batch[0]["events"], batch[1]
    # these are the return values from the DataLoader
    return ds.X, ds.y, ds.w, ds.y_aux, reports

# can additionally pass any kwargs to the dataset and it will be forwarded to the reports dictionary
dataset_kwargs = {
    "batch_size": 128, # number of events per batch
    "imbalanced_sampler": None, # supports oversampling and undersampling
    "drop_last": True, # drop last incomplete batch
}

# custom function to process the dataset inside the DataLoader workers
# the function should be: Callable[[StackedDatasets, MLHdf5Iterator], WeightedDatasetBatch | None]
dataset_kwargs["setup_func"] = default_setup_func

dataloader_kwargs = {
    "num_workers": -1, # use all available cores
    "prefetch_factor": 2, # number of batches to prefetch by each worker
}

dm = LightningHdf5DataModule(
    dm_name, # name of the datamodule
    files, # list of hdf5 files
    features, # names of the features to load (column names in the hdf5 file)
    stage_split_piles={"train": 512, "test": 256, "val": 256}, # number of piles for each stage
    shuffle=True, # shuffle events at each epoch
    collate_fn=events_collate_fn, # collate function to handle batches
    dataset_kwargs=dataset_kwargs, # kwargs for the dataset
    dataloader_kwargs=dataloader_kwargs, # kwargs for the dataloader
)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

f9columnar-0.4.1.tar.gz (195.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

f9columnar-0.4.1-py3-none-any.whl (228.5 kB view details)

Uploaded Python 3

File details

Details for the file f9columnar-0.4.1.tar.gz.

File metadata

  • Download URL: f9columnar-0.4.1.tar.gz
  • Upload date:
  • Size: 195.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for f9columnar-0.4.1.tar.gz
Algorithm Hash digest
SHA256 e3232a0a222a6b8448f6f2b7eb69d9db8edf926792bcd3aff83ec7ce69290a3b
MD5 ef44c98284d167ea3f523ae5f75d48d9
BLAKE2b-256 1ba9edb9179f57c94a3b397c612906b62951e2ee4a95b165e3d98d5ff3010032

See more details on using hashes here.

File details

Details for the file f9columnar-0.4.1-py3-none-any.whl.

File metadata

  • Download URL: f9columnar-0.4.1-py3-none-any.whl
  • Upload date:
  • Size: 228.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for f9columnar-0.4.1-py3-none-any.whl
Algorithm Hash digest
SHA256 00fa14befc37f9afcf3868684b5ce198bd5ccae66065412723c570fea71a056a
MD5 77c6b3c11eb6079b181aab2b39aa579c
BLAKE2b-256 cdd61193628be04eff2d66777aa6574b284d9e607f41c0357f6210a188ac29f1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page