Columnar analysis utils for high-energy physics data analysis.
Project description
F9 Columnar
A PyTorch-based library for processing ROOT and HDF5 event data in high energy physics.
Project description
F9Columnar is a Python library designed to streamline the processing of physics datasets from ROOT and HDF5 files for machine learning applications. Built on top of PyTorch, Uproot and Awkward Array, it provides a modular framework for efficient data loading, transformation, and analysis in high-energy physics research.
The library bridges the gap between traditional high-energy physics data formats (ROOT) and modern machine learning workflows. It enables physicists and data scientists to:
- Process ROOT and HDF5 datasets using PyTorch DataLoaders
- Apply event selections, calculate variables, and create histograms
- Convert ROOT data to HDF5 format for ML training
Check out the PyHEP 2025 talk for a good overview of what the library can do.
Setup
The library comes without any PyTorch dependencies by default to allow flexible installations. You can install it with or without PyTorch GPU support depending on your needs.
Install with PyTorch for GPU
pip install f9columnar
pip install torch
Install with PyTorch for CPU
pip install f9columnar
pip install torch --index-url https://download.pytorch.org/whl/cpu
Library overview
- Aim of the library
- Getting started
- Basic data loading
- Using processors and histogramming
- Converting ROOT to HDF5
- Feature scaling
- ML DataLoaders
- Data augmentation
- Inference server
Aim of the library
The main goal of this library is to provide a simple and efficient way to process large ROOT and HDF5 datasets using PyTorch DataLoaders. The library is designed to be modular and extensible, allowing users to easily add new processors and functionality as needed. A common workflow is illustrated in the diagram below:
Getting started
To illustrate the basic usage of the library, we need some ROOT files. We can create artificial ROOT files using the AwkwardEventGenerator utility. This utility allows us to define flat and jagged branches with different distributions.
Let's create some artificial ROOT files with 100,000 events, containing electrons and muons with transverse momentum (pt), pseudorapidity (eta), and azimuthal angle (phi) distributions, and a flat weight distribution.
from f9columnar.utils.ak_events import AwkwardEventGenerator
gen = AwkwardEventGenerator(n_events=100000, min_particles=0, max_particles=5)
flat_spec = {"weight": {"dist_type": "normal", "params": {"mean": 1.0, "stddev": 0.1}}}
jagged_spec = {
"el": {
"pt": {"dist_type": "pt", "params": {"pt_min": 2.0e4, "pt_max": 3.0e5, "n": 5.0}},
"eta": {"dist_type": "eta", "params": {}},
"phi": {"dist_type": "phi", "params": {}},
},
"mu": {
"pt": {"dist_type": "pt", "params": {"pt_min": 2.0e4, "pt_max": 5.0e5, "n": 5.0}},
"eta": {"dist_type": "eta", "params": {}},
"phi": {"dist_type": "phi", "params": {}},
},
}
arrays = gen.from_spec_to_dict(flat_spec=flat_spec, jagged_spec=jagged_spec)
gen.to_root(arrays, file_path="data/simpleNTuple.root", tree_name="physics", n_splits=5)
This will create 5 ROOT files named simpleNTuple_part<idx>.root each having 20,000 events in the data/ directory with artificial event data. We can now use the library to load and process this test data.
Basic data loading
The most basic usage of the library is to load data from ROOT files using the get_root_dataloader function. This function returns a PyTorch DataLoader that yields batches of events as Awkward arrays.
import glob
from f9columnar.root_dataloader import get_root_dataloader
def filter_branch(branch: str) -> bool:
# select only these two branches
return branch == "el_pt" or branch == "mu_pt"
# root_dataloader is an instance of a torch DataLoader that uses an IterableDataset
root_dataloader, total, _ = get_root_dataloader(
name="awkwardEvents", # name identifier
files=glob.glob("data/simpleNTuple_part*.root"), # root files
key="physics", # root file tree name
step_size=1024, # number of events per array batch to read into memory
num_workers=0, # number of workers for parallel processing (single threaded if 0)
processors=None, # arbitrary calculations on arrays
filter_name=filter_branch, # filter branches
)
# loop over batches of events from .root file(s), each batch is an awkward array
for events in root_dataloader:
arrays, reports = events # reports includes useful information about the batch
el_pt, mu_pt = arrays["el_pt"], arrays["mu_pt"] # awkward arrays of electron and muon pT
# ... do something with the arrays
Using processors and histogramming
The following example demonstrates how to define variables, apply a cut, and create a histogram using processors.
Calculations on arrays within worker processes can be performed using a Processor. Multiple processors can be linked together in a ProcessorsGraph, forming a directed acyclic graph (DAG). These processors are applied to arrays in the sequence determined by the DAG's topological order.
Each worker executes the same processor graph on batches of event data and returns the results to the event loop once processing is complete.
Lets make a simple analysis that performs a pT cut on leptons, selects a leading lepton (highest pT), and creates a histogram of the leading lepton pT.
First, we define the processors:
import awkward as ak
from f9columnar.processors import CheckpointProcessor, Processor, ProcessorsGraph
from f9columnar.processors_collection import decorate_branches
@decorate_branches("el_pt", "el_eta", "el_phi", "mu_pt", "mu_eta", "mu_phi", "weight")
class CutLeptons(Processor):
def __init__(self, name: str, pt_cut: float) -> None:
super().__init__(name)
self.pt_cut = pt_cut
def run(self, arrays: ak.Array) -> dict[str, ak.Array]:
# apply pt cut particle mask on electrons and muons
mask_el, mask_mu = arrays["el_pt"] > self.pt_cut, arrays["mu_pt"] > self.pt_cut
# mask arrays to keep only leptons passing the pt cut
for field in ["el_pt", "el_eta", "el_phi"]:
arrays[field] = arrays[field][mask_el]
for field in ["mu_pt", "mu_eta", "mu_phi"]:
arrays[field] = arrays[field][mask_mu]
# return must be a dictionary with key name for the argument of the next processor
return {"arrays": arrays}
class LeadingLeptons(Processor):
def __init__(self, name: str) -> None:
super().__init__(name)
def run(self, arrays: ak.Array) -> dict[str, ak.Array]:
# get leading electron (highest pt)
el_pt_argmax = ak.argmax(arrays["el_pt"], axis=1, keepdims=True)
for field in ["el_pt", "el_eta", "el_phi"]:
arrays[field] = arrays[field][el_pt_argmax]
arrays[field] = ak.drop_none(arrays[field])
# get leading muon (highest pt)
mu_pt_argmax = ak.argmax(arrays["mu_pt"], axis=1, keepdims=True)
for field in ["mu_pt", "mu_eta", "mu_phi"]:
arrays[field] = arrays[field][mu_pt_argmax]
arrays[field] = ak.drop_none(arrays[field])
return {"arrays": arrays}
The decorate_branches decorator is used to specify the branches that the processor depends on. This allows the dataloader to only load the necessary branches from the ROOT files.
Making a histogram processor is also straightforward:
from f9columnar.histograms import HistogramProcessor
class LeadingLeptonHistogram(HistogramProcessor):
def __init__(self, name: str) -> None:
super().__init__(name)
self.make_hist1d("leading_el_pt", 50, 20.0, 300.0) # num. bins, min, max
self.make_hist1d("leading_mu_pt", 50, 20.0, 500.0) # num. bins, min, max
def run(self, arrays: ak.Array) -> dict[str, ak.Array]:
# fill histogram with leading lepton pT in GeV
leading_el_pt, leading_mu_pt = arrays["el_pt"] / 1e3, arrays["mu_pt"] / 1e3
non_empty_el = ak.num(leading_el_pt) > 0
leading_el_pt = ak.firsts(leading_el_pt[non_empty_el])
el_weight = arrays["weight"][non_empty_el]
# fill electron histogram with pt and weights
self.fill_hist1d("leading_el_pt", leading_el_pt, el_weight)
non_empty_mu = ak.num(leading_mu_pt) > 0
leading_mu_pt = ak.firsts(leading_mu_pt[non_empty_mu])
mu_weight = arrays["weight"][non_empty_mu]
# fill muon histogram with pt and weights
self.fill_hist1d("leading_mu_pt", leading_mu_pt, mu_weight)
return {"arrays": arrays}
We can now build the processor graph from the defined processors:
from f9columnar.processors import CheckpointProcessor, ProcessorsGraph
from f9columnar.processors_collection import ProcessorsCollection
# define a collection of processors for bookkeeping
analysis_collection = ProcessorsCollection("leptonCollection")
# add processors to the collection
analysis_collection += CutLeptons(name="cutLeptons", pt_cut=25.0) # pt cut at 25 GeV
analysis_collection += LeadingLeptons(name="leadingLeptons")
analysis_collection += LeadingLeptonHistogram(name="leadingLeptonHistogram")
# define the processor graph
analysis_graph = ProcessorsGraph()
# build a linear chain of processors
analysis_graph.add(
CheckpointProcessor("input"),
*analysis_collection.as_list(),
CheckpointProcessor("output"),
)
# if the graph is a linear chain we can use chain() to connect the nodes
# for more complex graphs use connect() method that defines edges explicitly
analysis_graph.chain()
# draw the graph with pygraphviz
analysis_graph.draw("analysis_graph.pdf")
We now have a processor graph that can be passed into the DataLoader to process events. The graph will be executed in each worker process for each batch of events. Instead of using a simple get_root_dataloader function, we can create a RootPhysicsDataset and initialize the dataloader with the processor graph:
from f9columnar.dataset_builder import RootPhysicsDataset
# make one or more root datasets as a list of RootPhysicsDataset objects
root_datasets = [
RootPhysicsDataset(
name="awkwardEvents",
root_files=glob.glob("data/simpleNTuple_part*.root"),
is_data=False,
)
]
# analysis collection provides the branch name filter automatically
branch_name_filter = analysis_collection.get_branch_name_filter()
# setup dataloader configuration
dataloader_config = {
"step_size": 1024,
"key": "physics",
"num_workers": 0,
"filter_name": branch_name_filter,
}
# setup and initialize the dataloader for the root dataset
root_datasets[0].setup_dataloader(**dataloader_config)
# runs get_root_dataloader() internally
root_datasets[0].init_dataloader(analysis_graph)
We have build a processor graph (the actual calculations) and a dataloader that uses the graph to process events. We can now run the event loop over batches of events:
from f9columnar.run import ColumnarEventLoop
# define the event loop over the root datasets
event_loop = ColumnarEventLoop(
mc_datasets=root_datasets,
data_datasets=None,
cut_flow=False,
save_path="results/",
)
# run it!
event_loop.run(mc_only=True)
That's it! After running the event loop, the histograms will be saved in the results/ directory as awkwardEvents_mc_0_hists.p.
Converting ROOT to HDF5
The library includes Hdf5WriterProcessor - a Processor that converts ROOT ntuples into HDF5 files suitable for ML training. It supports both flat (event-level) and jagged (variable-length, e.g. per-jet) arrays, with two storage modes for jagged data.
Storage modes
Padded mode (varlen=False): jagged arrays are stored as fixed-size 2D arrays (n_events, max_length), with short sequences filled using a configurable pad_value.
Variable-length mode (varlen=True): jagged arrays are flattened into 1D packed arrays, with boundaries tracked by a cumulative-lengths dataset (culens). For example, an object group jets with culens = [0, 3, 5] means event 0 owns jets at indices 0-2 and event 1 owns jets at indices 3-4. This avoids wasting space when sequence lengths vary significantly.
Pile-based shuffling
Output is split across multiple HDF5 "pile" files (p0.hdf5, p1.hdf5, ..., p{n-1}.hdf5). During writing, each chunk of events is assigned to a pile either randomly (pile_assignment="random") or in round-robin order (pile_assignment="deque"). During reading, piles are loaded independently - the ML dataloader shuffles within each pile, giving efficient pseudo-random access without loading the full dataset into memory.
-- writing step --
for batch of events from ROOT file do
for chunk of size chunk_shape in batch do
j <- pile index (random or round-robin)
append chunk to pile p[j]
-- reading step --
for pile p[j] in selected range do
load p[j], shuffle in memory
yield batches to DataLoader
Valid masks
The writer can produce a boolean valid column inside each jagged object struct. When valid=True, slots holding real data are marked True and padding slots are False. Passing a dict (e.g. valid={"jets": ("jet_type", [2])}) additionally filters by a type field - only slots where jet_type == 2 are marked valid.
Example usage
Hdf5WriterProcessor is a regular Processor that runs inside the processor graph (alongside any custom transformation processors) rather than as a separate postprocessor:
from f9columnar.dataset_builder import RootPhysicsDataset
from f9columnar.hdf5_writer import Hdf5WriterProcessor
from f9columnar.processors import CheckpointProcessor, ProcessorsGraph
from f9columnar.run import ColumnarEventLoop
# build the processor graph -- the writer sits in the chain like any other processor
analysis_graph = ProcessorsGraph()
analysis_graph.add(
CheckpointProcessor("input"),
*analysis_collection.as_list(), # custom processors (weights, labels, scaling, ...)
Hdf5WriterProcessor(
output_files_path="output/hdf5/",
flat_column_names=["ptl1", "ptl2", "mll", "weights", "label_type"],
jagged_column_names={
"jets": ["jet_e", "jet_pt", "jet_eta", "jet_phi"],
},
default_chunk_shape=32,
max_lengths={"jets": 15},
pad_values={"jets": 999.0},
sort_by_variable={"jets": "jet_pt"}, # descending sort before truncation
n_piles=256,
pile_assignment="random",
enforce_dtypes={"nbjets": "int64"},
varlen=True, # variable-length storage
enforce_varlen_max_length=False,
valid={"jets": ("jet_type", [2])}, # per-slot valid mask filtered by jet_type
extra_metadata={"scale": {"jet_e": 1.0e-3}},
append=False,
compression="lz4", # forwarded to h5py.create_dataset
),
CheckpointProcessor("output", save_arrays=True),
)
analysis_graph.chain()
# create ROOT datasets
mc_dataset = RootPhysicsDataset("MC", [str(f) for f in mc_files], is_data=False)
mc_dataset.setup_dataloader(**dataloader_config)
mc_dataset.init_dataloader(processors=analysis_graph)
# run the event loop
event_loop = ColumnarEventLoop(
mc_datasets=[mc_dataset],
data_datasets=None,
cut_flow=False,
)
event_loop.run(mc_only=True)
Output HDF5 structure
Each pile file (p0.hdf5, ...) contains:
/events # flat event-level structured array (n_events,)
fields: ptl1, ptl2, mll, weights, label_type, ...
/jets # varlen: packed 1D (total_jets,) | padded: 2D (n_events, max_length)
fields: jet_e, jet_pt, jet_eta, jet_phi, valid
/jets_culens # varlen only: (n_events+1,) cumulative offsets, dtype int64
# jets[culens[i]:culens[i+1]] = jets for event i
/metadata # JSON string - pad_values, max_lengths, sort_by_variable,
# varlen, enforce_varlen_max_length, extra_metadata, ...
Feature scaling
The library fits sklearn-based scalers over HDF5 (or ROOT) datasets so that scaling works on data that doesn't fit in memory.
Numerical scalers
The following numerical scaler types are supported (passed as numer_scaler_type):
| Type | sklearn class | Supports partial fit | NN scaler |
|---|---|---|---|
"minmax" |
MinMaxScaler |
yes | NNMinMaxScaler |
"maxabs" |
MaxAbsScaler |
yes | - |
"standard" |
StandardScaler |
yes | NNStandardScaler |
"robust" |
RobustScaler |
no | - |
"quantile" |
QuantileTransformer |
no | - |
"power" |
PowerTransformer |
no | - |
"logit" |
MinMaxScaler -> LogitTransform |
yes | NNLogitScaler |
"standard_logit" |
MinMaxScaler -> LogitTransform -> StandardScaler |
yes | NNStandardLogitScaler |
When a scaler does not support partial fit, all events (up to n_max) are accumulated before fitting. Scalers that support partial fit are updated incrementally per batch.
Categorical scaler
Setting categ_scaler_type="categorical" enables a custom CategoricalFeatureScaler (a LabelEncoder variant that supports partial fitting). It maps each unique category value to a consecutive integer per column, and can be updated incrementally as new batches arrive.
Either scaler can be disabled by passing None for its type. At least one must be enabled.
Single-loader usage
from f9columnar.ak_collate_fn import CollateConfig
from f9columnar.hdf5_dataloader import get_hdf5_dataloader
from f9columnar.ml.dataset_scaling import DatasetScaler
collate_config = CollateConfig(
modalities={
"events": ["mll", "met", "metphi"],
"jets": ["jet_pt", "jet_eta", "jet_phi"],
},
modalities_metadata={"events": ["eventNumber", "weights"]},
layout="varlen",
)
loader, _ = get_hdf5_dataloader(
files=hdf5_pile_files,
collate_config=collate_config,
step_size=100_000,
num_workers=4,
batch_size=4096,
)
ds_scaler = DatasetScaler(
dataloader=loader,
modalities=collate_config.modalities,
modalities_metadata=collate_config.modalities_metadata,
numer_scaler_type="standard", # numerical scaler (or None to disable)
categ_scaler_type="categorical", # categorical scaler (or None to disable)
scaler_save_path="scalers/", # output directory
n_max=1_000_000, # cap for non-partial-fit scalers (None = all)
extra_hash="v2", # optional suffix on scaler filenames
scaler_kwargs={"partial_fit": True, "copy": True},
)
ds_scaler.feature_scale()
The driver writes one pickle per (modality, scaler kind). For varlen/padded modalities the padding mask is applied automatically - padded slots are not included in the fit.
Multi-source usage
When you train with several HDF5/ROOT sources (LightningCombinedSourcesDataModule), use MultiSourceDatasetScaler to fit one scaler set per source from the same YAML config used at training time:
from f9columnar.ml.dataset_scaling import MultiSourceDatasetScaler
ms_scaler = MultiSourceDatasetScaler(
config=sources_yaml, # path/dict/DictConfig - same schema as the DataModule
extra_hash="v2",
extra_processors=[CustomProcessor], # ROOT-source processor classes (optional)
)
ms_scaler.feature_scale()
Each source's feature_scaling block specifies the scaler types and save path:
sources:
- name: 2L_Run2
feature_scaling:
numer_scaler_type: standard
categ_scaler_type: null
save_path: scalers/Run2/
n_max: null
scaler_params: { partial_fit: true, copy: true }
MultiSourceDatasetScaler internally runs a deterministic, augmentation-free pass over each source.
NN scalers (torch nn.Module wrappers)
The fitted sklearn scalers are saved as pickle files, but at inference time (and during training) you typically want pure PyTorch operations that live inside the model graph. nn_scalers.py provides nn.Module wrappers that extract the fitted statistics from sklearn scalers and register them as non-trainable buffers, so they move with the model across devices and are included in state_dict / ONNX exports.
Each scaler exposes both forward() (scale) and inverse() (unscale).
The composite NNDatasetScaler wraps all per-modality scalers into a single module. It takes a fitted scaler utility plus a ColumnSelection describing which columns are numerical vs categorical and applies the correct scaler to each feature column:
from f9columnar.ml.nn_scalers import NNDatasetScaler
nn_scaler = NNDatasetScaler(scaler_util, selection)
# forward: raw features -> scaled features
X_events_scaled, Xs_scaled = nn_scaler(X_events, [X_jets, X_electrons])
# inverse: scaled features -> raw features
X_events_raw, Xs_raw = nn_scaler.inverse(X_events_scaled, Xs_scaled)
Padded rows are preserved - the scaler computes a padding mask before applying transforms and restores the pad value afterwards.
ML DataLoaders
The library exposes three PyTorch Lightning DataModules built on top of the same awkward-array to tensor collate pipeline:
| DataModule | Source | Use case |
|---|---|---|
LightningHdf5DataModule |
one HDF5 dataset (pile files) | single-source training on preconverted data |
LightningRootDataModule |
one ROOT dataset | training directly from ROOT files, no HDF5 conversion |
LightningCombinedSourcesDataModule |
many HDF5 / ROOT sources mixed | multi-source training (regions, runs, MC campaigns) from a single YAML |
All three share a common collation model (CollateConfig) and produce per-modality ModalityBatch tensors organised inside a SourceBatch.
Collate configuration
CollateConfig describes how an awkward batch is converted into model-ready tensors. It is the single object that drives field selection, layout, sorting, padding, augmentation, and (for ROOT) automatic branch filtering.
from f9columnar.ak_collate_fn import CollateConfig
collate_config = CollateConfig(
# per-modality field selection - modality names must match the HDF5
# object groups (or are arbitrary group names when loading from ROOT)
modalities={
"events": ["mll", "met", "metphi", "nbjets"],
"jets": ["jet_pt", "jet_eta", "jet_phi", "jet_e"],
"electrons": ["el_pt", "el_eta", "el_phi"],
},
# extra fields that travel with each batch but are NOT fed to the model
# (labels, weights, eventNumber for stage splitting, ...), available via
# `ModalityBatch.metadata`
modalities_metadata={"events": ["eventWeight", "eventNumber", "y", "w"]},
layout="varlen", # "varlen" (packed + cu_seqlens) or "padding" (2D + valid)
max_lengths={"jets": 12}, # only used when layout="padding"
pad_values={"jets": 0.0}, # only used when layout="padding"
sort_by_variable={"jets": "jet_pt"}, # optional descending sort per modality
combine_varlen_objects=True, # attach cross-modality reorder perm in metadata
augmentations={...}, # see "Data augmentation" below
)
Layouts. With layout="varlen" each jagged modality returns a packed 1D tensor of shape (total_objects,) plus a cu_seqlens index (B+1,), flat fields are (B,) tensors. With layout="padding" jagged modalities return (B, max_length) tensors plus a boolean valid mask of the same shape.
The same CollateConfig is used by every DataModule below - LightningRootDataModule additionally uses config.make_filter_fn() to skip ROOT branches that the model never asks for.
HDF5 DataModule
LightningHdf5DataModule consumes pile files produced by Hdf5WriterProcessor. Stages are split by pile index - each stage gets a disjoint subset of piles, so the same event is never seen across train/val/test.
import lightning as L
from f9columnar.ak_collate_fn import CollateConfig
from f9columnar.ml.lightning_data_module import LightningHdf5DataModule
dm = LightningHdf5DataModule(
name="ttHcc_2L",
files="output/hdf5/p*.hdf5", # glob or explicit list
collate_config=collate_config,
stage_split_piles={"train": 512, "val": 128, "test": 128}, # counts or [indices]
step_size=100_000, # entries loaded into memory per step
batch_size=4096,
num_workers=-1, # -1 = all available cores
shuffle=True, # train-only; non-train stages force False
shuffle_files=True,
shuffle_batch=True,
drop_last=True,
dataloader_kwargs={
"val_batch_size": 8192, # per-stage batch_size override
"test_batch_size": 8192,
"pin_memory": True,
"prefetch_factor": 2,
},
)
trainer = L.Trainer(...)
trainer.fit(model, datamodule=dm)
stage_split_piles accepts either explicit pile indices ({"train": [0, 1, 2, 3], "val": [4, 5]}) or counts assigned in order ({"train": 512, "val": 128}). Shuffling is automatically disabled for non-training stages.
Per-source class re-balancing can be enabled by passing an ImbalancedSampler (RandomUnderSampler, RandomOverSampler, or RandomResampleToTarget).
Direct ROOT DataModule
LightningRootDataModule trains directly from ROOT files via uproot, with no HDF5 conversion step. Stage splitting is performed inside the per-stage processor graphs (typically via a StageSplitProcessor that routes events by eventNumber % divisor), and per-class balancing is provided by PartitionedRootConfig - partitions are matched by regex over filenames and combined according to partition_mode (min_size, max_size, or max_size_cycle).
from f9columnar.ak_collate_fn import CollateConfig
from f9columnar.ml.lightning_data_module import LightningRootDataModule
from f9columnar.root_dataloader import PartitionedRootConfig
# one processor graph per stage -- usually only `stage` differs between them
def build_graph(stage: str):
graph = ProcessorsGraph()
graph.add(
CheckpointProcessor("input"),
StageSplitProcessor(branch="eventNumber", divisor=10,
splits={"train": [2,3,4,5,6,7,8,9], "val": [0], "test": [1]},
stage=stage),
LabelProcessor(labels=CLASSES),
WeightsProcessor(weights_field="eventWeight"),
ScaleBranchProcessor(scale_dct={"jet_pt": 1e-3, "jet_e": 1e-3}),
)
graph.chain()
return graph
dm = LightningRootDataModule(
name="phys_root",
files=glob.glob("data/*.Nominal.root"),
key="NOSYS/physics", # TTree path (REQUIRED for ROOT)
step_size=512, # uproot.iterate chunk size
num_workers=4,
shuffle=True,
stage_processors={ # one graph per stage
"train": build_graph("train"),
"val": build_graph("val"),
"test": build_graph("test"),
},
partitioned_config=PartitionedRootConfig(
# regex per partition -- one partition per class label
partitions={c: [f"^{c}\\.Nominal\\.root$"] for c in CLASSES},
partition_mode="max_size_cycle", # cycle short partitions until longest done
shuffle_partitions=True,
shuffle_batch_partitions=True,
partition_batch_size=64, # sub-batch size after concatenating partitions
partition_drop_last_batch=True,
),
collate_config=collate_config, # same CollateConfig as HDF5
dataloader_kwargs={"val_batch_size": 128, "test_batch_size": 128},
)
For non-training stages the module forces partition_mode="max_size", shuffle_partitions=False, and shuffle_batch_partitions=False so every event in val/test is seen exactly once.
When the partitioned config is not needed, you can also build a plain DataLoader with get_root_dataloader(name, files, key, step_size, num_workers, processors=..., collate_config=..., partitioned_config=...) directly - LightningRootDataModule is a thin wrapper around it.
Multi-source training (Combined Sources)
LightningCombinedSourcesDataModule mixes any number of HDF5 and/or ROOT sources, each with its own collate, scaler, augmentation, and per-source processors. The whole configuration is driven by a single YAML schema (also documented in sources.py):
defaults: &source_defaults
shuffle: true
drop_last: true
num_workers: -1
step_size: 8192
sources:
- name: 2L_Run2
id: 0
data:
<<: *source_defaults
reader: hdf5
paths: [/path/to/2L_Run2/*.h5]
pin_memory: true
collate:
modalities:
events: [run_number, mc_event_weight, label_type, weights]
jets: [jet_pt, jet_eta, jet_phi]
layout: varlen
sampling: { weight: 1.0, batch_size: 64 }
imbalanced_sampler:
sampler_name: RandomResampleToTarget
sampler_kwargs:
sampling_strategy: { ttH_cc: 4096, ttH_bb: 2048 }
class_labels: { ttH_cc: 0, ttH_bb: 1 }
label_field: y
- name: 2L_Run3
id: 1
data:
<<: *source_defaults
reader: root
paths: [/path/to/2L_Run3/*.root]
key: NOSYS/physics
partitioned:
partitions:
mc: ['^mc_.*.root$']
data: ['^data_.*.root$']
partition_mode: max_size_cycle
partition_batch_size: 64
collate:
modalities:
events: [run_number, mc_event_weight]
jets: [jet_pt, jet_eta]
layout: varlen
processors: # ROOT-only: per-source processor graph
start_graph:
chain: [CheckpointProcessor, JetSelection]
JetSelection: { pt_cut: 20.0, sel_jet: true }
sampling: { weight: 1.5, batch_size: 64 }
mixing:
combined_mode: max_size_cycle # min_size | max_size | max_size_cycle | sequential
total_batch_size: 256
weight_strategy: proportional_to_weight
Then in Python:
from f9columnar.ml.lightning_data_module import LightningCombinedSourcesDataModule
dm = LightningCombinedSourcesDataModule(
name="ttHcc_combined",
config="config/sources.yaml", # path, dict, DictConfig, or SourcesConfig
extra_processors=[CustomProcessor], # Processor classes referenced by name in YAML
)
Per-source batch sizes are derived from mixing.weight_strategy:
user_specified- each source'ssampling.batch_sizeis used as-is (total_batch_sizeignored).uniform-total_batch_sizeis split equally across sources.proportional_to_weight- split in proportion tosampling.weight.proportional_to_size- split in proportion to each source's event count.
The DataModule wraps the per-source loaders in a Lightning CombinedLoader, so each training step yields a dict {source_name: SourceBatch}. combined_mode controls when an epoch ends (shortest source, longest, or cycle the others). Non-train stages disable shuffles and augmentations automatically.
Data augmentation
f9columnar.ml.augmentation provides physics-aware awkward-array augmentations that run inside the collate step, before tensor conversion. Available transforms:
| Spec | Effect |
|---|---|
ConstituentDropoutSpec(p) |
drop jagged constituents independently with probability p |
PhiRotationSpec(fields=...) |
global per-event phi rotation (synchronised across all modalities to preserve angular correlations) |
PtSmearingSpec(fields=..., a, b) |
multiplicative pT smearing |
AngularSmearingSpec(fields=..., sigma) |
additive Gaussian noise on eta and phi |
FlipSignSpec(fields=..., prob) |
per-event sign flip with probability prob |
Augmentations are declared per modality (under CollateConfig.augmentations or the YAML collate.augmentations) and combined by ModalityAugmentationSpec:
augmentations = {
"jets": {
"constituent_dropout": 0.1,
"phi_rotation": {"fields": ["jet_phi"]},
"pt_smearing": {"fields": ["jet_pt"], "a": 0.05, "b": 0.01},
"angular_smearing": {"fields": ["jet_eta", "jet_phi"], "sigma": 0.005},
},
}
collate_config = CollateConfig(..., augmentations=augmentations, augmentation_strength=1.0)
Inference server
f9columnar.ml.inference_server.InferenceServer is a dedicated GPU process that hosts one or more trained models and serves inference requests sent from PyTorch DataLoader workers via multiprocessing queues. This sidesteps the usual CUDA / fork-safety issues when workers try to run CUDA themselves and allows MLScoreProcessor-style processors to call a model inside the columnar pipeline.
import torch
from f9columnar.ml.inference_server import InferenceServer
model = torch.load("checkpoints/model.pt").eval()
server = InferenceServer(
models={"classifier": model},
device="cuda:0",
num_workers=4, # must match DataLoader num_workers
multiprocessing_context="forkserver",
to_numpy=True, # convert outputs to numpy on return
)
server.start()
# inside a processor that runs in a DataLoader worker:
scores = self.inference_server.infer(X, *Xs, model_key="classifier")
# tear down at the end of the run
server.shutdown()
Inputs may be tensors or arbitrarily nested lists / tuples / dicts of tensors. The server batches them, runs model.forward (optionally under torch.autocast for bf16/fp16), and returns the result to the originating worker.
Projects using F9Columnar
- SeeSawML - multi-source, multi-modal and multi-task ML training framework, built directly on the ROOT/HDF5 dataloaders, scalers, augmentations, and inference server described above.
Citation
If you found F9Columnar useful, please consider citing its Zenodo record 10.5281/zenodo.19312298.
You can find an example bibtex entry below.
@software{F9Columnar,
author = {Gavranovic, Jan},
title = {F9Columnar: A PyTorch-Based Library for High Energy Physics Data Processing and Analysis},
publisher = {Zenodo},
doi = {10.5281/zenodo.19312298},
url = {https://doi.org/10.5281/zenodo.19312298},
}
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file f9columnar-0.5.0.tar.gz.
File metadata
- Download URL: f9columnar-0.5.0.tar.gz
- Upload date:
- Size: 256.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.14 {"installer":{"name":"uv","version":"0.11.14","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a00fc3f8581daf38c5682c5684b905c4b85eeb6d2c2637e6d5396a8b858e4d6c
|
|
| MD5 |
cc6838f4a7dc6b41a81b603dca105634
|
|
| BLAKE2b-256 |
97832d7df2d2f17e29b62b4639b5d0ed1597955dfa65342f4b3a1933adce9226
|
File details
Details for the file f9columnar-0.5.0-py3-none-any.whl.
File metadata
- Download URL: f9columnar-0.5.0-py3-none-any.whl
- Upload date:
- Size: 277.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.14 {"installer":{"name":"uv","version":"0.11.14","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
37b6b2dc7f9f7c3a2a04c9dc2392c14c939b3067f0a9fff9a5dcc71cb6b2d006
|
|
| MD5 |
29adfe3fc874ee2329206b3c90f48b06
|
|
| BLAKE2b-256 |
df5f245a22db3ce1214cd97d162ce254ea15c54162eb9b7587193b60b167be1c
|