Neural training observability SDK for Quark.
Project description
Quark SDK Usage
This guide gives developers and researchers copy-paste examples for instrumenting training jobs, persisting run data, and analyzing results from Python.
Install and Import
From PyPI:
python -m pip install abiotic-quark
Telemetry support is included in the default install. Add the PyTorch integration only when you need hook or parameter instrumentation (no need to install this if you already have torch>=2.0 installed):
python -m pip install "abiotic-quark[hooks]"
python -m pip install "abiotic-quark[all]"
The PyPI distribution is named abiotic-quark; the public Python import:
from quark import MetricsLogger, TrainingSession
Runtime dependencies:
psutilfor richer CPU and RAM telemetrynvidia-ml-pyfor GPU utilization and memory telemetry through thepynvmlmodule- optional
torchforHookEngineandParameterDiagnostics
Most users should import from quark. The lower-level storage and persistence modules are for SDK tooling, migrations, and tests.
Included runnable examples:
examples/session_lifecycle.py: start, tag, log, and end a run.examples/analyzer_diagnostics.py: read persisted analysis outputs and layer summaries.smoke_tests.py: run grouped service and SDK smoke checks.tests/live_service/run_transformer_persistence.py: live-service PyTorch integration test.
Storage Model
Choose a storage_dir for your project or experiment family.
- The SDK uses the resolved
storage_diras the project or experiment storage root. - The persistence service owns the actual metadata and artifact writes under that storage root.
- Each run still gets a run directory on the persistence side. You can access the returned path with
session.run_path. - Before starting runs, point the SDK at the hosted persistence service with
QUARK_PERSISTENCE_URLand provide your user key withQUARK_USER_KEY. - If
storage_diris omitted, the SDK uses the current working directory.
How The SDK Fits Together
The SDK centers on a shared run context created by TrainingSession.start_run(...).
Attach modules such as MetricsLogger, EventLogger, SystemTelemetry, HookEngine, ParameterDiagnostics, ArtifactStore, or DataDiagnostics to that session and they will all write against the same run.
In practice:
MetricsLoggeris the flexible scalar-metric path.SystemTelemetryis the built-in runtime-telemetry path.EventLoggerstores structured lifecycle, training, and log events.TrainingSession.add_log(...)writes immediate run logs withInfo,Warning, orErroridentifiers.HookEnginestores layer summaries from instrumented model internals.ParameterDiagnosticsstores model weight, bias, gradient, and optimizer-update summaries, with optional histogram distributions.- Context managers are the safest pattern because buffered writers flush automatically when the session ends, and uncaught runtime errors or interruptions are logged before lifecycle state is updated.
The persistence path is service-backed:
- Set
QUARK_PERSISTENCE_URLto the hosted persistence service URL. - Set
QUARK_USER_KEYto your Quark user key. - The service URL will be provided in the deployment or project documentation.
- The configured
storage_dirtells the service which project store to read or write.
Public API Map
Everything below is exported by quark unless noted otherwise.
| Area | Start or write | Read or analyze | Config and result types |
|---|---|---|---|
| Sessions | TrainingSession.start_run(...), start_run(...), TrainingSession.resume_run(...), resume_run(...), session.end_run(...), end_run(...), session.add_tag(...), add_tag(...), session.add_log(...), add_log(...) |
session.reload(), session.to_dict() |
RunContext, RunRecord, ProjectRecord |
| Scalar metrics | MetricsLogger(session).log(...) |
metrics.read(...) |
MetricRecord |
| Structured events | EventLogger(session).log(...), events.add_log(...) |
events.read(...) |
EventRecord, LogIdentifier |
| Runtime telemetry | SystemTelemetry(session).start_step(...), telemetry.record_step(...), telemetry.sample(...) |
telemetry.read(...) |
SystemTelemetryConfig, SystemTelemetryRecord |
| PyTorch hooks | HookEngine(session, model, optimizer).step(...), begin_step(...), end_step(...) |
hooks.read(...) |
HookEngineConfig, HookScheduleConfig, LayerFilterConfig, LayerSummaryRecord |
| Parameter diagnostics | ParameterDiagnostics(session, model, optimizer).snapshot(...) |
parameters.read_summaries(...), read_distributions(...), read(...) |
ParameterDiagnosticsConfig, ParameterSummaryRecord, ParameterDistributionRecord |
| Artifacts | ArtifactStore(session).store(...), log_histogram(...), log_snapshot(...) |
artifacts.read(...), artifacts.load(...) |
ArtifactRecord, ARTIFACT_STORAGE_LAYOUT |
| Data diagnostics | DataDiagnostics(session).log_batch(...) |
diagnostics.read(...), summarize(...), summarize_exposure(...), find_hard_batches(...) |
DataDiagnosticsConfig, BatchMetadata, BatchMetadataAdapter, MappingBatchMetadataAdapter, summary/finding records |
| Diagnostics | AnalyzerEngine(session).analyze(...) |
analyzer.read_alerts(...), analyzer.read_health_scores(...) |
AnalyzerEngineConfig, AnalyzerThresholds, AnalyzerResult, AlertRecord, HealthScoreRecord, EvidenceRecord |
| Phases | PhaseAnalyzer(session).analyze(...) |
phase_analyzer.read_phases(...) |
PhaseAnalyzerConfig, PhaseAnalyzerThresholds, PhaseAnalysisResult, PhaseRecord |
| Comparison | ComparisonEngine(session).compare(...), compare_to_baseline(...), ComparisonEngine.compare_runs(...) |
returned RunComparisonResult |
ComparisonEngineConfig, MetricTargetSpec, comparison diff records |
| Schemas | migrate_store(...) |
validate_record(...), dump_record(...) |
CURRENT_SCHEMA_VERSION, SchemaValidationError, SchemaMigrationError |
| Errors | raised by service-backed storage calls | catch in application code | SessionStoreError, RunNotFoundError, InvalidRunStateError |
Shared rules:
- Writer modules accept a
TrainingSessionor aRunContext. - Writer modules return immutable dataclass records. Use
.to_dict()on records and result objects when you want plain JSON-like dictionaries. step,epoch, range filters, and sequence counters are non-negative integers.metadata, event payloads, config snapshots, and environment snapshots must be JSON-serializable.flush()writes buffered records and returns the persisted records.- Most
read(...)methods flush first when the writer is still open. close()flushes buffered writers and marks them closed.- Context managers are preferred because they flush writers and close hooks/samplers even when the training loop exits early.
Custom Scalar Metrics
MetricsLogger is not limited to built-in keys such as train/loss or optimizer/lr.
You can log arbitrary scalar metrics as long as they follow Quark's naming and value rules.
from pathlib import Path
from quark import MetricsLogger, TrainingSession
storage_dir = Path("quark-runs").resolve()
with (
TrainingSession.start_run(
project="custom-metrics-demo",
storage_dir=storage_dir,
) as session,
MetricsLogger(session, batch_size=50) as metrics,
):
metrics.log(
"my_eval/f1_macro",
0.812,
step=120,
epoch=2,
description="Macro F1 on the internal holdout split.",
metadata={
"split": "holdout_internal",
"threshold": 0.5,
},
)
metrics.log(
"data/duplicate_ratio",
0.034,
step=120,
epoch=2,
metadata={"window": "epoch"},
)
metrics.log(
"latency_ms",
18.4,
epoch=2,
namespace="serving/shadow",
metadata={"unit": "milliseconds"},
)
Rules for custom scalar metrics:
- Every metric must include at least
steporepoch. - Values must be finite numeric scalars.
- You can pass a full metric key such as
my_eval/f1_macro, or passmetric="f1_macro"withnamespace="my_eval". - Bare metric names such as
lossare accepted, but namespaced keys are preferred for analyzers and UI grouping. - Each namespace or metric-name segment must start with a letter or digit and may contain letters, digits,
.,_, or-. descriptionis optional human-readable context stored with the metric.metadatais optional and must be JSON-serializable.
Choose between custom metrics and telemetry like this:
- Use
MetricsLoggerwhen you need a new scalar signal with your own name. - Use
SystemTelemetrywhen you want the built-in runtime schema for timings, CPU, RAM, GPU, overflow counts, and skipped-step counts. SystemTelemetrycan also mirror selected built-in runtime fields into scalar metrics such assystem/runtime/step_time_ms.- If you need extra context on a telemetry record, put it in
metadata. If you need a brand-new scalar series, log it throughMetricsLogger.
Built-in analyzers use conventional metric keys by default:
AnalyzerEnginelooks for runtime timing metrics such assystem/runtime/step_time_ms,system/runtime/data_time_ms, andsystem/runtime/compute_time_ms.PhaseAnalyzerlooks for training and validation loss or accuracy keys such astrain/loss,val/loss,train/accuracy, andval/accuracy.
If your project uses different names, configure the analyzers explicitly:
from quark import AnalyzerEngine, AnalyzerEngineConfig, PhaseAnalyzer, PhaseAnalyzerConfig
analyzer = AnalyzerEngine(
session,
config=AnalyzerEngineConfig(
step_time_metric_keys=("perf/step_ms",),
data_time_metric_keys=("perf/data_ms",),
compute_time_metric_keys=("perf/compute_ms",),
),
)
phase_analyzer = PhaseAnalyzer(
session,
config=PhaseAnalyzerConfig(
train_loss_metric_keys=("objective/train_ce",),
val_loss_metric_keys=("objective/val_ce",),
train_accuracy_metric_keys=("objective/train_top1",),
val_accuracy_metric_keys=("objective/val_top1",),
),
)
If you stay with the conventional names, the default analyzer configuration works out of the box.
Read Metrics Back
MetricsLogger.read(...) flushes pending metrics and returns MetricRecord objects ordered by persistence sequence:
losses = metrics.read(metric_key="train/loss", step_min=100, step_max=500)
latest_eval = metrics.read(namespace="eval", limit=20)
for record in losses:
print(record.step, record.metric_key, record.value, record.to_dict())
Useful filters are metric_key, namespace, metric_name, step, step_min, step_max, epoch, epoch_min, epoch_max, and limit.
1. Minimal Run Lifecycle
Use this pattern when you want run metadata, scalar metrics, and structured logs with very little setup.
from pathlib import Path
from quark import EventLogger, MetricsLogger, TrainingSession
storage_dir = Path("quark-runs").resolve()
with (
TrainingSession.start_run(
project="image-classifier",
config={
"model": "resnet18",
"optimizer": "adamw",
"lr": 3e-4,
"epochs": 5,
},
tags=["baseline", "dev"],
storage_dir=storage_dir,
) as session,
MetricsLogger(session, batch_size=50) as metrics,
EventLogger(session, batch_size=20) as events,
):
metrics.log("train/loss", 1.42, step=0, epoch=0)
metrics.log("optimizer/lr", 3e-4, step=0, epoch=0)
events.log(
"checkpoint_saved",
step=0,
epoch=0,
payload={"path": "checkpoints/step-0.pt"},
)
session.add_log("Warmup completed without instability.", identifier="Info")
print(session.run_id)
print(session.run_path)
Notes:
TrainingSessionrecords lifecycle events such asrun_started,run_resumed, andrun_ended.TrainingSession.start_run(...)acceptsproject, optionalconfig, optionaltags, optionalparent_run_id, optionalstorage_dir, andstrict_mode.- The equivalent top-level helper is
start_run(...). session.add_log(...)creates a persisteduser_logevent with anInfo,Warning, orErroridentifier.session.add_tag(...)persists an additional run tag and returns the refreshedRunContext.session.reload()reloads the run from the persistence service and returns the refreshedRunContext.session.to_dict()returns the full context payload, including project and run records.- The
TrainingSessioncontext manager marks the run ascompletedon success,failedon exceptions, andinterruptedon keyboard or system exits. - Runtime exceptions inside the session context are persisted as
runtime_errorlogs with theErroridentifier. Interruptions are persisted asruntime_interruptionlogs with theWarningidentifier. - Supported terminal statuses are
completed,failed, andinterrupted.
Structured Logs And Runtime Errors
Use session.add_log(...) for immediate run-level messages and EventLogger.add_log(...) when you want batched log writes alongside other events.
with TrainingSession.start_run(project="classifier", storage_dir=storage_dir) as session:
session.add_log("Loaded checkpoint from step 1200.", identifier="Info")
with EventLogger(session, batch_size=20) as events:
events.add_log(
"Validation loss increased for three checks.",
identifier="Warning",
step=1280,
payload={"window": 3},
)
train_one_epoch()
If train_one_epoch() raises, the session context writes a runtime_error event with identifier: "Error" and then ends the run as failed. If a KeyboardInterrupt or SystemExit leaves the context, it writes a runtime_interruption event with identifier: "Warning" and ends the run as interrupted.
EventLogger.log(...) accepts custom event types. If severity is omitted, common event types receive sensible defaults. Supported severity values are:
debuginfowarningerrorcritical
Event types must start with a lowercase letter and may contain lowercase letters, digits, _, ., /, or -.
add_log(...) identifiers are Info, Warning, or Error; log payloads may not define reserved keys named message or identifier.
EventLogger.read(...) flushes pending events and accepts event_type, event_types, severity, step, step_min, step_max, epoch, epoch_min, epoch_max, and limit.
2. Full PyTorch Training Loop Instrumentation
Use this pattern when you want scalar metrics, structured events, runtime telemetry, and layer summaries from the same training loop.
Assumptions:
model,optimizer,criterion,train_loader,device, andnum_epochsalready exist.train_loadersupportslen(train_loader). If it does not, replacelen(train_loader)with your own step counter logic.
from pathlib import Path
from quark import (
EventLogger,
HookEngine,
HookEngineConfig,
HookScheduleConfig,
LayerFilterConfig,
MetricsLogger,
ParameterDiagnostics,
ParameterDiagnosticsConfig,
SystemTelemetry,
SystemTelemetryConfig,
TrainingSession,
)
storage_dir = Path("quark-runs").resolve()
hook_config = HookEngineConfig(
schedule=HookScheduleConfig(every_n_steps=25),
filters=LayerFilterConfig(
include_types=("Linear", "Conv2d", "LayerNorm", "ReLU"),
),
)
parameter_config = ParameterDiagnosticsConfig(
schedule=HookScheduleConfig(every_n_steps=50),
capture_distributions=True,
distribution_types=("value", "gradient", "update"),
)
with (
TrainingSession.start_run(
project="image-classifier",
config={"model": "resnet18", "epochs": num_epochs},
tags=["experiment", "pytorch"],
storage_dir=storage_dir,
) as session,
MetricsLogger(session, batch_size=100) as metrics,
EventLogger(session, batch_size=50) as events,
SystemTelemetry(
session,
config=SystemTelemetryConfig(sample_every_n_steps=10),
) as telemetry,
HookEngine(session, model, optimizer, config=hook_config) as hooks,
ParameterDiagnostics(session, model, optimizer, config=parameter_config) as parameters,
):
for epoch in range(num_epochs):
train_iter = iter(train_loader)
for step_in_epoch in range(len(train_loader)):
global_step = (epoch * len(train_loader)) + step_in_epoch
with telemetry.start_step(
global_step,
epoch=epoch,
metadata={"split": "train"},
) as step_scope:
inputs, targets = next(train_iter)
inputs = inputs.to(device)
targets = targets.to(device)
step_scope.mark_data_ready()
optimizer.zero_grad(set_to_none=True)
with hooks.step(global_step, epoch=epoch) as probe_active:
logits = model(inputs)
loss = criterion(logits, targets)
loss.backward()
optimizer.step()
parameters.snapshot(global_step, epoch=epoch, metadata={"split": "train"})
metrics.log("train/loss", float(loss.item()), step=global_step, epoch=epoch)
metrics.log(
"optimizer/lr",
float(optimizer.param_groups[0]["lr"]),
step=global_step,
epoch=epoch,
)
if global_step % 100 == 0:
events.log(
"checkpoint_saved",
step=global_step,
epoch=epoch,
payload={"path": f"checkpoints/step-{global_step}.pt"},
)
if probe_active:
events.log(
"probe_completed",
step=global_step,
epoch=epoch,
payload={
"instrumented_layers": hooks.instrumented_module_names,
},
)
Notes:
HookEngineonly probes steps allowed byHookScheduleConfig.SystemTelemetrystores telemetry records and also mirrors timing values into scalar metrics such assystem/runtime/step_time_ms.MetricsLoggercan log both conventional keys and project-specific custom metrics in the same run.LayerFilterConfiglets you keep instrumentation focused on the layers you actually care about.
Hook Configuration Details
HookScheduleConfig controls when probes are active:
every_n_steps: probe cadence. Set toNoneto disable cadence-based probing.start_step: first eligible cadence step.stop_step: last eligible cadence step.explicit_steps: exact steps that should always be probed.
LayerFilterConfig controls which modules receive hooks:
include_namesandexclude_namesmatch module names frommodel.named_modules().include_typesandexclude_typesmatch either a class name such asLinearor a fully qualified type name.include_name_patternsandexclude_name_patternsare regular expressions matched against module names.leaf_modules_only=Trueskips container modules and instruments only leaves.
HookEngineConfig controls probe contents and failure behavior:
capture_forwardrecords activation summaries.capture_backwardrecords gradient summaries.batch_sizecontrols buffered layer-summary writes.raise_on_erroroverrides sessionstrict_modefor hook failures.dead_unit_epsiloncontrols how close to zero an activation must be to count as dead.saturation_thresholdcontrols saturation-ratio detection. Set it toNoneto disable saturation ratios.saturation_module_typesdefaults to sigmoid/tanh-like activation modules.
Layer summaries include module_name, module_type, summary_type, hook_direction, call_index, element_count, optional tensor_shape, mean, std, norm, variance, dead_unit_ratio, saturation_ratio, and optional metadata.
hooks.read(...) flushes pending summaries and accepts module_name, module_type, summary_type, hook_direction, step filters, epoch filters, and limit.
Parameter Diagnostics Details
ParameterDiagnostics samples model parameters separately from hook outputs. It writes ParameterSummaryRecord entries for weights, biases, gradients, and optimizer update deltas, and can also write ParameterDistributionRecord histogram bins without storing raw tensors.
ParameterDiagnosticsConfig controls sampling and histogram cost:
schedule: reuseHookScheduleConfigcadence fields for parameter snapshots.capture_summaries: enable first-class parameter summary records.capture_distributions: enable optional histogram records.distribution_types: choosevalue,gradient, and/orupdatehistograms.histogram_binsandmax_histogram_values: control histogram resolution and sampling cost.include_parameter_patterns,exclude_parameter_patterns, andinclude_parameter_roles: keep snapshots focused.
parameters.read_summaries(...) and parameters.read_distributions(...) flush pending records and accept module, parameter, step, epoch, latest_only, and limit filters. Distribution reads also accept distribution_type.
System Telemetry Details
SystemTelemetryConfig fields:
sample_every_n_steps: cadence forrecord_step(...)andstart_step(...)output.emit_scalar_metrics: mirror supported runtime fields into scalar metrics.capture_cpu: enable CPU/RAM/process memory sampling.capture_gpu: enable GPU sampling.gpu_index: choose a GPU index when GPU sampling is available.
start_step(...) is the easiest timing API. Call mark_data_ready() after the batch has moved to the device; exiting the scope calls finish() automatically.
with telemetry.start_step(step, epoch=epoch) as scope:
batch = next(train_iter)
inputs = batch["inputs"].to(device)
targets = batch["targets"].to(device)
scope.mark_data_ready()
loss = train_one_step(inputs, targets)
# Optional explicit flags:
telemetry.record_step(
step,
epoch=epoch,
step_time_ms=37.5,
overflow_occurred=found_inf,
skipped_step=skipped_optimizer_step,
metadata={"split": "train"},
)
telemetry.sample(...) captures point-in-time CPU/GPU/RAM state. If you provide step, the record boundary is step; otherwise it is time.
Mirrored scalar metric keys currently include:
system/runtime/step_time_mssystem/runtime/data_time_mssystem/runtime/compute_time_mssystem/cpu/usage_percentsystem/ram/usage_percentsystem/ram/used_bytessystem/process/rss_bytessystem/gpu/utilization_percentsystem/gpu/memory_used_bytessystem/gpu/memory_usage_percentsystem/runtime/overflow_countsystem/runtime/skipped_step_count
telemetry.read(...) flushes pending telemetry and accepts boundary, step filters, epoch filters, and limit.
3. Capture Artifacts
Use ArtifactStore when you want to persist non-scalar outputs such as histograms, checkpoint metadata, or custom JSON payloads.
from pathlib import Path
from quark import ArtifactStore, TrainingSession
storage_dir = Path("quark-runs").resolve()
with (
TrainingSession.start_run(
project="artifact-demo",
storage_dir=storage_dir,
) as session,
ArtifactStore(session) as artifacts,
):
histogram = artifacts.log_histogram(
"activation",
bins=[-1.0, 0.0, 1.0],
counts=[12, 4],
step=120,
epoch=2,
namespace="layers/encoder",
metadata={"module": "encoder.0"},
)
snapshot = artifacts.log_snapshot(
"warmup",
{
"checkpoint_path": "checkpoints/step-120.pt",
"checkpoint_size_bytes": 8192,
},
snapshot_type="checkpoint_metadata",
step=120,
epoch=2,
namespace="trainer",
metadata={"phase": "warmup"},
)
print(histogram.payload_relative_path)
print(snapshot.metadata_relative_path)
print(artifacts.load(snapshot))
Useful artifact types include:
histogramtensor_summaryembedding_snapshotbatch_diagnostics_snapshotcheckpoint_metadatasimilarity_analysis_outputalert_evidence_bundle
Artifact payload formats:
jsonfor JSON-serializable Python valuestextfor Python stringsbytesfor bytes-like payloads
If payload_format is omitted, ArtifactStore.store(...) infers text for strings, bytes for bytes-like objects, and json otherwise.
Artifact names follow the same namespacing style as metrics. Each segment must start with a letter or digit and may contain letters, digits, ., _, or -. You can pass name="layers/encoder/activation" or name="activation", namespace="layers/encoder".
Generic artifact API:
record = artifacts.store(
"tensor_summary",
"layers/encoder/weights",
{"mean": 0.02, "std": 0.31},
step=120,
payload_format="json",
)
records = artifacts.read(
artifact_type="tensor_summary",
namespace="layers/encoder",
step_min=100,
)
payload = artifacts.load(record)
same_payload = artifacts.load(record.id)
artifacts.read(...) accepts artifact_id, artifact_type, artifact_types, namespace, artifact_name, artifact_key, payload_format, step filters, epoch filters, and limit.
Retention hooks can observe each stored artifact:
def keep_only_metadata(store, record):
if record.artifact_type == "checkpoint_metadata":
print(record.artifact_key, record.size_bytes)
with ArtifactStore(
session,
retention_hooks=(keep_only_metadata,),
raise_on_retention_error=False,
) as artifacts:
artifacts.log_snapshot("latest", {"checkpoint": "step-120.pt"})
If raise_on_retention_error is omitted, it follows the run's strict_mode.
4. Batch and Data Diagnostics
Use DataDiagnostics when you want batch-level loss tracking, class exposure summaries, sample traces, and hard-batch detection without storing raw samples.
If your dataloader yields mapping-like batches, MappingBatchMetadataAdapter is the easiest setup:
Assume train_loader yields mapping-like batches and train_step(batch) returns the scalar batch loss for that batch.
from pathlib import Path
from quark import (
DataDiagnostics,
DataDiagnosticsConfig,
MappingBatchMetadataAdapter,
TrainingSession,
)
storage_dir = Path("quark-runs").resolve()
adapter = MappingBatchMetadataAdapter(
sample_id_key="sample_ids",
label_key="labels",
sequence_length_key="lengths",
metadata_keys=("loader",),
)
with (
TrainingSession.start_run(
project="data-diagnostics-demo",
storage_dir=storage_dir,
) as session,
DataDiagnostics(
session,
config=DataDiagnosticsConfig(
batch_size=32,
adapter=adapter,
),
) as diagnostics,
):
for step, batch in enumerate(train_loader):
loss = train_step(batch)
diagnostics.log_batch(
float(loss),
step=step,
epoch=0,
batch=batch,
)
summary = diagnostics.summarize(hard_batch_limit=5)
exposure = diagnostics.summarize_exposure()
hard_batches = diagnostics.find_hard_batches(limit=5)
print(summary.loss_summary.to_dict() if summary.loss_summary else None)
print(exposure.to_dict())
print([finding.to_dict() for finding in hard_batches])
If your batch object is not mapping-like, implement BatchMetadataAdapter and return a BatchMetadata instance.
from quark import BatchMetadata, BatchMetadataAdapter
class TupleBatchAdapter(BatchMetadataAdapter):
def extract_batch_metadata(self, batch):
inputs, targets, sample_ids = batch
return BatchMetadata(
batch_size=len(sample_ids),
sample_ids=sample_ids,
class_counts=targets,
metadata={"source": "tuple_loader"},
)
You can also skip adapters and pass fields directly:
diagnostics.log_batch(
loss=float(loss),
step=step,
epoch=epoch,
batch_size=len(targets),
labels=targets,
sample_ids=batch_ids,
sequence_lengths=token_lengths,
metadata={"augmentation": "mixup"},
)
DataDiagnosticsConfig fields:
batch_size: buffered write size.adapter: optionalBatchMetadataAdapter.outlier_z_score_threshold: z-score threshold for loss and sequence-length outliers.outlier_iqr_multiplier: IQR multiplier for upper-fence outliers.hard_batch_percentile: percentile used as the high-loss hard-batch threshold.min_batches_for_outlier_detection: minimum records before outlier rules activate.
DataDiagnostics.read(...) accepts sample_id, step filters, epoch filters, and limit.
summarize(...), summarize_exposure(...), and find_hard_batches(...) accept step and epoch range filters.
Diagnostics records do not store raw samples. They store loss, optional batch size, optional sample IDs, class counts, sequence lengths, and JSON metadata.
5. Analyze a Run
AnalyzerEngine turns persisted metrics and layer summaries into alerts and health scores.
PhaseAnalyzer segments a run into training regimes such as rapid learning, stabilization, stagnation, overfitting onset, and divergence.
Run this after the current session already has persisted metrics, and preferably layer summaries as well.
from quark import AnalyzerEngine, PhaseAnalyzer
analyzer = AnalyzerEngine(session)
analysis = analyzer.analyze()
phase_analyzer = PhaseAnalyzer(session)
phase_result = phase_analyzer.analyze()
print([alert.to_dict() for alert in analysis.alerts])
print([score.to_dict() for score in analysis.health_scores])
print([phase.to_dict() for phase in phase_result.phases])
runtime_alerts = analyzer.read_alerts(category="runtime")
overfitting_phases = phase_analyzer.read_phases(phase_name="overfitting_onset")
Notes:
AnalyzerEnginecan operate on metrics alone, but some alerts become much more useful when layer summaries are available.PhaseAnalyzerexpects either training and validation loss metrics or training and validation accuracy metrics.analyze(replace_existing=False)appends outputs instead of replacing prior analyzer or phase outputs.AnalyzerEngineConfig.replace_existing_outputsandPhaseAnalyzerConfig.replace_existing_outputsset the default replacement behavior.
Analyzer alert rule IDs currently include:
exploding_gradientsvanishing_gradientsstagnant_layersactivation_collapseoverfitting_onsetruntime_bottleneck_risk
Analyzer categories are optimization, representation, generalization, and runtime. Severities are info, warning, error, and critical.
Health scores are produced for optimization, representation, generalization, runtime, and overall. Scores are 0 to 100 with statuses healthy, watch, degraded, or critical.
Thresholds are configurable:
from quark import AnalyzerEngine, AnalyzerEngineConfig, AnalyzerThresholds
analyzer = AnalyzerEngine(
session,
config=AnalyzerEngineConfig(
thresholds=AnalyzerThresholds(
exploding_gradient_warning_norm=20.0,
runtime_warning_step_time_ms=400.0,
overfitting_warning_gap=0.15,
),
replace_existing_outputs=True,
),
)
Phase analyzer outputs use these phase names:
initialization_adaptationrapid_learningstabilizationstagnationoverfitting_onsetdivergence
Phase thresholds are also configurable:
from quark import PhaseAnalyzer, PhaseAnalyzerConfig, PhaseAnalyzerThresholds
phase_analyzer = PhaseAnalyzer(
session,
config=PhaseAnalyzerConfig(
thresholds=PhaseAnalyzerThresholds(
initialization_points=3,
rapid_learning_min_relative_change=0.12,
stagnation_max_relative_change=0.005,
),
),
)
6. Compare Runs and Baselines
Use ComparisonEngine when you want structured diffs across metrics, alerts, phases, and layer summaries.
Assume candidate_session and baseline_session already refer to runs with persisted telemetry and analysis outputs.
from quark import (
ComparisonEngine,
ComparisonEngineConfig,
MetricTargetSpec,
)
comparison = ComparisonEngine(
candidate_session,
config=ComparisonEngineConfig(
metric_targets=(
MetricTargetSpec(
metric_key="val/accuracy",
target_value=0.80,
direction="at_least",
),
),
),
).compare_to_baseline(baseline_session.run_id)
print(comparison.mode)
print(comparison.primary.to_dict())
print(comparison.reference.to_dict())
print([item.to_dict() for item in comparison.metric_diffs])
print(comparison.alert_diff.to_dict())
print([item.to_dict() for item in comparison.phase_diffs])
print([item.to_dict() for item in comparison.layer_summary_diffs])
For direct pairwise comparisons, use:
comparison = ComparisonEngine(primary_session).compare(reference_session)
You can also compare in one call:
comparison = ComparisonEngine.compare_runs(
primary_session,
baseline_session,
baseline=True,
config=ComparisonEngineConfig(
metric_targets=(
MetricTargetSpec("val/loss", 0.5, "at_most"),
),
),
)
Comparison targets can be a TrainingSession, a RunContext, or a run ID string. When you pass a run ID string, the comparison engine loads it from the primary run's storage_dir.
RunComparisonResult.to_dict() includes:
schema_versiongenerated_atmode:pairwiseorbaselineprimaryandreferencerun metadatametric_diffsalert_diffphase_diffslayer_summary_diffs
MetricTargetSpec.direction is either at_least or at_most, and enables time_to_target_step diffs for that metric.
7. Resume Interrupted Runs
Use resume_run when you are intentionally continuing a previously interrupted run.
from pathlib import Path
from quark import TrainingSession
storage_dir = Path("quark-runs").resolve()
session = TrainingSession.start_run(
project="resume-demo",
storage_dir=storage_dir,
)
session.end_run(status="interrupted")
resumed = TrainingSession.resume_run(
run_id=session.run_id,
storage_dir=storage_dir,
)
resumed.add_tag("recovered")
resumed.add_log("Recovered after a scheduled maintenance window.", identifier="Info")
resumed.end_run(status="completed")
Important:
resume_run(...)changes lifecycle state and emits arun_resumedevent.- Use it for continuation, not just read-only inspection.
TrainingSession.resume_run(...)and top-levelresume_run(...)acceptrun_idand optionalstorage_dir.RunNotFoundErroris raised when the service cannot find the requested run.InvalidRunStateErroris raised when the requested lifecycle transition is not allowed.
8. Records, Schemas, and Validation
The SDK returns dataclass records for persisted data. Each record has to_dict() for JSON-friendly output.
Core records:
| Record | Purpose | Important fields |
|---|---|---|
ProjectRecord |
Project identity | id, name, root_path, timestamps |
RunRecord |
Run lifecycle and lineage | id, project_id, project_name, status, parent_run_id, lineage_root_run_id, resume_count, run_path, git_commit, strict_mode, tags |
RunContext |
Shared session context | storage_dir, store_root, db_path, project, run; properties mirror project/run IDs and status |
MetricRecord |
Scalar metric sample | metric_key, namespace, metric_name, value, step, epoch, description, metadata |
EventRecord |
Structured event/log | event_type, severity, step, epoch, status, payload |
SystemTelemetryRecord |
Runtime telemetry | timing fields, CPU/RAM fields, GPU fields, overflow/skipped-step counters, metadata |
LayerSummaryRecord |
Hook summary | module identity, activation/gradient direction, tensor statistics, dead/saturation ratios |
ParameterSummaryRecord |
Parameter summary | module and parameter identity, shape/dtype, value/gradient/update statistics, optimizer group |
ParameterDistributionRecord |
Parameter histogram | module and parameter identity, distribution type, bin edges, counts, sample/total counts |
BatchDiagnosticRecord |
Batch metadata | loss, batch_size, sample_ids, class_counts, sequence_lengths, metadata |
ArtifactRecord |
Artifact metadata | artifact identity, payload format, paths, size, SHA-256, step/epoch |
EvidenceRecord |
Analyzer evidence | kind, source, summary, observed/threshold values, step range |
AlertRecord |
Analyzer alert | rule_id, category, severity, title, summary, fact, inference, evidence |
HealthScoreRecord |
Category score | score_key, category, score, status, evidence, contributing alert IDs |
PhaseRecord |
Training phase segment | phase_name, display_name, summary, step_start, step_end, evidence |
Supported literal values:
- Run statuses:
running,completed,failed,interrupted - Event severities:
debug,info,warning,error,critical - Alert severities:
info,warning,error,critical - Alert categories:
optimization,representation,generalization,runtime - Health score keys:
overall,optimization,representation,generalization,runtime - Telemetry boundaries:
step,time - Layer summary types:
activation,gradient - Hook directions:
forward,backward - Parameter roles:
weight,bias,other - Parameter distribution types:
value,gradient,update - Artifact payload formats:
json,text,bytes
Schema helpers exported by quark:
from quark import CURRENT_SCHEMA_VERSION, dump_record, migrate_store, validate_record
validate_record(metric_record)
payload = dump_record(metric_record)
version = migrate_store(storage_dir, generated_at="2026-05-07T00:00:00Z")
print(CURRENT_SCHEMA_VERSION, version, payload)
Use validate_record(...) for SDK record dataclasses before sending data to custom tooling.
Use dump_record(...) when you need a validated dictionary payload.
Use migrate_store(...) only for legacy local .quark stores that predate the current schema. It returns the current schema version or raises SchemaMigrationError.
Schema exceptions:
SchemaValidationError: a record or snapshot does not match the current schema.SchemaMigrationError: a local store cannot be migrated safely.
9. Persistence Service
Quark run data is written through a hosted persistence service.
Before running a training job, set QUARK_PERSISTENCE_URL to the service URL
provided by your Quark deployment or project documentation, and set
QUARK_USER_KEY to your Quark user key:
$env:QUARK_PERSISTENCE_URL = "https://quark-persistence.example.com"
$env:QUARK_USER_KEY = "qk_..."
$env:QUARK_PERSISTENCE_TIMEOUT_SECONDS = "30"
python train.py
Notes:
QUARK_PERSISTENCE_URLshould be set explicitly for normal use.QUARK_USER_KEYis required for authenticated access to the service.QUARK_PERSISTENCE_TIMEOUT_SECONDSdefaults to15.0.- If
QUARK_PERSISTENCE_URLis missing, the user key is invalid, or the service is unreachable, SDK write and read calls raiseSessionStoreError.
Catch service errors like this:
from quark import InvalidRunStateError, RunNotFoundError, SessionStoreError, TrainingSession
try:
session = TrainingSession.resume_run(run_id=run_id, storage_dir=storage_dir)
except RunNotFoundError:
raise SystemExit(f"Run does not exist: {run_id}")
except InvalidRunStateError as exc:
raise SystemExit(f"Cannot resume this run: {exc}")
except SessionStoreError as exc:
raise SystemExit(f"Persistence service error: {exc}")
10. Constructor and Method Reference
Session entrypoints:
TrainingSession.start_run(
*,
project: str,
config: dict | None = None,
tags: Iterable[str] | None = None,
parent_run_id: str | None = None,
storage_dir: str | Path | None = None,
strict_mode: bool = False,
) -> TrainingSession
TrainingSession.resume_run(*, run_id: str, storage_dir: str | Path | None = None)
session.reload() -> RunContext
session.add_tag(tag: str) -> RunContext
session.add_log(message: str, *, identifier: str | None = "Info", step: int | None = None, epoch: int | None = None, status: str | None = None, payload: Mapping | None = None, timestamp: str | None = None, event_type: str = "user_log") -> EventRecord
session.end_run(status: str = "completed") -> RunContext
session.to_dict() -> dict
Writer and reader entrypoints:
MetricsLogger(session, *, batch_size: int = 100)
metrics.log(metric: str, value: float, *, step: int | None = None, epoch: int | None = None, namespace: str | None = None, description: str | None = None, metadata: Mapping | None = None, timestamp: str | None = None) -> MetricRecord
metrics.flush() -> tuple[MetricRecord, ...]
metrics.read(...filters...) -> tuple[MetricRecord, ...]
metrics.close() -> None
EventLogger(session, *, batch_size: int = 100)
events.log(event_type: str, *, severity: str | None = None, step: int | None = None, epoch: int | None = None, status: str | None = None, payload: Mapping | None = None, timestamp: str | None = None) -> EventRecord
events.add_log(message: str, *, identifier: str | None = "Info", step: int | None = None, epoch: int | None = None, status: str | None = None, payload: Mapping | None = None, timestamp: str | None = None, event_type: str = "user_log") -> EventRecord
events.flush() -> tuple[EventRecord, ...]
events.read(...filters...) -> tuple[EventRecord, ...]
events.close() -> None
SystemTelemetry(session, *, config: SystemTelemetryConfig | None = None, batch_size: int = 20)
telemetry.start_step(step: int, *, epoch: int | None = None, metadata: Mapping | None = None) -> StepTelemetryScope
telemetry.sample(*, step: int | None = None, epoch: int | None = None, metadata: Mapping | None = None, timestamp: str | None = None) -> SystemTelemetryRecord
telemetry.record_step(step: int, *, epoch: int | None = None, step_time_ms: float | None = None, data_time_ms: float | None = None, compute_time_ms: float | None = None, overflow_occurred: bool = False, skipped_step: bool = False, metadata: Mapping | None = None, timestamp: str | None = None) -> SystemTelemetryRecord | None
telemetry.flush() -> tuple[SystemTelemetryRecord, ...]
telemetry.read(...filters...) -> tuple[SystemTelemetryRecord, ...]
telemetry.close() -> None
HookEngine(session, model, optimizer=None, *, config: HookEngineConfig | None = None)
hooks.should_probe(step: int) -> bool
hooks.begin_step(step: int, *, epoch: int | None = None) -> bool
hooks.end_step() -> None
hooks.step(step: int, *, epoch: int | None = None) -> context manager yielding bool
hooks.flush() -> tuple[LayerSummaryRecord, ...]
hooks.read(...filters...) -> tuple[LayerSummaryRecord, ...]
hooks.close() -> None
ParameterDiagnostics(session, model, optimizer=None, *, config: ParameterDiagnosticsConfig | None = None)
parameters.snapshot(step: int, *, epoch: int | None = None, metadata: Mapping | None = None) -> tuple[tuple[ParameterSummaryRecord, ...], tuple[ParameterDistributionRecord, ...]]
parameters.flush() -> tuple[tuple[ParameterSummaryRecord, ...], tuple[ParameterDistributionRecord, ...]]
parameters.read_summaries(...filters...) -> tuple[ParameterSummaryRecord, ...]
parameters.read_distributions(...filters...) -> tuple[ParameterDistributionRecord, ...]
parameters.read() -> tuple[tuple[ParameterSummaryRecord, ...], tuple[ParameterDistributionRecord, ...]]
parameters.close() -> None
ArtifactStore(session, *, retention_hooks: Sequence[Callable] | None = None, raise_on_retention_error: bool | None = None)
artifacts.store(artifact_type: str, name: str, payload: Any, *, step: int | None = None, epoch: int | None = None, namespace: str | None = None, payload_format: str | None = None, metadata: Mapping | None = None, timestamp: str | None = None) -> ArtifactRecord
artifacts.log_histogram(name: str, *, bins: Sequence[float], counts: Sequence[float], step: int | None = None, epoch: int | None = None, namespace: str | None = None, metadata: Mapping | None = None, timestamp: str | None = None) -> ArtifactRecord
artifacts.log_snapshot(name: str, payload: Any, *, snapshot_type: str = "batch_diagnostics_snapshot", step: int | None = None, epoch: int | None = None, namespace: str | None = None, payload_format: str | None = None, metadata: Mapping | None = None, timestamp: str | None = None) -> ArtifactRecord
artifacts.read(...filters...) -> tuple[ArtifactRecord, ...]
artifacts.load(artifact: str | ArtifactRecord) -> Any
artifacts.close() -> None
DataDiagnostics(session, *, config: DataDiagnosticsConfig | None = None)
diagnostics.log_batch(loss: float, *, step: int | None = None, epoch: int | None = None, batch: Any | None = None, batch_size: int | None = None, labels: Any | None = None, sample_ids: Any | None = None, sequence_lengths: Any | None = None, metadata: Mapping | None = None, timestamp: str | None = None) -> BatchDiagnosticRecord
diagnostics.flush() -> tuple[BatchDiagnosticRecord, ...]
diagnostics.read(...filters...) -> tuple[BatchDiagnosticRecord, ...]
diagnostics.summarize_exposure(...range filters...) -> ExposureSummary
diagnostics.find_hard_batches(...range filters..., limit: int | None = 10) -> tuple[HardBatchFinding, ...]
diagnostics.summarize(...range filters..., hard_batch_limit: int | None = 10) -> DataDiagnosticsSummary
diagnostics.close() -> None
Analyzer and comparison entrypoints:
AnalyzerEngine(session, *, config: AnalyzerEngineConfig | None = None)
analyzer.analyze(*, replace_existing: bool | None = None) -> AnalyzerResult
analyzer.read_alerts(*, rule_id: str | None = None, category: str | None = None, severity: str | None = None, step: int | None = None, step_min: int | None = None, step_max: int | None = None, limit: int | None = None)
analyzer.read_health_scores(*, score_key: str | None = None, category: str | None = None, limit: int | None = None)
PhaseAnalyzer(session, *, config: PhaseAnalyzerConfig | None = None)
phase_analyzer.analyze(*, replace_existing: bool | None = None) -> PhaseAnalysisResult
phase_analyzer.read_phases(*, phase_name: str | None = None, step: int | None = None, step_min: int | None = None, step_max: int | None = None, limit: int | None = None)
ComparisonEngine(session, *, config: ComparisonEngineConfig | None = None)
comparison.compare(other, *, baseline: bool = False) -> RunComparisonResult
comparison.compare_to_baseline(baseline_run) -> RunComparisonResult
ComparisonEngine.compare_runs(primary, reference, *, baseline: bool = False, config: ComparisonEngineConfig | None = None) -> RunComparisonResult
Configuration defaults:
| Config | Fields |
|---|---|
SystemTelemetryConfig |
sample_every_n_steps=1, emit_scalar_metrics=True, capture_cpu=True, capture_gpu=True, gpu_index=None |
HookScheduleConfig |
every_n_steps=1, start_step=0, stop_step=None, explicit_steps=() |
LayerFilterConfig |
include_names=(), exclude_names=(), include_types=(), exclude_types=(), include_name_patterns=(), exclude_name_patterns=(), leaf_modules_only=True |
HookEngineConfig |
schedule=HookScheduleConfig(), filters=LayerFilterConfig(), capture_forward=True, capture_backward=True, batch_size=100, raise_on_error=None, dead_unit_epsilon=0.0, saturation_threshold=0.95, saturation_module_types=("Sigmoid", "Tanh", "Hardsigmoid", "Hardtanh", "ReLU6") |
ParameterDiagnosticsConfig |
schedule=HookScheduleConfig(), batch_size=100, capture_summaries=True, capture_distributions=False, distribution_types=("value",), histogram_bins=32, near_zero_epsilon=1e-8, max_histogram_values=100000, parameter include/exclude filters |
DataDiagnosticsConfig |
batch_size=100, adapter=None, outlier_z_score_threshold=2.5, outlier_iqr_multiplier=1.5, hard_batch_percentile=0.9, min_batches_for_outlier_detection=5 |
AnalyzerEngineConfig |
thresholds=AnalyzerThresholds(), replace_existing_outputs=True, timing metric key tuples for step/data/compute time |
PhaseAnalyzerConfig |
thresholds=PhaseAnalyzerThresholds(), replace_existing_outputs=True, metric key tuples for train/validation loss and accuracy |
ComparisonEngineConfig |
metric_targets=() |
Default analyzer metric keys:
- Step time:
system/runtime/step_time_ms,system/step_time_ms - Data time:
system/runtime/data_time_ms,system/data_time_ms - Compute time:
system/runtime/compute_time_ms,system/compute_time_ms
Default phase metric keys:
- Train loss:
train/loss,training/loss - Validation loss:
val/loss,validation/loss - Train accuracy:
train/accuracy,training/accuracy,train/acc - Validation accuracy:
val/accuracy,validation/accuracy,val/acc
Default AnalyzerThresholds values:
AnalyzerThresholds(
exploding_gradient_warning_norm=10.0,
exploding_gradient_error_norm=25.0,
exploding_gradient_critical_norm=50.0,
vanishing_gradient_norm=1e-4,
vanishing_gradient_warning_ratio=0.5,
vanishing_gradient_error_ratio=0.75,
vanishing_gradient_critical_ratio=0.9,
vanishing_gradient_min_samples=4,
stagnant_layer_gradient_norm=1e-4,
stagnant_layer_warning_probes=2,
stagnant_layer_error_probes=4,
stagnant_layer_critical_probes=6,
activation_collapse_warning_dead_unit_ratio=0.8,
activation_collapse_error_dead_unit_ratio=0.92,
activation_collapse_critical_dead_unit_ratio=0.98,
activation_collapse_warning_saturation_ratio=0.9,
activation_collapse_error_saturation_ratio=0.97,
activation_collapse_critical_saturation_ratio=0.995,
activation_collapse_warning_std=0.05,
activation_collapse_error_std=0.01,
activation_collapse_critical_std=0.001,
overfitting_warning_gap=0.2,
overfitting_error_gap=0.4,
overfitting_critical_gap=0.7,
overfitting_min_points=3,
runtime_warning_step_time_ms=250.0,
runtime_error_step_time_ms=500.0,
runtime_critical_step_time_ms=1000.0,
runtime_warning_data_fraction=0.4,
runtime_error_data_fraction=0.6,
runtime_critical_data_fraction=0.75,
runtime_warning_jitter_ratio=0.25,
runtime_error_jitter_ratio=0.5,
runtime_critical_jitter_ratio=0.75,
)
Default PhaseAnalyzerThresholds values:
PhaseAnalyzerThresholds(
initialization_points=2,
rapid_learning_min_relative_change=0.18,
stagnation_max_relative_change=0.01,
overfitting_min_gap=0.12,
overfitting_min_train_relative_change=0.015,
overfitting_min_validation_regression=0.015,
divergence_min_relative_regression=0.15,
divergence_from_best_ratio=0.25,
)
11. Practical Notes
- All major SDK modules accept a
TrainingSessionor aRunContextcreated by the Quark SDK. MetricsLogger,EventLogger,SystemTelemetry,HookEngine,DataDiagnostics, andArtifactStoreall support context-manager usage.- Buffered modules flush automatically on
close()and before mostread(...)calls, but explicit context management is the safest pattern for long-running jobs. strict_mode=TrueonTrainingSession.start_run(...)is useful when you want instrumentation failures to fail fast instead of degrading gracefully.AnalyzerEnginepersists alerts and health scores for UI surfaces such as top alerts;TrainingSessionpersists lifecycle activity.session.run_id,session.run_path,session.project_name, andsession.statusare the main lifecycle properties most projects need.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file abiotic_quark-0.2.1.tar.gz.
File metadata
- Download URL: abiotic_quark-0.2.1.tar.gz
- Upload date:
- Size: 137.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4572dfdc54e1b0514e9653354c5747d91183ce8905cedecf5aed831e8b686481
|
|
| MD5 |
0160aee821b9ea1a51f4fb4c3d8bfb9d
|
|
| BLAKE2b-256 |
62ff604a2686f3d23e970947ca8d0350395d7463d96e885c354d3ea363f2cc98
|
File details
Details for the file abiotic_quark-0.2.1-py3-none-any.whl.
File metadata
- Download URL: abiotic_quark-0.2.1-py3-none-any.whl
- Upload date:
- Size: 103.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
91ad14e5fde0dd32662a3fc8cd5fe992834608aa0fca5a83d01e3a16c33986d1
|
|
| MD5 |
e4cda70789f60448105569e42d2c59d7
|
|
| BLAKE2b-256 |
de7bc36b1571b19240f725c381e2c6517ebfc0acdd7eff95d35335e56e13ea24
|