Skip to main content

Comprehensive performance monitoring and metrics collection for Coffea-based High Energy Physics analysis workflows

Project description

roastcoffea

Performance monitoring and metrics collection for Coffea-based HEP analysis workflows.

Features

  • Automatic worker tracking - Dask worker counts, memory, CPU utilization
  • Comprehensive metrics - Throughput, event rates, efficiency, resource usage
  • I/O analysis - File compression ratios, branch access patterns, data read percentages
  • Chunk-level monitoring - Per-chunk timing, memory, runtime distributions
  • Rich visualizations - 17 built-in Matplotlib plots for performance analysis
  • Terminal output - Formatted tables with color-coded metrics
  • Measurement persistence - Save and load benchmarks for comparison
  • Simple API - Context manager for clean integration

Documentation

Complete documentation is available at roastcoffea.readthedocs.io:

Installation

# Clone and install in development mode
git clone https://github.com/MoAly98/roastcoffea.git
cd roastcoffea
pip install -e .

Quick Start

Complete Example

from coffea import processor
from coffea.nanoevents import NanoEventsFactory, NanoAODSchema
from dask.distributed import Client, LocalCluster
from roastcoffea import MetricsCollector


# Define a simple processor
class MyProcessor(processor.ProcessorABC):
    def process(self, events):
        # Example: select jets with pT > 30 GeV
        jets = events.Jet[events.Jet.pt > 30]

        return {
            "njets": len(jets),
            "jet_pt": jets.pt.flatten().sum(),
        }

    def postprocess(self, accumulator):
        return accumulator


# Define your fileset
fileset = {
    "TTbar": {
        "files": {
            "root://xrootd.example.com//data/TTbar_1.root": "Events",
            "root://xrootd.example.com//data/TTbar_2.root": "Events",
        }
    },
}

# Setup Dask cluster and run with metrics collection
with LocalCluster(n_workers=4, threads_per_worker=2) as cluster:
    with Client(cluster) as client:
        # Create MetricsCollector
        with MetricsCollector(client, track_workers=True) as collector:
            # Run your Coffea workflow
            executor = processor.DaskExecutor(client=client)
            runner = processor.Runner(
                executor=executor,
                savemetrics=True,
                schema=NanoAODSchema,
            )

            output, report = runner(
                fileset,
                treename="Events",
                processor_instance=MyProcessor(),
            )

            # Provide the report to collector
            collector.set_coffea_report(report)

        # After context exit, metrics are aggregated
        # Print Rich tables with all metrics
        collector.print_summary()

        # Access specific metrics
        metrics = collector.get_metrics()
        print(f"\nThroughput: {metrics['overall_rate_gbps']:.2f} Gbps")
        print(f"Event rate: {metrics['event_rate_wall_khz']:.1f} kHz")

        # Fine metrics (automatic with Dask Spans)
        if metrics.get("cpu_percentage") is not None:
            print(f"CPU %: {metrics['cpu_percentage']:.1f}%")
            print(f"I/O %: {metrics['io_percentage']:.1f}%")
            print(f"Compression ratio: {metrics.get('compression_ratio', 'N/A')}")

        # Save measurement for later analysis
        collector.save_measurement(
            output_dir="benchmarks/", measurement_name="ttbar_analysis"
        )

Minimal Example

from dask.distributed import Client
from roastcoffea import MetricsCollector

client = Client()

with MetricsCollector(client) as collector:
    # Your Coffea workflow here
    output, report = runner(fileset, processor_instance=my_processor)
    collector.set_coffea_report(report)

# Print summary
collector.print_summary()

Metrics Reference

Workflow Metrics (from Coffea Report)

Metric Source Description
wall_time Coffea Report Real elapsed time for the workflow
total_cpu_time Coffea Report Sum of all task durations across workers
num_chunks Coffea Report Number of data chunks processed
avg_cpu_time_per_chunk Coffea Report Average CPU time per chunk
total_events Coffea Report Total number of events processed
total_bytes_compressed Coffea Report Compressed bytes read from files
total_bytes_uncompressed Dask Spans (v0.2+) Actual uncompressed bytes read (stub: returns None until Spans implemented)
compression_ratio Dask Spans (v0.2+) Uncompressed / compressed bytes ratio (stub: returns None until Spans implemented)
overall_rate_gbps Derived Data processing rate in Gbps
overall_rate_mbps Derived Data processing rate in MB/s
event_rate_wall_khz Derived Events/sec from wall time (kHz)
event_rate_agg_khz Derived Events/sec from aggregated CPU time (kHz)

Worker Metrics (from Scheduler Tracking)

Metric Source Description
avg_workers Scheduler Tracking Time-weighted average worker count
peak_workers Scheduler Tracking Maximum number of workers observed
cores_per_worker Scheduler Tracking Average cores per worker
total_cores Scheduler Tracking Total cores across all workers
peak_memory_bytes Scheduler Tracking Peak memory usage across all workers
avg_memory_per_worker_bytes Scheduler Tracking Time-averaged memory per worker

Efficiency Metrics (Derived)

Metric Source Description
core_efficiency Derived Fraction of available cores actually used (0-1)
speedup_factor Derived Parallel speedup achieved vs single core
event_rate_core_hz Derived Events/sec/core (Hz per core)

Per-Worker Time-Series Data (from Scheduler Tracking)

Raw tracking data available in metrics["tracking_data"] for visualization:

Field Description
worker_counts Worker count over time
worker_memory Process memory usage per worker over time
worker_memory_limit Memory limit per worker over time
worker_cores Cores per worker over time
worker_active_tasks Tasks assigned (processing + queued) per worker
worker_executing Tasks actually running per worker
worker_nbytes Data stored on worker (vs process overhead)
worker_occupancy Worker saturation metric (0.0 = idle, higher = saturated)
worker_last_seen Last heartbeat timestamp (for detecting dead workers)

Advanced Usage

Custom Per-Dataset Metrics

with MetricsCollector(client) as collector:
    output, report = runner(fileset, processor_instance=my_processor)

    # Provide custom per-dataset breakdown
    custom_metrics = {
        "TTbar": {
            "entries": 1_000_000,
            "duration": 45.2,
            "performance_counters": {"num_requested_bytes": 5_000_000_000},
        },
        "WJets": {
            "entries": 500_000,
            "duration": 23.1,
            "performance_counters": {"num_requested_bytes": 2_500_000_000},
        },
    }

    collector.set_coffea_report(report, custom_metrics=custom_metrics)

Fine-Grained Metrics (Dask Spans)

Automatic collection of CPU/I/O breakdown and real compression ratios.

with MetricsCollector(client) as collector:
    output, report = runner(fileset, processor_instance=my_processor)
    collector.set_coffea_report(report)

# Print summary includes fine metrics table when available
collector.print_summary()

# Access fine metrics directly
metrics = collector.get_metrics()
print(f"CPU %: {metrics['cpu_percentage']:.1f}%")
print(f"I/O %: {metrics['io_percentage']:.1f}%")
print(f"Compression ratio: {metrics.get('compression_ratio', 'N/A')}")
print(f"Compression overhead: {metrics['total_compression_overhead_seconds']:.1f}s")

Fine metrics table output example:

┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ Metric                         ┃ Value       ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ CPU Time                       │ 125.3s      │
│ I/O Time                       │ 48.7s       │
│ CPU %                          │ 72.0%       │
│ I/O %                          │ 28.0%       │
│ Disk Read                      │ 12.50 GB    │
│ Compression Overhead           │ 5.2s        │
│   • Decompress                 │ 4.8s        │
│   • Compress                   │ 0.4s        │
│ Serialization Overhead         │ 3.1s        │
│   • Deserialize                │ 2.1s        │
│   • Serialize                  │ 1.0s        │
└────────────────────────────────┴─────────────┘

Visualization

roastcoffea provides comprehensive visualization capabilities for all collected metrics.

Worker Timeline Plots

from roastcoffea import (
    plot_worker_count_timeline,
    plot_memory_utilization_mean_timeline,
    plot_occupancy_timeline,
    plot_executing_tasks_timeline,
    plot_worker_activity_timeline,
    plot_total_active_tasks_timeline,
)

# After collecting metrics with track_workers=True
tracking_data = collector.get_metrics()["tracking_data"]

# Worker count over time
plot_worker_count_timeline(tracking_data=tracking_data, output_path="worker_count.png")

# Memory utilization percentage over time (mean with min-max band)
plot_memory_utilization_mean_timeline(
    tracking_data=tracking_data, output_path="memory_util.png"
)

# Worker occupancy (task saturation) over time
plot_occupancy_timeline(tracking_data=tracking_data, output_path="occupancy.png")

# Executing tasks per worker over time
plot_executing_tasks_timeline(
    tracking_data=tracking_data, output_path="executing_tasks.png"
)

# Active tasks per worker over time
plot_worker_activity_timeline(
    tracking_data=tracking_data, output_path="worker_activity.png"
)

# Total active tasks across all workers
plot_total_active_tasks_timeline(
    tracking_data=tracking_data, output_path="total_activity.png"
)

Efficiency & Scaling Summary Plots

from roastcoffea import (
    plot_efficiency_summary,
    plot_resource_utilization,
)

metrics = collector.get_metrics()

# Efficiency metrics bar chart
plot_efficiency_summary(metrics=metrics, output_path="efficiency.png")

# Resource utilization summary
plot_resource_utilization(metrics=metrics, output_path="resources.png")

Per-Task Fine Metrics (Dask Spans)

from roastcoffea import (
    plot_per_task_cpu_io,
    plot_per_task_bytes_read,
    plot_per_task_overhead,
)

# Get span metrics from collector
span_metrics = collector.span_metrics

# CPU vs I/O time per task
plot_per_task_cpu_io(span_metrics=span_metrics, output_path="per_task_cpu_io.png")

# Bytes read per task (if disk-read available)
plot_per_task_bytes_read(span_metrics=span_metrics, output_path="per_task_bytes.png")

# Compression & serialization overhead per task
plot_per_task_overhead(span_metrics=span_metrics, output_path="per_task_overhead.png")

Loading Saved Measurements

from roastcoffea import load_measurement

# Load a previously saved measurement
metrics, t0, t1 = load_measurement("benchmarks/my_run")

# Use metrics for any of the above plots
tracking_data = metrics.get("tracking_data")
if tracking_data:
    plot_worker_count_timeline(tracking_data, output_path="worker_count.png")

Disable Worker Tracking

# Skip worker tracking if you only want workflow-level metrics
with MetricsCollector(client, track_workers=False) as collector:
    output, report = runner(fileset, processor_instance=my_processor)
    collector.set_coffea_report(report)

Adjust Tracking Interval

# Sample worker metrics every 0.5 seconds instead of default 1.0s
with MetricsCollector(client, worker_tracking_interval=0.5) as collector:
    output, report = runner(fileset, processor_instance=my_processor)
    collector.set_coffea_report(report)

License

BSD-3-Clause - see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

roastcoffea-0.1.0.tar.gz (2.8 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

roastcoffea-0.1.0-py3-none-any.whl (55.8 kB view details)

Uploaded Python 3

File details

Details for the file roastcoffea-0.1.0.tar.gz.

File metadata

  • Download URL: roastcoffea-0.1.0.tar.gz
  • Upload date:
  • Size: 2.8 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.11

File hashes

Hashes for roastcoffea-0.1.0.tar.gz
Algorithm Hash digest
SHA256 918f2323f0811cf0139bb1f850a0d5b78bd6869b61ba29f8f6d053b253a678d6
MD5 157f432868a481cde5ef21660dcb8bbe
BLAKE2b-256 64981b6da76511c8d2bb063f81f771a530b32ea1dd39e36cf8689c7e2a2d2bac

See more details on using hashes here.

File details

Details for the file roastcoffea-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: roastcoffea-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 55.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.11

File hashes

Hashes for roastcoffea-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 90751ef87bf987fcbfeb357a3dcff23fdd81e0fb59c4f9bb06eb33929a958abd
MD5 3aee39115be195560ad685d8823819e0
BLAKE2b-256 ad73ed5dbf971d829f0f0aaaafaf0f9d0f2f5971b3a8b3969ada47cf09c07a4b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page