Columnar analysis utils.
Project description
F9 Columnar
A lightweight Python library for processing of ROOT and HDF5 event data in high energy physics.
Project description
This library is designed for efficient handling of large datasets. Built on PyTorch, Awkward Array, and Uproot, it utilizes PyTorch's DataLoader with an IterableDataset to enable parallel processing. It implements a columnar event loop, returning batches of events in a format compatible with standard PyTorch training loops over multiple epochs.
It is optimized for machine learning applications, the library provides RootDataLoader and Hdf5DataLoader classes for data loading from ROOT and HDF5 files. Additionally, it supports parallel data processing through a modular pipeline of processor classes, allowing users to chain operations for complex computations and histogramming.
See PyHEP 2025 talk for a general overview of the library and its features.
Setup
Install with PyTorch GPU
pip install f9columnar[torch]
Install with PyTorch CPU (recommended)
pip install f9columnar
pip install torch --index-url https://download.pytorch.org/whl/cpu
Install without PyTorch
pip install f9columnar
Examples
- Aim of the library
- Getting started
- Using processors and histogramming
- Converting ROOT to HDF5
- Feature scaling
- HDF5 ML DataLoader
Aim of the library
The main goal of this library is to provide a simple and efficient way to process large ROOT and HDF5 datasets using PyTorch DataLoaders. The library is designed to be modular and extensible, allowing users to easily add new processors and functionality as needed. A common workflow is illustrated in the diagram below:
Getting started
The most basic usage of the library is to load data from ROOT files using the get_root_dataloader function. This function returns a PyTorch DataLoader that yields batches of events as Awkward Arrays.
from f9columnar.root_dataloader import get_root_dataloader
def filter_branch(branch):
# select only these two branches
return branch == "tau_p4" or branch == "lephad_p4"
# root_dataloader is an instance of a torch DataLoader that uses an IterableDataset
root_dataloader, total = get_root_dataloader(
name="data", # name identifier
files=files, # root files
key="NOMINAL", # root file tree name
step_size=10**5, # number of events per array batch to read into memory
num_workers=12, # number of workers for parallel processing
processors=None, # arbitrary calculations on arrays
filter_name=filter_branch, # filter branches
)
# loop over batches of events from .root file(s), each batch is an awkward array
for events in root_dataloader:
arrays, report = events
# ... do something with the arrays
Using processors and histogramming
The following example demonstrates how to define variables, apply a cut, and create a histogram using processors.
Calculations on arrays within worker processes can be performed using a Processor. Multiple processors can be linked together in a ProcessorsGraph, forming a directed acyclic graph (DAG). These processors are applied to arrays in the sequence determined by the DAG’s topological order.
Each worker executes the same processor graph on batches of event data and returns the results to the event loop once processing is complete. In the example above, 12 (num_workers) processor graphs would be running in parallel, each handling small batches of events. Below is an example demonstrating how to calculate the tau visible mass and apply a cut to this variable.
from f9columnar.processors import ProcessorsGraph, CheckpointProcessor
from f9columnar.object_collections import Variable, VariableCollection, Cut, CutCollection
from f9columnar.histograms import HistogramProcessor
class VisibleMass(Variable): # Variable is a Processor
name = "vis_mass" # processor name
branch_name = "lephad_p4" # name of the branch in the .root file
def __init__(self):
super().__init__()
def run(self, arrays): # each processor must implement a run method
lephad_p4 = arrays[self.branch_name] # branch_name is the name of the field in the ak array
v = get_kinematics_vector(lephad_p4) # use vector with px, py, pz and E
arrays["tau_vis_mass"] = v.m # add a new field to the arrays
return {"arrays": arrays} # return the arrays (can also return None if no changes are made)
class CutVisibleMass(Cut): # Cut is a Processor
name = "vis_mass_cut"
branch_name = None # is not a branch in ntuples but was defined in the VisibleMass processor
def __init__(self, cut_lower, cut_upper): # argumnets of the processor
super().__init__()
self.cut_lower = cut_lower
self.cut_upper = cut_upper
def run(self, arrays):
mask = (arrays["tau_vis_mass"] > self.cut_lower) & (arrays["tau_vis_mass"] < self.cut_upper)
arrays = arrays[mask] # apply the cut
return {"arrays": arrays} # return must be a dictionary with key name for the argument of the next processor
class Histograms(HistogramProcessor): # HistogramProcessor is a Processor
def __init__(self, name="histograms"):
super().__init__(name)
self.make_hist1d("tau_vis_mass", 20, 80.0, 110.0) # make a histogram with 20 bins from 80 to 110 GeV
def run(self, arrays):
return super().run(arrays) # auto fills histograms if array names match histogram names
var_collection = VariableCollection(VisibleMass, init=False) # will initialize later
cut_collection = CutCollection(CutVisibleMass, init=False)
collection = var_collection + cut_collection # add collections of objects together
branch_filter = collection.branch_name_filter # defines the branches that the processors depend on
graph = ProcessorsGraph() # graph has a fit method that gets called inside the root_dataloader
# add nodes to the graph
graph.add(
CheckpointProcessor("input"), # input node
var_collection["vis_mass"](), # initialize the processor
cut_collection["vis_mass_cut"](cut_lower=90.0, cut_upper=100.0),
CheckpointProcessor("output", save_arrays=True), # saves final arrays
Histograms(),
)
# build a processor graph
graph.connect(
[
("input", "vis_mass"),
("vis_mass", "vis_mass_cut"),
("vis_mass_cut", "output"),
("output", "histograms"),
]
)
# plot the graph
graph.draw("graph.pdf")
# ... pass into the root_dataloader with the processors argument (e.g. processors=graph)
# in this case the dataloader will return a fitted graph
for processed_graph in dataloader:
histograms = processed_graph["histograms"].hists
arrays = processed_graph["output"].arrays
# ... do something with the histograms and arrays
A higher level of abstraction is also possible using the ColumnarEventLoop class. See benchmark examples for some more details.
Converting ROOT to HDF5
The library also includes a utility to convert ROOT files to HDF5 format.
Event shuffling is supported to randomize the order of events in the output HDF5 file. This is particularly useful for machine learning applications where data order can impact training. This is done using a 2-pass shuffling algorithm that is memory efficient and works well with large datasets:
-- writing step --
create empty datasets called piles p[0], p[1], ..., p[n - 1]
for step size of events x from a root file i do
for chunk size c of events x do
j ← random integer in [0, n - 1]
append c to p[j]
-- reading step --
for j in [n1, n2] where 0 ≤ n1 < n2 and n1 < n2 ≤ n do
read all events from p[j] into memory
shuffle p[j] in memory
for batch size of events x in p[j] do
yield x to DataLoader
Below is an example of how to use the writing utility. In the example, the dataloader loop is abstracted away using the ColumnarEventLoop class.
from f9columnar.ml.hdf5_writer import Hdf5WriterPostprocessor
# we will make a linear chain of processors
analysis_graph = ProcessorsGraph()
analysis_graph.add(
CheckpointProcessor("input"),
*analysis_collection.as_list(), # add a collection of processors that do some calculations
CheckpointProcessor("output", save_arrays=True), # save arrays at the end of the chain for the hdf5 writer
)
analysis_graph.chain()
analysis_graph.draw("hdf_analysis_graph.pdf")
# create the hdf5 writer postprocessor
# a postprocessor is a special processor that runs after the main processor graph and is executed in the main process
# ProcessorsGraph can be thought of as a map step and PostprocessorsGraph as a reduce step
hdf5_writer = Hdf5WriterPostprocessor(
output_file,
flat_column_names=["tau_vis_mass"], # flat columns to save
jagged_column_names=None, # jagged columns to save, pads extra values with a pad value if needed
chunk_shape=512, # shape of chunks to write to hdf5 file
n_piles=1024, # number of piles to use for shuffling
pile_assignment="random", # how to assign events to piles, random or deque
merge_piles=False, # merge piles into a single hdf5 file at the end
enforce_dtypes=None, # enforce specific dtypes for columns
save_node="output", # name of the node in the processor graph to save arrays from
)
# create a linear chain of postprocessors
postprocessors_graph = PostprocessorsGraph()
postprocessors_graph.add(
CheckpointPostprocessor("input"),
hdf5_writer,
)
postprocessors_graph.chain()
postprocessors_graph.draw("hdf_post_analysis_graph.pdf")
# we can make a root dataset for MC and data files lists
data_dataset = RootPhysicsDataset(f"Data", [str(f) for f in data_files], is_data=True)
mc_dataset = RootPhysicsDataset(f"MC", [str(f) for f in mc_files], is_data=False)
# get the branch filter from the analysis collection to not load unnecessary branches
branch_filter = analysis_collection.branch_name_filter
# setup the data dataloader
data_dataset.setup_dataloader(**dataloader_config)
data_dataset.init_dataloader(processors=analysis_graph)
# setup the mc dataloader
mc_dataset.setup_dataloader(**dataloader_config)
mc_dataset.init_dataloader(processors=analysis_graph)
# run the DataLoader event loop over batches of events for both datasets
event_loop = ColumnarEventLoop(
mc_datasets=[mc_dataset], # supports multiple datasets
data_datasets=[data_dataset], # supports multiple datasets
postprocessors_graph=postprocessors_graph,
fit_postprocessors=True,
cut_flow=False,
)
event_loop.run() # iterates over batches of events in both datasets
postprocessors_graph["hdf5WriterPostprocessor"].close() # close the file handles
Note that variables ending with *_type have a special meaning in the HDF5 writer. For example, a variable named label_type can be used to determine the type of the event (e.g., signal or background) and will be saved as a column in the HDF5 file. This is useful for classification tasks in machine learning. Another special variable is weights, which will be used to store event weights in the HDF5 file. These special variables help in organizing and utilizing the data effectively for training machine learning models.
Feature scaling
The library also includes a utility to scale the entire HDF5 dataset using sklearn scalers. This is particularly useful for machine learning applications where feature scaling can improve model performance.
from f9columnar.ml.dataset_scaling import DatasetScaler
ds_scaler = DatasetScaler(
files, # list of hdf5 files
scaler_type, # minmax, maxabs, standard, robust, quantile, power, logit, standard_logit
features, # names of the features to scale
scaler_save_path=save_path,
n_max=None, # maximum number of events to use for fitting the scaler if the scaler does not have partial fit
extra_hash="", # extra string to add to the hash of the scaler
scaler_kwargs=scaler_kwargs, # kwargs for the scaler
dataloader_kwargs=dataloader_kwargs, # kwargs for the dataloader
)
ds_scaler.feature_scale()
The result will be a pickle file with the fitted scaler object that can be used in the HDF5 ML DataLoader as:
feature_scaling_kwargs = {
"scaler_type": scaler_type,
"scaler_path": save_path, # where the scaler was saved
"scalers_extra_hash": "",
}
dataset_kwargs = dataset_kwargs | feature_scaling_kwargs
Note that categorical features use a custom LabelEncoder that supports partial fitting and can be used in an online fashion.
HDF5 ML DataLoader
We will use PyTorch Lightning to demonstrate how to use the Hdf5IterableDataset in a training loop.
from typing import Any, Callable
import lightning as L
from torch.utils.data import DataLoader
from f9columnar.ml.hdf5_ml_dataloader import (
WeightedBatch,
WeightedBatchType,
default_setup_func,
get_ml_hdf5_dataloader,
)
class LightningHdf5DataModule(L.LightningDataModule):
def __init__(
self,
name: str,
files: str | list[str],
column_names: list[str],
stage_split_piles: dict[str, list[int] | int],
shuffle: bool = False,
collate_fn: Callable[[list[WeightedBatch]], WeightedBatchType] | None = None,
dataset_kwargs: dict[str, Any] | None = None,
dataloader_kwargs: dict[str, Any] | None = None,
) -> None:
super().__init__()
self.dl_name = name
self.files = files
self.column_names = column_names
self.stage_split_piles = stage_split_piles
self.shuffle = shuffle
self.collate_fn = collate_fn
self.dataset_kwargs = dataset_kwargs
self.dl_kwargs = dataloader_kwargs
def _get_dataloader(self, stage: str) -> DataLoader:
# returns DataLoader object for the given stage, selection and number of events
dl, _, _ = get_ml_hdf5_dataloader(
f"{stage} - {self.dl_name}",
self.files,
self.column_names,
stage_split_piles=self.stage_split_piles,
stage=stage,
shuffle=self.shuffle,
collate_fn=self.collate_fn,
dataset_kwargs=self.dataset_kwargs,
dataloader_kwargs=self.dl_kwargs,
)
return dl
def train_dataloader(self) -> DataLoader:
return self._get_dataloader("train")
def val_dataloader(self) -> DataLoader:
return self._get_dataloader("val")
def test_dataloader(self) -> DataLoader:
return self._get_dataloader("test")
def events_collate_fn(batch: tuple[WeightedDatasetBatch, dict[str, Any]]) -> WeightedBatchType:
ds, reports = batch[0]["events"], batch[1]
# these are the return values from the DataLoader
return ds.X, ds.y, ds.w, ds.y_aux, reports
# can additionally pass any kwargs to the dataset and it will be forwarded to the reports dictionary
dataset_kwargs = {
"batch_size": 128, # number of events per batch
"imbalanced_sampler": None, # supports oversampling and undersampling
"drop_last": True, # drop last incomplete batch
}
# custom function to process the dataset inside the DataLoader workers
# the function should be: Callable[[StackedDatasets, MLHdf5Iterator], WeightedDatasetBatch | None]
dataset_kwargs["setup_func"] = default_setup_func
dataloader_kwargs = {
"num_workers": -1, # use all available cores
"prefetch_factor": 2, # number of batches to prefetch by each worker
}
dm = LightningHdf5DataModule(
dm_name, # name of the datamodule
files, # list of hdf5 files
features, # names of the features to load (column names in the hdf5 file)
stage_split_piles={"train": 512, "test": 256, "val": 256}, # number of piles for each stage
shuffle=True, # shuffle events at each epoch
collate_fn=events_collate_fn, # collate function to handle batches
dataset_kwargs=dataset_kwargs, # kwargs for the dataset
dataloader_kwargs=dataloader_kwargs, # kwargs for the dataloader
)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file f9columnar-0.3.14.tar.gz.
File metadata
- Download URL: f9columnar-0.3.14.tar.gz
- Upload date:
- Size: 1.5 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.1 CPython/3.11.14 Linux/6.8.4-200.fc39.x86_64
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
48e2185c8372d617a227f4653f84850a2f6abd7b63ea280f1c91030394cd3920
|
|
| MD5 |
d1affcdc396c904679b74935d0959fdf
|
|
| BLAKE2b-256 |
63c0747a5bbb55027fbbba618b317924ec0dc1ef0065155059088fe54c3497a4
|
File details
Details for the file f9columnar-0.3.14-py3-none-any.whl.
File metadata
- Download URL: f9columnar-0.3.14-py3-none-any.whl
- Upload date:
- Size: 1.6 MB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.1 CPython/3.11.14 Linux/6.8.4-200.fc39.x86_64
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5a5ad531ee0f08062f20d81dd6e9b49c93caf1d4c4dfaba45f8e244d488a99d5
|
|
| MD5 |
fda3ec1ee746cc5ebf49e2d3affb5c9c
|
|
| BLAKE2b-256 |
d9583dcf6092481d5a51f798abeb704bf349e32e177464923eeedc28c75e97e7
|