Skip to main content

Columnar analysis utils for high-energy physics data analysis.

Project description

F9 Columnar

Ruff python pytorch License pytorch DOI

A PyTorch-based library for processing ROOT and HDF5 event data in high energy physics.

Project description

F9Columnar is a Python library designed to streamline the processing of physics datasets from ROOT and HDF5 files for machine learning applications. Built on top of PyTorch, Uproot and Awkward Array, it provides a modular framework for efficient data loading, transformation, and analysis in high-energy physics research.

The library bridges the gap between traditional high-energy physics data formats (ROOT) and modern machine learning workflows. It enables physicists and data scientists to:

  • Process ROOT and HDF5 datasets using PyTorch DataLoaders
  • Apply event selections, calculate variables, and create histograms
  • Convert ROOT data to HDF5 format for ML training

Check out the PyHEP 2025 talk for a good overview of what the library can do.

Setup

The library comes without any PyTorch dependencies by default to allow flexible installations. You can install it with or without PyTorch GPU support depending on your needs.

Install with PyTorch for GPU

pip install f9columnar
pip install torch

Install with PyTorch for CPU

pip install f9columnar
pip install torch --index-url https://download.pytorch.org/whl/cpu

Library overview

Aim of the library

The main goal of this library is to provide a simple and efficient way to process large ROOT and HDF5 datasets using PyTorch DataLoaders. The library is designed to be modular and extensible, allowing users to easily add new processors and functionality as needed. A common workflow is illustrated in the diagram below:

F9Columnar

Getting started

To illustrate the basic usage of the library, we need some ROOT files. We can create artificial ROOT files using the AwkwardEventGenerator utility. This utility allows us to define flat and jagged branches with different distributions. Let's create some artificial ROOT files with 100,000 events, containing electrons and muons with transverse momentum (pt), pseudorapidity (eta), and azimuthal angle (phi) distributions, and a flat weight distribution.

from f9columnar.utils.ak_events import AwkwardEventGenerator

gen = AwkwardEventGenerator(n_events=100000, min_particles=0, max_particles=5)

flat_spec = {"weight": {"dist_type": "normal", "params": {"mean": 1.0, "stddev": 0.1}}}

jagged_spec = {
    "el": {
        "pt": {"dist_type": "pt", "params": {"pt_min": 2.0e4, "pt_max": 3.0e5, "n": 5.0}},
        "eta": {"dist_type": "eta", "params": {}},
        "phi": {"dist_type": "phi", "params": {}},
    },
    "mu": {
        "pt": {"dist_type": "pt", "params": {"pt_min": 2.0e4, "pt_max": 5.0e5, "n": 5.0}},
        "eta": {"dist_type": "eta", "params": {}},
        "phi": {"dist_type": "phi", "params": {}},
    },
}

arrays = gen.from_spec_to_dict(flat_spec=flat_spec, jagged_spec=jagged_spec)
gen.to_root(arrays, file_path="data/simpleNTuple.root", tree_name="physics", n_splits=5)

This will create 5 ROOT files named simpleNTuple_part<idx>.root each having 20,000 events in the data/ directory with artificial event data. We can now use the library to load and process this test data.

Basic data loading

The most basic usage of the library is to load data from ROOT files using the get_root_dataloader function. This function returns a PyTorch DataLoader that yields batches of events as Awkward arrays.

import glob

from f9columnar.root_dataloader import get_root_dataloader

def filter_branch(branch: str) -> bool:
    # select only these two branches
    return branch == "el_pt" or branch == "mu_pt"

# root_dataloader is an instance of a torch DataLoader that uses an IterableDataset
root_dataloader, total, _ = get_root_dataloader(
    name="awkwardEvents",  # name identifier
    files=glob.glob("data/simpleNTuple_part*.root"),  # root files
    key="physics",  # root file tree name
    step_size=1024,  # number of events per array batch to read into memory
    num_workers=0,  # number of workers for parallel processing (single threaded if 0)
    processors=None,  # arbitrary calculations on arrays
    filter_name=filter_branch,  # filter branches
)

# loop over batches of events from .root file(s), each batch is an awkward array
for events in root_dataloader:
    arrays, reports = events  # reports includes useful information about the batch
    el_pt, mu_pt = arrays["el_pt"], arrays["mu_pt"]  # awkward arrays of electron and muon pT
    # ... do something with the arrays

Using processors and histogramming

The following example demonstrates how to define variables, apply a cut, and create a histogram using processors.

Calculations on arrays within worker processes can be performed using a Processor. Multiple processors can be linked together in a ProcessorsGraph, forming a directed acyclic graph (DAG). These processors are applied to arrays in the sequence determined by the DAG's topological order.

Each worker executes the same processor graph on batches of event data and returns the results to the event loop once processing is complete.

Lets make a simple analysis that performs a pT cut on leptons, selects a leading lepton (highest pT), and creates a histogram of the leading lepton pT.

First, we define the processors:

import awkward as ak

from f9columnar.processors import CheckpointProcessor, Processor, ProcessorsGraph
from f9columnar.processors_collection import decorate_branches

@decorate_branches("el_pt", "el_eta", "el_phi", "mu_pt", "mu_eta", "mu_phi", "weight")
class CutLeptons(Processor):
    def __init__(self, name: str, pt_cut: float) -> None:
        super().__init__(name)
        self.pt_cut = pt_cut

    def run(self, arrays: ak.Array) -> dict[str, ak.Array]:
        # apply pt cut particle mask on electrons and muons
        mask_el, mask_mu = arrays["el_pt"] > self.pt_cut, arrays["mu_pt"] > self.pt_cut

        # mask arrays to keep only leptons passing the pt cut
        for field in ["el_pt", "el_eta", "el_phi"]:
            arrays[field] = arrays[field][mask_el]

        for field in ["mu_pt", "mu_eta", "mu_phi"]:
            arrays[field] = arrays[field][mask_mu]

        # return must be a dictionary with key name for the argument of the next processor
        return {"arrays": arrays}

class LeadingLeptons(Processor):
    def __init__(self, name: str) -> None:
        super().__init__(name)

    def run(self, arrays: ak.Array) -> dict[str, ak.Array]:
        # get leading electron (highest pt)
        el_pt_argmax = ak.argmax(arrays["el_pt"], axis=1, keepdims=True)
        for field in ["el_pt", "el_eta", "el_phi"]:
            arrays[field] = arrays[field][el_pt_argmax]
            arrays[field] = ak.drop_none(arrays[field])

        # get leading muon (highest pt)
        mu_pt_argmax = ak.argmax(arrays["mu_pt"], axis=1, keepdims=True)
        for field in ["mu_pt", "mu_eta", "mu_phi"]:
            arrays[field] = arrays[field][mu_pt_argmax]
            arrays[field] = ak.drop_none(arrays[field])

        return {"arrays": arrays}

The decorate_branches decorator is used to specify the branches that the processor depends on. This allows the dataloader to only load the necessary branches from the ROOT files.

Making a histogram processor is also straightforward:

from f9columnar.histograms import HistogramProcessor

class LeadingLeptonHistogram(HistogramProcessor):
    def __init__(self, name: str) -> None:
        super().__init__(name)
        self.make_hist1d("leading_el_pt", 50, 20.0, 300.0)  # num. bins, min, max
        self.make_hist1d("leading_mu_pt", 50, 20.0, 500.0)  # num. bins, min, max

    def run(self, arrays: ak.Array) -> dict[str, ak.Array]:
        # fill histogram with leading lepton pT in GeV
        leading_el_pt, leading_mu_pt = arrays["el_pt"] / 1e3, arrays["mu_pt"] / 1e3

        non_empty_el = ak.num(leading_el_pt) > 0
        leading_el_pt = ak.firsts(leading_el_pt[non_empty_el])
        el_weight = arrays["weight"][non_empty_el]

        # fill electron histogram with pt and weights
        self.fill_hist1d("leading_el_pt", leading_el_pt, el_weight)

        non_empty_mu = ak.num(leading_mu_pt) > 0
        leading_mu_pt = ak.firsts(leading_mu_pt[non_empty_mu])
        mu_weight = arrays["weight"][non_empty_mu]

        # fill muon histogram with pt and weights
        self.fill_hist1d("leading_mu_pt", leading_mu_pt, mu_weight)

        return {"arrays": arrays}

We can now build the processor graph from the defined processors:

from f9columnar.processors import CheckpointProcessor, ProcessorsGraph
from f9columnar.processors_collection import ProcessorsCollection

# define a collection of processors for bookkeeping
analysis_collection = ProcessorsCollection("leptonCollection")

# add processors to the collection
analysis_collection += CutLeptons(name="cutLeptons", pt_cut=25.0)  # pt cut at 25 GeV
analysis_collection += LeadingLeptons(name="leadingLeptons")
analysis_collection += LeadingLeptonHistogram(name="leadingLeptonHistogram")

# define the processor graph
analysis_graph = ProcessorsGraph()

# build a linear chain of processors
analysis_graph.add(
    CheckpointProcessor("input"),
    *analysis_collection.as_list(),
    CheckpointProcessor("output"),
)

# if the graph is a linear chain we can use chain() to connect the nodes
# for more complex graphs use connect() method that defines edges explicitly
analysis_graph.chain()

# draw the graph with pygraphviz
analysis_graph.draw("analysis_graph.pdf")

We now have a processor graph that can be passed into the DataLoader to process events. The graph will be executed in each worker process for each batch of events. Instead of using a simple get_root_dataloader function, we can create a RootPhysicsDataset and initialize the dataloader with the processor graph:

from f9columnar.dataset_builder import RootPhysicsDataset

# make one or more root datasets as a list of RootPhysicsDataset objects
root_datasets = [
    RootPhysicsDataset(
        name="awkwardEvents",
        root_files=glob.glob("data/simpleNTuple_part*.root"),
        is_data=False,
    )
]

# analysis collection provides the branch name filter automatically
branch_name_filter = analysis_collection.get_branch_name_filter()

# setup dataloader configuration
dataloader_config = {
    "step_size": 1024,
    "key": "physics",
    "num_workers": 0,
    "filter_name": branch_name_filter,
}

# setup and initialize the dataloader for the root dataset
root_datasets[0].setup_dataloader(**dataloader_config)
# runs get_root_dataloader() internally
root_datasets[0].init_dataloader(analysis_graph)

We have build a processor graph (the actual calculations) and a dataloader that uses the graph to process events. We can now run the event loop over batches of events:

from f9columnar.run import ColumnarEventLoop

# define the event loop over the root datasets
event_loop = ColumnarEventLoop(
    mc_datasets=root_datasets,
    data_datasets=None,
    cut_flow=False,
    save_path="results/",
)
# run it!
event_loop.run(mc_only=True)

That's it! After running the event loop, the histograms will be saved in the results/ directory as awkwardEvents_mc_0_hists.p.

Converting ROOT to HDF5

The library includes Hdf5WriterProcessor - a Processor that converts ROOT ntuples into HDF5 files suitable for ML training. It supports both flat (event-level) and jagged (variable-length, e.g. per-jet) arrays, with two storage modes for jagged data.

Storage modes

Padded mode (varlen=False): jagged arrays are stored as fixed-size 2D arrays (n_events, max_length), with short sequences filled using a configurable pad_value.

Variable-length mode (varlen=True): jagged arrays are flattened into 1D packed arrays, with boundaries tracked by a cumulative-lengths dataset (culens). For example, an object group jets with culens = [0, 3, 5] means event 0 owns jets at indices 0-2 and event 1 owns jets at indices 3-4. This avoids wasting space when sequence lengths vary significantly.

Pile-based shuffling

Output is split across multiple HDF5 "pile" files (p0.hdf5, p1.hdf5, ..., p{n-1}.hdf5). During writing, each chunk of events is assigned to a pile either randomly (pile_assignment="random") or in round-robin order (pile_assignment="deque"). During reading, piles are loaded independently - the ML dataloader shuffles within each pile, giving efficient pseudo-random access without loading the full dataset into memory.

-- writing step --
for batch of events from ROOT file do
    for chunk of size chunk_shape in batch do
        j <- pile index (random or round-robin)
        append chunk to pile p[j]

-- reading step --
for pile p[j] in selected range do
    load p[j], shuffle in memory
    yield batches to DataLoader

Valid masks

The writer can produce a boolean valid column inside each jagged object struct. When valid=True, slots holding real data are marked True and padding slots are False. Passing a dict (e.g. valid={"jets": ("jet_type", [2])}) additionally filters by a type field - only slots where jet_type == 2 are marked valid.

Example usage

Hdf5WriterProcessor is a regular Processor that runs inside the processor graph (alongside any custom transformation processors) rather than as a separate postprocessor:

from f9columnar.dataset_builder import RootPhysicsDataset
from f9columnar.hdf5_writer import Hdf5WriterProcessor
from f9columnar.processors import CheckpointProcessor, ProcessorsGraph
from f9columnar.run import ColumnarEventLoop

# build the processor graph -- the writer sits in the chain like any other processor
analysis_graph = ProcessorsGraph()
analysis_graph.add(
    CheckpointProcessor("input"),
    *analysis_collection.as_list(),  # custom processors (weights, labels, scaling, ...)
    Hdf5WriterProcessor(
        output_files_path="output/hdf5/",
        flat_column_names=["ptl1", "ptl2", "mll", "weights", "label_type"],
        jagged_column_names={
            "jets": ["jet_e", "jet_pt", "jet_eta", "jet_phi"],
        },
        default_chunk_shape=32,
        max_lengths={"jets": 15},
        pad_values={"jets": 999.0},
        sort_by_variable={"jets": "jet_pt"},     # descending sort before truncation
        n_piles=256,
        pile_assignment="random",
        enforce_dtypes={"nbjets": "int64"},
        varlen=True,                              # variable-length storage
        enforce_varlen_max_length=False,
        valid={"jets": ("jet_type", [2])},        # per-slot valid mask filtered by jet_type
        extra_metadata={"scale": {"jet_e": 1.0e-3}},
        append=False,
        compression="lz4",                       # forwarded to h5py.create_dataset
    ),
    CheckpointProcessor("output", save_arrays=True),
)
analysis_graph.chain()

# create ROOT datasets
mc_dataset = RootPhysicsDataset("MC", [str(f) for f in mc_files], is_data=False)
mc_dataset.setup_dataloader(**dataloader_config)
mc_dataset.init_dataloader(processors=analysis_graph)

# run the event loop
event_loop = ColumnarEventLoop(
    mc_datasets=[mc_dataset],
    data_datasets=None,
    cut_flow=False,
)
event_loop.run(mc_only=True)

Output HDF5 structure

Each pile file (p0.hdf5, ...) contains:

/events                  # flat event-level structured array (n_events,)
    fields: ptl1, ptl2, mll, weights, label_type, ...

/jets                    # varlen: packed 1D (total_jets,) | padded: 2D (n_events, max_length)
    fields: jet_e, jet_pt, jet_eta, jet_phi, valid

/jets_culens             # varlen only: (n_events+1,) cumulative offsets, dtype int64
                         # jets[culens[i]:culens[i+1]] = jets for event i

/metadata                # JSON string - pad_values, max_lengths, sort_by_variable,
                         # varlen, enforce_varlen_max_length, extra_metadata, ...

Feature scaling

The library fits sklearn-based scalers over HDF5 (or ROOT) datasets so that scaling works on data that doesn't fit in memory.

Numerical scalers

The following numerical scaler types are supported (passed as numer_scaler_type):

Type sklearn class Supports partial fit NN scaler
"minmax" MinMaxScaler yes NNMinMaxScaler
"maxabs" MaxAbsScaler yes -
"standard" StandardScaler yes NNStandardScaler
"robust" RobustScaler no -
"quantile" QuantileTransformer no -
"power" PowerTransformer no -
"logit" MinMaxScaler -> LogitTransform yes NNLogitScaler
"standard_logit" MinMaxScaler -> LogitTransform -> StandardScaler yes NNStandardLogitScaler

When a scaler does not support partial fit, all events (up to n_max) are accumulated before fitting. Scalers that support partial fit are updated incrementally per batch.

Categorical scaler

Setting categ_scaler_type="categorical" enables a custom CategoricalFeatureScaler (a LabelEncoder variant that supports partial fitting). It maps each unique category value to a consecutive integer per column, and can be updated incrementally as new batches arrive.

Either scaler can be disabled by passing None for its type. At least one must be enabled.

Single-loader usage

from f9columnar.ak_collate_fn import CollateConfig
from f9columnar.hdf5_dataloader import get_hdf5_dataloader
from f9columnar.ml.dataset_scaling import DatasetScaler

collate_config = CollateConfig(
    modalities={
        "events": ["mll", "met", "metphi"],
        "jets":   ["jet_pt", "jet_eta", "jet_phi"],
    },
    modalities_metadata={"events": ["eventNumber", "weights"]},
    layout="varlen",
)

loader, _ = get_hdf5_dataloader(
    files=hdf5_pile_files,
    collate_config=collate_config,
    step_size=100_000,
    num_workers=4,
    batch_size=4096,
)

ds_scaler = DatasetScaler(
    dataloader=loader,
    modalities=collate_config.modalities,
    modalities_metadata=collate_config.modalities_metadata,
    numer_scaler_type="standard",        # numerical scaler (or None to disable)
    categ_scaler_type="categorical",     # categorical scaler (or None to disable)
    scaler_save_path="scalers/",         # output directory
    n_max=1_000_000,                     # cap for non-partial-fit scalers (None = all)
    extra_hash="v2",                     # optional suffix on scaler filenames
    scaler_kwargs={"partial_fit": True, "copy": True},
)
ds_scaler.feature_scale()

The driver writes one pickle per (modality, scaler kind). For varlen/padded modalities the padding mask is applied automatically - padded slots are not included in the fit.

Multi-source usage

When you train with several HDF5/ROOT sources (LightningCombinedSourcesDataModule), use MultiSourceDatasetScaler to fit one scaler set per source from the same YAML config used at training time:

from f9columnar.ml.dataset_scaling import MultiSourceDatasetScaler

ms_scaler = MultiSourceDatasetScaler(
    config=sources_yaml,                 # path/dict/DictConfig - same schema as the DataModule
    extra_hash="v2",
    extra_processors=[CustomProcessor],  # ROOT-source processor classes (optional)
)
ms_scaler.feature_scale()

Each source's feature_scaling block specifies the scaler types and save path:

sources:
  - name: 2L_Run2
    feature_scaling:
      numer_scaler_type: standard
      categ_scaler_type: null
      save_path: scalers/Run2/
      n_max: null
      scaler_params: { partial_fit: true, copy: true }

MultiSourceDatasetScaler internally runs a deterministic, augmentation-free pass over each source.

NN scalers (torch nn.Module wrappers)

The fitted sklearn scalers are saved as pickle files, but at inference time (and during training) you typically want pure PyTorch operations that live inside the model graph. nn_scalers.py provides nn.Module wrappers that extract the fitted statistics from sklearn scalers and register them as non-trainable buffers, so they move with the model across devices and are included in state_dict / ONNX exports.

Each scaler exposes both forward() (scale) and inverse() (unscale).

The composite NNDatasetScaler wraps all per-modality scalers into a single module. It takes a fitted scaler utility plus a ColumnSelection describing which columns are numerical vs categorical and applies the correct scaler to each feature column:

from f9columnar.ml.nn_scalers import NNDatasetScaler

nn_scaler = NNDatasetScaler(scaler_util, selection)

# forward: raw features -> scaled features
X_events_scaled, Xs_scaled = nn_scaler(X_events, [X_jets, X_electrons])

# inverse: scaled features -> raw features
X_events_raw, Xs_raw = nn_scaler.inverse(X_events_scaled, Xs_scaled)

Padded rows are preserved - the scaler computes a padding mask before applying transforms and restores the pad value afterwards.

ML DataLoaders

The library exposes three PyTorch Lightning DataModules built on top of the same awkward-array to tensor collate pipeline:

DataModule Source Use case
LightningHdf5DataModule one HDF5 dataset (pile files) single-source training on preconverted data
LightningRootDataModule one ROOT dataset training directly from ROOT files, no HDF5 conversion
LightningCombinedSourcesDataModule many HDF5 / ROOT sources mixed multi-source training (regions, runs, MC campaigns) from a single YAML

All three share a common collation model (CollateConfig) and produce per-modality ModalityBatch tensors organised inside a SourceBatch.

Collate configuration

CollateConfig describes how an awkward batch is converted into model-ready tensors. It is the single object that drives field selection, layout, sorting, padding, augmentation, and (for ROOT) automatic branch filtering.

from f9columnar.ak_collate_fn import CollateConfig

collate_config = CollateConfig(
    # per-modality field selection - modality names must match the HDF5
    # object groups (or are arbitrary group names when loading from ROOT)
    modalities={
        "events":    ["mll", "met", "metphi", "nbjets"],
        "jets":      ["jet_pt", "jet_eta", "jet_phi", "jet_e"],
        "electrons": ["el_pt", "el_eta", "el_phi"],
    },
    # extra fields that travel with each batch but are NOT fed to the model
    # (labels, weights, eventNumber for stage splitting, ...), available via
    # `ModalityBatch.metadata`
    modalities_metadata={"events": ["eventWeight", "eventNumber", "y", "w"]},
    layout="varlen",                      # "varlen" (packed + cu_seqlens) or "padding" (2D + valid)
    max_lengths={"jets": 12},             # only used when layout="padding"
    pad_values={"jets": 0.0},             # only used when layout="padding"
    sort_by_variable={"jets": "jet_pt"},  # optional descending sort per modality
    combine_varlen_objects=True,          # attach cross-modality reorder perm in metadata
    augmentations={...},                  # see "Data augmentation" below
)

Layouts. With layout="varlen" each jagged modality returns a packed 1D tensor of shape (total_objects,) plus a cu_seqlens index (B+1,), flat fields are (B,) tensors. With layout="padding" jagged modalities return (B, max_length) tensors plus a boolean valid mask of the same shape.

The same CollateConfig is used by every DataModule below - LightningRootDataModule additionally uses config.make_filter_fn() to skip ROOT branches that the model never asks for.

HDF5 DataModule

LightningHdf5DataModule consumes pile files produced by Hdf5WriterProcessor. Stages are split by pile index - each stage gets a disjoint subset of piles, so the same event is never seen across train/val/test.

import lightning as L
from f9columnar.ak_collate_fn import CollateConfig
from f9columnar.ml.lightning_data_module import LightningHdf5DataModule

dm = LightningHdf5DataModule(
    name="ttHcc_2L",
    files="output/hdf5/p*.hdf5",                   # glob or explicit list
    collate_config=collate_config,
    stage_split_piles={"train": 512, "val": 128, "test": 128},  # counts or [indices]
    step_size=100_000,                              # entries loaded into memory per step
    batch_size=4096,
    num_workers=-1,                                 # -1 = all available cores
    shuffle=True,                                   # train-only; non-train stages force False
    shuffle_files=True,
    shuffle_batch=True,
    drop_last=True,
    dataloader_kwargs={
        "val_batch_size": 8192,                     # per-stage batch_size override
        "test_batch_size": 8192,
        "pin_memory": True,
        "prefetch_factor": 2,
    },
)

trainer = L.Trainer(...)
trainer.fit(model, datamodule=dm)

stage_split_piles accepts either explicit pile indices ({"train": [0, 1, 2, 3], "val": [4, 5]}) or counts assigned in order ({"train": 512, "val": 128}). Shuffling is automatically disabled for non-training stages.

Per-source class re-balancing can be enabled by passing an ImbalancedSampler (RandomUnderSampler, RandomOverSampler, or RandomResampleToTarget).

Direct ROOT DataModule

LightningRootDataModule trains directly from ROOT files via uproot, with no HDF5 conversion step. Stage splitting is performed inside the per-stage processor graphs (typically via a StageSplitProcessor that routes events by eventNumber % divisor), and per-class balancing is provided by PartitionedRootConfig - partitions are matched by regex over filenames and combined according to partition_mode (min_size, max_size, or max_size_cycle).

from f9columnar.ak_collate_fn import CollateConfig
from f9columnar.ml.lightning_data_module import LightningRootDataModule
from f9columnar.root_dataloader import PartitionedRootConfig

# one processor graph per stage -- usually only `stage` differs between them
def build_graph(stage: str):
    graph = ProcessorsGraph()
    graph.add(
        CheckpointProcessor("input"),
        StageSplitProcessor(branch="eventNumber", divisor=10,
                            splits={"train": [2,3,4,5,6,7,8,9], "val": [0], "test": [1]},
                            stage=stage),
        LabelProcessor(labels=CLASSES),
        WeightsProcessor(weights_field="eventWeight"),
        ScaleBranchProcessor(scale_dct={"jet_pt": 1e-3, "jet_e": 1e-3}),
    )
    graph.chain()
    return graph

dm = LightningRootDataModule(
    name="phys_root",
    files=glob.glob("data/*.Nominal.root"),
    key="NOSYS/physics",                            # TTree path (REQUIRED for ROOT)
    step_size=512,                                  # uproot.iterate chunk size
    num_workers=4,
    shuffle=True,
    stage_processors={                              # one graph per stage
        "train": build_graph("train"),
        "val":   build_graph("val"),
        "test":  build_graph("test"),
    },
    partitioned_config=PartitionedRootConfig(
        # regex per partition -- one partition per class label
        partitions={c: [f"^{c}\\.Nominal\\.root$"] for c in CLASSES},
        partition_mode="max_size_cycle",            # cycle short partitions until longest done
        shuffle_partitions=True,
        shuffle_batch_partitions=True,
        partition_batch_size=64,                    # sub-batch size after concatenating partitions
        partition_drop_last_batch=True,
    ),
    collate_config=collate_config,                  # same CollateConfig as HDF5
    dataloader_kwargs={"val_batch_size": 128, "test_batch_size": 128},
)

For non-training stages the module forces partition_mode="max_size", shuffle_partitions=False, and shuffle_batch_partitions=False so every event in val/test is seen exactly once.

When the partitioned config is not needed, you can also build a plain DataLoader with get_root_dataloader(name, files, key, step_size, num_workers, processors=..., collate_config=..., partitioned_config=...) directly - LightningRootDataModule is a thin wrapper around it.

Multi-source training (Combined Sources)

LightningCombinedSourcesDataModule mixes any number of HDF5 and/or ROOT sources, each with its own collate, scaler, augmentation, and per-source processors. The whole configuration is driven by a single YAML schema (also documented in sources.py):

defaults: &source_defaults
  shuffle: true
  drop_last: true
  num_workers: -1
  step_size: 8192

sources:
  - name: 2L_Run2
    id: 0
    data:
      <<: *source_defaults
      reader: hdf5
      paths: [/path/to/2L_Run2/*.h5]
      pin_memory: true
    collate:
      modalities:
        events: [run_number, mc_event_weight, label_type, weights]
        jets:   [jet_pt, jet_eta, jet_phi]
      layout: varlen
    sampling: { weight: 1.0, batch_size: 64 }
    imbalanced_sampler:
      sampler_name: RandomResampleToTarget
      sampler_kwargs:
        sampling_strategy: { ttH_cc: 4096, ttH_bb: 2048 }
      class_labels: { ttH_cc: 0, ttH_bb: 1 }
      label_field: y

  - name: 2L_Run3
    id: 1
    data:
      <<: *source_defaults
      reader: root
      paths: [/path/to/2L_Run3/*.root]
      key: NOSYS/physics
    partitioned:
      partitions:
        mc:   ['^mc_.*.root$']
        data: ['^data_.*.root$']
      partition_mode: max_size_cycle
      partition_batch_size: 64
    collate:
      modalities:
        events: [run_number, mc_event_weight]
        jets:   [jet_pt, jet_eta]
      layout: varlen
    processors:                # ROOT-only: per-source processor graph
      start_graph:
        chain: [CheckpointProcessor, JetSelection]
        JetSelection: { pt_cut: 20.0, sel_jet: true }
    sampling: { weight: 1.5, batch_size: 64 }

mixing:
  combined_mode: max_size_cycle   # min_size | max_size | max_size_cycle | sequential
  total_batch_size: 256
  weight_strategy: proportional_to_weight

Then in Python:

from f9columnar.ml.lightning_data_module import LightningCombinedSourcesDataModule

dm = LightningCombinedSourcesDataModule(
    name="ttHcc_combined",
    config="config/sources.yaml",          # path, dict, DictConfig, or SourcesConfig
    extra_processors=[CustomProcessor],    # Processor classes referenced by name in YAML
)

Per-source batch sizes are derived from mixing.weight_strategy:

  • user_specified - each source's sampling.batch_size is used as-is (total_batch_size ignored).
  • uniform - total_batch_size is split equally across sources.
  • proportional_to_weight - split in proportion to sampling.weight.
  • proportional_to_size - split in proportion to each source's event count.

The DataModule wraps the per-source loaders in a Lightning CombinedLoader, so each training step yields a dict {source_name: SourceBatch}. combined_mode controls when an epoch ends (shortest source, longest, or cycle the others). Non-train stages disable shuffles and augmentations automatically.

Data augmentation

f9columnar.ml.augmentation provides physics-aware awkward-array augmentations that run inside the collate step, before tensor conversion. Available transforms:

Spec Effect
ConstituentDropoutSpec(p) drop jagged constituents independently with probability p
PhiRotationSpec(fields=...) global per-event phi rotation (synchronised across all modalities to preserve angular correlations)
PtSmearingSpec(fields=..., a, b) multiplicative pT smearing
AngularSmearingSpec(fields=..., sigma) additive Gaussian noise on eta and phi
FlipSignSpec(fields=..., prob) per-event sign flip with probability prob

Augmentations are declared per modality (under CollateConfig.augmentations or the YAML collate.augmentations) and combined by ModalityAugmentationSpec:

augmentations = {
    "jets": {
        "constituent_dropout": 0.1,
        "phi_rotation":   {"fields": ["jet_phi"]},
        "pt_smearing":    {"fields": ["jet_pt"], "a": 0.05, "b": 0.01},
        "angular_smearing": {"fields": ["jet_eta", "jet_phi"], "sigma": 0.005},
    },
}
collate_config = CollateConfig(..., augmentations=augmentations, augmentation_strength=1.0)

Inference server

f9columnar.ml.inference_server.InferenceServer is a dedicated GPU process that hosts one or more trained models and serves inference requests sent from PyTorch DataLoader workers via multiprocessing queues. This sidesteps the usual CUDA / fork-safety issues when workers try to run CUDA themselves and allows MLScoreProcessor-style processors to call a model inside the columnar pipeline.

import torch
from f9columnar.ml.inference_server import InferenceServer

model = torch.load("checkpoints/model.pt").eval()

server = InferenceServer(
    models={"classifier": model},
    device="cuda:0",
    num_workers=4,                    # must match DataLoader num_workers
    multiprocessing_context="forkserver",
    to_numpy=True,                    # convert outputs to numpy on return
)
server.start()

# inside a processor that runs in a DataLoader worker:
scores = self.inference_server.infer(X, *Xs, model_key="classifier")

# tear down at the end of the run
server.shutdown()

Inputs may be tensors or arbitrarily nested lists / tuples / dicts of tensors. The server batches them, runs model.forward (optionally under torch.autocast for bf16/fp16), and returns the result to the originating worker.

Projects using F9Columnar

  • SeeSawML - multi-source, multi-modal and multi-task ML training framework, built directly on the ROOT/HDF5 dataloaders, scalers, augmentations, and inference server described above.

Citation

If you found F9Columnar useful, please consider citing its Zenodo record 10.5281/zenodo.19312298.

You can find an example bibtex entry below.

@software{F9Columnar,
  author       = {Gavranovic, Jan},
  title        = {F9Columnar: A PyTorch-Based Library for High Energy Physics Data Processing and Analysis},
  publisher    = {Zenodo},
  doi          = {10.5281/zenodo.19312298},
  url          = {https://doi.org/10.5281/zenodo.19312298},
}

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

f9columnar-0.6.0.tar.gz (267.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

f9columnar-0.6.0-py3-none-any.whl (288.8 kB view details)

Uploaded Python 3

File details

Details for the file f9columnar-0.6.0.tar.gz.

File metadata

  • Download URL: f9columnar-0.6.0.tar.gz
  • Upload date:
  • Size: 267.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.20 {"installer":{"name":"uv","version":"0.11.20","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for f9columnar-0.6.0.tar.gz
Algorithm Hash digest
SHA256 b6f560ec9f838e14353756a2241ece335715e6e5a557086a3e8d022d906ce20c
MD5 cf4ddc8b71786b0b11913f0a6941d7e2
BLAKE2b-256 b25a555d52b365e3777b17f20bf415cd8b6afb664f377ba72ea1282f5b3488ef

See more details on using hashes here.

File details

Details for the file f9columnar-0.6.0-py3-none-any.whl.

File metadata

  • Download URL: f9columnar-0.6.0-py3-none-any.whl
  • Upload date:
  • Size: 288.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.20 {"installer":{"name":"uv","version":"0.11.20","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for f9columnar-0.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 15839aa057c837c326305d80937c4fa28db523368a3e1f9cc6b1c19b84f3f8d6
MD5 ccf4c924237059ac68560ff504fe6931
BLAKE2b-256 bb6b4186fcc777b80ae3c928b0e414275cda09aeef70eba6ca774cb395563366

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page