Cardiac Shared

Shared utilities for cardiac imaging analysis projects.
Version: 0.7.0 | PyPI: https://pypi.org/project/cardiac-shared/
Installation
# Install from PyPI (recommended)
pip install cardiac-shared
# Install with optional dependencies
pip install cardiac-shared[all] # All optional deps
pip install cardiac-shared[dicom] # DICOM support
pip install cardiac-shared[nifti] # NIfTI support
pip install cardiac-shared[gpu] # GPU/PyTorch support
Modules
IO Module
| Function |
Description |
read_dicom_series(path) |
Read DICOM series from directory |
get_dicom_metadata(ds) |
Extract metadata from DICOM dataset |
load_nifti(path) |
Load NIfTI file with metadata |
save_nifti(volume, path) |
Save numpy array as NIfTI |
extract_zip(path) |
Context manager for ZIP extraction |
find_dicom_root(path) |
Find DICOM directory in extracted ZIP |
AsyncNiftiPreloader |
Background NIfTI preloading (v0.5.1) |
preload_nifti_batch() |
Batch preloading convenience function (v0.5.1) |
Hardware Module
| Function |
Description |
detect_hardware() |
Detect complete hardware info (GPU/CPU/RAM) |
HardwareInfo |
Dataclass with GPU, CPU, RAM, environment info |
print_hardware_summary(hw) |
Print formatted hardware summary |
get_optimal_config(hw) |
Get optimal inference configuration |
CPUOptimizer |
CPU optimization for hospital deployments |
get_recommended_gpu_stabilization_time() |
Dynamic GPU wait time (v0.5.1) |
get_gpu_performance_tier() |
GPU tier classification (v0.5.1) |
Environment Module
| Function |
Description |
detect_runtime() |
Detect runtime environment |
RuntimeEnvironment |
Dataclass with environment info |
detect_colab() |
Check if running in Google Colab |
detect_wsl() |
Check if running in WSL |
Parallel Module (v0.3.0)
| Class/Function |
Description |
ParallelProcessor |
Unified parallel processing framework |
parallel_map() |
Quick parallel map without checkpoint |
parallel_map_with_checkpoint() |
Parallel map with resume support |
ProcessingResult |
Result dataclass for each processed item |
Checkpoint |
Checkpoint data for resume capability |
Progress Module (v0.3.0)
| Class/Function |
Description |
ProgressTracker |
Multi-level progress visualization |
create_tracker() |
Create and start a progress tracker |
ProgressLevel |
Progress tracking for a single level |
Cache Module (v0.3.0)
| Class |
Description |
CacheManager |
Multi-level caching with resume capability |
Batch Module (v0.3.0)
| Class |
Description |
BatchProcessor |
Generic batch processing framework |
BatchConfig |
Batch processing configuration |
Config Module (v0.3.0)
| Class/Function |
Description |
ConfigManager |
YAML/JSON configuration management |
load_config() |
Load configuration with defaults |
Data Module (v0.4.0)
| Class/Function |
Description |
IntermediateResultsRegistry |
Cross-project data discovery and sharing |
RegistryEntry |
Dataclass for registry entries |
get_registry() |
Get singleton registry instance |
Data Sources Module (v0.5.0)
| Class/Function |
Description |
DataSourceManager |
Multi-source data management (ZAL/CHD/Normal/Custom) |
DataSource |
Configuration for a single data source |
DataSourceStatus |
Status check result for a data source |
get_source() |
Get data source from default manager |
list_sources() |
List all configured data sources |
Vertebra Module (v0.5.0)
| Class/Function |
Description |
VertebraDetector |
Detect and analyze vertebrae from TotalSegmentator output |
VertebraInfo |
Vertebra metadata (center slice, volume, etc.) |
VertebraROI |
Region of interest around a vertebra |
parse_vertebrae() |
Parse vertebra names from labels directory |
sort_vertebrae() |
Sort vertebrae cranial to caudal |
Tissue Module (v0.5.0)
| Class/Function |
Description |
TissueClassifier |
Tissue-specific HU filtering (Alberta Protocol 2024) |
TissueMetrics |
Metrics for a tissue type (area, HU, quality) |
FilterStats |
Statistics from HU filtering |
filter_tissue() |
Filter tissue mask by HU range |
get_tissue_hu_range() |
Get HU range for tissue type |
TISSUE_HU_RANGES |
Standard HU ranges for all tissue types |
Batch Management Module (v0.6.0)
| Class/Function |
Description |
BatchManager |
Create and manage batch manifests |
BatchManifest |
Track patients, status, and consumers |
PatientEntry |
Individual patient processing record |
ConsumerRecord |
Module that consumed a batch |
create_nifti_batch() |
Create NIfTI batch manifest |
load_batch() |
Load existing batch manifest |
Batch Discovery Module (v0.6.4)
| Class/Function |
Description |
BatchDiscovery |
Discover and select from multiple processing batches |
BatchInfo |
Information about a discovered batch |
PatientBatchRecord |
Patient record within a batch |
discover_batches() |
Convenience function to discover batches |
list_batches() |
List all discovered batch IDs |
find_patient() |
Find a patient across all batches |
select_latest_batch() |
Select the latest batch matching criteria |
get_patient_coverage() |
Check coverage of patients across batches |
Dataset Registry Module (v0.7.0)
| Class/Function |
Description |
DatasetRegistry |
Unified registry for dataset definitions |
Dataset |
Dataset definition with patient count and metadata |
DatasetStatus |
Processing status (PLANNED, COMPLETED, VALIDATED) |
DatasetCategory |
Category (INTERNAL, EXTERNAL, FUTURE) |
get_dataset_registry() |
Get singleton registry instance |
get_dataset(id) |
Get dataset by ID (e.g., 'internal.chd') |
get_patient_count(id) |
Get patient count for a dataset |
list_datasets(category) |
List datasets, optionally by category |
print_dataset_summary() |
Print formatted dataset summary |
Authoritative Patient Counts:
- Internal: CHD 489 + Normal 277 = 766 patients
- NLST: Batch1-4 (108+92+402+255) = 857 patients
- COCA: Gated 444 + Nongated 213 = 657 patients
- TotalSegmentator: 1,228 patients (planned)
Preprocessing Module (v0.6.0)
| Class/Function |
Description |
DicomConverter |
Unified DICOM to NIfTI conversion |
ConversionResult |
Conversion result details |
convert_dicom_to_nifti() |
Simple conversion function |
SharedPreprocessingPipeline |
Multi-module preprocessing |
PreprocessingConfig |
Pipeline configuration |
PreprocessingResult |
Preprocessing result details |
create_pipeline() |
Create configured pipeline |
Usage Examples
Hardware Detection
from cardiac_shared import detect_hardware, detect_runtime
hw = detect_hardware()
print(f"GPU: {hw.gpu.device_name if hw.gpu.available else 'None'}")
print(f"CPU Cores: {hw.cpu.physical_cores}")
print(f"RAM: {hw.ram.total_gb:.1f} GB")
env = detect_runtime()
print(f"Runtime: {env.runtime_type}") # wsl, linux, windows, colab
Parallel Processing with Checkpoint
from cardiac_shared.parallel import ParallelProcessor
def process_patient(patient_id):
# Your processing logic
return {"id": patient_id, "status": "done"}
processor = ParallelProcessor(
max_workers=4,
checkpoint_file="results/checkpoint.json"
)
results = processor.map_with_checkpoint(
process_patient,
patient_list,
desc="Processing patients"
)
processor.print_summary(results)
Progress Tracking
from cardiac_shared.progress import ProgressTracker
tracker = ProgressTracker()
tracker.start_overall("Processing Pipeline", total=100)
for i, item in enumerate(items):
tracker.start_step(f"Step {i+1}", total=3)
tracker.update_substep("Loading data")
# ... load data
tracker.update_step_progress()
tracker.update_substep("Processing")
# ... process
tracker.update_step_progress()
tracker.complete_step()
tracker.update_overall(i + 1)
tracker.finish()
Cache Management
from cardiac_shared.cache import CacheManager
cache = CacheManager("results/cache.json")
for patient_id in patient_list:
if cache.is_completed(patient_id):
continue # Skip already processed
result = process_patient(patient_id)
cache.mark_completed(patient_id, result)
Configuration Management
from cardiac_shared.config import ConfigManager
config = ConfigManager("config/settings.yaml")
db_host = config.get("database.host", default="localhost")
config.set("processing.batch_size", 32)
config.save()
Intermediate Results Registry (v0.4.0)
from cardiac_shared.data import get_registry
# Get singleton registry instance
registry = get_registry()
# Check if TotalSegmentator results are available
if registry.exists('segmentation.totalsegmentator_organs.chd_v2'):
organs_path = registry.get_path('segmentation.totalsegmentator_organs.chd_v2')
heart_mask = organs_path / patient_id / 'heart.nii.gz'
# Get metadata
meta = registry.get_metadata('body_composition.vbca_stage1_labels.zal_v3.2')
print(f"Patient count: {meta.get('patient_count')}")
# List available results
available = registry.list_available('segmentation')
# Get usage suggestion for a project
suggestion = registry.suggest_input('pcfa', 'heart_masks', 'chd')
Data Source Management (v0.5.0)
from cardiac_shared.data_sources import DataSourceManager
# Load from project config
manager = DataSourceManager('/path/to/data_sources.yaml')
# Or use project auto-discovery
manager = DataSourceManager.from_project('vbca')
# Get data source
source = manager.get_source('zal')
print(f"Input: {source.input_dir}")
print(f"Files: {source.file_count()}")
# Get input files
for file in source.get_files(limit=10):
process(file)
# Check all sources status
manager.print_status()
Vertebra Detection (v0.5.0)
from cardiac_shared.vertebra import VertebraDetector, parse_vertebrae
import numpy as np
# Simple parsing
vertebrae = parse_vertebrae('/path/to/labels')
print(f"Found: {vertebrae}") # ['T10', 'T11', 'T12', 'L1']
# Full analysis
detector = VertebraDetector()
vertebrae_info = detector.find_vertebrae('/path/to/labels')
for v in vertebrae_info:
print(f"{v.name}: center slice {v.center_slice}")
# Get center slice from mask
mask = np.load('vertebrae_T12.npy')
center = detector.get_center_slice(mask)
Tissue Classification (v0.5.0)
from cardiac_shared.tissue import TissueClassifier, filter_tissue, TISSUE_HU_RANGES
import numpy as np
# Check HU ranges
print(TISSUE_HU_RANGES['skeletal_muscle']['range']) # (-29, 150)
# Filter tissue by HU
filtered_mask, stats = filter_tissue(ct_array, mask, 'skeletal_muscle')
print(f"Retention: {stats.retention_pct:.1f}%")
# Full metrics calculation
classifier = TissueClassifier()
metrics = classifier.calculate_metrics(
ct_array, mask, 'skeletal_muscle',
spacing=(1.0, 0.5, 0.5),
slice_idx=50 # Single slice
)
print(f"Area: {metrics.area_cm2:.1f} cm^2")
print(f"Mean HU: {metrics.mean_hu:.1f}")
print(f"Quality: {metrics.quality_grade}")
Batch Management (v0.6.0)
from cardiac_shared.data import BatchManager, create_nifti_batch
# Create batch manager
manager = BatchManager(output_dir="/data/nifti")
# Create batch for NIfTI conversion
manifest = manager.create_batch(
dataset_id="internal_chd_v1",
source_path="/data/dicom/chd",
provider="Dr. Chen",
)
# Check for existing conversion (deduplication)
existing = manager.find_existing_nifti("10022887", "internal_chd_v1")
if not existing:
# Process and register
manager.register_patient(
dataset_id="internal_chd_v1",
patient_id="10022887",
status="success",
output_file="10022887.nii.gz",
dimensions=[512, 512, 256]
)
# Track consumer modules
manager.register_consumer("internal_chd_v1", "pcfa", "pcfa_run_20260103")
Batch Discovery (v0.6.4)
from cardiac_shared.data import BatchDiscovery
# Discover all TotalSegmentator batches
discovery = BatchDiscovery("/data/totalsegmentator")
# List all available batches
for batch_id in discovery.list_batches(prefix="organs_chd"):
info = discovery.get_batch_info(batch_id)
print(f"{batch_id}: {info['total_patients']} patients, created {info['created_at']}")
# Select the latest batch for CHD
batch = discovery.select_latest_batch(prefix="organs_chd", require_success_count=100)
print(f"Selected: {batch.batch_id}")
# Find a patient across all batches
records = discovery.find_patient("10022887")
if records:
latest = records[0] # Most recent
heart_mask = latest.patient_path / "heart.nii.gz"
print(f"Found in {latest.batch_id}: {heart_mask}")
# Check coverage for a patient list
coverage = discovery.get_patient_coverage(patient_ids, batch_prefix="organs_chd")
print(f"Coverage: {coverage['coverage_rate']:.1f}% ({coverage['covered']}/{coverage['total']})")
Preprocessing Pipeline (v0.6.0)
from cardiac_shared.preprocessing import SharedPreprocessingPipeline, create_pipeline
# Create pipeline
pipeline = create_pipeline(
nifti_root="/data/nifti",
segmentation_root="/data/totalsegmentator",
totalsegmentator_fast=True,
)
# Ensure NIfTI exists (converts if needed)
result = pipeline.ensure_nifti("10022887", "internal_chd_v1", dicom_path)
print(f"NIfTI: {result.output_path}")
# Ensure TotalSegmentator results exist
result = pipeline.ensure_totalsegmentator("10022887", "internal_chd_v1")
print(f"Segmentation: {result.output_path}")
# Get masks for specific module (PCFA needs heart)
masks = pipeline.get_module_masks("10022887", "internal_chd_v1", "pcfa")
heart_mask = masks["heart"]
# Validate masks for a module
valid, missing = pipeline.validate_for_module("10022887", "internal_chd_v1", "pvat")
if not valid:
print(f"Missing masks: {missing}")
Projects Using This Package
- vbca - Vertebral Body Composition Analysis
- cardiac-ml-research - Main research project
- pcfa - Pericardial Fat Analysis
- ai-cac-research - CAC scoring research
Changelog
See CHANGELOG.md for full version history.
v0.6.4 (2026-01-04)
- NEW:
BatchDiscovery module for dynamic batch discovery and selection
BatchDiscovery: Scan directories and discover all batches with manifest.json
list_batches(): List all discovered batch IDs with filtering
find_patient(): Find a patient across all batches
select_latest_batch(): Select the latest batch matching criteria
get_patient_coverage(): Check how many patients are covered by existing batches
add_batch_directory(): Manually add batch even without manifest
- Use case: When running PCFA after VBCA, select specific batch version to use
v0.6.3 (2026-01-04)
- NEW: Registry-based auto-discovery in SharedPreprocessingPipeline
find_existing_segmentation(): Search registry/fallback for existing TotalSeg outputs
get_reuse_summary(): Analyze reuse potential for batch processing
ensure_totalsegmentator(): Now auto-checks registry before running TotalSegmentator
- New config options:
use_registry, registry_config_path, fallback_segmentation_paths
- Impact: PCFA can now automatically reuse VBCA's TotalSegmentator outputs
- Time savings: ~80s per patient when reusing existing segmentation
v0.6.2 (2026-01-04)
- Added
totalsegmentator_roi_subset parameter to PreprocessingConfig
- Enables TotalSegmentator
--roi_subset for single-organ segmentation
- Performance: 1.5-2x speedup for single-organ tasks (68s -> 43s on RTX 2060)
- PCFA results consistent (<0.5% difference vs full segmentation)
- 40-case validation test: 97.5% success rate
v0.6.1 (2026-01-03)
- Fix: Auto-detect TotalSegmentator executable path
- SharedPreprocessingPipeline now finds TotalSegmentator in Python env or PATH
- Added
totalsegmentator_path config option for custom paths
v0.6.0 (2026-01-03)
- Added
data/batch_manager.py (BatchManager, BatchManifest for batch tracking)
- Added
preprocessing/dicom_converter.py (DicomConverter for unified DICOM->NIfTI)
- Added
preprocessing/pipeline.py (SharedPreprocessingPipeline for multi-module preprocessing)
- Manifest-based batch tracking with consumer lineage
- Automatic deduplication for NIfTI conversion
- Module-specific mask requirements (PCFA, PVAT, VBCA, Chamber)
- 40 new unit tests (100% pass)
v0.5.1 (2026-01-03)
- Added
hardware/gpu_utils.py (GPU stabilization time optimization)
- Added
io/preloader.py (AsyncNiftiPreloader for background preloading)
- ~5-10% speedup for TotalSegmentator pipelines
- 38 new unit tests
v0.5.0 (2026-01-03)
- Added
data_sources module (DataSourceManager for ZAL/CHD/Normal/Custom)
- Added
vertebra module (VertebraDetector, ROI calculation)
- Added
tissue module (TissueClassifier, Alberta Protocol 2024 HU ranges)
- 44 new unit tests (100% pass)
v0.4.0 (2026-01-02)
- Added
data module (IntermediateResultsRegistry)
- Cross-project intermediate results discovery and sharing
- Automatic Windows/WSL path conversion
- Usage pattern suggestions per project
v0.3.0 (2026-01-02)
- Added
parallel module (ParallelProcessor, checkpoint/resume)
- Added
progress module (ProgressTracker, multi-level)
- Added
cache module (CacheManager)
- Added
batch module (BatchProcessor)
- Added
config module (ConfigManager)
- Published to PyPI
v0.2.0 (2026-01-02)
- Added
hardware module (detector, cpu_optimizer)
- Added
environment module (runtime_detector)
v0.1.0 (2025-12-01)
- Initial release with IO modules
License
MIT License - see LICENSE for details.