Columnar analysis utils for high-energy physics data analysis.
Project description
F9 Columnar
A PyTorch-based library for processing ROOT and HDF5 event data in high energy physics.
Project description
F9Columnar is a Python library designed to streamline the processing of physics datasets from ROOT and HDF5 files for machine learning applications. Built on top of PyTorch, Uproot and Awkward Array, it provides a modular framework for efficient data loading, transformation, and analysis in high-energy physics research.
The library bridges the gap between traditional high-energy physics data formats (ROOT) and modern machine learning workflows. It enables physicists and data scientists to:
- Process ROOT and HDF5 datasets using PyTorch DataLoaders
- Apply event selections, calculate variables, and create histograms
- Convert ROOT data to HDF5 format for ML training
Check out the PyHEP 2025 talk for a good overview of what the library can do.
Setup
The library comes without any PyTorch dependencies by default to allow flexible installations. You can install it with or without PyTorch GPU support depending on your needs.
Install with PyTorch for GPU
pip install f9columnar
pip install torch
Install with PyTorch for CPU
pip install f9columnar
pip install torch --index-url https://download.pytorch.org/whl/cpu
Library overview
- Aim of the library
- Getting started
- Basic data loading
- Using processors and histogramming
- Converting ROOT to HDF5
- Feature scaling
- HDF5 ML DataLoader
Aim of the library
The main goal of this library is to provide a simple and efficient way to process large ROOT and HDF5 datasets using PyTorch DataLoaders. The library is designed to be modular and extensible, allowing users to easily add new processors and functionality as needed. A common workflow is illustrated in the diagram below:
Getting started
To illustrate the basic usage of the library, we need some ROOT files. We can create artificial ROOT files using the AwkwardEventGenerator utility. This utility allows us to define flat and jagged branches with different distributions.
Let's create some artificial ROOT files with 100,000 events, containing electrons and muons with transverse momentum (pt), pseudorapidity (eta), and azimuthal angle (phi) distributions, and a flat weight distribution.
from f9columnar.utils.ak_events import AwkwardEventGenerator
gen = AwkwardEventGenerator(n_events=100000, min_particles=0, max_particles=5)
flat_spec = {"weight": {"dist_type": "normal", "params": {"mean": 1.0, "stddev": 0.1}}}
jagged_spec = {
"el": {
"pt": {"dist_type": "pt", "params": {"pt_min": 2.0e4, "pt_max": 3.0e5, "n": 5.0}},
"eta": {"dist_type": "eta", "params": {}},
"phi": {"dist_type": "phi", "params": {}},
},
"mu": {
"pt": {"dist_type": "pt", "params": {"pt_min": 2.0e4, "pt_max": 5.0e5, "n": 5.0}},
"eta": {"dist_type": "eta", "params": {}},
"phi": {"dist_type": "phi", "params": {}},
},
}
arrays = gen.from_spec_to_dict(flat_spec=flat_spec, jagged_spec=jagged_spec)
gen.to_root(arrays, file_path="data/simpleNTuple.root", tree_name="physics", n_splits=5)
This will create 5 ROOT files named simpleNTuple_part<idx>.root each having 20,000 events in the data/ directory with artificial event data. We can now use the library to load and process this test data.
Basic data loading
The most basic usage of the library is to load data from ROOT files using the get_root_dataloader function. This function returns a PyTorch DataLoader that yields batches of events as Awkward arrays.
import glob
from f9columnar.root_dataloader import get_root_dataloader
def filter_branch(branch: str) -> bool:
# select only these two branches
return branch == "el_pt" or branch == "mu_pt"
# root_dataloader is an instance of a torch DataLoader that uses an IterableDataset
root_dataloader, total, _ = get_root_dataloader(
name="awkwardEvents", # name identifier
files=glob.glob("data/simpleNTuple_part*.root"), # root files
key="physics", # root file tree name
step_size=1024, # number of events per array batch to read into memory
num_workers=0, # number of workers for parallel processing (single threaded if 0)
processors=None, # arbitrary calculations on arrays
filter_name=filter_branch, # filter branches
)
# loop over batches of events from .root file(s), each batch is an awkward array
for events in root_dataloader:
arrays, reports = events # reports includes useful information about the batch
el_pt, mu_pt = arrays["el_pt"], arrays["mu_pt"] # awkward arrays of electron and muon pT
# ... do something with the arrays
Using processors and histogramming
The following example demonstrates how to define variables, apply a cut, and create a histogram using processors.
Calculations on arrays within worker processes can be performed using a Processor. Multiple processors can be linked together in a ProcessorsGraph, forming a directed acyclic graph (DAG). These processors are applied to arrays in the sequence determined by the DAG's topological order.
Each worker executes the same processor graph on batches of event data and returns the results to the event loop once processing is complete.
Lets make a simple analysis that performs a pT cut on leptons, selects a leading lepton (highest pT), and creates a histogram of the leading lepton pT.
First, we define the processors:
import awkward as ak
from f9columnar.processors import CheckpointProcessor, Processor, ProcessorsGraph
from f9columnar.processors_collection import decorate_branches
@decorate_branches("el_pt", "el_eta", "el_phi", "mu_pt", "mu_eta", "mu_phi", "weight")
class CutLeptons(Processor):
def __init__(self, name: str, pt_cut: float) -> None:
super().__init__(name)
self.pt_cut = pt_cut
def run(self, arrays: ak.Array) -> dict[str, ak.Array]:
# apply pt cut particle mask on electrons and muons
mask_el, mask_mu = arrays["el_pt"] > self.pt_cut, arrays["mu_pt"] > self.pt_cut
# mask arrays to keep only leptons passing the pt cut
for field in ["el_pt", "el_eta", "el_phi"]:
arrays[field] = arrays[field][mask_el]
for field in ["mu_pt", "mu_eta", "mu_phi"]:
arrays[field] = arrays[field][mask_mu]
# return must be a dictionary with key name for the argument of the next processor
return {"arrays": arrays}
class LeadingLeptons(Processor):
def __init__(self, name: str) -> None:
super().__init__(name)
def run(self, arrays: ak.Array) -> dict[str, ak.Array]:
# get leading electron (highest pt)
el_pt_argmax = ak.argmax(arrays["el_pt"], axis=1, keepdims=True)
for field in ["el_pt", "el_eta", "el_phi"]:
arrays[field] = arrays[field][el_pt_argmax]
arrays[field] = ak.drop_none(arrays[field])
# get leading muon (highest pt)
mu_pt_argmax = ak.argmax(arrays["mu_pt"], axis=1, keepdims=True)
for field in ["mu_pt", "mu_eta", "mu_phi"]:
arrays[field] = arrays[field][mu_pt_argmax]
arrays[field] = ak.drop_none(arrays[field])
return {"arrays": arrays}
The decorate_branches decorator is used to specify the branches that the processor depends on. This allows the dataloader to only load the necessary branches from the ROOT files.
Making a histogram processor is also straightforward:
from f9columnar.histograms import HistogramProcessor
class LeadingLeptonHistogram(HistogramProcessor):
def __init__(self, name: str) -> None:
super().__init__(name)
self.make_hist1d("leading_el_pt", 50, 20.0, 300.0) # num. bins, min, max
self.make_hist1d("leading_mu_pt", 50, 20.0, 500.0) # num. bins, min, max
def run(self, arrays: ak.Array) -> dict[str, ak.Array]:
# fill histogram with leading lepton pT in GeV
leading_el_pt, leading_mu_pt = arrays["el_pt"] / 1e3, arrays["mu_pt"] / 1e3
non_empty_el = ak.num(leading_el_pt) > 0
leading_el_pt = ak.firsts(leading_el_pt[non_empty_el])
el_weight = arrays["weight"][non_empty_el]
# fill electron histogram with pt and weights
self.fill_hist1d("leading_el_pt", leading_el_pt, el_weight)
non_empty_mu = ak.num(leading_mu_pt) > 0
leading_mu_pt = ak.firsts(leading_mu_pt[non_empty_mu])
mu_weight = arrays["weight"][non_empty_mu]
# fill muon histogram with pt and weights
self.fill_hist1d("leading_mu_pt", leading_mu_pt, mu_weight)
return {"arrays": arrays}
We can now build the processor graph from the defined processors:
from f9columnar.processors import CheckpointProcessor, ProcessorsGraph
from f9columnar.processors_collection import ProcessorsCollection
# define a collection of processors for bookkeeping
analysis_collection = ProcessorsCollection("leptonCollection")
# add processors to the collection
analysis_collection += CutLeptons(name="cutLeptons", pt_cut=25.0) # pt cut at 25 GeV
analysis_collection += LeadingLeptons(name="leadingLeptons")
analysis_collection += LeadingLeptonHistogram(name="leadingLeptonHistogram")
# define the processor graph
analysis_graph = ProcessorsGraph()
# build a linear chain of processors
analysis_graph.add(
CheckpointProcessor("input"),
*analysis_collection.as_list(),
CheckpointProcessor("output"),
)
# if the graph is a linear chain we can use chain() to connect the nodes
# for more complex graphs use connect() method that defines edges explicitly
analysis_graph.chain()
# draw the graph with pygraphviz
analysis_graph.draw("analysis_graph.pdf")
We now have a processor graph that can be passed into the DataLoader to process events. The graph will be executed in each worker process for each batch of events. Instead of using a simple get_root_dataloader function, we can create a RootPhysicsDataset and initialize the dataloader with the processor graph:
from f9columnar.dataset_builder import RootPhysicsDataset
# make one or more root datasets as a list of RootPhysicsDataset objects
root_datasets = [
RootPhysicsDataset(
name="awkwardEvents",
root_files=glob.glob("data/simpleNTuple_part*.root"),
is_data=False,
)
]
# analysis collection provides the branch name filter automatically
branch_name_filter = analysis_collection.get_branch_name_filter()
# setup dataloader configuration
dataloader_config = {
"step_size": 1024,
"key": "physics",
"num_workers": 0,
"filter_name": branch_name_filter,
}
# setup and initialize the dataloader for the root dataset
root_datasets[0].setup_dataloader(**dataloader_config)
# runs get_root_dataloader() internally
root_datasets[0].init_dataloader(analysis_graph)
We have build a processor graph (the actual calculations) and a dataloader that uses the graph to process events. We can now run the event loop over batches of events:
from f9columnar.run import ColumnarEventLoop
# define the event loop over the root datasets
event_loop = ColumnarEventLoop(
mc_datasets=root_datasets,
data_datasets=None,
cut_flow=False,
save_path="results/",
)
# run it!
event_loop.run(mc_only=True)
That's it! After running the event loop, the histograms will be saved in the results/ directory as awkwardEvents_mc_0_hists.p.
Converting ROOT to HDF5
The library also includes a utility to convert ROOT files to HDF5 format.
Event shuffling is supported to randomize the order of events in the output HDF5 file. This is particularly useful for machine learning applications where data order can impact training. This is done using a 2-pass shuffling algorithm that is memory efficient and works well with large datasets:
-- writing step --
create empty datasets called piles p[0], p[1], ..., p[n - 1]
for step size of events x from a root file i do
for chunk size c of events x do
j ← random integer in [0, n - 1]
append c to p[j]
-- reading step --
for j in [n1, n2] where 0 ≤ n1 < n2 and n1 < n2 ≤ n do
read all events from p[j] into memory
shuffle p[j] in memory
for batch size of events x in p[j] do
yield x to DataLoader
Below is an example of how to use the writing utility. In the example, the dataloader loop is abstracted away using the ColumnarEventLoop class.
from f9columnar.ml.hdf5_writer import Hdf5WriterPostprocessor
# we will make a linear chain of processors
analysis_graph = ProcessorsGraph()
analysis_graph.add(
CheckpointProcessor("input"),
*analysis_collection.as_list(), # add a collection of processors that do some calculations
CheckpointProcessor("output", save_arrays=True), # save arrays at the end of the chain for the hdf5 writer
)
analysis_graph.chain()
analysis_graph.draw("hdf_analysis_graph.pdf")
# create the hdf5 writer postprocessor
# a postprocessor is a special processor that runs after the main processor graph and is executed in the main process
# ProcessorsGraph can be thought of as a map step and PostprocessorsGraph as a reduce step
hdf5_writer = Hdf5WriterPostprocessor(
output_file,
flat_column_names=["tau_vis_mass"], # flat columns to save
jagged_column_names=None, # jagged columns to save, pads extra values with a pad value if needed
chunk_shape=512, # shape of chunks to write to hdf5 file
n_piles=1024, # number of piles to use for shuffling
pile_assignment="random", # how to assign events to piles, random or deque
merge_piles=False, # merge piles into a single hdf5 file at the end
enforce_dtypes=None, # enforce specific dtypes for columns
save_node="output", # name of the node in the processor graph to save arrays from
)
# create a linear chain of postprocessors
postprocessors_graph = PostprocessorsGraph()
postprocessors_graph.add(
CheckpointPostprocessor("input"),
hdf5_writer,
)
postprocessors_graph.chain()
postprocessors_graph.draw("hdf_post_analysis_graph.pdf")
# we can make a root dataset for MC and data files lists
data_dataset = RootPhysicsDataset(f"Data", [str(f) for f in data_files], is_data=True)
mc_dataset = RootPhysicsDataset(f"MC", [str(f) for f in mc_files], is_data=False)
# get the branch filter from the analysis collection to not load unnecessary branches
branch_filter = analysis_collection.branch_name_filter
# setup the data dataloader
data_dataset.setup_dataloader(**dataloader_config)
data_dataset.init_dataloader(processors=analysis_graph)
# setup the mc dataloader
mc_dataset.setup_dataloader(**dataloader_config)
mc_dataset.init_dataloader(processors=analysis_graph)
# run the DataLoader event loop over batches of events for both datasets
event_loop = ColumnarEventLoop(
mc_datasets=[mc_dataset], # supports multiple datasets
data_datasets=[data_dataset], # supports multiple datasets
postprocessors_graph=postprocessors_graph,
fit_postprocessors=True,
cut_flow=False,
)
event_loop.run() # iterates over batches of events in both datasets
postprocessors_graph["hdf5WriterPostprocessor"].close() # close the file handles
Note that variables ending with *_type have a special meaning in the HDF5 writer. For example, a variable named label_type can be used to determine the type of the event (e.g., signal or background) and will be saved as a column in the HDF5 file. This is useful for classification tasks in machine learning. Another special variable is weights, which will be used to store event weights in the HDF5 file. These special variables help in organizing and utilizing the data effectively for training machine learning models.
Feature scaling
The library also includes a utility to scale the entire HDF5 dataset using sklearn scalers. This is particularly useful for machine learning applications where feature scaling can improve model performance.
from f9columnar.ml.dataset_scaling import DatasetScaler
ds_scaler = DatasetScaler(
files, # list of hdf5 files
scaler_type, # minmax, maxabs, standard, robust, quantile, power, logit, standard_logit
features, # names of the features to scale
scaler_save_path=save_path,
n_max=None, # maximum number of events to use for fitting the scaler if the scaler does not have partial fit
extra_hash="", # extra string to add to the hash of the scaler
scaler_kwargs=scaler_kwargs, # kwargs for the scaler
dataloader_kwargs=dataloader_kwargs, # kwargs for the dataloader
)
ds_scaler.feature_scale()
The result will be a pickle file with the fitted scaler object that can be used in the HDF5 ML DataLoader as:
feature_scaling_kwargs = {
"scaler_type": scaler_type,
"scaler_path": save_path, # where the scaler was saved
"scalers_extra_hash": "",
}
dataset_kwargs = dataset_kwargs | feature_scaling_kwargs
Note that categorical features use a custom LabelEncoder that supports partial fitting and can be used in an online fashion.
HDF5 ML DataLoader
We will use PyTorch Lightning to demonstrate how to use the Hdf5IterableDataset in a training loop.
from typing import Any, Callable
import lightning as L
from torch.utils.data import DataLoader
from f9columnar.ml.hdf5_ml_dataloader import (
WeightedBatch,
WeightedBatchType,
default_setup_func,
get_ml_hdf5_dataloader,
)
class LightningHdf5DataModule(L.LightningDataModule):
def __init__(
self,
name: str,
files: str | list[str],
column_names: list[str],
stage_split_piles: dict[str, list[int] | int],
shuffle: bool = False,
collate_fn: Callable[[list[WeightedBatch]], WeightedBatchType] | None = None,
dataset_kwargs: dict[str, Any] | None = None,
dataloader_kwargs: dict[str, Any] | None = None,
) -> None:
super().__init__()
self.dl_name = name
self.files = files
self.column_names = column_names
self.stage_split_piles = stage_split_piles
self.shuffle = shuffle
self.collate_fn = collate_fn
self.dataset_kwargs = dataset_kwargs
self.dl_kwargs = dataloader_kwargs
def _get_dataloader(self, stage: str) -> DataLoader:
# returns DataLoader object for the given stage, selection and number of events
dl, _, _ = get_ml_hdf5_dataloader(
f"{stage} - {self.dl_name}",
self.files,
self.column_names,
stage_split_piles=self.stage_split_piles,
stage=stage,
shuffle=self.shuffle,
collate_fn=self.collate_fn,
dataset_kwargs=self.dataset_kwargs,
dataloader_kwargs=self.dl_kwargs,
)
return dl
def train_dataloader(self) -> DataLoader:
return self._get_dataloader("train")
def val_dataloader(self) -> DataLoader:
return self._get_dataloader("val")
def test_dataloader(self) -> DataLoader:
return self._get_dataloader("test")
def events_collate_fn(batch: tuple[WeightedDatasetBatch, dict[str, Any]]) -> WeightedBatchType:
ds, reports = batch[0]["events"], batch[1]
# these are the return values from the DataLoader
return ds.X, ds.y, ds.w, ds.y_aux, reports
# can additionally pass any kwargs to the dataset and it will be forwarded to the reports dictionary
dataset_kwargs = {
"batch_size": 128, # number of events per batch
"imbalanced_sampler": None, # supports oversampling and undersampling
"drop_last": True, # drop last incomplete batch
}
# custom function to process the dataset inside the DataLoader workers
# the function should be: Callable[[StackedDatasets, MLHdf5Iterator], WeightedDatasetBatch | None]
dataset_kwargs["setup_func"] = default_setup_func
dataloader_kwargs = {
"num_workers": -1, # use all available cores
"prefetch_factor": 2, # number of batches to prefetch by each worker
}
dm = LightningHdf5DataModule(
dm_name, # name of the datamodule
files, # list of hdf5 files
features, # names of the features to load (column names in the hdf5 file)
stage_split_piles={"train": 512, "test": 256, "val": 256}, # number of piles for each stage
shuffle=True, # shuffle events at each epoch
collate_fn=events_collate_fn, # collate function to handle batches
dataset_kwargs=dataset_kwargs, # kwargs for the dataset
dataloader_kwargs=dataloader_kwargs, # kwargs for the dataloader
)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file f9columnar-0.4.2.tar.gz.
File metadata
- Download URL: f9columnar-0.4.2.tar.gz
- Upload date:
- Size: 198.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.2 {"installer":{"name":"uv","version":"0.11.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d1e1d396b5f7b9f1a654ec4f212347fe227cedb8b080e5a9d14399dc53ae9383
|
|
| MD5 |
a64591ae81abff4fb157b4aa614cc784
|
|
| BLAKE2b-256 |
d2029d11053738a5ccb54fee8640ee1dbcd25c6a65091035fd786f5c1a271a26
|
File details
Details for the file f9columnar-0.4.2-py3-none-any.whl.
File metadata
- Download URL: f9columnar-0.4.2-py3-none-any.whl
- Upload date:
- Size: 227.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.2 {"installer":{"name":"uv","version":"0.11.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
367588a988a8d2ef69b5805caf836d0476ef2dee9aa29bac2ca8161ee2eb3bd1
|
|
| MD5 |
a4bdd4719081220bbed4f8192fb2c508
|
|
| BLAKE2b-256 |
10a5123793723633065a0bc0c25175c315cdd6bc54ad22596b7dd39b6c77a1d0
|