Skip to main content

Neural training observability SDK for Quark.

Project description

Quark SDK Usage

This guide gives developers and researchers copy-paste examples for instrumenting training jobs, persisting run data, and analyzing results from Python.

Install and Import

From PyPI:

python -m pip install abiotic-quark

Telemetry support is included in the default install. Add the PyTorch integration only when you need hook instrumentation:

python -m pip install "abiotic-quark[hooks]"
python -m pip install "abiotic-quark[all]"

For local SDK development from the repository root:

python -m pip install -e .

The PyPI distribution is named abiotic-quark; the public Python import:

from quark import MetricsLogger, TrainingSession

Runtime dependencies:

  • psutil for richer CPU and RAM telemetry
  • nvidia-ml-py for GPU utilization and memory telemetry through the pynvml module
  • optional torch for HookEngine

The lower-level storage and persistence modules are compatibility and service-client helpers. Use them only when you are building SDK tooling, migrations, or tests around the persistence service.

Included runnable examples:

  • examples/session_lifecycle.py: start, tag, log, and end a run.
  • examples/analyzer_diagnostics.py: read persisted analysis outputs and layer summaries.
  • smoke_tests.py: run grouped service and SDK smoke checks.
  • tests/live_service/run_transformer_persistence.py: live-service PyTorch integration test.

Storage Model

Choose a storage_dir for your project or experiment family.

  • The SDK sends the resolved storage_dir to the persistence service with the X-Quark-Storage-Dir header.
  • The persistence service owns the actual metadata and artifact writes under that storage root.
  • Each run still gets a run directory on the persistence side. You can access the returned path with session.run_path.
  • Before starting runs, point the SDK at the service with QUARK_PERSISTENCE_URL and provide a user key with QUARK_USER_KEY.
  • If storage_dir is omitted, the SDK uses the current working directory.

How The SDK Fits Together

The SDK centers on a shared run context created by TrainingSession.start_run(...). Attach modules such as MetricsLogger, EventLogger, SystemTelemetry, HookEngine, ArtifactStore, or DataDiagnostics to that session and they will all write against the same run.

In practice:

  • MetricsLogger is the flexible scalar-metric path.
  • SystemTelemetry is the built-in runtime-telemetry path.
  • EventLogger stores structured lifecycle, training, and log events.
  • TrainingSession.add_log(...) writes immediate run logs with Info, Warning, or Error identifiers.
  • HookEngine stores layer summaries from instrumented model internals.
  • Context managers are the safest pattern because buffered writers flush automatically when the session ends, and uncaught runtime errors or interruptions are logged before lifecycle state is updated.

The persistence path is service-backed:

  • Set QUARK_PERSISTENCE_URL to the running persistence service.
  • Set QUARK_USER_KEY to a valid user key created through the service auth flow.
  • The SDK sends X-Quark-Storage-Dir on requests so the service can resolve the storage root for the run.

Public API Map

Everything below is exported by quark unless noted otherwise.

Area Start or write Read or analyze Config and result types
Sessions TrainingSession.start_run(...), start_run(...), TrainingSession.resume_run(...), resume_run(...), session.end_run(...), end_run(...), session.add_tag(...), add_tag(...), session.add_log(...), add_log(...) session.reload(), session.to_dict() RunContext, RunRecord, ProjectRecord
Scalar metrics MetricsLogger(session).log(...) metrics.read(...) MetricRecord
Structured events EventLogger(session).log(...), events.add_log(...) events.read(...) EventRecord, LogIdentifier
Runtime telemetry SystemTelemetry(session).start_step(...), telemetry.record_step(...), telemetry.sample(...) telemetry.read(...) SystemTelemetryConfig, SystemTelemetryRecord
PyTorch hooks HookEngine(session, model, optimizer).step(...), begin_step(...), end_step(...) hooks.read(...) HookEngineConfig, HookScheduleConfig, LayerFilterConfig, LayerSummaryRecord
Artifacts ArtifactStore(session).store(...), log_histogram(...), log_snapshot(...) artifacts.read(...), artifacts.load(...) ArtifactRecord, ARTIFACT_STORAGE_LAYOUT
Data diagnostics DataDiagnostics(session).log_batch(...) diagnostics.read(...), summarize(...), summarize_exposure(...), find_hard_batches(...) DataDiagnosticsConfig, BatchMetadata, BatchMetadataAdapter, MappingBatchMetadataAdapter, summary/finding records
Diagnostics AnalyzerEngine(session).analyze(...) analyzer.read_alerts(...), analyzer.read_health_scores(...) AnalyzerEngineConfig, AnalyzerThresholds, AnalyzerResult, AlertRecord, HealthScoreRecord, EvidenceRecord
Phases PhaseAnalyzer(session).analyze(...) phase_analyzer.read_phases(...) PhaseAnalyzerConfig, PhaseAnalyzerThresholds, PhaseAnalysisResult, PhaseRecord
Comparison ComparisonEngine(session).compare(...), compare_to_baseline(...), ComparisonEngine.compare_runs(...) returned RunComparisonResult ComparisonEngineConfig, MetricTargetSpec, comparison diff records
Schemas migrate_store(...) validate_record(...), dump_record(...) CURRENT_SCHEMA_VERSION, SchemaValidationError, SchemaMigrationError
Errors raised by service-backed storage calls catch in application code SessionStoreError, RunNotFoundError, InvalidRunStateError

Shared rules:

  • Writer modules accept a TrainingSession or a RunContext.
  • Writer modules return immutable dataclass records. Use .to_dict() on records and result objects when you want plain JSON-like dictionaries.
  • step, epoch, range filters, and sequence counters are non-negative integers.
  • metadata, event payloads, config snapshots, and environment snapshots must be JSON-serializable.
  • flush() writes buffered records and returns the persisted records.
  • Most read(...) methods flush first when the writer is still open.
  • close() flushes buffered writers and marks them closed.
  • Context managers are preferred because they flush writers and close hooks/samplers even when the training loop exits early.

Custom Scalar Metrics

MetricsLogger is not limited to built-in keys such as train/loss or optimizer/lr. You can log arbitrary scalar metrics as long as they follow Quark's naming and value rules.

from pathlib import Path

from quark import MetricsLogger, TrainingSession

storage_dir = Path("quark-runs").resolve()

with (
    TrainingSession.start_run(
        project="custom-metrics-demo",
        storage_dir=storage_dir,
    ) as session,
    MetricsLogger(session, batch_size=50) as metrics,
):
    metrics.log(
        "my_eval/f1_macro",
        0.812,
        step=120,
        epoch=2,
        description="Macro F1 on the internal holdout split.",
        metadata={
            "split": "holdout_internal",
            "threshold": 0.5,
        },
    )
    metrics.log(
        "data/duplicate_ratio",
        0.034,
        step=120,
        epoch=2,
        metadata={"window": "epoch"},
    )
    metrics.log(
        "latency_ms",
        18.4,
        epoch=2,
        namespace="serving/shadow",
        metadata={"unit": "milliseconds"},
    )

Rules for custom scalar metrics:

  • Every metric must include at least step or epoch.
  • Values must be finite numeric scalars.
  • You can pass a full metric key such as my_eval/f1_macro, or pass metric="f1_macro" with namespace="my_eval".
  • Bare metric names such as loss are accepted, but namespaced keys are preferred for analyzers and UI grouping.
  • Each namespace or metric-name segment must start with a letter or digit and may contain letters, digits, ., _, or -.
  • description is optional human-readable context stored with the metric.
  • metadata is optional and must be JSON-serializable.

Choose between custom metrics and telemetry like this:

  • Use MetricsLogger when you need a new scalar signal with your own name.
  • Use SystemTelemetry when you want the built-in runtime schema for timings, CPU, RAM, GPU, overflow counts, and skipped-step counts.
  • SystemTelemetry can also mirror selected built-in runtime fields into scalar metrics such as system/runtime/step_time_ms.
  • If you need extra context on a telemetry record, put it in metadata. If you need a brand-new scalar series, log it through MetricsLogger.

Built-in analyzers use conventional metric keys by default:

  • AnalyzerEngine looks for runtime timing metrics such as system/runtime/step_time_ms, system/runtime/data_time_ms, and system/runtime/compute_time_ms.
  • PhaseAnalyzer looks for training and validation loss or accuracy keys such as train/loss, val/loss, train/accuracy, and val/accuracy.

If your project uses different names, configure the analyzers explicitly:

from quark import AnalyzerEngine, AnalyzerEngineConfig, PhaseAnalyzer, PhaseAnalyzerConfig

analyzer = AnalyzerEngine(
    session,
    config=AnalyzerEngineConfig(
        step_time_metric_keys=("perf/step_ms",),
        data_time_metric_keys=("perf/data_ms",),
        compute_time_metric_keys=("perf/compute_ms",),
    ),
)

phase_analyzer = PhaseAnalyzer(
    session,
    config=PhaseAnalyzerConfig(
        train_loss_metric_keys=("objective/train_ce",),
        val_loss_metric_keys=("objective/val_ce",),
        train_accuracy_metric_keys=("objective/train_top1",),
        val_accuracy_metric_keys=("objective/val_top1",),
    ),
)

If you stay with the conventional names, the default analyzer configuration works out of the box.

Read Metrics Back

MetricsLogger.read(...) flushes pending metrics and returns MetricRecord objects ordered by persistence sequence:

losses = metrics.read(metric_key="train/loss", step_min=100, step_max=500)
latest_eval = metrics.read(namespace="eval", limit=20)

for record in losses:
    print(record.step, record.metric_key, record.value, record.to_dict())

Useful filters are metric_key, namespace, metric_name, step, step_min, step_max, epoch, epoch_min, epoch_max, and limit.

1. Minimal Run Lifecycle

Use this pattern when you want run metadata, scalar metrics, and structured logs with very little setup.

from pathlib import Path

from quark import EventLogger, MetricsLogger, TrainingSession

storage_dir = Path("quark-runs").resolve()

with (
    TrainingSession.start_run(
        project="image-classifier",
        config={
            "model": "resnet18",
            "optimizer": "adamw",
            "lr": 3e-4,
            "epochs": 5,
        },
        tags=["baseline", "dev"],
        storage_dir=storage_dir,
    ) as session,
    MetricsLogger(session, batch_size=50) as metrics,
    EventLogger(session, batch_size=20) as events,
):
    metrics.log("train/loss", 1.42, step=0, epoch=0)
    metrics.log("optimizer/lr", 3e-4, step=0, epoch=0)

    events.log(
        "checkpoint_saved",
        step=0,
        epoch=0,
        payload={"path": "checkpoints/step-0.pt"},
    )

    session.add_log("Warmup completed without instability.", identifier="Info")

print(session.run_id)
print(session.run_path)

Notes:

  • TrainingSession records lifecycle events such as run_started, run_resumed, and run_ended.
  • TrainingSession.start_run(...) accepts project, optional config, optional tags, optional parent_run_id, optional storage_dir, and strict_mode.
  • The equivalent top-level helper is start_run(...).
  • session.add_log(...) creates a persisted user_log event with an Info, Warning, or Error identifier.
  • session.add_tag(...) persists an additional run tag and returns the refreshed RunContext.
  • session.reload() reloads the run from the persistence service and returns the refreshed RunContext.
  • session.to_dict() returns the full context payload, including project and run records.
  • The TrainingSession context manager marks the run as completed on success, failed on exceptions, and interrupted on keyboard or system exits.
  • Runtime exceptions inside the session context are persisted as runtime_error logs with the Error identifier. Interruptions are persisted as runtime_interruption logs with the Warning identifier.
  • Supported terminal statuses are completed, failed, and interrupted.

Structured Logs And Runtime Errors

Use session.add_log(...) for immediate run-level messages and EventLogger.add_log(...) when you want batched log writes alongside other events.

with TrainingSession.start_run(project="classifier", storage_dir=storage_dir) as session:
    session.add_log("Loaded checkpoint from step 1200.", identifier="Info")

    with EventLogger(session, batch_size=20) as events:
        events.add_log(
            "Validation loss increased for three checks.",
            identifier="Warning",
            step=1280,
            payload={"window": 3},
        )

        train_one_epoch()

If train_one_epoch() raises, the session context writes a runtime_error event with identifier: "Error" and then ends the run as failed. If a KeyboardInterrupt or SystemExit leaves the context, it writes a runtime_interruption event with identifier: "Warning" and ends the run as interrupted.

EventLogger.log(...) accepts custom event types. If severity is omitted, common event types receive sensible defaults. Supported severity values are:

  • debug
  • info
  • warning
  • error
  • critical

Event types must start with a lowercase letter and may contain lowercase letters, digits, _, ., /, or -. add_log(...) identifiers are Info, Warning, or Error; log payloads may not define reserved keys named message or identifier.

EventLogger.read(...) flushes pending events and accepts event_type, event_types, severity, step, step_min, step_max, epoch, epoch_min, epoch_max, and limit.

2. Full PyTorch Training Loop Instrumentation

Use this pattern when you want scalar metrics, structured events, runtime telemetry, and layer summaries from the same training loop.

Assumptions:

  • model, optimizer, criterion, train_loader, device, and num_epochs already exist.
  • train_loader supports len(train_loader). If it does not, replace len(train_loader) with your own step counter logic.
from pathlib import Path

from quark import (
    EventLogger,
    HookEngine,
    HookEngineConfig,
    HookScheduleConfig,
    LayerFilterConfig,
    MetricsLogger,
    SystemTelemetry,
    SystemTelemetryConfig,
    TrainingSession,
)

storage_dir = Path("quark-runs").resolve()

hook_config = HookEngineConfig(
    schedule=HookScheduleConfig(every_n_steps=25),
    filters=LayerFilterConfig(
        include_types=("Linear", "Conv2d", "LayerNorm", "ReLU"),
    ),
)

with (
    TrainingSession.start_run(
        project="image-classifier",
        config={"model": "resnet18", "epochs": num_epochs},
        tags=["experiment", "pytorch"],
        storage_dir=storage_dir,
    ) as session,
    MetricsLogger(session, batch_size=100) as metrics,
    EventLogger(session, batch_size=50) as events,
    SystemTelemetry(
        session,
        config=SystemTelemetryConfig(sample_every_n_steps=10),
    ) as telemetry,
    HookEngine(session, model, optimizer, config=hook_config) as hooks,
):
    for epoch in range(num_epochs):
        train_iter = iter(train_loader)

        for step_in_epoch in range(len(train_loader)):
            global_step = (epoch * len(train_loader)) + step_in_epoch

            with telemetry.start_step(
                global_step,
                epoch=epoch,
                metadata={"split": "train"},
            ) as step_scope:
                inputs, targets = next(train_iter)
                inputs = inputs.to(device)
                targets = targets.to(device)
                step_scope.mark_data_ready()

                optimizer.zero_grad(set_to_none=True)

                with hooks.step(global_step, epoch=epoch) as probe_active:
                    logits = model(inputs)
                    loss = criterion(logits, targets)
                    loss.backward()

                optimizer.step()

            metrics.log("train/loss", float(loss.item()), step=global_step, epoch=epoch)
            metrics.log(
                "optimizer/lr",
                float(optimizer.param_groups[0]["lr"]),
                step=global_step,
                epoch=epoch,
            )

            if global_step % 100 == 0:
                events.log(
                    "checkpoint_saved",
                    step=global_step,
                    epoch=epoch,
                    payload={"path": f"checkpoints/step-{global_step}.pt"},
                )

            if probe_active:
                events.log(
                    "probe_completed",
                    step=global_step,
                    epoch=epoch,
                    payload={
                        "instrumented_layers": hooks.instrumented_module_names,
                    },
                )

Notes:

  • HookEngine only probes steps allowed by HookScheduleConfig.
  • SystemTelemetry stores telemetry records and also mirrors timing values into scalar metrics such as system/runtime/step_time_ms.
  • MetricsLogger can log both conventional keys and project-specific custom metrics in the same run.
  • LayerFilterConfig lets you keep instrumentation focused on the layers you actually care about.

Hook Configuration Details

HookScheduleConfig controls when probes are active:

  • every_n_steps: probe cadence. Set to None to disable cadence-based probing.
  • start_step: first eligible cadence step.
  • stop_step: last eligible cadence step.
  • explicit_steps: exact steps that should always be probed.

LayerFilterConfig controls which modules receive hooks:

  • include_names and exclude_names match module names from model.named_modules().
  • include_types and exclude_types match either a class name such as Linear or a fully qualified type name.
  • include_name_patterns and exclude_name_patterns are regular expressions matched against module names.
  • leaf_modules_only=True skips container modules and instruments only leaves.

HookEngineConfig controls probe contents and failure behavior:

  • capture_forward records activation summaries.
  • capture_backward records gradient summaries.
  • batch_size controls buffered layer-summary writes.
  • raise_on_error overrides session strict_mode for hook failures.
  • dead_unit_epsilon controls how close to zero an activation must be to count as dead.
  • saturation_threshold controls saturation-ratio detection. Set it to None to disable saturation ratios.
  • saturation_module_types defaults to sigmoid/tanh-like activation modules.

Layer summaries include module_name, module_type, summary_type, hook_direction, call_index, element_count, optional tensor_shape, mean, std, norm, variance, dead_unit_ratio, saturation_ratio, and optional metadata.

hooks.read(...) flushes pending summaries and accepts module_name, module_type, summary_type, hook_direction, step filters, epoch filters, and limit.

System Telemetry Details

SystemTelemetryConfig fields:

  • sample_every_n_steps: cadence for record_step(...) and start_step(...) output.
  • emit_scalar_metrics: mirror supported runtime fields into scalar metrics.
  • capture_cpu: enable CPU/RAM/process memory sampling.
  • capture_gpu: enable GPU sampling.
  • gpu_index: choose a GPU index when GPU sampling is available.

start_step(...) is the easiest timing API. Call mark_data_ready() after the batch has moved to the device; exiting the scope calls finish() automatically.

with telemetry.start_step(step, epoch=epoch) as scope:
    batch = next(train_iter)
    inputs = batch["inputs"].to(device)
    targets = batch["targets"].to(device)
    scope.mark_data_ready()

    loss = train_one_step(inputs, targets)

# Optional explicit flags:
telemetry.record_step(
    step,
    epoch=epoch,
    step_time_ms=37.5,
    overflow_occurred=found_inf,
    skipped_step=skipped_optimizer_step,
    metadata={"split": "train"},
)

telemetry.sample(...) captures point-in-time CPU/GPU/RAM state. If you provide step, the record boundary is step; otherwise it is time.

Mirrored scalar metric keys currently include:

  • system/runtime/step_time_ms
  • system/runtime/data_time_ms
  • system/runtime/compute_time_ms
  • system/cpu/usage_percent
  • system/ram/usage_percent
  • system/ram/used_bytes
  • system/process/rss_bytes
  • system/gpu/utilization_percent
  • system/gpu/memory_used_bytes
  • system/gpu/memory_usage_percent
  • system/runtime/overflow_count
  • system/runtime/skipped_step_count

telemetry.read(...) flushes pending telemetry and accepts boundary, step filters, epoch filters, and limit.

3. Capture Artifacts

Use ArtifactStore when you want to persist non-scalar outputs such as histograms, checkpoint metadata, or custom JSON payloads.

from pathlib import Path

from quark import ArtifactStore, TrainingSession

storage_dir = Path("quark-runs").resolve()

with (
    TrainingSession.start_run(
        project="artifact-demo",
        storage_dir=storage_dir,
    ) as session,
    ArtifactStore(session) as artifacts,
):
    histogram = artifacts.log_histogram(
        "activation",
        bins=[-1.0, 0.0, 1.0],
        counts=[12, 4],
        step=120,
        epoch=2,
        namespace="layers/encoder",
        metadata={"module": "encoder.0"},
    )

    snapshot = artifacts.log_snapshot(
        "warmup",
        {
            "checkpoint_path": "checkpoints/step-120.pt",
            "checkpoint_size_bytes": 8192,
        },
        snapshot_type="checkpoint_metadata",
        step=120,
        epoch=2,
        namespace="trainer",
        metadata={"phase": "warmup"},
    )

    print(histogram.payload_relative_path)
    print(snapshot.metadata_relative_path)
    print(artifacts.load(snapshot))

Useful artifact types include:

  • histogram
  • tensor_summary
  • embedding_snapshot
  • batch_diagnostics_snapshot
  • checkpoint_metadata
  • similarity_analysis_output
  • alert_evidence_bundle

Artifact payload formats:

  • json for JSON-serializable Python values
  • text for Python strings
  • bytes for bytes-like payloads

If payload_format is omitted, ArtifactStore.store(...) infers text for strings, bytes for bytes-like objects, and json otherwise.

Artifact names follow the same namespacing style as metrics. Each segment must start with a letter or digit and may contain letters, digits, ., _, or -. You can pass name="layers/encoder/activation" or name="activation", namespace="layers/encoder".

Generic artifact API:

record = artifacts.store(
    "tensor_summary",
    "layers/encoder/weights",
    {"mean": 0.02, "std": 0.31},
    step=120,
    payload_format="json",
)

records = artifacts.read(
    artifact_type="tensor_summary",
    namespace="layers/encoder",
    step_min=100,
)
payload = artifacts.load(record)
same_payload = artifacts.load(record.id)

artifacts.read(...) accepts artifact_id, artifact_type, artifact_types, namespace, artifact_name, artifact_key, payload_format, step filters, epoch filters, and limit.

Retention hooks can observe each stored artifact:

def keep_only_metadata(store, record):
    if record.artifact_type == "checkpoint_metadata":
        print(record.artifact_key, record.size_bytes)

with ArtifactStore(
    session,
    retention_hooks=(keep_only_metadata,),
    raise_on_retention_error=False,
) as artifacts:
    artifacts.log_snapshot("latest", {"checkpoint": "step-120.pt"})

If raise_on_retention_error is omitted, it follows the run's strict_mode.

4. Batch and Data Diagnostics

Use DataDiagnostics when you want batch-level loss tracking, class exposure summaries, sample traces, and hard-batch detection without storing raw samples.

If your dataloader yields mapping-like batches, MappingBatchMetadataAdapter is the easiest setup:

Assume train_loader yields mapping-like batches and train_step(batch) returns the scalar batch loss for that batch.

from pathlib import Path

from quark import (
    DataDiagnostics,
    DataDiagnosticsConfig,
    MappingBatchMetadataAdapter,
    TrainingSession,
)

storage_dir = Path("quark-runs").resolve()

adapter = MappingBatchMetadataAdapter(
    sample_id_key="sample_ids",
    label_key="labels",
    sequence_length_key="lengths",
    metadata_keys=("loader",),
)

with (
    TrainingSession.start_run(
        project="data-diagnostics-demo",
        storage_dir=storage_dir,
    ) as session,
    DataDiagnostics(
        session,
        config=DataDiagnosticsConfig(
            batch_size=32,
            adapter=adapter,
        ),
    ) as diagnostics,
):
    for step, batch in enumerate(train_loader):
        loss = train_step(batch)
        diagnostics.log_batch(
            float(loss),
            step=step,
            epoch=0,
            batch=batch,
        )

    summary = diagnostics.summarize(hard_batch_limit=5)
    exposure = diagnostics.summarize_exposure()
    hard_batches = diagnostics.find_hard_batches(limit=5)

    print(summary.loss_summary.to_dict() if summary.loss_summary else None)
    print(exposure.to_dict())
    print([finding.to_dict() for finding in hard_batches])

If your batch object is not mapping-like, implement BatchMetadataAdapter and return a BatchMetadata instance.

from quark import BatchMetadata, BatchMetadataAdapter

class TupleBatchAdapter(BatchMetadataAdapter):
    def extract_batch_metadata(self, batch):
        inputs, targets, sample_ids = batch
        return BatchMetadata(
            batch_size=len(sample_ids),
            sample_ids=sample_ids,
            class_counts=targets,
            metadata={"source": "tuple_loader"},
        )

You can also skip adapters and pass fields directly:

diagnostics.log_batch(
    loss=float(loss),
    step=step,
    epoch=epoch,
    batch_size=len(targets),
    labels=targets,
    sample_ids=batch_ids,
    sequence_lengths=token_lengths,
    metadata={"augmentation": "mixup"},
)

DataDiagnosticsConfig fields:

  • batch_size: buffered write size.
  • adapter: optional BatchMetadataAdapter.
  • outlier_z_score_threshold: z-score threshold for loss and sequence-length outliers.
  • outlier_iqr_multiplier: IQR multiplier for upper-fence outliers.
  • hard_batch_percentile: percentile used as the high-loss hard-batch threshold.
  • min_batches_for_outlier_detection: minimum records before outlier rules activate.

DataDiagnostics.read(...) accepts sample_id, step filters, epoch filters, and limit. summarize(...), summarize_exposure(...), and find_hard_batches(...) accept step and epoch range filters.

Diagnostics records do not store raw samples. They store loss, optional batch size, optional sample IDs, class counts, sequence lengths, and JSON metadata.

5. Analyze a Run

AnalyzerEngine turns persisted metrics and layer summaries into alerts and health scores.

PhaseAnalyzer segments a run into training regimes such as rapid learning, stabilization, stagnation, overfitting onset, and divergence.

Run this after the current session already has persisted metrics, and preferably layer summaries as well.

from quark import AnalyzerEngine, PhaseAnalyzer

analyzer = AnalyzerEngine(session)
analysis = analyzer.analyze()

phase_analyzer = PhaseAnalyzer(session)
phase_result = phase_analyzer.analyze()

print([alert.to_dict() for alert in analysis.alerts])
print([score.to_dict() for score in analysis.health_scores])
print([phase.to_dict() for phase in phase_result.phases])

runtime_alerts = analyzer.read_alerts(category="runtime")
overfitting_phases = phase_analyzer.read_phases(phase_name="overfitting_onset")

Notes:

  • AnalyzerEngine can operate on metrics alone, but some alerts become much more useful when layer summaries are available.
  • PhaseAnalyzer expects either training and validation loss metrics or training and validation accuracy metrics.
  • analyze(replace_existing=False) appends outputs instead of replacing prior analyzer or phase outputs.
  • AnalyzerEngineConfig.replace_existing_outputs and PhaseAnalyzerConfig.replace_existing_outputs set the default replacement behavior.

Analyzer alert rule IDs currently include:

  • exploding_gradients
  • vanishing_gradients
  • stagnant_layers
  • activation_collapse
  • overfitting_onset
  • runtime_bottleneck_risk

Analyzer categories are optimization, representation, generalization, and runtime. Severities are info, warning, error, and critical.

Health scores are produced for optimization, representation, generalization, runtime, and overall. Scores are 0 to 100 with statuses healthy, watch, degraded, or critical.

Thresholds are configurable:

from quark import AnalyzerEngine, AnalyzerEngineConfig, AnalyzerThresholds

analyzer = AnalyzerEngine(
    session,
    config=AnalyzerEngineConfig(
        thresholds=AnalyzerThresholds(
            exploding_gradient_warning_norm=20.0,
            runtime_warning_step_time_ms=400.0,
            overfitting_warning_gap=0.15,
        ),
        replace_existing_outputs=True,
    ),
)

Phase analyzer outputs use these phase names:

  • initialization_adaptation
  • rapid_learning
  • stabilization
  • stagnation
  • overfitting_onset
  • divergence

Phase thresholds are also configurable:

from quark import PhaseAnalyzer, PhaseAnalyzerConfig, PhaseAnalyzerThresholds

phase_analyzer = PhaseAnalyzer(
    session,
    config=PhaseAnalyzerConfig(
        thresholds=PhaseAnalyzerThresholds(
            initialization_points=3,
            rapid_learning_min_relative_change=0.12,
            stagnation_max_relative_change=0.005,
        ),
    ),
)

6. Compare Runs and Baselines

Use ComparisonEngine when you want structured diffs across metrics, alerts, phases, and layer summaries.

Assume candidate_session and baseline_session already refer to runs with persisted telemetry and analysis outputs.

from quark import (
    ComparisonEngine,
    ComparisonEngineConfig,
    MetricTargetSpec,
)

comparison = ComparisonEngine(
    candidate_session,
    config=ComparisonEngineConfig(
        metric_targets=(
            MetricTargetSpec(
                metric_key="val/accuracy",
                target_value=0.80,
                direction="at_least",
            ),
        ),
    ),
).compare_to_baseline(baseline_session.run_id)

print(comparison.mode)
print(comparison.primary.to_dict())
print(comparison.reference.to_dict())
print([item.to_dict() for item in comparison.metric_diffs])
print(comparison.alert_diff.to_dict())
print([item.to_dict() for item in comparison.phase_diffs])
print([item.to_dict() for item in comparison.layer_summary_diffs])

For direct pairwise comparisons, use:

comparison = ComparisonEngine(primary_session).compare(reference_session)

You can also compare in one call:

comparison = ComparisonEngine.compare_runs(
    primary_session,
    baseline_session,
    baseline=True,
    config=ComparisonEngineConfig(
        metric_targets=(
            MetricTargetSpec("val/loss", 0.5, "at_most"),
        ),
    ),
)

Comparison targets can be a TrainingSession, a RunContext, or a run ID string. When you pass a run ID string, the comparison engine loads it from the primary run's storage_dir.

RunComparisonResult.to_dict() includes:

  • schema_version
  • generated_at
  • mode: pairwise or baseline
  • primary and reference run metadata
  • metric_diffs
  • alert_diff
  • phase_diffs
  • layer_summary_diffs

MetricTargetSpec.direction is either at_least or at_most, and enables time_to_target_step diffs for that metric.

7. Resume Interrupted Runs

Use resume_run when you are intentionally continuing a previously interrupted run.

from pathlib import Path

from quark import TrainingSession

storage_dir = Path("quark-runs").resolve()

session = TrainingSession.start_run(
    project="resume-demo",
    storage_dir=storage_dir,
)
session.end_run(status="interrupted")

resumed = TrainingSession.resume_run(
    run_id=session.run_id,
    storage_dir=storage_dir,
)
resumed.add_tag("recovered")
resumed.add_log("Recovered after a scheduled maintenance window.", identifier="Info")
resumed.end_run(status="completed")

Important:

  • resume_run(...) changes lifecycle state and emits a run_resumed event.
  • Use it for continuation, not just read-only inspection.
  • TrainingSession.resume_run(...) and top-level resume_run(...) accept run_id and optional storage_dir.
  • RunNotFoundError is raised when the service cannot find the requested run.
  • InvalidRunStateError is raised when the requested lifecycle transition is not allowed.

8. Records, Schemas, and Validation

The SDK returns dataclass records for persisted data. Each record has to_dict() for JSON-friendly output.

Core records:

Record Purpose Important fields
ProjectRecord Project identity id, name, root_path, timestamps
RunRecord Run lifecycle and lineage id, project_id, project_name, status, parent_run_id, lineage_root_run_id, resume_count, run_path, git_commit, strict_mode, tags
RunContext Shared session context storage_dir, store_root, db_path, project, run; properties mirror project/run IDs and status
MetricRecord Scalar metric sample metric_key, namespace, metric_name, value, step, epoch, description, metadata
EventRecord Structured event/log event_type, severity, step, epoch, status, payload
SystemTelemetryRecord Runtime telemetry timing fields, CPU/RAM fields, GPU fields, overflow/skipped-step counters, metadata
LayerSummaryRecord Hook summary module identity, activation/gradient direction, tensor statistics, dead/saturation ratios
BatchDiagnosticRecord Batch metadata loss, batch_size, sample_ids, class_counts, sequence_lengths, metadata
ArtifactRecord Artifact metadata artifact identity, payload format, paths, size, SHA-256, step/epoch
EvidenceRecord Analyzer evidence kind, source, summary, observed/threshold values, step range
AlertRecord Analyzer alert rule_id, category, severity, title, summary, fact, inference, evidence
HealthScoreRecord Category score score_key, category, score, status, evidence, contributing alert IDs
PhaseRecord Training phase segment phase_name, display_name, summary, step_start, step_end, evidence

Supported literal values:

  • Run statuses: running, completed, failed, interrupted
  • Event severities: debug, info, warning, error, critical
  • Alert severities: info, warning, error, critical
  • Alert categories: optimization, representation, generalization, runtime
  • Health score keys: overall, optimization, representation, generalization, runtime
  • Telemetry boundaries: step, time
  • Layer summary types: activation, gradient
  • Hook directions: forward, backward
  • Artifact payload formats: json, text, bytes

Schema helpers exported by quark:

from quark import CURRENT_SCHEMA_VERSION, dump_record, migrate_store, validate_record

validate_record(metric_record)
payload = dump_record(metric_record)
version = migrate_store(storage_dir, generated_at="2026-05-07T00:00:00Z")
print(CURRENT_SCHEMA_VERSION, version, payload)

Use validate_record(...) for SDK record dataclasses before sending data to custom tooling. Use dump_record(...) when you need a validated dictionary payload. Use migrate_store(...) only for legacy local .quark stores that predate the current schema. It returns the current schema version or raises SchemaMigrationError.

Schema exceptions:

  • SchemaValidationError: a record or snapshot does not match the current schema.
  • SchemaMigrationError: a local store cannot be migrated safely.

9. Persistence Service

The SDK now expects an external Quark persistence service. It no longer starts an embedded persistence process automatically.

Start the persistence service separately using that service's own deployment or startup flow. Then create a user key with the auth flow described in src/persistence/API.md and point the SDK at the running service:

$env:QUARK_PERSISTENCE_URL = "http://127.0.0.1:8765"
$env:QUARK_USER_KEY = "qk_..."
$env:QUARK_PERSISTENCE_TIMEOUT_SECONDS = "30"
python train.py

Notes:

  • QUARK_PERSISTENCE_URL defaults to http://127.0.0.1:8765.
  • QUARK_USER_KEY is required for protected SDK requests unless you pass user_key directly to the low-level LocalSessionStore.
  • The SDK sends X-Quark-API-Key by default on protected requests.
  • Set QUARK_PERSISTENCE_AUTH_MODE=bearer if you want the SDK to send Authorization: Bearer ... instead.
  • QUARK_PERSISTENCE_TIMEOUT_SECONDS defaults to 15.0.
  • The SDK always sends X-Quark-Storage-Dir so the service knows which project store to read or write.
  • The service may return run status active; the SDK maps that to public status running.
  • Buffered sequence counters are seeded from service sequence reads. This assumes one SDK writer process per run.

Low-level service client:

from storage import LocalSessionStore

store = LocalSessionStore(
    storage_dir,
    base_url="http://127.0.0.1:8765",
    user_key="qk_...",
    auth_mode="header",
    timeout_seconds=30,
)
context = store.load_run(run_id)
metrics = store.read_scalar_metrics(run_id, metric_key="train/loss")

The lower-level client methods mirror the persistence service routes:

  • Run lifecycle: create_run, load_run, resume_run, end_run, add_tag, add_log
  • Sequence allocation: allocate_event_sequence, allocate_metric_sequence, allocate_system_telemetry_sequence, get_next_layer_summary_sequence, get_next_batch_diagnostic_sequence, get_next_artifact_sequence
  • Events: write_events, read_events
  • Metrics: write_scalar_metrics, read_scalar_metrics
  • Telemetry: write_system_telemetry, read_system_telemetry
  • Layers: write_layer_summaries, read_layer_summaries
  • Batch diagnostics: write_batch_diagnostics, read_batch_diagnostics
  • Artifacts: persist_artifact, read_artifacts, load_artifact_payload
  • Analysis: write_analysis_outputs, read_alerts, read_health_scores
  • Phases: write_phases, read_phases

Catch service errors like this:

from quark import InvalidRunStateError, RunNotFoundError, SessionStoreError, TrainingSession

try:
    session = TrainingSession.resume_run(run_id=run_id, storage_dir=storage_dir)
except RunNotFoundError:
    raise SystemExit(f"Run does not exist: {run_id}")
except InvalidRunStateError as exc:
    raise SystemExit(f"Cannot resume this run: {exc}")
except SessionStoreError as exc:
    raise SystemExit(f"Persistence service error: {exc}")

10. Constructor and Method Reference

Session entrypoints:

TrainingSession.start_run(
    *,
    project: str,
    config: dict | None = None,
    tags: Iterable[str] | None = None,
    parent_run_id: str | None = None,
    storage_dir: str | Path | None = None,
    strict_mode: bool = False,
) -> TrainingSession

TrainingSession.resume_run(*, run_id: str, storage_dir: str | Path | None = None)
session.reload() -> RunContext
session.add_tag(tag: str) -> RunContext
session.add_log(message: str, *, identifier: str | None = "Info", step: int | None = None, epoch: int | None = None, status: str | None = None, payload: Mapping | None = None, timestamp: str | None = None, event_type: str = "user_log") -> EventRecord
session.end_run(status: str = "completed") -> RunContext
session.to_dict() -> dict

Writer and reader entrypoints:

MetricsLogger(session, *, batch_size: int = 100)
metrics.log(metric: str, value: float, *, step: int | None = None, epoch: int | None = None, namespace: str | None = None, description: str | None = None, metadata: Mapping | None = None, timestamp: str | None = None) -> MetricRecord
metrics.flush() -> tuple[MetricRecord, ...]
metrics.read(...filters...) -> tuple[MetricRecord, ...]
metrics.close() -> None

EventLogger(session, *, batch_size: int = 100)
events.log(event_type: str, *, severity: str | None = None, step: int | None = None, epoch: int | None = None, status: str | None = None, payload: Mapping | None = None, timestamp: str | None = None) -> EventRecord
events.add_log(message: str, *, identifier: str | None = "Info", step: int | None = None, epoch: int | None = None, status: str | None = None, payload: Mapping | None = None, timestamp: str | None = None, event_type: str = "user_log") -> EventRecord
events.flush() -> tuple[EventRecord, ...]
events.read(...filters...) -> tuple[EventRecord, ...]
events.close() -> None

SystemTelemetry(session, *, config: SystemTelemetryConfig | None = None, batch_size: int = 20)
telemetry.start_step(step: int, *, epoch: int | None = None, metadata: Mapping | None = None) -> StepTelemetryScope
telemetry.sample(*, step: int | None = None, epoch: int | None = None, metadata: Mapping | None = None, timestamp: str | None = None) -> SystemTelemetryRecord
telemetry.record_step(step: int, *, epoch: int | None = None, step_time_ms: float | None = None, data_time_ms: float | None = None, compute_time_ms: float | None = None, overflow_occurred: bool = False, skipped_step: bool = False, metadata: Mapping | None = None, timestamp: str | None = None) -> SystemTelemetryRecord | None
telemetry.flush() -> tuple[SystemTelemetryRecord, ...]
telemetry.read(...filters...) -> tuple[SystemTelemetryRecord, ...]
telemetry.close() -> None

HookEngine(session, model, optimizer=None, *, config: HookEngineConfig | None = None)
hooks.should_probe(step: int) -> bool
hooks.begin_step(step: int, *, epoch: int | None = None) -> bool
hooks.end_step() -> None
hooks.step(step: int, *, epoch: int | None = None) -> context manager yielding bool
hooks.flush() -> tuple[LayerSummaryRecord, ...]
hooks.read(...filters...) -> tuple[LayerSummaryRecord, ...]
hooks.close() -> None

ArtifactStore(session, *, retention_hooks: Sequence[Callable] | None = None, raise_on_retention_error: bool | None = None)
artifacts.store(artifact_type: str, name: str, payload: Any, *, step: int | None = None, epoch: int | None = None, namespace: str | None = None, payload_format: str | None = None, metadata: Mapping | None = None, timestamp: str | None = None) -> ArtifactRecord
artifacts.log_histogram(name: str, *, bins: Sequence[float], counts: Sequence[float], step: int | None = None, epoch: int | None = None, namespace: str | None = None, metadata: Mapping | None = None, timestamp: str | None = None) -> ArtifactRecord
artifacts.log_snapshot(name: str, payload: Any, *, snapshot_type: str = "batch_diagnostics_snapshot", step: int | None = None, epoch: int | None = None, namespace: str | None = None, payload_format: str | None = None, metadata: Mapping | None = None, timestamp: str | None = None) -> ArtifactRecord
artifacts.read(...filters...) -> tuple[ArtifactRecord, ...]
artifacts.load(artifact: str | ArtifactRecord) -> Any
artifacts.close() -> None

DataDiagnostics(session, *, config: DataDiagnosticsConfig | None = None)
diagnostics.log_batch(loss: float, *, step: int | None = None, epoch: int | None = None, batch: Any | None = None, batch_size: int | None = None, labels: Any | None = None, sample_ids: Any | None = None, sequence_lengths: Any | None = None, metadata: Mapping | None = None, timestamp: str | None = None) -> BatchDiagnosticRecord
diagnostics.flush() -> tuple[BatchDiagnosticRecord, ...]
diagnostics.read(...filters...) -> tuple[BatchDiagnosticRecord, ...]
diagnostics.summarize_exposure(...range filters...) -> ExposureSummary
diagnostics.find_hard_batches(...range filters..., limit: int | None = 10) -> tuple[HardBatchFinding, ...]
diagnostics.summarize(...range filters..., hard_batch_limit: int | None = 10) -> DataDiagnosticsSummary
diagnostics.close() -> None

Analyzer and comparison entrypoints:

AnalyzerEngine(session, *, config: AnalyzerEngineConfig | None = None)
analyzer.analyze(*, replace_existing: bool | None = None) -> AnalyzerResult
analyzer.read_alerts(*, rule_id: str | None = None, category: str | None = None, severity: str | None = None, step: int | None = None, step_min: int | None = None, step_max: int | None = None, limit: int | None = None)
analyzer.read_health_scores(*, score_key: str | None = None, category: str | None = None, limit: int | None = None)

PhaseAnalyzer(session, *, config: PhaseAnalyzerConfig | None = None)
phase_analyzer.analyze(*, replace_existing: bool | None = None) -> PhaseAnalysisResult
phase_analyzer.read_phases(*, phase_name: str | None = None, step: int | None = None, step_min: int | None = None, step_max: int | None = None, limit: int | None = None)

ComparisonEngine(session, *, config: ComparisonEngineConfig | None = None)
comparison.compare(other, *, baseline: bool = False) -> RunComparisonResult
comparison.compare_to_baseline(baseline_run) -> RunComparisonResult
ComparisonEngine.compare_runs(primary, reference, *, baseline: bool = False, config: ComparisonEngineConfig | None = None) -> RunComparisonResult

Configuration defaults:

Config Fields
SystemTelemetryConfig sample_every_n_steps=1, emit_scalar_metrics=True, capture_cpu=True, capture_gpu=True, gpu_index=None
HookScheduleConfig every_n_steps=1, start_step=0, stop_step=None, explicit_steps=()
LayerFilterConfig include_names=(), exclude_names=(), include_types=(), exclude_types=(), include_name_patterns=(), exclude_name_patterns=(), leaf_modules_only=True
HookEngineConfig schedule=HookScheduleConfig(), filters=LayerFilterConfig(), capture_forward=True, capture_backward=True, batch_size=100, raise_on_error=None, dead_unit_epsilon=0.0, saturation_threshold=0.95, saturation_module_types=("Sigmoid", "Tanh", "Hardsigmoid", "Hardtanh", "ReLU6")
DataDiagnosticsConfig batch_size=100, adapter=None, outlier_z_score_threshold=2.5, outlier_iqr_multiplier=1.5, hard_batch_percentile=0.9, min_batches_for_outlier_detection=5
AnalyzerEngineConfig thresholds=AnalyzerThresholds(), replace_existing_outputs=True, timing metric key tuples for step/data/compute time
PhaseAnalyzerConfig thresholds=PhaseAnalyzerThresholds(), replace_existing_outputs=True, metric key tuples for train/validation loss and accuracy
ComparisonEngineConfig metric_targets=()

Default analyzer metric keys:

  • Step time: system/runtime/step_time_ms, system/step_time_ms
  • Data time: system/runtime/data_time_ms, system/data_time_ms
  • Compute time: system/runtime/compute_time_ms, system/compute_time_ms

Default phase metric keys:

  • Train loss: train/loss, training/loss
  • Validation loss: val/loss, validation/loss
  • Train accuracy: train/accuracy, training/accuracy, train/acc
  • Validation accuracy: val/accuracy, validation/accuracy, val/acc

Default AnalyzerThresholds values:

AnalyzerThresholds(
    exploding_gradient_warning_norm=10.0,
    exploding_gradient_error_norm=25.0,
    exploding_gradient_critical_norm=50.0,
    vanishing_gradient_norm=1e-4,
    vanishing_gradient_warning_ratio=0.5,
    vanishing_gradient_error_ratio=0.75,
    vanishing_gradient_critical_ratio=0.9,
    vanishing_gradient_min_samples=4,
    stagnant_layer_gradient_norm=1e-4,
    stagnant_layer_warning_probes=2,
    stagnant_layer_error_probes=4,
    stagnant_layer_critical_probes=6,
    activation_collapse_warning_dead_unit_ratio=0.8,
    activation_collapse_error_dead_unit_ratio=0.92,
    activation_collapse_critical_dead_unit_ratio=0.98,
    activation_collapse_warning_saturation_ratio=0.9,
    activation_collapse_error_saturation_ratio=0.97,
    activation_collapse_critical_saturation_ratio=0.995,
    activation_collapse_warning_std=0.05,
    activation_collapse_error_std=0.01,
    activation_collapse_critical_std=0.001,
    overfitting_warning_gap=0.2,
    overfitting_error_gap=0.4,
    overfitting_critical_gap=0.7,
    overfitting_min_points=3,
    runtime_warning_step_time_ms=250.0,
    runtime_error_step_time_ms=500.0,
    runtime_critical_step_time_ms=1000.0,
    runtime_warning_data_fraction=0.4,
    runtime_error_data_fraction=0.6,
    runtime_critical_data_fraction=0.75,
    runtime_warning_jitter_ratio=0.25,
    runtime_error_jitter_ratio=0.5,
    runtime_critical_jitter_ratio=0.75,
)

Default PhaseAnalyzerThresholds values:

PhaseAnalyzerThresholds(
    initialization_points=2,
    rapid_learning_min_relative_change=0.18,
    stagnation_max_relative_change=0.01,
    overfitting_min_gap=0.12,
    overfitting_min_train_relative_change=0.015,
    overfitting_min_validation_regression=0.015,
    divergence_min_relative_regression=0.15,
    divergence_from_best_ratio=0.25,
)

11. Practical Notes

  • All major SDK modules accept a TrainingSession or a RunContext created by the Quark SDK.
  • MetricsLogger, EventLogger, SystemTelemetry, HookEngine, DataDiagnostics, and ArtifactStore all support context-manager usage.
  • Buffered modules flush automatically on close() and before most read(...) calls, but explicit context management is the safest pattern for long-running jobs.
  • strict_mode=True on TrainingSession.start_run(...) is useful when you want instrumentation failures to fail fast instead of degrading gracefully.
  • AnalyzerEngine persists alerts and health scores for UI surfaces such as top alerts; TrainingSession persists lifecycle activity.
  • session.run_id, session.run_path, session.project_name, and session.status are the main lifecycle properties most projects need.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

abiotic_quark-0.1.0.tar.gz (136.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

abiotic_quark-0.1.0-py3-none-any.whl (102.8 kB view details)

Uploaded Python 3

File details

Details for the file abiotic_quark-0.1.0.tar.gz.

File metadata

  • Download URL: abiotic_quark-0.1.0.tar.gz
  • Upload date:
  • Size: 136.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.1

File hashes

Hashes for abiotic_quark-0.1.0.tar.gz
Algorithm Hash digest
SHA256 a821367d41599c7e48e90406cd553bb2b3678d588a7cf300ea000ee1dcbf4e59
MD5 8d90afbbbe762c370996a350cb9b0380
BLAKE2b-256 b4a63368258d188f9c5db08d7e2be09b960e55eafe186fe194ef09e1b6547384

See more details on using hashes here.

File details

Details for the file abiotic_quark-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: abiotic_quark-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 102.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.1

File hashes

Hashes for abiotic_quark-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 853c123d5ce264176db11a7e560f047315c7cdabe93223671e35d954d5ab6458
MD5 a781c500acaa294107c1701d6b519b2a
BLAKE2b-256 c4435b50220525fa23ad0af382d7f72bb78477ea84e45fb8e110e03d27721f78

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page