Skip to main content

Process-isolated plugin system with resource-aware scheduling, configuration validation, and lifecycle management for extensible Python applications.

Project description

cjm-plugin-system

Install

pip install cjm_plugin_system

Project Structure

nbs/
├── core/ (15)
│   ├── config.ipynb           # Project-level configuration for paths, runtime settings, and environment management
│   ├── config_store.ipynb     # Persistent storage for per-plugin configuration (with enabled flag)
│   ├── empirical_store.ipynb  # Persistent store for empirically-observed resource usage per (instance_id, config_hash) pair. CR-7's data foundation — `record_sample` is called from `PluginManager.execute_plugin*` finally blocks; aggregates feed eviction-candidate selection + future UI hints + cost-aware retry decisions.
│   ├── errors.ipynb           # Typed exception hierarchy + JobError dataclass + default classification of bare Python exceptions. The substrate's CR-5 implementation per the 2026-05-19 substrate audit.
│   ├── interface.ipynb        # Abstract base class defining the generic plugin interface
│   ├── manager.ipynb          # Plugin discovery, loading, and lifecycle management system
│   ├── manifest_format.ipynb  # Typed parser + writer for the nested v2.0 manifest layout per the 2026-05-19 substrate audit's CR-8. Substrate manifests transitioned from a flat top-level JSON object to a four-section nested layout: `install` (deployment-specific facts populated at install time), `code` (code-derived facts refreshed by `cjm-ctl regenerate-manifest`), `drift_tracking` (a config_schema hash that records the witness shape so live-vs-stored comparisons can detect drift), and `overrides` (an operator-supplied overlay placeholder).
│   ├── metadata.ipynb         # Data structures for plugin metadata
│   ├── platform.ipynb         # Cross-platform utilities for process management, path handling, and system detection
│   ├── proxy.ipynb            # Bridge between Host application and isolated Worker processes
│   ├── queue.ipynb            # Resource-aware job queue for sequential plugin execution with cancellation support
│   ├── scheduling.ipynb       # Resource scheduling policies for plugin execution
│   ├── secret_store.ipynb     # CR-12: project-local secret storage for API-based plugins (file-backed, 0600)
│   ├── telemetry.ipynb        # Shared GPU/CPU attribution helpers used by both `JobQueue._sample_resource_snapshot` (CR-6 Stage 3) and `PluginManager._record_sample_safe` (CR-7).
│   └── worker.ipynb           # FastAPI server that runs inside isolated plugin environments
├── utils/ (2)
│   ├── hashing.ipynb     # Shared cryptographic hashing primitives for content integrity verification
│   └── validation.ipynb  # Validation helpers for plugin configuration dataclasses
├── bootstrap.ipynb  # One-call factory that assembles a PluginManager + JobQueue + plugin bindings — closes the demo-app boilerplate duplication audited across 5 substrate consumers.
└── cli.ipynb        # CLI tool for declarative plugin management

Total: 19 notebooks across 2 directories

Module Dependencies

graph LR
    bootstrap["bootstrap<br/>Bootstrap"]
    cli["cli<br/>cli"]
    core_config["core.config<br/>Configuration"]
    core_config_store["core.config_store<br/>Plugin Config Store"]
    core_empirical_store["core.empirical_store<br/>Empirical Resource Tracking"]
    core_errors["core.errors<br/>Plugin Error Taxonomy"]
    core_interface["core.interface<br/>Plugin Interface"]
    core_manager["core.manager<br/>Plugin Manager"]
    core_manifest_format["core.manifest_format<br/>Manifest Format (v2.0)"]
    core_metadata["core.metadata<br/>Plugin Metadata"]
    core_platform["core.platform<br/>Platform Utilities"]
    core_proxy["core.proxy<br/>Remote Plugin Proxy"]
    core_queue["core.queue<br/>Job Queue"]
    core_scheduling["core.scheduling<br/>Scheduling"]
    core_secret_store["core.secret_store<br/>Plugin Secret Store"]
    core__telemetry["core._telemetry<br/>Substrate Telemetry Helpers"]
    core_worker["core.worker<br/>Universal Worker"]
    utils_hashing["utils.hashing<br/>Content Hashing Utilities"]
    utils_validation["utils.validation<br/>Configuration Validation"]

    bootstrap --> core_manager
    bootstrap --> core_scheduling
    bootstrap --> core_queue
    cli --> core_platform
    cli --> core_config
    cli --> core_manifest_format
    cli --> core_metadata
    core_empirical_store --> utils_hashing
    core_interface --> core_errors
    core_manager --> core_empirical_store
    core_manager --> core_errors
    core_manager --> core_scheduling
    core_manager --> core_metadata
    core_manager --> core_proxy
    core_manager --> core_manifest_format
    core_manager --> core_config
    core_manager --> core_interface
    core_manager --> core_secret_store
    core_manager --> core_config_store
    core_manager --> core__telemetry
    core_manager --> utils_validation
    core_manifest_format --> core_metadata
    core_manifest_format --> utils_hashing
    core_platform --> core_config
    core_proxy --> core_errors
    core_proxy --> core_platform
    core_proxy --> core_config
    core_proxy --> core_interface
    core_queue --> core_errors
    core_queue --> core__telemetry
    core_scheduling --> core_metadata
    core_worker --> core_errors
    core_worker --> core_platform
    utils_validation --> core_errors

34 cross-module dependencies detected

CLI Reference

No CLI commands found in this project.

Module Overview

Detailed documentation for each module in the project:

Bootstrap (bootstrap.ipynb)

One-call factory that assembles a PluginManager + JobQueue + plugin bindings — closes the demo-app boilerplate duplication audited across 5 substrate consumers.

Import

from cjm_plugin_system.bootstrap import (
    PluginSpec,
    Pipeline,
    create_pipeline
)

Functions

def _normalize_spec(
    spec: PluginSpec  # Raw spec from caller
) -> Tuple[str, Optional[Dict[str, Any]]]:  # (plugin_name, optional config)
    """
    Normalize a plugin spec into a `(name, config)` pair.
    
    Accepts a bare string, a `(name, config)` tuple, or a mapping with a 'name'
    key (config under 'config' or the mapping itself minus 'name').
    """
def create_pipeline(
    plugins: Optional[Iterable[PluginSpec]] = None,  # Plugins to discover + load
    scheduler: Optional[ResourceScheduler] = None,  # Resource policy (default: permissive)
    system_monitor: Optional[str] = None,  # Plugin name to register as system monitor
    search_paths: Optional[List[Path]] = None,  # Custom manifest search paths
    queue_kwargs: Optional[Dict[str, Any]] = None,  # Extra kwargs forwarded to JobQueue
    strict: bool = True,  # SG-5 strict config validation on each load
) -> Pipeline:  # Assembled stack ready to start
    """
    Assemble a PluginManager + JobQueue + plugin bindings in one call.
    
    Steps performed:
      1. Construct PluginManager with the given scheduler + search paths
      2. discover_manifests()
      3. For each spec in `plugins`: load the plugin and create a PluginBinding
      4. If `system_monitor` is set, register that plugin as the sys-mon
      5. Construct JobQueue (NOT started — caller starts via context manager)
    
    Plugins that fail to load are logged but do not raise; their entries are
    omitted from `Pipeline.bindings`. Use the returned `Pipeline.manager` to
    inspect which loads succeeded.
    """

Classes

@dataclass
class Pipeline:
    "Assembled substrate stack: manager + queue + plugin bindings."
    
    manager: PluginManager  # Discovery + lifecycle
    queue: JobQueue  # Job submission + scheduling
    bindings: Dict[str, PluginBinding] = field(...)  # Plugin name -> bound view
    
    async def start(self) -> None:
            """Start the job queue's background processor."""
            await self.queue.start()
        
        async def stop(self) -> None
        "Start the job queue's background processor."
    
    async def stop(self) -> None:
            """Stop the job queue and unload all plugins."""
            await self.queue.stop()
            self.manager.unload_all()
        
        async def __aenter__(self) -> "Pipeline"
        "Stop the job queue and unload all plugins."

cli (cli.ipynb)

CLI tool for declarative plugin management

Import

from cjm_plugin_system.cli import (
    app,
    main,
    setup_runtime,
    run_cmd,
    regenerate_manifest,
    install_all,
    setup_host,
    estimate_size,
    list_plugins,
    remove_plugin,
    validate_file,
    set_secret,
    list_secrets
)

Functions

def main(
    ctx:typer.Context,
    cjm_config:Annotated[Optional[Path], typer.Option(
        "--cjm-config",
        help="Path to cjm.yaml configuration file"
    )]=None,
    data_dir:Annotated[Optional[Path], typer.Option(
        "--data-dir",
        help="Override data directory (manifests, logs)"
    )]=None,
    conda_prefix:Annotated[Optional[Path], typer.Option(
        "--conda-prefix",
        help="Override conda/mamba prefix path"
    )]=None,
    conda_type:Annotated[Optional[str], typer.Option(
        "--conda-type",
        help="Conda implementation: micromamba, miniforge, or conda"
    )]=None,
) -> None
    "CJM Plugin System CLI for managing isolated plugin environments."
def setup_runtime(
    force:bool=typer.Option(False, "--force", "-f", help="Re-download even if binary exists")
) -> None
    "Download and setup micromamba runtime for project-local mode."
def _check_runtime_available() -> None:
    """Check if the configured conda runtime is available, exit with helpful message if not."""
    cfg = get_config()
    
    if not ensure_runtime_available(cfg)
    "Check if the configured conda runtime is available, exit with helpful message if not."
def _get_conda_cmd_str() -> str
    "Get the conda/micromamba command string for shell commands."
def _download_url_to_temp(
    url: str,  # URL to download
    suffix: str = ".yml"  # File suffix for temp file
) -> Optional[Path]:  # Path to temp file or None if failed
    "Download a URL to a temporary file. Returns None if download fails."
def _resolve_env_file(
    env_file: str  # Path or URL to environment file
) -> tuple[str, Optional[Path]]:  # (resolved_path, temp_file_to_cleanup)
    """
    Resolve env_file to a local path, downloading if it's a URL.
    
    Returns (local_path, temp_file) where temp_file is set if we created
    a temporary file that should be cleaned up later.
    """
def run_cmd(
    cmd: str,  # Shell command to execute
    check: bool = True  # Whether to raise on non-zero exit
) -> None
    """
    Run a shell command and stream output.
    
    Uses the platform's default shell (no hardcoded /bin/bash).
    """
def _generate_manifest(
    env_name: str,  # Name of the Conda environment
    package_name: str,  # Package source string (git URL or package name)
    manifest_dir: Path  # Directory to write manifest JSON files
) -> Optional[Path]:  # Path to the manifest written, or None on failure
    """
    Run introspection script inside the target env and write a v2.0 manifest.
    
    Builds a `ManifestV2` from the introspection output + install-time markers,
    computes `drift_tracking.config_schema_hash`, and writes via
    `write_manifest` (nested layout, indent=2). Both `installed_at` and
    `regenerated_at` are set to "now"; `regenerate_manifest` preserves the
    original `installed_at` via post-write fix-up.
    """
def regenerate_manifest(
    plugin_name: str = typer.Argument(..., help="Plugin name as it appears in the manifest"),
    plugins_path: Optional[str] = typer.Option(
        None, "--plugins",
        help="Path to plugins.yaml for package_source recovery (legacy manifests)",
    ),
    package: Optional[str] = typer.Option(
        None, "--package",
        help="Package spec override (e.g., git URL or pip name); wins over manifest/plugins.yaml lookups",
    ),
) -> None
    """
    Re-run introspection for an installed plugin and rewrite its manifest.
    
    Reads the existing manifest via `load_manifest` (handles both v2.0 nested
    + legacy v1.0 flat layouts), recovers `env_name` + `package_source` from
    the install section, runs `_generate_manifest` to refresh the code section,
    then post-writes to preserve the original `installed_at` so the regenerate
    only updates `regenerated_at` semantically. Always emits v2.0 layout —
    regenerating a v1.0 manifest transparently upgrades it.
    """
def _conda_env_exists_configured(
    env_name: str  # Name of the conda environment
) -> bool:  # True if environment exists
    "Check if conda environment exists using configured conda command."
def install_all(
    plugins_path:str=typer.Option("plugins.yaml", "--plugins", help="Path to plugins.yaml file"),
    force:bool=typer.Option(False, help="Force recreation of environments")
) -> None
    "Install and register all plugins defined in plugins.yaml."
def setup_host(
    plugins_path:str=typer.Option("plugins.yaml", "--plugins", help="Path to plugins.yaml file"),
    yes:bool=typer.Option(False, "--yes", "-y", help="Skip confirmation prompt")
) -> None
    "Install interface libraries in the current Python environment."
def _format_size(
    size_bytes: int  # Size in bytes
) -> str:  # Human-readable size string
    "Format bytes as human-readable string."
def _get_pypi_size(
    package_spec: str  # Package name or git URL
) -> tuple[int, str]:  # (size_bytes, package_name)
    """
    Query PyPI for package download size.
    
    SG-37: PyPI normalizes package names to dash-form per PEP 503, so we try
    the dash form first and fall back to the underscore form. Without this,
    cjm-* packages (whose repo names contain dashes) silently 404 because
    the old heuristic only tried the underscored form.
    """
def _estimate_conda_size(
    env_file: str,  # Path or URL to environment.yml
    env_name: str  # Target environment name
) -> tuple[int, int]:  # (total_bytes, package_count)
    "Estimate conda package sizes using dry-run."
def _estimate_pip_sizes(
    packages: list[str]  # List of pip package specs
) -> tuple[int, int, list[tuple[str, int]]]:  # (total_bytes, found_count, [(name, size), ...])
    "Estimate pip package sizes from PyPI."
def estimate_size(
    plugins_path:str=typer.Option("plugins.yaml", "--plugins", help="Path to plugins.yaml file"),
    plugin_name:Optional[str]=typer.Option(None, "--plugin", "-p", help="Estimate for a single plugin"),
    verbose:bool=typer.Option(False, "--verbose", "-v", help="Show per-package breakdown")
) -> None
    "Estimate disk space required for plugin environments."
def _get_conda_envs() -> set[str]: # Set of existing conda environment names
    """Get list of existing conda environment names using configured conda command."""
    cfg = get_config()
    cmd_parts = build_conda_command(cfg, "env", "list", "--json")
    
    try
    "Get list of existing conda environment names using configured conda command."
def _v2_to_legacy_flat_view(
    raw: Dict[str, Any]  # Manifest JSON dict as read from disk
) -> Dict[str, Any]:  # Flat-shaped dict (install + code merged at top level)
    """
    REMOVE-AFTER-OVERHAUL: produce a legacy flat-shaped view of a manifest.
    
    Consumer code in `list_plugins` + `remove_plugin` still reads manifests as
    flat dicts (`manifest.get('python_path')`, etc.). When the on-disk manifest
    is v2.0 nested, we flatten install + code sections to the top level so
    those consumers don't need migration in the same PR. The shim retires when
    consumers migrate to `load_manifest` for typed access.
    
    v1.0 manifests pass through unchanged.
    """
def _get_installed_manifests(
    manifest_dir:Optional[Path]=None # Directory to scan (uses config default if None)
) -> list[dict]: # List of manifest dictionaries
    """
    Load all manifest JSON files from the manifest directory.
    
    Returns flat-shaped dicts regardless of on-disk format (v2.0 manifests are
    flattened via `_v2_to_legacy_flat_view`). Consumers that want typed access
    should use `load_manifest` from `cjm_plugin_system.core.manifest_format`
    instead.
    """
def _extract_env_from_python_path(
    python_path:str # Path like /home/user/miniforge3/envs/my-env/bin/python
) -> str: # Extracted environment name or empty string
    "Extract conda environment name from python_path."
def list_plugins(
    plugins_path:Optional[str]=typer.Option(None, "--plugins", help="Path to plugins.yaml for cross-reference"),
    show_envs:bool=typer.Option(False, "--envs", "-e", help="Show conda environment status")
) -> None
    "List installed plugins from manifest directory."
def remove_plugin(
    plugin_name:str=typer.Argument(..., help="Name of the plugin to remove"),
    plugins_path:Optional[str]=typer.Option(None, "--plugins", help="Path to plugins.yaml for env name lookup"),
    keep_env:bool=typer.Option(False, "--keep-env", help="Keep the conda environment, only remove manifest"),
    yes:bool=typer.Option(False, "--yes", "-y", help="Skip confirmation prompt")
) -> None
    "Remove a plugin's manifest and conda environment."
def _validate_taxonomy_block(
    tax: Any,  # taxonomy sub-dict (may be None or non-dict; we type-check here)
    top_level_interface: str,  # `interface` field for cross-check
    path_prefix: str,  # Error message prefix (e.g., "manifest" or "manifest: code")
) -> List[str]
    """
    Type-check the taxonomy block + cross-check interface_fqcn vs top-level interface.
    
    Shared between v1.0 (flat) and v2.0 (nested) validators. `path_prefix` is
    used to disambiguate error messages between layouts.
    """
def _validate_resources_block(
    res: Any,  # resources sub-dict (may be None or non-dict; we type-check here)
    path_prefix: str,  # Error message prefix
) -> List[str]
    "Phase 5a: type-check the resources block. Shared between v1.0 and v2.0."
def _validate_manifest_v2_dict(
    data: Dict[str, Any]  # v2.0 nested manifest dict (caller already verified format_version)
) -> List[str]:  # Empty list = valid
    """
    CR-8: validate the nested v2.0 manifest layout.
    
    Required sections: `install` + `code`. Optional sections: `drift_tracking`
    (substrate emits it on every fresh write but legacy-via-load_manifest
    upgrades leave it empty until the first regenerate); `overrides` (free-form
    operator overlay).
    
    Required `code.*` fields mirror v1.0's required top-level fields. Required
    `install.python_path` mirrors v1.0's required top-level `python_path`.
    """
def _validate_manifest_v1_dict(
    data: Dict[str, Any]  # Legacy flat manifest dict (no format_version)
) -> List[str]:  # Empty list = valid
    """
    REMOVE-AFTER-OVERHAUL: validate the legacy v1.0 flat manifest layout.
    
    Retires when `cascade_manifests.py` rewrites every production manifest to
    v2.0. Keeps the SG-6 validator working against pre-CR-8 manifests during
    the transition window.
    """
def _validate_manifest_dict(
    data: Any  # Loaded manifest JSON
) -> List[str]:  # List of human-readable error messages (empty == valid)
    """
    SG-6 + CR-8: structural validation, dispatching on `format_version`.
    
    `format_version == "2.0"` validates the nested v2.0 layout.
    Absent `format_version` validates the legacy flat v1.0 layout
    (REMOVE-AFTER-OVERHAUL — retires after cascade_manifests.py).
    Any other value rejects with a single error so unknown future formats
    fail loud rather than silently degrading.
    """
def _validate_plugins_yaml_dict(
    data: Any  # Loaded plugins.yaml content
) -> List[str]:  # List of human-readable error messages (empty == valid)
    """
    Structural validation of a plugins.yaml file.
    
    Each plugin entry must have name + env_name + package, plus either env_file
    or python_version (one defines how the conda env is created).
    """
def _detect_manifest_format(
    path: Path  # File to inspect
) -> Optional[str]:  # 'manifest' | 'plugins_yaml' | None
    "Auto-detect file format from extension."
def validate_file(
    path:Path=typer.Argument(..., help="Manifest JSON or plugins.yaml to validate"),
    format:Optional[str]=typer.Option(
        None, "--format", "-f",
        help="Override format detection: 'manifest' or 'plugins_yaml'",
    ),
) -> None
    """
    SG-6: validate a manifest JSON or plugins.yaml file's structure.
    
    Auto-detects format from the file extension (`.json` → manifest,
    `.yaml`/`.yml` → plugins.yaml). Exits non-zero with a list of validation
    errors if any check fails.
    """
def _open_secret_store():
    """Open the project-local LocalSecretStore at <data_dir>/secrets (CR-12)."""
    from cjm_plugin_system.core.secret_store import LocalSecretStore
    cfg = get_config()
    data_dir = getattr(cfg, "data_dir", None)
    secrets_dir = (data_dir / "secrets") if data_dir is not None else None
    return LocalSecretStore(secrets_dir)


@app.command("set-secret")
def set_secret(
    plugin_name: str = typer.Argument(..., help="Plugin name (manifest 'name', e.g. cjm-transcription-plugin-gemini)"),
    key: str = typer.Argument(..., help="Secret key = the env-var name the worker reads (e.g. GEMINI_API_KEY)"),
    value: Optional[str] = typer.Option(None, "--value", help="Secret value (omit to be prompted with hidden input)"),
    scope: Optional[str] = typer.Option(None, "--scope", help="Reserved multi-user scope (default: single-user)"),
)
    "Open the project-local LocalSecretStore at <data_dir>/secrets (CR-12)."
def set_secret(
    plugin_name: str = typer.Argument(..., help="Plugin name (manifest 'name', e.g. cjm-transcription-plugin-gemini)"),
    key: str = typer.Argument(..., help="Secret key = the env-var name the worker reads (e.g. GEMINI_API_KEY)"),
    value: Optional[str] = typer.Option(None, "--value", help="Secret value (omit to be prompted with hidden input)"),
    scope: Optional[str] = typer.Option(None, "--scope", help="Reserved multi-user scope (default: single-user)"),
)
    """
    Store a plugin secret in the project-local SecretStore (CR-12).
    
    The value is written to <data_dir>/secrets/secrets.json (0600) — never to
    plugins.yaml, manifests, or the config store. Plugins read it from their
    worker env at spawn. Omit --value to be prompted (hidden input) so the
    secret stays out of shell history. After setting, reload the plugin (or
    restart the host) so its worker respawns with the new env — the GUI /
    PluginManager.set_plugin_secret do this automatically.
    """
def list_secrets(
    plugin_name: str = typer.Argument(..., help="Plugin name to list secret KEY NAMES for"),
    scope: Optional[str] = typer.Option(None, "--scope", help="Reserved multi-user scope"),
)
    "List the secret KEY NAMES stored for a plugin — never the values (CR-12)."

Variables

_PYPI_404_CACHE: set[str]

Configuration (config.ipynb)

Project-level configuration for paths, runtime settings, and environment management

Import

from cjm_plugin_system.core.config import (
    RuntimeMode,
    CondaType,
    RuntimeConfig,
    SubstrateConfig,
    CJMConfig,
    load_config,
    get_config,
    set_config,
    reset_config
)

Functions

def _load_from_yaml(
    yaml_path:Path # Path to cjm.yaml file
) -> CJMConfig: # Parsed configuration
    "Load config from YAML file, resolving relative paths."
def load_config(
    config_path:Optional[Path]=None, # CLI --cjm-config
    data_dir:Optional[Path]=None, # CLI --data-dir
    conda_prefix:Optional[Path]=None, # CLI --conda-prefix
    conda_type:Optional[str]=None # CLI --conda-type
) -> CJMConfig: # Resolved configuration
    "Load config with layered resolution (CLI > env vars > yaml > defaults)."
def get_config() -> CJMConfig: # Current configuration
    """Get current config (loads defaults if not set)."""
    global _current_config
    if _current_config is None
    "Get current config (loads defaults if not set)."
def set_config(
    config:CJMConfig # Configuration to set as current
) -> None
    "Set current config (called by CLI callback)."
def reset_config() -> None
    "Reset to unloaded state (for testing)."

Classes

class RuntimeMode(str, Enum):
    "Runtime mode for the plugin system."
class CondaType(str, Enum):
    "Type of conda implementation to use."
@dataclass
class RuntimeConfig:
    "Runtime environment configuration."
    
    mode: RuntimeMode = RuntimeMode.SYSTEM  # LOCAL for project-local, SYSTEM for global
    conda_type: CondaType = CondaType.CONDA  # Conda implementation to use
    prefix: Optional[Path]  # Path to runtime directory (LOCAL mode only)
    binaries: Dict[str, Path] = field(...)  # Platform-specific binary paths
@dataclass
class SubstrateConfig:
    """
    Substrate behavior toggles.
    
    Loaded from the `substrate:` section of `cjm.yaml`. Each flag gates a
    substrate-wide behavior that hosts can disable when they don't want the
    per-load or per-execute cost.
    
    - `drift_detection` (CR-8): per-load `/config_schema` HTTP call + hash
      comparison against the manifest's stored hash. PluginManager's load
      path branches around `_check_config_schema_drift` when False.
    - `empirical_tracking` (CR-7): per-execute resource sample recording into
      `EmpiricalResourceStore`. PluginManager skips `record_sample` calls when
      False; the store's lazy-init also short-circuits.
    - `prefetch_stall_threshold_seconds` (CR-4 / Session A 2026-05-27): how long
      proxy.prefetch waits with no observed progress (via `/progress` polling)
      before declaring a stall. Replaces per-plugin wall-clock timeouts —
      operators no longer race network speed against an arbitrary value. Plugins
      defeat the stall counter by calling `self.report_progress(...)` periodically
      during long lifecycle operations (model download / vLLM server startup).
      Default 60 s; bump higher for plugins that don't report progress, or lower
      if false-positive stalls are noisy.
    """
    
    drift_detection: bool = True  # Run /config_schema hash compare on every load_plugin
    empirical_tracking: bool = True  # Record ResourceSample after every execute_plugin*
    prefetch_stall_threshold_seconds: float = 60.0  # CR-4 / Session A: stall detection threshold for proxy.prefetch
@dataclass
class CJMConfig:
    "Main configuration for cjm-plugin-system."
    
    runtime: RuntimeConfig = field(...)  # Runtime environment settings
    data_dir: Path = field(...)  # Base directory for manifests, logs
    plugins_config: Path = field(...)  # Path to plugins.yaml file
    models_dir: Optional[Path]  # Directory for model downloads
    substrate: SubstrateConfig = field(...)  # CR-8 substrate behavior toggles
    
    def manifests_dir(self) -> Path: # Directory containing plugin manifests
            """Directory containing plugin manifests."""
            return self.data_dir / "manifests"
    
        @property
        def plugin_data_dir(self) -> Path: # Directory for plugin runtime data
        "Directory containing plugin manifests."
    
    def plugin_data_dir(self) -> Path: # Directory for plugin runtime data
            """Directory for plugin runtime data (databases, caches)."""
            return self.data_dir / "data"
    
        @property
        def logs_dir(self) -> Path: # Directory containing plugin logs
        "Directory for plugin runtime data (databases, caches)."
    
    def logs_dir(self) -> Path: # Directory containing plugin logs
            """Directory containing plugin logs."""
            return self.data_dir / "logs"
    
        @property
        def conda_binary_path(self) -> Optional[Path]: # Path to conda/micromamba binary or None
        "Directory containing plugin logs."
    
    def conda_binary_path(self) -> Optional[Path]: # Path to conda/micromamba binary or None
            """Get the configured binary path for the current platform."""
            # Inline platform detection to avoid circular imports
            system = platform_mod.system().lower()
            machine = platform_mod.machine().lower()
            
            if system == "windows"
        "Get the configured binary path for the current platform."

Variables

_current_config: Optional[CJMConfig] = None

Plugin Config Store (config_store.ipynb)

Persistent storage for per-plugin configuration (with enabled flag)

Import

from cjm_plugin_system.core.config_store import (
    PluginConfigRecord,
    PluginConfigStore,
    LocalPluginConfigStore
)

Functions

def _default_db_path() -> Path:
    """Default SQLite location: `~/.cjm/plugin_configs.db`."""
    return Path.home() / ".cjm" / "plugin_configs.db"


class LocalPluginConfigStore
    "Default SQLite location: `~/.cjm/plugin_configs.db`."

Classes

@dataclass
class PluginConfigRecord:
    "Persisted state for a plugin: config dict + enabled flag."
    
    config: Dict[str, Any] = field(...)  # Plugin's current config values
    enabled: bool = True  # Whether the substrate should accept jobs for this plugin
    updated_at: float = 0.0  # Unix timestamp of the last write (server clock)
@runtime_checkable
class PluginConfigStore(Protocol):
    "Protocol for persisting per-plugin `PluginConfigRecord` across sessions."
    
    def get(self, plugin_name: str) -> Optional[PluginConfigRecord]:
            """Fetch the record for a plugin, or None if no record exists yet."""
            ...
        
        def set(self, plugin_name: str, record: PluginConfigRecord) -> None
        "Fetch the record for a plugin, or None if no record exists yet."
    
    def set(self, plugin_name: str, record: PluginConfigRecord) -> None:
            """Persist a record. Overwrites any prior record for the same plugin.
            
            Implementations stamp `record.updated_at` to the current time during
            the write so callers don't have to manage timestamps.
            """
            ...
        
        def delete(self, plugin_name: str) -> bool
        "Persist a record. Overwrites any prior record for the same plugin.

Implementations stamp `record.updated_at` to the current time during
the write so callers don't have to manage timestamps."
    
    def delete(self, plugin_name: str) -> bool:
            """Remove the record for a plugin. Returns True if a record was deleted."""
            ...
        
        def list_all(self) -> Dict[str, PluginConfigRecord]
        "Remove the record for a plugin. Returns True if a record was deleted."
    
    def list_all(self) -> Dict[str, PluginConfigRecord]
        "Return every stored record, keyed by plugin name."
class LocalPluginConfigStore:
    def __init__(self, db_path: Optional[Path] = None):
        """Initialize the store. `db_path=None` uses `~/.cjm/plugin_configs.db`."""
        self.db_path = Path(db_path) if db_path is not None else _default_db_path()
    
    @contextmanager
    def _conn(self) -> Iterator[sqlite3.Connection]
    """
    SQLite-backed default implementation of `PluginConfigStore`.
    
    The DB is created lazily on first write. Reads against a non-existent DB
    return empty results rather than raising, so hosts can call `.get()` on
    a fresh install without preparing the file first.
    """
    
    def __init__(self, db_path: Optional[Path] = None):
            """Initialize the store. `db_path=None` uses `~/.cjm/plugin_configs.db`."""
            self.db_path = Path(db_path) if db_path is not None else _default_db_path()
        
        @contextmanager
        def _conn(self) -> Iterator[sqlite3.Connection]
        "Initialize the store. `db_path=None` uses `~/.cjm/plugin_configs.db`."
    
    def get(
            self,
            plugin_name: str  # Plugin to look up
        ) -> Optional[PluginConfigRecord]:  # Persisted record or None if absent
        "Fetch the record for a plugin."
    
    def set(
            self,
            plugin_name: str,  # Plugin to write
            record: PluginConfigRecord  # New record (updated_at overwritten with current time)
        ) -> None
        "Persist a record. Stamps `updated_at` to the current time."
    
    def delete(
            self,
            plugin_name: str  # Plugin to remove
        ) -> bool:  # True if a row was deleted
        "Remove the record for a plugin."
    
    def list_all(self) -> Dict[str, PluginConfigRecord]:  # plugin_name -> record
            """Return all stored records keyed by plugin name."""
            if not self.db_path.exists()
        "Return all stored records keyed by plugin name."

Variables

_SCHEMA = '\nCREATE TABLE IF NOT EXISTS plugin_configs (\n    plugin_name TEXT PRIMARY KEY,\n    config_json TEXT NOT NULL,\n    enabled INTEGER NOT NULL DEFAULT 1,\n    updated_at REAL NOT NULL\n)\n'

Empirical Resource Tracking (empirical_store.ipynb)

Persistent store for empirically-observed resource usage per (instance_id, config_hash) pair. CR-7’s data foundation — record_sample is called from PluginManager.execute_plugin* finally blocks; aggregates feed eviction-candidate selection + future UI hints + cost-aware retry decisions.

Import

from cjm_plugin_system.core.empirical_store import (
    compute_config_hash,
    ResourceSample,
    EmpiricalResourceRecord,
    EmpiricalResourceStore,
    LocalEmpiricalResourceStore
)

Functions

def compute_config_hash(
    """
    CR-7: hash a plugin instance's effective config for empirical-record keying.
    
    Same canonicalization as CR-8's `compute_config_schema_hash` — sorted keys,
    no whitespace, `"sha256:hex"` shape. None / empty configs hash deterministically
    to the canonical-empty value so plugins with no config still get a single
    record per instance rather than scattering across hash-of-None edge cases.
    """
def _default_db_path() -> Path:
    """Default SQLite location: `~/.cjm/empirical_resources.db`.
    
    Hosts using per-project `data_dir` (the intended pattern per CR-8 cascade_manifests
    docs) override this by passing `db_path=cfg.data_dir / "empirical_resources.db"`
    when constructing the store. PluginManager's lazy-init does this automatically.
    """
    return Path.home() / ".cjm" / "empirical_resources.db"


class LocalEmpiricalResourceStore
    """
    Default SQLite location: `~/.cjm/empirical_resources.db`.
    
    Hosts using per-project `data_dir` (the intended pattern per CR-8 cascade_manifests
    docs) override this by passing `db_path=cfg.data_dir / "empirical_resources.db"`
    when constructing the store. PluginManager's lazy-init does this automatically.
    """

Classes

class ResourceSample:
    """
    Single observation captured after an execute call completes.
    
    Frozen — substrate aggregates online via Welford's algorithm; no need to
    keep raw samples around. `observed_at` is tz-aware per the CR-5 convention.
    """
@dataclass
class EmpiricalResourceRecord:
    "Aggregated empirical resource profile for a (instance_id, config_hash) pair."
    
    instance_id: str  # PluginInstance.instance_id (CR-10 multi-instance aware)
    plugin_name: str  # Convenience: PluginInstance.plugin_name; derivable but cheap to denormalize
    config_hash: str  # compute_config_hash(inst.config) at sample time
    sample_count: int  # Number of ResourceSamples folded into this record
    cpu_percent_mean: float  # Welford running mean of cpu_percent
    memory_mb_peak_max: float  # max(sample.memory_mb_peak over all samples) — worst observed
    memory_mb_peak_mean: float  # Welford running mean of sample.memory_mb_peak
    gpu_memory_mb_peak_max: float  # max(sample.gpu_memory_mb_peak over all samples)
    gpu_memory_mb_peak_mean: float  # Welford running mean of sample.gpu_memory_mb_peak
    duration_seconds_mean: float  # Welford running mean of sample.duration_seconds
    success_rate: float  # success_count / sample_count
    last_observed: datetime  # tz-aware; tracks most recent ResourceSample.observed_at
    api_usage_totals: Dict[str, float] = field(...)  # SG-54: cumulative per-unit usage summed across runs (tokens/credits/pages/...); {} for compute-only plugins
@runtime_checkable
class EmpiricalResourceStore(Protocol):
    """
    Protocol for persisting empirically-observed resource usage.
    
    Implementations aggregate online (Welford for means, max-of-peaks for memory).
    No raw-sample retention required — v1 is one row per (instance_id, config_hash)
    pair with running aggregates. A future implementation can add a samples table
    if time-series queries become necessary.
    """
    
    def record_sample(
            self,
            instance_id: str,
            plugin_name: str,
            config_hash: str,
            sample: ResourceSample,
        ) -> None
        "Fold a sample into the running aggregate. Creates a new record on first call."
    
    def get_record(
            self,
            instance_id: str,
            config_hash: str,
        ) -> Optional[EmpiricalResourceRecord]
        "Fetch the aggregated record for (instance_id, config_hash), or None."
    
    def list_records(
            self,
            plugin_name: Optional[str] = None,
        ) -> List[EmpiricalResourceRecord]
        "List all records, optionally filtered to a single plugin_name."
    
    def delete_record(
            self,
            instance_id: str,
            config_hash: str,
        ) -> bool
        "Remove a record. Returns True if a row was deleted."
class LocalEmpiricalResourceStore:
    def __init__(self, db_path: Optional[Path] = None):
        """Initialize the store. `db_path=None` uses `~/.cjm/empirical_resources.db`."""
        self.db_path = Path(db_path) if db_path is not None else _default_db_path()
    
    @contextmanager
    def _conn(self) -> Iterator[sqlite3.Connection]
    """
    SQLite-backed default implementation of `EmpiricalResourceStore`.
    
    Online Welford aggregation for means; max-of-peaks for memory metrics.
    success_rate computed at read time from `success_count / sample_count`.
    DB + schema created lazily on first write.
    """
    
    def __init__(self, db_path: Optional[Path] = None):
            """Initialize the store. `db_path=None` uses `~/.cjm/empirical_resources.db`."""
            self.db_path = Path(db_path) if db_path is not None else _default_db_path()
        
        @contextmanager
        def _conn(self) -> Iterator[sqlite3.Connection]
        "Initialize the store. `db_path=None` uses `~/.cjm/empirical_resources.db`."
    
    def record_sample(
            self,
            instance_id: str,  # PluginInstance.instance_id
            plugin_name: str,  # PluginInstance.plugin_name (denormalized for filtering)
            config_hash: str,  # compute_config_hash(inst.config)
            sample: ResourceSample,  # One observation
        ) -> None
        "Fold a sample into the running aggregate. Creates a new row on first call.

Welford update for each mean. Max-of-peaks for memory metrics.
success_count incremented by 1 if sample.success else 0."
    
    def get_record(
            self,
            instance_id: str,
            config_hash: str,
        ) -> Optional[EmpiricalResourceRecord]
        "Fetch the aggregated record for (instance_id, config_hash), or None."
    
    def list_records(
            self,
            plugin_name: Optional[str] = None,
        ) -> List[EmpiricalResourceRecord]
        "List all records, optionally filtered to a plugin."
    
    def delete_record(
            self,
            instance_id: str,
            config_hash: str,
        ) -> bool
        "Remove a record. Returns True if a row was deleted."

Variables

_SCHEMA = "\nCREATE TABLE IF NOT EXISTS empirical_resources (\n    instance_id TEXT NOT NULL,\n    plugin_name TEXT NOT NULL,\n    config_hash TEXT NOT NULL,\n    sample_count INTEGER NOT NULL DEFAULT 0,\n    success_count INTEGER NOT NULL DEFAULT 0,\n    cpu_percent_mean REAL NOT NULL DEFAULT 0.0,\n    memory_mb_peak_max REAL NOT NULL DEFAULT 0.0,\n    memory_mb_peak_mean REAL NOT NULL DEFAULT 0.0,\n    gpu_memory_mb_peak_max REAL NOT NULL DEFAULT 0.0,\n    gpu_memory_mb_peak_mean REAL NOT NULL DEFAULT 0.0,\n    duration_seconds_mean REAL NOT NULL DEFAULT 0.0,\n    last_observed TEXT NOT NULL,\n    api_usage_totals TEXT NOT NULL DEFAULT '{}',\n    PRIMARY KEY (instance_id, config_hash)\n)\n"

Plugin Error Taxonomy (errors.ipynb)

Typed exception hierarchy + JobError dataclass + default classification of bare Python exceptions. The substrate’s CR-5 implementation per the 2026-05-19 substrate audit.

Import

from cjm_plugin_system.core.errors import (
    PluginError,
    PluginInputError,
    PluginTransientError,
    PluginResourceError,
    PluginFatalError,
    PluginDisabledError,
    PluginNotLoadedError,
    PluginTimeoutError,
    PluginCancelledError,
    WorkerOOMError,
    PluginConfigError,
    ResourceShortfall,
    TracebackPolicy,
    JobError,
    classify_exception,
    map_bare_exception_to_job_error
)

Functions

def classify_exception(
    exc: BaseException  # The exception to classify
) -> "Literal['user_input', 'transient', 'resource', 'fatal']":  # Category
    """
    Return the substrate category for any exception.
    
    PluginError subclasses report their own declared `category`. Bare Python
    exceptions are mapped via `__mro__` walk against `_BARE_EXCEPTION_CATEGORY_MAP`;
    the first ancestor in the table wins. Unrecognized exceptions classify as
    `fatal` (don't auto-retry the unknown).
    """
def map_bare_exception_to_job_error(
    exc: BaseException,  # The raised exception
    *,
    plugin_name: Optional[str] = None,  # Name of the plugin that raised
    plugin_instance_id: Optional[str] = None,  # Per CR-10
    traceback_policy: TracebackPolicy = TracebackPolicy.FULL,  # How much detail to record
    occurred_at: Optional[datetime] = None,  # Override; defaults to datetime.now(timezone.utc)
) -> JobError
    """
    Convert any exception into a structured `JobError`.
    
    PluginError subclasses contribute their category-specific structured data
    (`fields_invalid` for input errors, `resource_shortfall` for resource errors,
    `retry_after_seconds` for transient errors). Bare exceptions get the
    default category-based retriable flag and no structured side-channel.
    """

Classes

class PluginError(Exception):
    """
    Base for substrate-recognized plugin exceptions.
    
    Subclasses declare a `category` and `default_retriable` ClassVar so the
    JobQueue + scheduler can route the failure without sniffing exception
    text. Bare Python exceptions raised by plugin code go through
    `map_bare_exception_to_job_error` to acquire a default category.
    """
class PluginInputError:
    def __init__(
        self,
        message: str,  # Human-readable description
        *,
        fields_invalid: Optional[List[str]] = None,  # Names of inputs that failed validation
    )
    """
    User-fixable error: bad config, invalid argument, missing file.
    
    Multi-inherits `ValueError` so SG-8-era `except ValueError:` catch sites that
    legitimately want input errors keep working through the SG-47 migration
    window. The MRO is `PluginInputError → PluginError → ValueError → Exception`;
    other category bases (`PluginTransientError`, `PluginResourceError`,
    `PluginFatalError`) deliberately do NOT extend `ValueError` because their
    failure modes are not semantically value errors.
    """
    
    def __init__(
            self,
            message: str,  # Human-readable description
            *,
            fields_invalid: Optional[List[str]] = None,  # Names of inputs that failed validation
        )
class PluginTransientError:
    def __init__(
        self,
        message: str,  # Human-readable description
        *,
        retry_after_seconds: Optional[float] = None,  # Hint for backoff strategies
    )
    """
    Temporary failure: timeout, network blip, brief resource contention.
    
    Substrate / JobQueue may retry on its own initiative. Plugin authors raise
    this when they know the failure is recoverable.
    """
    
    def __init__(
            self,
            message: str,  # Human-readable description
            *,
            retry_after_seconds: Optional[float] = None,  # Hint for backoff strategies
        )
class PluginResourceError:
    def __init__(
        self,
        message: str,  # Human-readable description
        *,
        resource_shortfall: Optional["ResourceShortfall"] = None,  # Quantitative gap
    )
    """
    Resource exhaustion: GPU VRAM, system RAM, disk full.
    
    JobQueue's reactive-eviction flow (CR-7) routes resource errors to retry
    after attempting to free the named resource. Plugin authors set
    `resource_shortfall` so the substrate knows what to evict.
    """
    
    def __init__(
            self,
            message: str,  # Human-readable description
            *,
            resource_shortfall: Optional["ResourceShortfall"] = None,  # Quantitative gap
        )
class PluginFatalError(PluginError):
    """
    Bug / irrecoverable state. The plugin cannot complete this job; retrying won't help.
    
    Plugin authors raise this when they know the failure is permanent for the
    given inputs. The substrate does NOT retry fatal errors.
    """
class PluginDisabledError:
    def __init__(self, plugin_name: str):
        super().__init__(f"Plugin {plugin_name!r} is disabled")
        self.plugin_name = plugin_name


class PluginNotLoadedError(PluginFatalError)
    """
    JobQueue / execute_plugin rejected: the plugin is currently disabled.
    
    User-fixable (re-enable the plugin). Inherits `PluginInputError`'s ValueError
    MRO so existing `except ValueError:` callers see it as an input error.
    Raised by CR-2's enable/disable wiring once that lands.
    """
    
    def __init__(self, plugin_name: str):
            super().__init__(f"Plugin {plugin_name!r} is disabled")
            self.plugin_name = plugin_name
    
    
    class PluginNotLoadedError(PluginFatalError)
class PluginNotLoadedError:
    def __init__(self, plugin_name: str):
        super().__init__(f"Plugin {plugin_name!r} is not loaded")
        self.plugin_name = plugin_name


class PluginTimeoutError(PluginTransientError)
    """
    Caller submitted to a plugin that was never loaded.
    
    Fatal category because this is a programmer / orchestration bug, not a
    user-fixable condition. NOT a ValueError — the right reader intent is
    `except PluginNotLoadedError:` (or the broader `except PluginError:`), not
    a blanket `except ValueError:`.
    """
    
    def __init__(self, plugin_name: str):
            super().__init__(f"Plugin {plugin_name!r} is not loaded")
            self.plugin_name = plugin_name
    
    
    class PluginTimeoutError(PluginTransientError)
class PluginTimeoutError:
    def __init__(
        self,
        plugin_name: str,
        timeout_seconds: float,
        *,
        retry_after_seconds: Optional[float] = None,
    )
    """
    A per-job timeout fired before the plugin finished.
    
    Transient category — retry may succeed if the slow operation completes faster
    next time. Carries `retry_after_seconds` from `PluginTransientError`.
    Raised by SG-14's per-job timeout primitive when that lands.
    """
    
    def __init__(
            self,
            plugin_name: str,
            timeout_seconds: float,
            *,
            retry_after_seconds: Optional[float] = None,
        )
class PluginCancelledError:
    def __init__(self, plugin_name: str):
        super().__init__(f"Plugin {plugin_name!r} cancelled by operator")
        self.plugin_name = plugin_name


class WorkerOOMError(PluginResourceError)
    """
    Cooperative cancellation signal raised from `PluginInterface.check_cancel()`.
    
    Anchors under `PluginTransientError` because cancellation is in-principle
    re-runnable — a future attempt with the same inputs won't auto-fail if the
    cancel flag isn't set. But `default_retriable` is False: cancellation was
    a deliberate operator action, so the substrate should NOT auto-retry.
    Job-monitor / JobQueue render cancelled jobs with their own state
    (separate from "failed"); the JobError category remains `transient` so
    consumers reading the typed taxonomy can group recoverable signals.
    
    Plugin authors raise this implicitly via `self.check_cancel()` inside
    `execute()`; substrate sets the underlying `_cancel_requested` flag via
    `cancel()`. See CR-4's cancellation primitives for the cooperative-cancel
    protocol.
    """
    
    def __init__(self, plugin_name: str):
            super().__init__(f"Plugin {plugin_name!r} cancelled by operator")
            self.plugin_name = plugin_name
    
    
    class WorkerOOMError(PluginResourceError)
class WorkerOOMError:
    def __init__(
        self,
        plugin_name: str,
        *,
        process_returncode: Optional[int] = None,
        message: Optional[str] = None,
    )
    """
    The worker subprocess died with a kill-signal during an active execute call.
    
    CR-7 Track A — substrate-side OOM detection: when an HTTP call to the worker
    faults and the subprocess has died with `returncode == -signal.SIGKILL` (or
    the platform equivalent), the substrate raises this. The kernel OOM-killer
    is the most common cause of SIGKILL during normal execute paths, so the
    substrate treats SIGKILL-during-call as "assume OOM" and surfaces a typed
    resource error for the reactive retry path.
    
    `resource_shortfall` is `None` for Track A — the substrate only saw "worker
    died from kill-signal" and has no per-resource needed/available numbers.
    Track B (per SG-47's sub-task: plugin-side wrapping of `torch.cuda.OutOfMemoryError`
    et al.) raises `PluginResourceError` directly with a populated
    `ResourceShortfall` because the plugin had the context. Both land at the
    same `except PluginResourceError` site in CR-7's reactive retry loop.
    
    `process_returncode` carries the observed exit code for debugging /
    classification (e.g. operators can distinguish kernel-OOM SIGKILL from
    other signals if they read it). Defaults to `None` for callers that don't
    have it on hand.
    """
    
    def __init__(
            self,
            plugin_name: str,
            *,
            process_returncode: Optional[int] = None,
            message: Optional[str] = None,
        )
class PluginConfigError:
    def __init__(
        self,
        message: str,  # Human-readable description
        *,
        fields_invalid: Optional[List[str]] = None,  # Canonical: list of bad config keys
        config_class_name: str = "",  # Dataclass / plugin name for the schema
        # REMOVE-AFTER-OVERHAUL: drop unknown_keys kwarg after SG-47 cascade completes
        unknown_keys: Optional[List[str]] = None,
    )
    """
    Unknown / invalid keys in a config dict against a plugin's config schema.
    
    Reparented from `cjm_plugin_system.utils.validation` (Wave 2 / SG-8) under
    CR-5. Inherits `PluginInputError`'s ValueError MRO automatically.
    `config_class_name` is the dataclass / plugin name whose schema was violated.
    """
    
    def __init__(
            self,
            message: str,  # Human-readable description
            *,
            fields_invalid: Optional[List[str]] = None,  # Canonical: list of bad config keys
            config_class_name: str = "",  # Dataclass / plugin name for the schema
            # REMOVE-AFTER-OVERHAUL: drop unknown_keys kwarg after SG-47 cascade completes
            unknown_keys: Optional[List[str]] = None,
        )
    
    def unknown_keys(self) -> List[str]
        "Deprecated alias for `fields_invalid` (SG-8-era attribute name).

Accessing this property does NOT emit a deprecation warning  only
constructor-time misuse triggers the warning. Read-only by design;
nothing in the ecosystem mutates exception attributes after construction."
@dataclass
class ResourceShortfall:
    "Quantitative gap between what a plugin needed and what was available."
    
    resource: Literal['gpu_vram_mb', 'system_ram_mb', 'disk_mb']  # Which resource
    needed: float  # Amount the plugin reported it needed
    available: float  # Amount actually available when the failure occurred
class TracebackPolicy(str, Enum):
    "How much exception detail the substrate records on a JobError."
@dataclass
class JobError:
    """
    Structured failure summary recorded on a completed Job.
    
    Populated by the JobQueue when a plugin execution fails (CR-6 owns the
    population logic; CR-5 owns the shape). Sufficient for UI to render a
    failure card + retry affordance without re-running the plugin.
    """
    
    category: Literal['user_input', 'transient', 'resource', 'fatal']
    message: str  # Human-readable error message
    retriable: bool  # Whether the substrate considers this safe to auto-retry
    original_exc_repr: str  # repr(exc) of the original exception
    traceback: Optional[str]  # Full traceback per TracebackPolicy
    retry_after_seconds: Optional[float]  # Backoff hint from PluginTransientError
    fields_invalid: Optional[List[str]]  # From PluginInputError subclasses
    resource_shortfall: Optional[ResourceShortfall]  # From PluginResourceError
    plugin_name: Optional[str]  # Name of the plugin that raised
    plugin_instance_id: Optional[str]  # Per CR-10 multi-instance support
    occurred_at: Optional[datetime]  # When the failure was recorded

Variables

_BARE_EXCEPTION_CATEGORY_MAP: "dict[type, Literal['user_input', 'transient', 'resource', 'fatal']]"
_CATEGORY_RETRIABLE_DEFAULTS: 'dict[str, bool]'

Content Hashing Utilities (hashing.ipynb)

Shared cryptographic hashing primitives for content integrity verification

Import

from cjm_plugin_system.utils.hashing import (
    hash_bytes,
    hash_file,
    verify_hash,
    hash_dict_canonical
)

Functions

def hash_bytes(
    content: bytes,  # Byte content to hash
    algo: str = "sha256"  # Hash algorithm name (e.g., "sha256", "sha3_256")
) -> str:  # Hash string in "algo:hexdigest" format
    "Compute a hash of byte content."
def hash_file(
    path: Union[str, Path],  # Path to file to hash
    algo: str = "sha256",  # Hash algorithm name
    chunk_size: int = 8192  # Read chunk size in bytes
) -> str:  # Hash string in "algo:hexdigest" format
    "Stream-hash a file without loading it entirely into memory."
def verify_hash(
    content: bytes,  # Content to verify
    expected: str  # Expected hash in "algo:hexdigest" format
) -> bool:  # True if content matches expected hash
    "Verify content against an expected hash string."
def hash_dict_canonical(
    data: Optional[Dict[str, Any]],  # Dict to hash (or None — treated as {})
    algo: str = "sha256",  # Hash algorithm name
) -> str:  # Hash string in "algo:hexdigest" format
    """
    Hash a dict via canonical JSON encoding.
    
    Canonicalization: `json.dumps(data, sort_keys=True, separators=(",", ":"))`.
    Sorted keys eliminate dict-insertion-order variance; minimal separators
    eliminate whitespace variance. Result is deterministic across Python
    versions and machines.
    """

Plugin Interface (interface.ipynb)

Abstract base class defining the generic plugin interface

Import

from cjm_plugin_system.core.interface import (
    RELOAD_TRIGGER,
    FileBackedDTO,
    ConfigOption,
    FieldOptions,
    EnvVarSpec,
    PluginInterface,
    plugin_action,
    collect_plugin_actions
)

Functions

def plugin_action(
    action_name: str  # Public action name the decorated method handles
) -> Callable[[Callable], Callable]:  # Decorator
    """
    Marker decorator tagging a plugin method as the handler for `action_name`.
    
    Sets `func._plugin_action = action_name`. Plugin authors with dispatcher-style
    `execute(action, **kwargs)` use `collect_plugin_actions(cls)` to derive their
    `supported_actions` set from these markers rather than maintaining a separate
    list. The decorator does not change call semantics — the wrapped function is
    returned unchanged.
    """
def collect_plugin_actions(
    cls: type  # Class (or subclass) to scan for @plugin_action-tagged methods
) -> Set[str]:  # Set of action names handled by `cls` (including inherited)
    """
    Collect action names from `@plugin_action`-decorated methods on `cls`.
    
    Walks the class's MRO so subclasses inherit action handlers from base
    classes automatically. The returned set is suitable for
    `supported_actions: ClassVar[Set[str]] = collect_plugin_actions(MyPlugin)`
    once the plugin class body has been defined.
    """

Classes

@runtime_checkable
class FileBackedDTO(Protocol):
    "Protocol for Data Transfer Objects that serialize to disk for zero-copy transfer."
    
    def to_temp_file(self) -> str: # Absolute path to the temporary file
        "Save the data to a temporary file and return the absolute path."
@dataclass
class ConfigOption:
    "CR-11: one live option for a dynamic config field, with optional metadata."
    
    value: Any  # option value (e.g. "gemini-2.5-flash")
    label: str  # display label (e.g. "Gemini 2.5 Flash")
    metadata: Dict[str, Any] = dataclasses.field(default_factory=dict)  # token limits, descriptions, ...
@dataclass
class FieldOptions:
    """
    CR-11: the live option domain for one dynamic config field.
    
    Kept SEPARATE from the static config_schema (which CR-8 hashes for drift
    detection). The plugin-config UI merges these live options on top of the
    static schema; folding them into the schema would make every API plugin
    perpetually 'drift'.
    """
    
    options: List[ConfigOption]  # current valid options
    constraints: Dict[str, Any] = dataclasses.field(default_factory=dict)  # derived field-constraint overrides
@dataclass
class EnvVarSpec:
    """
    CR-12: one entry of a plugin's spawn-time worker-environment contract.
    
    A plugin declares the environment variables its worker subprocess reads at
    startup via `WORKER_ENV: ClassVar[List[EnvVarSpec]]`. Worker env vars are
    FIXED AT SPAWN — changing one requires a worker RESPAWN, so the substrate
    routes such changes through `reload_plugin`, never `reconfigure` (the env is
    baked into the subprocess at `Popen` and can't be mutated in-process). This
    is the lifecycle distinction from a normal config field (reconfigurable in
    place via `reconfigure`/`_release_<trigger>`).
    
    Two flavors share this one declaration:
    
      - `secret=True`  : value resolved from the `SecretStore` (masked; never
                         persisted in the config store, echoed in config_schema,
                         or logged). A secret never carries a `default` — a
                         baked-in secret is a leak.
      - `secret=False` : visible value resolved from the override chain
                         (operator override > manifest `install.env_vars` >
                         this `default`); safe to display.
    
    Both share one injection seam: the substrate composes the resolved
    {name: value} overlay at load time and injects it into the worker env at
    spawn (extending the existing CJM_DATA_DIR / CJM_MODELS_DIR injection).
    This is "derive from behaviour, not metadata" applied to the spawn env: the
    plugin declares WHICH vars it consumes + whether each is secret/required;
    the substrate owns resolution + injection.
    
    `options` is a forward seam for visible vars with a finite domain (e.g. a
    device selector enumerating GPUs); unused today but reserved so the
    plugin-config UI / a future `set-env` surface isn't blocked.
    """
    
    name: str  # The env var the worker reads, e.g. "GEMINI_API_KEY"
    secret: bool = False  # True -> value resolved from the SecretStore, masked
    required: bool = False  # Worker can't do useful work until this is satisfied
    label: str = ''  # Display label for CLI / GUI affordances
    description: str = ''  # Help text for CLI / GUI
    default: Optional[str]  # Visible vars only; secrets must leave this None
    options: Optional[List[str]]  # Forward seam: finite domain for a visible var (unused today)
class PluginInterface(ABC):
    """
    Abstract base class for all plugins (both local workers and remote proxies).
    
    CR-4 extended this surface with: prefetch hook (SG-19), made cleanup optional
    (SG-43), reconfigure split + RELOAD_TRIGGER declarative-helper (lifecycle
    hook split), and cooperative-cancellation primitives (SG-16 — flag + callback
    + context manager).
    
    Abstract methods: `name`, `version`, `initialize`, `execute`, `get_config_schema`,
    `get_current_config`. Concrete defaults (overridable): `execute_stream`,
    `cleanup`, `prefetch`, `reconfigure`, `cancel`, `check_cancel`,
    `register_cancel_callback`, `cancel_signal_to`, `report_progress`,
    `fields_that_changed`, `reconfigure_with_triggers`, `on_disable`, `on_enable`.
    """
    
    def name(self) -> str: # Unique identifier for this plugin
            """Unique plugin identifier."""
            ...
    
        @property
        @abstractmethod
        def version(self) -> str: # Semantic version string (e.g., "1.0.0")
        "Unique plugin identifier."
    
    def version(self) -> str: # Semantic version string (e.g., "1.0.0")
            """Plugin version."""
            ...
    
        @abstractmethod
        def initialize(
            self,
            config: Optional[Dict[str, Any]] = None # Configuration dictionary
        ) -> None
        "Plugin version."
    
    def initialize(
            self,
            config: Optional[Dict[str, Any]] = None # Configuration dictionary
        ) -> None
        "Initialize or re-configure the plugin.

CR-4: this is "first-time setup"  called once after construction with
the initial config. Substrate uses `reconfigure(old, new)` for delta
updates afterwards. Plugins predating CR-4 see no behavior change since
the default `reconfigure()` body delegates to `reconfigure_with_triggers`
which is a no-op unless the plugin opts in via RELOAD_TRIGGER metadata."
    
    def execute(
            self,
            *args,
            **kwargs
        ) -> Any: # Plugin-specific output
        "Execute the plugin's main functionality."
    
    def execute_stream(
            self,
            *args,
            **kwargs
        ) -> Generator[Any, None, None]: # Yields partial results
        "Stream execution results chunk by chunk."
    
    def get_config_schema(self) -> Dict[str, Any]: # JSON Schema for configuration
            """Return JSON Schema describing the plugin's configuration options."""
            ...
    
        @abstractmethod
        def get_current_config(self) -> Dict[str, Any]: # Current configuration values
        "Return JSON Schema describing the plugin's configuration options."
    
    def get_current_config(self) -> Dict[str, Any]: # Current configuration values
            """Return the current configuration state as a dictionary."""
            ...
    
        def get_config_options(self) -> Dict[str, "FieldOptions"]
        "Return the current configuration state as a dictionary."
    
    def get_config_options(self) -> Dict[str, "FieldOptions"]:
            """CR-11: live option domains for dynamic config fields, keyed by field name.
    
            Optional. Default: {} (fully static plugins). For fields whose valid
            domain is determined at runtime (e.g. an API model list), return a
            `FieldOptions` carrying current `ConfigOption` values + per-option
            metadata (token limits, etc.) + optional derived constraints. Runs in
            the worker subprocess (has the plugin's deps + credentials).
    
            Kept SEPARATE from get_config_schema(): the schema is static + hashed for
            CR-8 drift detection; these options are the live, un-hashed companion the
            plugin-config UI merges on top. A fetch failure should raise a typed CR-5
            error; the substrate's PluginManager.get_config_options accessor degrades
            to {} so the UI can fall back to the static schema.
            """
            return {}
    
        def cleanup(self) -> None
        "CR-11: live option domains for dynamic config fields, keyed by field name.

Optional. Default: {} (fully static plugins). For fields whose valid
domain is determined at runtime (e.g. an API model list), return a
`FieldOptions` carrying current `ConfigOption` values + per-option
metadata (token limits, etc.) + optional derived constraints. Runs in
the worker subprocess (has the plugin's deps + credentials).

Kept SEPARATE from get_config_schema(): the schema is static + hashed for
CR-8 drift detection; these options are the live, un-hashed companion the
plugin-config UI merges on top. A fetch failure should raise a typed CR-5
error; the substrate's PluginManager.get_config_options accessor degrades
to {} so the UI can fall back to the static schema."
    
    def cleanup(self) -> None:
            """Clean up resources when plugin is unloaded.
            
            CR-4: made optional (SG-43 closure). Was `@abstractmethod` before; every
            audited plugin overrode it with a near-no-op, so the default is now a
            no-op and plugin authors override only when they have non-trivial
            teardown (file handles, GPU memory, database connections). The
            substrate's worker /cleanup endpoint calls this regardless.
            """
            pass
    
        def prefetch(self) -> None
        "Clean up resources when plugin is unloaded.

CR-4: made optional (SG-43 closure). Was `@abstractmethod` before; every
audited plugin overrode it with a near-no-op, so the default is now a
no-op and plugin authors override only when they have non-trivial
teardown (file handles, GPU memory, database connections). The
substrate's worker /cleanup endpoint calls this regardless."
    
    def prefetch(self) -> None:
            """Acquire heavy resources eagerly without invoking execute().
            
            CR-4 (SG-19): default no-op. Plugin authors override when downstream
            callers benefit from eager acquisition — typically transcription /
            inference plugins that lazy-download models on first execute. The
            substrate's prefetch_plugin(name) API + worker /prefetch endpoint
            invoke this. Should be idempotent (safe to call multiple times) since
            the substrate may pre-warm at load time AND on operator request.
            """
            pass
    
        def reconfigure(
            self,
            old_config: Optional[Dict[str, Any]],  # Previous configuration values
            new_config: Optional[Dict[str, Any]],  # New configuration values to apply
        ) -> None
        "Acquire heavy resources eagerly without invoking execute().

CR-4 (SG-19): default no-op. Plugin authors override when downstream
callers benefit from eager acquisition  typically transcription /
inference plugins that lazy-download models on first execute. The
substrate's prefetch_plugin(name) API + worker /prefetch endpoint
invoke this. Should be idempotent (safe to call multiple times) since
the substrate may pre-warm at load time AND on operator request."
    
    def reconfigure(
            self,
            old_config: Optional[Dict[str, Any]],  # Previous configuration values
            new_config: Optional[Dict[str, Any]],  # New configuration values to apply
        ) -> None
        "Apply a configuration change without re-running full initialize().

CR-4 completion (2026-05-25): reconfigure is the substrate's canonical
delta path - `PluginManager.update_plugin_config` routes here, NOT through
a bare `initialize(new_config)`. It fires `_release_<trigger>` releases for
changed RELOAD_TRIGGER fields, then re-applies config (see body below).

Distinction from initialize(): initialize sets up persistent state once
after construction; reconfigure applies delta updates and is the
canonical entry point for hot-reload via the substrate's
update_plugin_config path."
    
    def fields_that_changed(
            self,
            old: Dict[str, Any],  # Previous config snapshot
            new: Dict[str, Any],  # Proposed new config snapshot
        ) -> Set[str]:  # Field names whose values differ between old and new
        "Return the set of field names whose values differ between old and new.

Includes fields present in only one dict (treated as a change to/from
the implicit None). Equality is structural via `!=`; nested dicts /
lists compare by value, not identity. Hashable-vs-unhashable values
compare correctly because we never put them in a set themselves."
    
    def reconfigure_with_triggers(
            self,
            old_config: Dict[str, Any],  # Previous configuration values
            new_config: Dict[str, Any],  # New configuration values being applied
        ) -> None
        "CR-4 helper: walk RELOAD_TRIGGER metadata + fire `_release_<trigger>` methods.

Resolution sequence:

  1. Find the plugin's `config_class` attribute (a dataclass). Absent
     means the plugin hasn't opted into the declarative pattern; the
     helper returns silently.
  2. Compute the diff between `old_config` and `new_config` via
     `fields_that_changed`.
  3. For each changed field, read its `RELOAD_TRIGGER` metadata key
     (if any) and accumulate the trigger names into a set (de-duping
     when multiple fields share a trigger).
  4. For each trigger, call `self._release_<trigger>()` if it exists
     on the instance.

Plugin authors opt in by:

  - Setting `config_class = MyConfigDataclass` as a class attribute.
  - Annotating field metadata with `{RELOAD_TRIGGER: "model"}` etc.
  - Implementing matching `_release_model(self)` instance methods.

Plugins WITHOUT config_class or RELOAD_TRIGGER metadata land here as a
no-op  safe default for the SG-T3tr migration window where the
cascade hasn't yet adopted the declarative pattern everywhere."
    
    def cancel(self) -> None:
            """Request cooperative cancellation of the current execute() call.
            
            CR-4: default sets the substrate-tracked `_cancel_requested` flag and
            fires any callbacks registered via `register_cancel_callback`.
            
            Plugin authors who need extra teardown logic (signaling a subprocess,
            closing a network connection) override `cancel()` and SHOULD call
            `super().cancel()` to preserve the flag-setting + callback-fire
            behavior. The plugin's `execute()` polls via `check_cancel()` at safe
            interruption points and unwinds when it raises `PluginCancelledError`.
            """
            self._cancel_requested = True
            for cb in list(getattr(self, "_cancel_callbacks", ()))
        "Request cooperative cancellation of the current execute() call.

CR-4: default sets the substrate-tracked `_cancel_requested` flag and
fires any callbacks registered via `register_cancel_callback`.

Plugin authors who need extra teardown logic (signaling a subprocess,
closing a network connection) override `cancel()` and SHOULD call
`super().cancel()` to preserve the flag-setting + callback-fire
behavior. The plugin's `execute()` polls via `check_cancel()` at safe
interruption points and unwinds when it raises `PluginCancelledError`."
    
    def check_cancel(self) -> None:
            """Raise `PluginCancelledError` if cancellation has been requested.
            
            CR-4 (SG-16 polling primitive): plugin authors call this at safe
            interruption points inside `execute()`. The substrate sets the flag via
            `cancel()` (typically driven by an operator's "Cancel" button); the
            next `check_cancel` call surfaces the cancellation as a typed exception
            that unwinds execute() cleanly.
            
            The substrate's worker /execute wrapper resets the flag before each
            call so cancellation doesn't leak from one job into the next.
            """
            if self._cancel_requested
        "Raise `PluginCancelledError` if cancellation has been requested.

CR-4 (SG-16 polling primitive): plugin authors call this at safe
interruption points inside `execute()`. The substrate sets the flag via
`cancel()` (typically driven by an operator's "Cancel" button); the
next `check_cancel` call surfaces the cancellation as a typed exception
that unwinds execute() cleanly.

The substrate's worker /execute wrapper resets the flag before each
call so cancellation doesn't leak from one job into the next."
    
    def register_cancel_callback(
            self,
            callback: Callable[[], None],  # Called when cancel() fires
        ) -> None
        "Register a callback that fires when cancel() is called.

CR-4 (SG-16 callback primitive): for plugins that can't easily insert
polling at strategic points (e.g., a plugin wrapping a blocking C
extension). Callbacks should be non-blocking and idempotent. Multiple
callbacks can be registered; all fire in registration order when
cancel() is invoked. A misbehaving callback that raises is logged
and skipped  the remaining callbacks still fire."
    
    def cancel_signal_to(
            self,
            callback: Callable[[], None],  # Cancellation callback scoped to the with-block
        ) -> Generator[None, None, None]
        "Context manager registering `callback` for the duration of the with-block.

Useful for binding cancellation to a finite-scope resource (a subprocess,
a network request, a temporary file handle) without needing to
deregister manually. Pairs with `register_cancel_callback` for cases
where lifetime is tied to a `try:`/`finally:` block.

Example:

    def execute(self, *args, **kwargs):
        proc = subprocess.Popen(...)
        with self.cancel_signal_to(lambda: proc.terminate()):
            return proc.wait()"
    
    def on_disable(self) -> None:
            """CR-2: signal that the substrate has marked this plugin as disabled.
            
            Worker stays alive; plugin can release heavy resources here (e.g., free
            GPU memory, close model files). The substrate fires this hook AFTER any
            in-flight job for this plugin finishes — see PluginManager.disable_plugin
            deferred-hook semantics. Default: no-op; plugins opt in by overriding.
            """
            pass
    
        def on_enable(self) -> None
        "CR-2: signal that the substrate has marked this plugin as disabled.

Worker stays alive; plugin can release heavy resources here (e.g., free
GPU memory, close model files). The substrate fires this hook AFTER any
in-flight job for this plugin finishes  see PluginManager.disable_plugin
deferred-hook semantics. Default: no-op; plugins opt in by overriding."
    
    def on_enable(self) -> None:
            """CR-2: signal that the substrate has marked this plugin as re-enabled.
            
            Plugin can eagerly re-acquire heavy resources here, or rely on lazy
            re-acquisition via the next execute() call (substrate doesn't prefer
            one strategy over the other). Default: no-op; plugins opt in by overriding.
            """
            pass
    
        def report_progress(
            self,
            progress: float, # 0.0 to 1.0, or -1.0 for indeterminate
            message: str = "" # Descriptive status message
        ) -> None
        "CR-2: signal that the substrate has marked this plugin as re-enabled.

Plugin can eagerly re-acquire heavy resources here, or rely on lazy
re-acquisition via the next execute() call (substrate doesn't prefer
one strategy over the other). Default: no-op; plugins opt in by overriding."
    
    def report_progress(
            self,
            progress: float, # 0.0 to 1.0, or -1.0 for indeterminate
            message: str = "" # Descriptive status message
        ) -> None
        "Report execution progress. Call during execute() to update status."
    
    def report_usage(
            self,
            usage: Dict[str, float],  # Measured usage for this execute, keyed by plugin-defined unit name
        ) -> None
        "SG-54: report measured API/service usage for the current execute() call.

Unit-agnostic by design  the plugin (which holds the API response)
supplies whatever unit names it measures: {"input_tokens": .., "output_tokens": ..}
for an LLM, {"pages": ..} for OCR, {"characters": ..} for TTS,
{"credits"/"requests"/"minutes": ..} for others. The substrate stores +
accumulates per unit name WITHOUT interpreting them (summed across runs in
the empirical store's api_usage_totals). Pricing is deliberately NOT here
(volatile, per-service, often not API-accessible)  a consumer-side rate
table turns raw units into cost. "Derive from behaviour": the plugin
MEASURES actual usage from the response; the substrate aggregates.

Stored on `self._last_api_usage`; the worker exposes it via /stats and the
substrate folds it into the post-execute ResourceSample. The worker resets
it before each execute so a failed/usage-less call can't inherit stale
usage. Default: store-only (parallel to report_progress)."
class _CR4MinimalPlugin(PluginInterface):
    "Concrete plugin satisfying abstracts; relies on CR-4 default cleanup()."
    
    def name(self) -> str: return "cr4-minimal"
        @property
        def version(self) -> str: return "0.0.0"
    
    def version(self) -> str: return "0.0.0"
        def initialize(self, config=None): self._cfg = dict(config or {})
    
    def initialize(self, config=None): self._cfg = dict(config or {})
        def execute(self, *args, **kwargs): return None
    
    def execute(self, *args, **kwargs): return None
        def get_config_schema(self) -> Dict[str, Any]: return {}
    
    def get_config_schema(self) -> Dict[str, Any]: return {}
        def get_current_config(self) -> Dict[str, Any]: return dict(getattr(self, "_cfg", {}))
    
    def get_current_config(self) -> Dict[str, Any]: return dict(getattr(self, "_cfg", {}))

Variables

RELOAD_TRIGGER = 'reload_trigger'

Plugin Manager (manager.ipynb)

Plugin discovery, loading, and lifecycle management system

Import

from cjm_plugin_system.core.manager import (
    PluginManager,
    PluginBinding
)

Functions

def register_system_monitor(
    self,
    plugin_name:str # Name of the system monitor plugin
) -> None
    "Bind a loaded plugin to act as the hardware system monitor."
def _get_global_stats(self) -> Dict[str, Any]: # Current system telemetry
    """Fetch real-time stats from the system monitor plugin (sync).
    
    CR-3: prefer typed `get_system_status()` over magic-string dispatcher.
    Duck-types because the substrate references `system_monitor` as a
    generic `PluginInterface` — CR-1's host-no-imports rule means substrate
    does not import `cjm-infra-plugin-system` to type-narrow the reference.
    Proxies after CR-3 expose `get_system_status` as a bound method that
    POSTs to `/get_system_status` and returns `Optional[Dict[str, Any]]`.
    """
    if not self.system_monitor
    """
    Fetch real-time stats from the system monitor plugin (sync).
    
    CR-3: prefer typed `get_system_status()` over magic-string dispatcher.
    Duck-types because the substrate references `system_monitor` as a
    generic `PluginInterface` — CR-1's host-no-imports rule means substrate
    does not import `cjm-infra-plugin-system` to type-narrow the reference.
    Proxies after CR-3 expose `get_system_status` as a bound method that
    POSTs to `/get_system_status` and returns `Optional[Dict[str, Any]]`.
    """
async def _get_global_stats_async(self) -> Dict[str, Any]: # Current system telemetry
    """Fetch real-time stats from the system monitor plugin (async).
    
    Same CR-3 duck-type semantics as the sync variant. Async variant exists
    because the substrate's `execute_plugin_async` path (CR-2 + CR-10) needs
    a non-blocking stats fetch when scheduling under an asyncio event loop.
    """
    if not self.system_monitor
    """
    Fetch real-time stats from the system monitor plugin (async).
    
    Same CR-3 duck-type semantics as the sync variant. Async variant exists
    because the substrate's `execute_plugin_async` path (CR-2 + CR-10) needs
    a non-blocking stats fetch when scheduling under an asyncio event loop.
    """
def _check_interface_fqn(
    self,
    iface_fqn:str, # Interface FQN string from the manifest
    plugin_name:str # For error messages
) -> bool: # True if FQN passes the format check
    """
    SG-7: sanity-check the interface FQN format at discovery time.
    
    Substrate cannot actually import every interface library (the host
    env doesn't carry them all), so this is a format-only check. The
    authoritative import check happens at install time in
    _generate_manifest's introspection script.
    """
def _parse_taxonomy(
    self,
    manifest: Dict[str, Any]  # Loaded manifest dict
) -> Optional[PluginTaxonomy]
    """
    CR-1: parse the manifest's taxonomy block into a PluginTaxonomy.
    
    Returns None when the manifest predates CR-1 (no taxonomy block).
    Used for callers that have a flat dict (post-CR-8, discover_manifests
    uses load_manifest's typed PluginTaxonomy directly and bypasses this).
    """
def _parse_resources(
    self,
    manifest: Dict[str, Any]  # Loaded manifest dict
) -> Optional[ResourceRequirements]
    "Phase 5a: parse the manifest's resources block into a ResourceRequirements."
def _derive_category(
    self,
    manifest: Dict[str, Any],  # Loaded manifest dict (flat-shaped)
    taxonomy: Optional[PluginTaxonomy]  # Parsed taxonomy (or None for legacy manifests)
) -> str
    """
    CR-1: derive the human-readable category label.
    
    Resolution: category_override > Title-Case of taxonomy.domain (with
    underscores → spaces) > legacy `category` field > empty.
    `category_override` lives at top level on v1.0 flat manifests only
    (v2.0 layout has no slot — CR-1 closed decision deferred a custom
    label channel until a concrete consumer asks for one).
    """
def discover_manifests(self) -> List[PluginMeta]: # List of discovered plugin metadata
    """Discover plugins via JSON manifests in search paths.
    
    CR-8: reads each manifest via `load_manifest`, which transparently parses
    both v2.0 nested + legacy v1.0 flat layouts into a typed `ManifestV2`.
    `meta.manifest` is set to a flat-shaped dict view so existing consumers
    (proxy, scheduling, execute path) continue working unchanged; the typed
    `ManifestV2` is also attached as `meta.manifest_v2` so drift detection
    + future typed callers can read `drift_tracking.config_schema_hash`
    without re-parsing.
    """
    self.discovered = []
    seen_plugins = set()

    for base_path in self.search_paths
    """
    Discover plugins via JSON manifests in search paths.
    
    CR-8: reads each manifest via `load_manifest`, which transparently parses
    both v2.0 nested + legacy v1.0 flat layouts into a typed `ManifestV2`.
    `meta.manifest` is set to a flat-shaped dict view so existing consumers
    (proxy, scheduling, execute path) continue working unchanged; the typed
    `ManifestV2` is also attached as `meta.manifest_v2` so drift detection
    + future typed callers can read `drift_tracking.config_schema_hash`
    without re-parsing.
    """
def get_discovered_by_category(
    self,
    category:str # Category to filter by (e.g., "transcription")
) -> List[PluginMeta]: # List of matching discovered plugins
    "Get discovered plugins filtered by category."
def get_plugins_by_category(
    self,
    category:str # Category to filter by (e.g., "transcription")
) -> List[PluginMeta]: # List of matching loaded plugins
    "Get loaded plugins filtered by category."
def get_discovered_categories(self) -> List[str]: # List of unique categories
    "Get all unique categories among discovered plugins."
def get_loaded_categories(self) -> List[str]: # List of unique categories
    "Get all unique categories among loaded plugins."
def get_plugin_meta(
    self,
    plugin_name:str # Name of the plugin
) -> Optional[PluginMeta]: # Plugin metadata or None
    "Get metadata for a loaded plugin by name."
def get_discovered_meta(
    self,
    plugin_name:str # Name of the plugin
) -> Optional[PluginMeta]: # Plugin metadata or None
    "Get metadata for a discovered (not necessarily loaded) plugin by name."
def _extract_defaults_from_schema(
    self,
    config_schema:Optional[Dict[str, Any]] # JSON Schema with properties
) -> Dict[str, Any]: # Default values extracted from schema
    "Extract default values from a JSON Schema's properties."
def _validate_config_against_schema(
    """
    SG-5: validate a config dict against the manifest's `config_schema`
    before forwarding to the plugin's `initialize()`.
    """
def _check_config_schema_drift(
    self,
    proxy:Any, # RemotePluginProxy with a live worker
    plugin_meta:PluginMeta, # Metadata to flag if drift is detected
) -> None
    """
    SG-9 + CR-8: compare live worker `/config_schema` to the stored hash.
    
    Reads the stored hash from `plugin_meta.manifest_v2.drift_tracking.config_schema_hash`
    (populated by `discover_manifests`). Computes the live hash with
    `compute_config_schema_hash` and compares — drift = hashes differ.
    
    Honors `cfg.substrate.drift_detection` opt-out from `cjm.yaml`: hosts
    that don't want the per-load `/config_schema` HTTP call can disable
    detection there. Default is on.
    
    Test fixtures that stub `meta.manifest = {}` without going through
    `discover_manifests` won't have a `manifest_v2` attribute; the
    `getattr(..., None)` fallback yields `stored_hash=None`, which doesn't
    match any live hash — those tests don't expose a real proxy so the
    drift warning fires harmlessly.
    """
def _persist_config(
    self,
    plugin_name: str  # Plugin to persist
) -> None
    """
    CR-2: write current PluginMeta state + live worker config to the store.
    
    Reads `meta.enabled` (the substrate-authoritative flag) and the worker's
    current_config (when reachable). Failures are logged + swallowed —
    persistence is a best-effort side-channel, not a correctness invariant.
    """
def _maybe_fire_disable_hook(
    self,
    name_or_id: str  # instance_id (or legacy plugin_name) whose in-flight job just finished
) -> None
    """
    CR-2 + CR-10: fire deferred on_disable for `name_or_id` if pending.
    
    Idempotent. Resolves via self.instances first; falls back to
    self.plugins[name].instance for legacy code paths.
    """
def _validate_instance_id(self, instance_id: str) -> None:
    """Reject malformed explicit instance_ids at load time.
    
    Pattern: alphanumeric + underscore + hyphen, length 1..64. Raises
    ValueError on invalid input so the caller sees the constraint failure
    immediately rather than at first execute / unload.
    """
    import re as _re
    if not isinstance(instance_id, str)
    """
    Reject malformed explicit instance_ids at load time.
    
    Pattern: alphanumeric + underscore + hyphen, length 1..64. Raises
    ValueError on invalid input so the caller sees the constraint failure
    immediately rather than at first execute / unload.
    """
def _generate_instance_id(self, plugin_name: str) -> str:
    """Generate a unique instance_id of form `{plugin_name}-{6-char-hex}`.
    
    Used when load_plugin is called with new_instance=True and no explicit
    instance_id. Retries up to 16 times if a collision occurs in self.instances.
    """
    import secrets as _secrets
    for _ in range(16)
    """
    Generate a unique instance_id of form `{plugin_name}-{6-char-hex}`.
    
    Used when load_plugin is called with new_instance=True and no explicit
    instance_id. Retries up to 16 times if a collision occurs in self.instances.
    """
def get_instance(
    self,
    name_or_id: str  # Plugin name (default-loaded) or explicit instance_id
) -> Optional[PluginInstance]
    """
    Return the PluginInstance for `name_or_id`, or None if not loaded.
    
    Lookup is keyed by instance_id (which equals plugin_name for default-
    loaded plugins). Multi-instance IDs only exist in self.instances.
    """
def list_instances(
    "List all loaded instances, optionally filtered by underlying plugin name."
def _worker_env_specs(
    self,
    plugin_meta: PluginMeta  # Plugin whose WORKER_ENV contract to read
) -> List[Dict[str, Any]]:  # List of EnvVarSpec-as-dict entries (possibly empty)
    """
    Return a plugin's WORKER_ENV contract as spec dicts (CR-12).
    
    Prefers the typed manifest_v2 code section; falls back to the flat manifest
    dict view. Empty list when the plugin declares no worker-env contract.
    """
def _resolve_worker_env(
    self,
    plugin_meta: PluginMeta,        # Plugin being loaded
    scope: Optional[str] = None     # SG-55 forward seam: per-principal scope (None = single-user)
) -> Dict[str, str]:  # {ENV_NAME: value} overlay injected into the worker at spawn
    """
    CR-12: compose the resolved worker-env overlay for a load.
    
    Secrets resolve from the SecretStore keyed by plugin_name — so every
    instance of a plugin shares one credential (CR-10: two Gemini instances,
    one GEMINI_API_KEY). A missing secret is OMITTED (the worker spawns without
    it; the plugin reports the gap at execute) rather than injected empty.
    
    Visible vars resolve from their declared `default` (the operator-override
    store is a deferred source; the manifest's static `install.env_vars` already
    injects fixed visible defaults via the proxy, so this only adds WORKER_ENV
    defaults not already covered there). All values are fixed at spawn — a
    change requires `reload_plugin`.
    """
def get_worker_env_status(
    self,
    name_or_meta: Any,              # Plugin name (loaded/discovered) or a PluginMeta
    scope: Optional[str] = None     # SG-55 forward seam
) -> List[Dict[str, Any]]:  # Per-entry status dicts (secret values never returned)
    """
    CR-12: per-entry satisfaction status of a plugin's worker-env contract.
    
    Each entry: {name, secret, required, satisfied, label, description}.
    `satisfied` means a value is resolvable (secret present in the store, or a
    visible var has a default/override). Secret VALUES are never returned — only
    whether one is set. The plugin-config UI uses this to gate config display on
    required secrets being satisfied.
    """
def missing_required_env(
    self,
    name_or_meta: Any,              # Plugin name or PluginMeta
    scope: Optional[str] = None     # SG-55 forward seam
) -> List[str]:  # Names of required worker-env entries with no resolvable value
    "CR-12: names of required worker-env entries that are unsatisfied."
def set_plugin_secret(
    self,
    name_or_id: str,             # Plugin name or instance_id whose secret to set
    key: str,                    # Secret key (the env-var name, e.g. "GEMINI_API_KEY")
    value: str,                  # Secret value (stored via the SecretStore, never config/logs)
    *,
    scope: Optional[str] = None, # SG-55 forward seam: per-principal scope
    reload: bool = True          # Respawn loaded worker(s) so the new env is injected
) -> bool:  # True if the secret was stored
    """
    CR-12: store a plugin secret, then respawn its worker(s) to inject it.
    
    Secrets are keyed by the underlying PLUGIN name (not instance_id), so all
    instances of a plugin share one credential — set the Gemini key once and
    every Gemini instance gets it at (re)spawn. Because worker env is fixed at
    spawn, the new value only reaches a *running* worker via a RESPAWN, so this
    reloads each loaded instance of the plugin (unless `reload=False`, e.g. when
    provisioning a secret before the plugin is loaded). This is the
    actuation seam both the CLI (`cjm-ctl set-secret`) and a future config UI
    call. Reload failures are logged, not raised.
    """
def load_plugin(
    self,
    plugin_meta:PluginMeta, # Plugin metadata (with manifest attached)
    config:Optional[Dict[str, Any]]=None, # Initial configuration
    strict:bool=True, # SG-5: reject unknown keys against manifest config_schema (default)
    instance_id:Optional[str]=None, # CR-10: explicit instance_id; None defaults to plugin_name
    new_instance:bool=False, # CR-10: auto-generate `{name}-{hex}` instance_id (with instance_id=None)
    max_concurrent_requests:Optional[int]=None # SG-33 (CR-7): per-instance async concurrency cap; None = unbounded
) -> bool: # True if successfully loaded
    """
    Load a plugin by spawning a Worker subprocess.
    
    CR-2: reads the persisted PluginConfigRecord from `self.config_store`
    before launching the worker. If a persisted record exists and the
    caller didn't pass an explicit config, the persisted config is used
    as the effective input. The persisted `enabled` flag is applied to
    `plugin_meta.enabled` so disabled plugins stay disabled across
    process restarts.
    
    CR-10: optional `instance_id` allows multi-instance loading.
    - instance_id=None, new_instance=False (default): instance_id =
      plugin_meta.name. Populates self.plugins[plugin_name] + self.instances
      [plugin_name] together (single-instance backward compat).
    - instance_id="custom": validated against `[A-Za-z0-9_-]{1,64}`. Populates
      self.instances[custom]. Persistence is keyed by plugin_name and only
      applied to the default instance.
    - instance_id=None, new_instance=True: auto-generates `{name}-{6-hex}`.
    Idempotent: re-load against an existing instance_id returns True without
    re-spawning.
    
    CR-7: computes `config_hash` from the effective config (post-defaults +
    post-validation) and stores it on the PluginInstance so execute_plugin*
    can key empirical samples by (instance_id, config_hash). SG-33 stores
    `max_concurrent_requests` on the instance — the actual asyncio.Semaphore
    is lazy-created in execute_plugin_async via `_get_concurrent_limiter`.
    """
def load_all(
    self,
    configs:Optional[Dict[str, Dict[str, Any]]]=None # Plugin name -> config mapping
) -> Dict[str, bool]: # Plugin name -> success mapping
    "Discover and load all available plugins."
def unload_plugin(
    self,
    name_or_id:str # Plugin name (default-loaded) or instance_id (multi-instance)
) -> bool: # True if successfully unloaded
    """
    Unload a plugin instance and terminate its Worker process (CR-10).
    
    If name_or_id resolves to the default instance (instance_id == plugin_name)
    and no other instances remain for the same plugin, also removes the
    PluginMeta from self.plugins. Otherwise removes only the instance and
    clears PluginMeta.instance if it pointed at the unloaded canonical.
    """
def unload_all(self) -> None:
    """Unload all plugin instances and terminate all Worker processes (CR-10).
    
    Iterates self.instances (CR-10 keying) rather than self.plugins so all
    multi-instance entries get torn down, not just the canonical instances.
    """
    for inst_id in list(self.instances.keys())
    """
    Unload all plugin instances and terminate all Worker processes (CR-10).
    
    Iterates self.instances (CR-10 keying) rather than self.plugins so all
    multi-instance entries get torn down, not just the canonical instances.
    """
def get_plugin(
    self,
    name_or_id:str # Plugin name (default-loaded) or instance_id (multi-instance)
) -> Optional[PluginInterface]: # Plugin proxy instance or None
    """
    Get a loaded plugin's proxy by name or instance_id (CR-10).
    
    Lookup order: self.instances first (covers both default plugin_name and
    multi-instance IDs), falling back to PluginMeta.instance for any
    legacy code path that populated self.plugins without self.instances
    (defensive — shouldn't happen post-CR-10 since load_plugin always
    records the instance).
    """
def list_plugins(self) -> List[PluginMeta]: # List of loaded plugin metadata
    "List all loaded plugins."
def _get_sysmon_plugin(self) -> Optional[Any]:
    """Resolve the configured MonitorPlugin (CR-3) for GPU subtree attribution.

    Returns the loaded plugin instance keyed by `sysmon_plugin_name`, or
    None when no sysmon is configured / hasn't been loaded yet. Lazy
    resolution against `self.plugins` tolerates load-order: the manager
    can be constructed before the sysmon plugin is loaded; later
    `_record_sample_safe` calls pick it up automatically.
    """
    name = getattr(self, "_sysmon_plugin_name", None)
    if not name
    """
    Resolve the configured MonitorPlugin (CR-3) for GPU subtree attribution.
    
    Returns the loaded plugin instance keyed by `sysmon_plugin_name`, or
    None when no sysmon is configured / hasn't been loaded yet. Lazy
    resolution against `self.plugins` tolerates load-order: the manager
    can be constructed before the sysmon plugin is loaded; later
    `_record_sample_safe` calls pick it up automatically.
    """
def _record_sample_safe(self, inst:PluginInstance, start_time:float, success:bool) -> None
    """
    CR-7: best-effort empirical sample recording.
    
    Captures worker stats at end-of-execute (proxy of peak), builds a
    ResourceSample, and records it via the EmpiricalResourceStore. Failures
    log + swallow — sample recording must never break the execute path
    (matches CR-2's `_persist_config` best-effort discipline).
    
    Stats fetch can fail naturally (e.g. worker died with WorkerOOMError —
    the proxy is unreachable). The sample still records with zero stats +
    success=False so we have a record of the failed attempt for the
    success_rate aggregate.
    
    GPU memory is attributed across the worker's process subtree via
    `attribute_gpu_to_worker_subtree` (intersecting worker-reported
    `subtree_pids` with sysmon's per-PID GPU enumeration). Pre-fix this
    function read `worker_stats["gpu_memory_mb"]` — a key the worker `/stats`
    endpoint NEVER emits — so EmpiricalResourceRecord.gpu_memory_mb_peak_max
    was silently 0 for every plugin since CR-7 shipped, not just for
    subprocess-spawning ones. When no sysmon is configured, GPU memory
    records as 0.0 (honest signal that we can't measure it).
    """
def _get_concurrent_limiter(self, instance_id:str) -> Optional[asyncio.Semaphore]:
    """SG-33 (CR-7): lazy-create the per-instance asyncio.Semaphore.
    
    Returns None when the instance has no `max_concurrent_requests` set (the
    default — unbounded). Otherwise creates the semaphore on first call and
    caches it in `self._concurrent_limiters`. Semaphores are bound to the
    event loop they were created in; lazy creation inside `execute_plugin_async`
    ensures we're inside the right loop at construction time (Python 3.10+
    semaphore-loop-binding rules).
    
    Defensive: returns None if the manager was constructed via __new__ without
    `_concurrent_limiters` being populated (test-fixture pattern).
    """
    limiters = getattr(self, '_concurrent_limiters', None)
    if limiters is None
    """
    SG-33 (CR-7): lazy-create the per-instance asyncio.Semaphore.
    
    Returns None when the instance has no `max_concurrent_requests` set (the
    default — unbounded). Otherwise creates the semaphore on first call and
    caches it in `self._concurrent_limiters`. Semaphores are bound to the
    event loop they were created in; lazy creation inside `execute_plugin_async`
    ensures we're inside the right loop at construction time (Python 3.10+
    semaphore-loop-binding rules).
    
    Defensive: returns None if the manager was constructed via __new__ without
    `_concurrent_limiters` being populated (test-fixture pattern).
    """
def _reactive_evict_for(
    self,
    needed_meta:PluginMeta,
    shortfall:Optional[Any]=None,  # Optional ResourceShortfall from Track B; informational only
) -> bool
    """
    CR-7: try to free resources after a PluginResourceError during execute.
    
    Wraps `_evict_for_resources` with reactive-flow logging. `_evict_for_resources`
    itself extends to multi-axis + cost-aware candidate selection (drops the
    GPU-only filter, prefers evicting empirically-expensive idle plugins).
    
    `shortfall` is recorded for log context but doesn't currently steer
    candidate selection beyond what _evict_for_resources already does via
    its `needed_meta.resources` check. A future enhancement could pass it
    through for axis-specific candidate filtering.
    """
def _evict_for_resources(self, needed_meta:PluginMeta) -> bool
    """
    Attempt to free resources by unloading/releasing idle plugins (LRU).
    
    CR-7: extended from GPU-only LRU to multi-axis cost-aware eviction.
    - Candidate set: any loaded plugin that isn't the one we're allocating
      for (drops the pre-CR-7 `requires_gpu` filter).
    - Sort key: primary = idle (older last_executed first, classic LRU);
      secondary = empirical cost when available (highest peak gets evicted
      first among equally-idle candidates). Cost axis follows the needed
      plugin's `resources.requires_gpu` flag — GPU peak when we're freeing
      for a GPU plugin, system memory peak otherwise.
    
    Without empirical data (no store / unmeasured plugin), the secondary
    key is 0.0 and pure LRU applies. Cost-aware selection is opt-in via
    `empirical_tracking: true`.
    """
def execute_plugin(
    self,
    name_or_id:str, # Plugin name (default-loaded) or instance_id (multi-instance)
    *args,
    **kwargs
) -> Any: # Plugin result
    """
    Execute a plugin instance's main functionality (sync).
    
    CR-10: resolves `name_or_id` via self.instances; per-instance enabled
    flag gates execution. `_running_executions` tracks by instance_id so
    concurrent multi-instance executes don't collide.
    
    CR-2: raises PluginDisabledError (typed) when the instance is disabled.
    
    CR-7: reactive retry on PluginResourceError — evicts other plugins to
    free resources, then ALWAYS reloads the failing plugin's worker before
    the retry attempt. Track A (WorkerOOMError — worker died from SIGKILL)
    needs the reload because there's no live worker to retry on. Track B
    (plugin-raised PluginResourceError — worker still alive) ALSO reloads
    because PyTorch's CUDA caching allocator can fragment post-OOM in ways
    the plugin can't clean up from within its own process; a fresh worker
    is the only reliable reset. Bounded by `self.max_retries` (default 1).
    Empirical sample recorded in the finally block — best-effort, doesn't
    break execute on failure.
    """
async def execute_plugin_async(
    self,
    name_or_id:str, # Plugin name (default-loaded) or instance_id (multi-instance)
    *args,
    **kwargs
) -> Any: # Plugin result
    """
    Execute a plugin instance's main functionality (async).
    
    CR-10 + CR-2: same semantics as execute_plugin, async-flavored. Scheduler
    allocation goes through allocate_async for non-blocking polling.
    
    CR-7 + SG-33: reactive retry on PluginResourceError — always reloads
    before retry (Track A + Track B converge on the same reload path; see
    sync variant docstring for the rationale). Per-instance asyncio.Semaphore
    enforces the `max_concurrent_requests` cap (None = unbounded). Empirical
    sample recorded in the finally block.
    """
def enable_plugin(
    self,
    name_or_id:str # Plugin name (default instance) or instance_id (multi-instance)
) -> bool: # True if instance was enabled
    """
    Enable a plugin instance (CR-10 multi-instance aware).
    
    CR-2: persists the new state via `config_store` (default-instance only;
    persistence is per-plugin, not per-instance) and fires the plugin's
    on_enable hook on state-change. Idempotent for already-enabled instances.
    """
def disable_plugin(
    self,
    name_or_id:str # Plugin name (default instance) or instance_id (multi-instance)
) -> bool: # True if instance was disabled
    """
    Disable a plugin instance without unloading it (CR-10 multi-instance aware).
    
    CR-2: persists the new state (default-instance only) and fires the
    plugin's on_disable hook — but defers the hook until any in-flight job
    for THIS instance finishes (the per-instance `_running_executions` key
    is the instance_id, so a concurrent execute on a different instance of
    the same plugin doesn't gate this instance's hook).
    """
def get_plugin_logs(
    self,
    plugin_name:str, # Name of the plugin
    lines:int=50 # Number of lines to return
) -> str: # Log content
    "Read the last N lines of the plugin's log file."
def get_plugin_config(
    self,
    plugin_name: str # Name of the plugin
) -> Optional[Dict[str, Any]]: # Current configuration or None
    "Get the current configuration of a plugin."
def get_plugin_config_schema(
    self,
    plugin_name: str # Name of the plugin
) -> Optional[Dict[str, Any]]: # JSON Schema or None
    "Get the configuration JSON Schema for a plugin."
def get_config_options(
    self,
    name_or_id: str # Plugin name (default instance) or instance_id (multi-instance)
) -> Dict[str, Any]: # CR-11: live config option domains, or {} if unavailable
    """
    Get a plugin instance's runtime config option providers (CR-11).
    
    Forwards to the worker's get_config_options() - live enum domains +
    per-option metadata for dynamic config fields (e.g. an API model list).
    Kept separate from get_plugin_config_schema (static, hashed for CR-8 drift);
    these options are the live companion the plugin-config UI merges on top.
    
    Degrades to {} if the instance is missing or the worker call fails - the UI
    then falls back to the static schema. Typed-error surfacing for the UI
    consumer is deferred to the plugin-config UI library (Path C Step 4).
    """
def get_all_plugin_configs(self) -> Dict[str, Dict[str, Any]]: # Plugin name -> config mapping
    """Get current configuration for all loaded plugins."""
    return {
        name: plugin.get_current_config()
    "Get current configuration for all loaded plugins."
def update_plugin_config(
    self,
    name_or_id: str, # Plugin name (default instance) or instance_id (multi-instance)
    config: Dict[str, Any], # New configuration values
    strict: bool = True # SG-5: reject unknown keys against manifest config_schema (default)
) -> bool: # True if successful
    """
    Update a plugin instance's configuration (CR-10 multi-instance aware).
    
    CR-2: on successful reconfigure, persists the new config (default instance
    only; multi-instance loads don't persist). Per-instance `inst.config` is
    updated regardless.
    SG-5: validates against the underlying plugin's config_schema (per-plugin,
    not per-instance, so all instances share the same schema).
    """
def reload_plugin(
    self,
    name_or_id: str, # Plugin name (default instance) or instance_id (multi-instance)
    config: Optional[Dict[str, Any]] = None # Optional new configuration
) -> bool: # True if successful
    "Reload a plugin instance by terminating and restarting its Worker (CR-10)."
def get_plugin_stats(
    self,
    name_or_id: str # Plugin name (default instance) or instance_id (multi-instance)
) -> Optional[Dict[str, Any]]: # Resource telemetry or None
    "Get resource usage stats for a plugin instance's Worker process (CR-10)."
async def execute_plugin_stream(
    self,
    name_or_id: str,  # Plugin name (default instance) or instance_id (multi-instance)
    *args,
    **kwargs
) -> AsyncGenerator[Any, None]:  # Async generator yielding results
    """
    Execute a plugin instance with streaming response (CR-10 multi-instance aware).
    
    Same per-instance resolution as execute_plugin_async; scheduler allocation
    keys off the PluginMeta (plugin-level), execution + bookkeeping key off
    the PluginInstance (per-instance).
    """
async def load_plugin_async(
    self,
    plugin_meta: PluginMeta,
    config: Optional[Dict[str, Any]] = None,
    strict: bool = True,
    instance_id: Optional[str] = None,
    new_instance: bool = False,
) -> bool
    """
    Async variant of `load_plugin` (CR-10b).
    
    Runs the existing sync `load_plugin` via `asyncio.to_thread` so the
    blocking proxy spawn + `_wait_for_ready` doesn't stall the event loop.
    Backward compat: identical behavior to the sync method, just non-blocking.
    """
async def unload_plugin_async(
    self,
    name_or_id: str,
) -> bool
    "Async variant of `unload_plugin` (CR-10b)."
def _spec_requested_key(spec: PluginLoadSpec, index: int) -> str:
    """Derive the dict key the load_plugins_concurrent result uses for `spec`.
    
    Resolution: explicit `instance_id` > `meta.name` + `#new[{index}]` suffix
    for ambiguous new_instance=True specs > `meta.name`. The suffix prevents
    key collision when multiple specs request a new instance of the same plugin
    without explicit instance_ids.
    """
    if spec.instance_id is not None
    """
    Derive the dict key the load_plugins_concurrent result uses for `spec`.
    
    Resolution: explicit `instance_id` > `meta.name` + `#new[{index}]` suffix
    for ambiguous new_instance=True specs > `meta.name`. The suffix prevents
    key collision when multiple specs request a new instance of the same plugin
    without explicit instance_ids.
    """
async def load_plugins_concurrent(
    self,
    specs: List[PluginLoadSpec],  # Per-plugin load specifications
    max_concurrency: Optional[int] = None,  # Cap simultaneous loads; None = unbounded
    fail_fast: bool = False,  # Re-raise first exception (default: collect all results)
) -> Dict[str, Union[str, Exception]]:  # requested_key → instance_id or Exception
    """
    CR-10b: fan out plugin loads concurrently via asyncio.gather.
    
    Each spec is loaded via `load_plugin_async` (`asyncio.to_thread` under the
    hood). The total wall-clock drops from sum-of-spawns to max-of-spawns when
    `max_concurrency=None`. Capped concurrency uses an asyncio.Semaphore.
    
    Result keys come from `_spec_requested_key`: explicit `instance_id` if set,
    `{plugin_name}#new[{index}]` for ambiguous new_instance specs, else
    `plugin_name`. Successful entries map to the resolved instance_id (string);
    failures map to the raised exception (caught regardless of fail_fast value
    for non-fail-fast mode; re-raised in fail_fast=True).
    """
async def unload_plugins_concurrent(
    self,
    name_or_ids: List[str],  # Plugin names or instance_ids to unload
    max_concurrency: Optional[int] = None,
    fail_fast: bool = False,
) -> Dict[str, Union[bool, Exception]]:  # name_or_id → True or Exception
    """
    CR-10b: fan out plugin unloads concurrently via asyncio.gather.
    
    Same concurrency + fail_fast semantics as load_plugins_concurrent. Result
    keys are the input `name_or_ids` (deduplication is the caller's
    responsibility; duplicate inputs produce one dict entry per unique key).
    """
def bind(
    self,
    plugin_name: str,  # Name of the plugin to pre-bind
    default_config: Optional[Dict[str, Any]] = None  # Default config used by binding.load()
) -> PluginBinding:  # Bound view ready for instance-style use
    "Create a PluginBinding pre-bound to this manager + plugin_name."
def get_by_role(
    self,
    role: str  # Interface class name segment of the FQCN (e.g., "TranscriptionPlugin")
) -> List[PluginMeta]:  # Discovered plugins matching the role
    "CR-1: return discovered plugins implementing the given interface role."
def get_by_domain(
    self,
    domain: str  # Domain segment of the taxonomy (e.g., "transcription")
) -> List[PluginMeta]:  # Discovered plugins in the domain
    "CR-1: return discovered plugins in the given domain."
def get_canonical(
    self,
    role: str  # Interface class name to look up
) -> Optional[PluginMeta]:  # The unique matching plugin or None
    """
    CR-1: return the single canonical plugin for a role.
    
    Returns None if zero or multiple plugins implement the role — useful for
    substrate-internal use cases (e.g., the graph storage plugin) where the
    expectation is exactly one implementation. Callers that want
    multi-implementation handling use `get_by_role()` directly.
    
    Multi-match is logged at WARNING level because it's a substrate-visible
    configuration-time surprise — without the warning, the caller's None-handling
    branch can't distinguish "no plugins installed for this role" from
    "multiple plugins competing for an exactly-one role." Zero-match is silent
    because absence-of-optional-plugin is a normal probe outcome.
    """
def get_compatible_for_current_platform(self) -> List[PluginMeta]:  # Plugins compatible with current platform
    """Phase 5a: return discovered plugins compatible with the host platform.
    
    Filters by `resources.platforms`. Plugins with an empty (or absent)
    platforms list are considered universally compatible — that's the
    introspection-time convention when a plugin author didn't declare a
    platform constraint. Plugins lacking the entire `resources` block
    (legacy / pre-Phase-5a manifests) also pass through as universal.
    
    Does NOT filter on `requires_gpu` — substrate doesn't know whether a
    GPU is present without invoking a system monitor plugin. Callers gate
    on GPU availability separately if needed.
    """
    # Late import: platform module brings in subprocess + json; defer to call time.
    """
    Phase 5a: return discovered plugins compatible with the host platform.
    
    Filters by `resources.platforms`. Plugins with an empty (or absent)
    platforms list are considered universally compatible — that's the
    introspection-time convention when a plugin author didn't declare a
    platform constraint. Plugins lacking the entire `resources` block
    (legacy / pre-Phase-5a manifests) also pass through as universal.
    
    Does NOT filter on `requires_gpu` — substrate doesn't know whether a
    GPU is present without invoking a system monitor plugin. Callers gate
    on GPU availability separately if needed.
    """

Classes

class PluginManager:
    def __init__(
        self,
        plugin_interface:Type[PluginInterface]=PluginInterface, # Base interface for type checking
        search_paths:Optional[List[Path]]=None, # Custom manifest search paths
        scheduler:Optional[ResourceScheduler]=None, # Resource allocation policy
        config_store:Optional[PluginConfigStore]=None, # CR-2: persistence backend; lazy LocalPluginConfigStore default per OQ-4
        empirical_store:Optional[EmpiricalResourceStore]=None, # CR-7: resource-usage tracking backend; lazy LocalEmpiricalResourceStore when cfg.substrate.empirical_tracking
        secret_store:Optional[SecretStore]=None, # CR-12: secret backend; lazy LocalSecretStore default (project-local <data_dir>/secrets)
        max_retries:int=1, # CR-7: how many reactive retries to attempt on PluginResourceError (default 1 — one retry after eviction)
        sysmon_plugin_name:Optional[str]=None # MonitorPlugin (CR-3) name for GPU subtree attribution; default-None records skip GPU attribution (compute axis only)
    )
    "Manages plugin discovery, loading, and lifecycle via process isolation."
    
    def __init__(
            self,
            plugin_interface:Type[PluginInterface]=PluginInterface, # Base interface for type checking
            search_paths:Optional[List[Path]]=None, # Custom manifest search paths
            scheduler:Optional[ResourceScheduler]=None, # Resource allocation policy
            config_store:Optional[PluginConfigStore]=None, # CR-2: persistence backend; lazy LocalPluginConfigStore default per OQ-4
            empirical_store:Optional[EmpiricalResourceStore]=None, # CR-7: resource-usage tracking backend; lazy LocalEmpiricalResourceStore when cfg.substrate.empirical_tracking
            secret_store:Optional[SecretStore]=None, # CR-12: secret backend; lazy LocalSecretStore default (project-local <data_dir>/secrets)
            max_retries:int=1, # CR-7: how many reactive retries to attempt on PluginResourceError (default 1 — one retry after eviction)
            sysmon_plugin_name:Optional[str]=None # MonitorPlugin (CR-3) name for GPU subtree attribution; default-None records skip GPU attribution (compute axis only)
        )
        "Initialize the plugin manager."
@dataclass
class PluginBinding:
    """
    Pre-bound view of a single plugin through a shared PluginManager.
    
    Eliminates the wrapper-class duplication audited across 8 consumer services
    (SG-17). Methods forward to the manager with `plugin_name` pre-supplied;
    `default_config` is the fallback used when `load()` is called without an
    explicit config (matches the manifest-default behavior in `load_plugin`).
    """
    
    manager: 'PluginManager'  # The shared PluginManager
    plugin_name: str  # Name of the plugin this binding targets
    default_config: Dict[str, Any] = _field(default_factory=dict)  # Used when load() called without config
    
    def meta(self) -> Optional[PluginMeta]:
            """The PluginMeta if the plugin is loaded, else None."""
            return self.manager.get_plugin_meta(self.plugin_name)
        
        @property
        def is_loaded(self) -> bool
        "The PluginMeta if the plugin is loaded, else None."
    
    def is_loaded(self) -> bool:
            """True if the plugin is loaded in the bound manager."""
            return self.manager.get_plugin(self.plugin_name) is not None
        
        @property
        def is_enabled(self) -> bool
        "True if the plugin is loaded in the bound manager."
    
    def is_enabled(self) -> bool:
            """True if the plugin is loaded AND not currently disabled."""
            m = self.meta
            return m is not None and m.enabled
        
        # --- Lifecycle ---
        
        def load(
            self,
            config: Optional[Dict[str, Any]] = None,  # Override default_config when provided
            strict: bool = True  # SG-5 strict validation
        ) -> bool:  # True if loaded successfully
        "True if the plugin is loaded AND not currently disabled."
    
    def load(
            self,
            config: Optional[Dict[str, Any]] = None,  # Override default_config when provided
            strict: bool = True  # SG-5 strict validation
        ) -> bool:  # True if loaded successfully
        "Load via the bound manager. Uses `default_config` if no `config` provided."
    
    def unload(self) -> bool:  # True if unloaded
            """Unload the bound plugin."""
            return self.manager.unload_plugin(self.plugin_name)
        
        def reload(
            self,
            config: Optional[Dict[str, Any]] = None  # Optional new config; current config used if None
        ) -> bool
        "Unload the bound plugin."
    
    def reload(
            self,
            config: Optional[Dict[str, Any]] = None  # Optional new config; current config used if None
        ) -> bool
        "Reload the bound plugin (terminate + restart worker)."
    
    def enable(self) -> bool:
            """Enable the bound plugin."""
            return self.manager.enable_plugin(self.plugin_name)
        
        def disable(self) -> bool
        "Enable the bound plugin."
    
    def disable(self) -> bool:
            """Disable the bound plugin (worker stays alive; jobs rejected)."""
            return self.manager.disable_plugin(self.plugin_name)
        
        # --- Execution ---
        
        def execute(self, *args, **kwargs) -> Any
        "Disable the bound plugin (worker stays alive; jobs rejected)."
    
    def execute(self, *args, **kwargs) -> Any:
            """Execute via the bound manager (sync)."""
            return self.manager.execute_plugin(self.plugin_name, *args, **kwargs)
        
        async def execute_async(self, *args, **kwargs) -> Any
        "Execute via the bound manager (sync)."
    
    async def execute_async(self, *args, **kwargs) -> Any:
            """Execute via the bound manager (async)."""
            return await self.manager.execute_plugin_async(self.plugin_name, *args, **kwargs)
        
        # --- Configuration ---
        
        def update_config(
            self,
            config: Dict[str, Any],  # New config values
            strict: bool = True  # SG-5 strict validation
        ) -> bool
        "Execute via the bound manager (async)."
    
    def update_config(
            self,
            config: Dict[str, Any],  # New config values
            strict: bool = True  # SG-5 strict validation
        ) -> bool
        "Hot-reload the bound plugin's configuration."
    
    def get_config(self) -> Optional[Dict[str, Any]]:
            """Current configuration values (None if not loaded)."""
            return self.manager.get_plugin_config(self.plugin_name)
        
        def get_config_schema(self) -> Optional[Dict[str, Any]]
        "Current configuration values (None if not loaded)."
    
    def get_config_schema(self) -> Optional[Dict[str, Any]]:
            """JSON Schema describing this plugin's configuration."""
            return self.manager.get_plugin_config_schema(self.plugin_name)
        
        def get_stats(self) -> Optional[Dict[str, Any]]
        "JSON Schema describing this plugin's configuration."
    
    def get_stats(self) -> Optional[Dict[str, Any]]
        "Resource telemetry for the bound plugin's worker process."
class _CR10StubProxy:
    def __init__(self, name="stub"):
        self._name = name
        self.execute_calls = []
        self.on_disable_calls = 0
        self.on_enable_calls = 0
    @property
    def name(self): return self._name
    "Stand-in proxy tracking execute calls + hook fires for verification."
    
    def __init__(self, name="stub"):
            self._name = name
            self.execute_calls = []
            self.on_disable_calls = 0
            self.on_enable_calls = 0
        @property
        def name(self): return self._name
    
    def name(self): return self._name
        @property
        def version(self): return "0.0.1"
    
    def version(self): return "0.0.1"
        def initialize(self, config): self._config = dict(config or {})
    
    def initialize(self, config): self._config = dict(config or {})
        def execute(self, *args, **kwargs)
    
    def execute(self, *args, **kwargs):
            self.execute_calls.append((args, kwargs))
            return {"who": self._name, "args": args, "kwargs": kwargs}
    
    def get_config_schema(self): return {}
        def get_current_config(self): return {}
    
    def get_current_config(self): return {}
        def cleanup(self): pass
    
    def cleanup(self): pass
        def on_disable(self): self.on_disable_calls += 1
    
    def on_disable(self): self.on_disable_calls += 1
        def on_enable(self): self.on_enable_calls += 1
    
    def on_enable(self): self.on_enable_calls += 1

Manifest Format (v2.0) (manifest_format.ipynb)

Typed parser + writer for the nested v2.0 manifest layout per the 2026-05-19 substrate audit’s CR-8. Substrate manifests transitioned from a flat top-level JSON object to a four-section nested layout: install (deployment-specific facts populated at install time), code (code-derived facts refreshed by cjm-ctl regenerate-manifest), drift_tracking (a config_schema hash that records the witness shape so live-vs-stored comparisons can detect drift), and overrides (an operator-supplied overlay placeholder).

Import

from cjm_plugin_system.core.manifest_format import (
    CURRENT_FORMAT_VERSION,
    InstallSection,
    CodeSection,
    DriftTracking,
    ManifestV2,
    compute_config_schema_hash,
    load_manifest,
    manifest_to_dict,
    write_manifest
)

Functions

def compute_config_schema_hash(
    schema: Optional[Dict[str, Any]],  # JSON Schema or None
) -> str:                              # "sha256:hexdigest"
    """
    Hash a JSON Schema with stable canonicalization.
    
    None is treated as `{}` — the hash records "no schema declared" rather
    than refusing. This way a plugin that lost its config_schema between
    install and load still gets a drift warning rather than a crash.
    """
def _parse_taxonomy_dict(d: Optional[Dict[str, Any]]) -> Optional[PluginTaxonomy]:
    """Build a `PluginTaxonomy` from its JSON sub-dict, or None."""
    if not d
    "Build a `PluginTaxonomy` from its JSON sub-dict, or None."
def _parse_resources_dict(d: Optional[Dict[str, Any]]) -> Optional[ResourceRequirements]:
    """Build a `ResourceRequirements` from its JSON sub-dict, or None."""
    if not d
    "Build a `ResourceRequirements` from its JSON sub-dict, or None."
def _from_v2_dict(
    data: Dict[str, Any],  # Parsed JSON dict with `format_version == "2.0"`
) -> ManifestV2
    "Parse a v2.0 nested manifest dict into a typed `ManifestV2`."
def _from_v1_flat_dict(
    data: Dict[str, Any],  # Legacy flat manifest dict (no `format_version`)
) -> ManifestV2
    """
    REMOVE-AFTER-OVERHAUL: legacy flat-manifest reader shim.
    
    Maps top-level fields into the nested ManifestV2 shape so substrate code
    paths only see v2.0 after this point. Returns a ManifestV2 with
    `format_version == "1.0"` and `drift_tracking.config_schema_hash == None`
    so callers can tell a legacy load apart from a fresh v2.0 write.
    
    Retires after `cascade_manifests.py` rewrites every production manifest
    to v2.0; SG-48 sweep removes this function.
    """
def load_manifest(
    path: Union[str, Path],  # Path to manifest JSON file on disk
) -> ManifestV2:             # Parsed manifest in v2.0 typed shape
    """
    Load a manifest file and return a typed `ManifestV2`.
    
    Format detection by top-level `format_version` key:
    - `"2.0"` → nested layout, parse directly.
    - missing → legacy flat layout, pass through v1.0 shim.
    - anything else → ValueError.
    """
def _taxonomy_to_dict(t: Optional[PluginTaxonomy]) -> Optional[Dict[str, str]]:
    """Serialize a `PluginTaxonomy` back to its JSON sub-dict, or None."""
    if t is None
    "Serialize a `PluginTaxonomy` back to its JSON sub-dict, or None."
def _resources_to_dict(r: Optional[ResourceRequirements]) -> Optional[Dict[str, Any]]:
    """Serialize a `ResourceRequirements` back to its JSON sub-dict, or None."""
    if r is None
    "Serialize a `ResourceRequirements` back to its JSON sub-dict, or None."
def _code_section_to_dict(c: CodeSection) -> Dict[str, Any]:
    """Serialize a `CodeSection` to its JSON sub-dict, renaming `class_name` -> `class`."""
    d: Dict[str, Any] = {
    "Serialize a `CodeSection` to its JSON sub-dict, renaming `class_name` -> `class`."
def manifest_to_dict(
    m: ManifestV2,  # Manifest to serialize
) -> Dict[str, Any]:  # v2.0 nested dict ready for `json.dumps`
    """
    Serialize a `ManifestV2` to a v2.0 dict.
    
    Always emits `format_version == CURRENT_FORMAT_VERSION` — even if the
    manifest was loaded from a legacy v1.0 file. This is the upgrade seam:
    load-then-write transparently rewrites flat manifests as nested.
    """
def write_manifest(
    path: Union[str, Path],  # Output JSON file path
    manifest: ManifestV2,    # Manifest to serialize
) -> None
    "Serialize a `ManifestV2` to disk in v2.0 nested layout (indent=2)."

Classes

@dataclass
class InstallSection:
    """
    Deployment-specific facts populated at install time.
    
    These fields are written by `install_all` (paths, conda env, env vars)
    plus `_generate_manifest`'s post-introspection step (installed_at,
    installer_version, package_source). `regenerate-manifest` preserves
    the install section across regeneration so paths survive code-side
    refreshes.
    """
    
    python_path: str = ''  # Absolute path to the plugin env's python interpreter
    conda_env: str = ''  # Conda environment name
    db_path: str = ''  # Plugin's per-data SQLite path (if any)
    env_vars: Dict[str, str] = field(...)  # Per-plugin env vars
    installed_at: str = ''  # ISO-8601 UTC timestamp of install/regen
    installer_version: str = ''  # "cjm-ctl <version>" that wrote this manifest
    package_source: str = ''  # Original install input (git URL or pip spec)
@dataclass
class CodeSection:
    """
    Code-derived facts refreshed by `cjm-ctl regenerate-manifest`.
    
    Everything in this section comes from running the introspection script
    inside the plugin's conda env: metadata + interface + config_schema +
    derived taxonomy + binary platform/hardware hard-facts. Drift detection
    hashes this section's `config_schema` field as its witness shape.
    
    `class_name` serializes as the JSON key `"class"` (Python reserved-word
    workaround).
    """
    
    name: str = ''  # Plugin's unique identifier
    version: str = ''  # Plugin's version string
    description: str = ''  # Brief description (SG-6 required)
    module: str = ''  # Importable module path for the plugin class
    class_name: str = ''  # Plugin class name (JSON key: "class")
    interface: str = ''  # Fully qualified interface class name
    taxonomy: Optional[PluginTaxonomy]  # CR-1 domain/role/FQCN triple
    resources: Optional[ResourceRequirements]  # Phase 5a hard-facts
    config_schema: Optional[Dict[str, Any]]  # JSON Schema for plugin config
    regenerated_at: Optional[str]  # ISO-8601 UTC of last regen
    worker_env: Optional[List[Dict[str, Any]]]  # CR-12 spawn-env contract: asdict(EnvVarSpec) list
@dataclass
class DriftTracking:
    """
    Witness hashes for drift detection.
    
    `config_schema_hash` is computed at write time (regenerate-manifest /
    install_all) from a canonical JSON encoding of the code section's
    `config_schema`. The PluginManager's drift-check fetches the live
    `/config_schema` from the worker, hashes it the same way, and compares;
    a mismatch raises `PluginMeta.config_schema_drift = True` plus a
    warning log.
    """
    
    config_schema_hash: Optional[str]  # "sha256:hexdigest" of canonical config_schema
@dataclass
class ManifestV2:
    """
    Top-level v2.0 manifest with four named sections plus `format_version`.
    
    Loaded from a v2.0 nested JSON file as-is, or from a v1.0 flat file via
    `_from_v1_flat_dict` (REMOVE-AFTER-OVERHAUL shim). When the v1.0 shim
    fires, `format_version` is set to `"1.0"` so substrate code can distinguish
    legacy loads from fresh writes; otherwise `format_version` is always
    `CURRENT_FORMAT_VERSION`.
    """
    
    install: InstallSection = field(...)
    code: CodeSection = field(...)
    drift_tracking: DriftTracking = field(...)
    overrides: Dict[str, Any] = field(...)
    format_version: str = CURRENT_FORMAT_VERSION

Variables

CURRENT_FORMAT_VERSION = '2.0'  # Emitted on every freshly-written manifest

Plugin Metadata (metadata.ipynb)

Data structures for plugin metadata

Import

from cjm_plugin_system.core.metadata import (
    PluginTaxonomy,
    ResourceRequirements,
    PluginMeta,
    PluginInstance,
    PluginLoadSpec
)

Classes

@dataclass
class PluginTaxonomy:
    """
    Derived classification of a plugin via its interface FQN.
    
    Populated at install time by `_generate_manifest`'s introspection script
    (running in the plugin's own conda env, where the interface library is
    installed). Substrate stores strings only — no host-side imports of
    interface libraries needed for taxonomy queries.
    
    - `domain`: the substrate area, derived from the interface library's
      naming convention `cjm-<domain>-plugin-system` (e.g., "transcription",
      "graph", "media", "text").
    - `role`: the interface class name segment of the FQN (e.g.,
      "TranscriptionPlugin", "GraphPlugin", "ForcedAlignmentPlugin"). Multiple
      plugins can share a role within a domain.
    - `interface_fqcn`: the full dotted path, kept verbatim for queries and
      reverse-lookup.
    """
    
    domain: str  # e.g., "transcription", "graph", "media"
    role: str  # e.g., "TranscriptionPlugin", "GraphPlugin"
    interface_fqcn: str  # Full dotted interface class path
@dataclass
class ResourceRequirements:
    """
    Binary hard-facts about what a plugin needs to run (Phase 5a).
    
    Quantitative resource amounts (min_vram_mb, etc.) deliberately omitted
    per CR-7's reactive resource management reframing — plugin authors can't
    reliably estimate model × dtype × quantization combinatorics, and Blender-
    style variable-render plugins can't estimate at all. The substrate uses
    these binary hard-facts purely for discovery filtering; actual resource
    contention is handled reactively by CR-7's eviction + retry flow.
    
    - `requires_gpu`: True if the plugin needs any GPU; the substrate gates
      execution on a system monitor reporting one is present.
    - `platforms`: e.g., ["linux-x64", "darwin-arm64"]. Empty list means no
      platform constraint declared (assume universal compatibility).
    - `accelerators`: e.g., ["cuda", "mps", "cpu"]. Informational; substrate
      doesn't auto-select but consumers can filter on the values.
    """
    
    requires_gpu: bool = False
    platforms: List[str] = field(...)
    accelerators: List[str] = field(...)
@dataclass
class PluginMeta:
    "Metadata about a plugin."
    
    name: str  # Plugin's unique identifier
    version: str  # Plugin's version string
    description: str = ''  # Brief description of the plugin's functionality
    category: str = ''  # Display label — derived from taxonomy.domain (CR-1) or read from legacy manifests
    interface: str = ''  # Fully qualified interface class name (kept verbatim for SG-7 format check)
    taxonomy: Optional['PluginTaxonomy']
    resources: Optional['ResourceRequirements']
    config_schema: Optional[Dict[str, Any]]  # JSON Schema for plugin configuration
    instance: Optional[Any]  # Plugin instance (PluginInterface subclass)
    enabled: bool = True  # Whether the plugin is enabled
    last_executed: float = 0.0  # Unix timestamp
    config_schema_drift: bool = False
    live_config_schema: Optional[Dict[str, Any]]
@dataclass
class PluginInstance:
    """
    Per-instance runtime state for a loaded plugin (CR-10 multi-instance).
    
    Differs from PluginMeta in scope:
    - PluginMeta is per-plugin-name discovery + canonical-instance state.
    - PluginInstance is per-load-call runtime state.
    
    A plugin loaded with no instance_id (default) gets `instance_id == plugin_name`
    and is the canonical instance referenced by PluginMeta.instance. Multi-instance
    loads (instance_id != plugin_name) add entries to PluginManager.instances
    without changing the canonical reference.
    """
    
    instance_id: str  # Unique key in PluginManager.instances; default = plugin_name
    plugin_name: str  # The underlying discovered plugin's name (PluginMeta.name)
    config: Dict[str, Any] = field(...)  # Effective config used at initialize()
    proxy: Optional[Any]
    enabled: bool = True  # Per-instance enable flag; substrate's execute_plugin checks this
    last_executed: float = 0.0  # Unix timestamp of the most recent execute on this instance
    created_at: datetime = field(...)
    config_hash: str = ''
    max_concurrent_requests: Optional[int]
@dataclass
class PluginLoadSpec:
    """
    One entry in `PluginManager.load_plugins_concurrent`'s batch input (CR-10).
    
    Mirrors the positional arguments of `load_plugin` so the concurrent helper
    can fan out load calls without repeating the per-spec instance_id /
    new_instance plumbing.
    
    - `meta`: the discovered PluginMeta to load (must have a `.manifest` attached).
    - `config`: initial configuration; falls through to persisted-or-schema-defaults
      when None (default-instance only; multi-instance starts fresh).
    - `instance_id`: explicit instance_id (validated against [A-Za-z0-9_-]{1,64}).
      None defaults to plugin_name (single-instance backward compat).
    - `new_instance`: when True with instance_id=None, auto-generate
      `{plugin_name}-{6-hex}`.
    """
    
    meta: Any  # PluginMeta — typed as Any to avoid forward-reference quirk under nbdev's late binding
    config: Optional[Dict[str, Any]]
    instance_id: Optional[str]
    new_instance: bool = False

Platform Utilities (platform.ipynb)

Cross-platform utilities for process management, path handling, and system detection

Import

from cjm_plugin_system.core.platform import (
    MICROMAMBA_URLS,
    is_windows,
    is_macos,
    is_linux,
    is_apple_silicon,
    get_current_platform,
    get_python_in_env,
    get_popen_isolation_kwargs,
    terminate_process,
    terminate_self,
    run_shell_command,
    conda_env_exists,
    get_micromamba_download_url,
    download_micromamba,
    get_conda_command,
    build_conda_command,
    get_micromamba_binary_path,
    ensure_runtime_available
)

Functions

def is_windows() -> bool:
    """Check if running on Windows."""
    return platform.system() == "Windows"


def is_macos() -> bool
    "Check if running on Windows."
def is_macos() -> bool:
    """Check if running on macOS."""
    return platform.system() == "Darwin"


def is_linux() -> bool
    "Check if running on macOS."
def is_linux() -> bool:
    """Check if running on Linux."""
    return platform.system() == "Linux"


def is_apple_silicon() -> bool
    "Check if running on Linux."
def is_apple_silicon() -> bool
    "Check if running on Apple Silicon Mac (for MPS detection)."
def get_current_platform() -> str:
    """Get current platform string for manifest filtering.
    
    Returns strings like 'linux-x64', 'darwin-arm64', 'win-x64'.
    """
    system = platform.system().lower()
    machine = platform.machine().lower()
    
    # Normalize system names
    if system == "darwin"
    """
    Get current platform string for manifest filtering.
    
    Returns strings like 'linux-x64', 'darwin-arm64', 'win-x64'.
    """
def get_python_in_env(
    env_path: Path  # Path to conda environment root
) -> Path:  # Path to Python executable
    """
    Get the Python executable path for a conda environment.
    
    On Windows: env_path/python.exe
    On Unix: env_path/bin/python
    """
def get_popen_isolation_kwargs() -> Dict[str, Any]:
    """Return kwargs for process isolation in subprocess.Popen.
    
    On Unix: Returns {'start_new_session': True}
    On Windows: Returns {'creationflags': CREATE_NEW_PROCESS_GROUP}
    
    Usage:
        process = subprocess.Popen(cmd, **get_popen_isolation_kwargs(), ...)
    """
    if is_windows()
    """
    Return kwargs for process isolation in subprocess.Popen.
    
    On Unix: Returns {'start_new_session': True}
    On Windows: Returns {'creationflags': CREATE_NEW_PROCESS_GROUP}
    
    Usage:
        process = subprocess.Popen(cmd, **get_popen_isolation_kwargs(), ...)
    """
def terminate_process(
    process: subprocess.Popen,  # Process to terminate
    timeout: float = 2.0  # Seconds to wait before force kill
) -> None
    """
    Terminate a subprocess gracefully, with fallback to force kill.
    
    On all platforms:
    1. Calls process.terminate() (SIGTERM on Unix, TerminateProcess on Windows)
    2. Waits for timeout seconds
    3. If still running, calls process.kill() (SIGKILL on Unix, TerminateProcess on Windows)
    """
def terminate_self() -> None:
    """Terminate the current process (for worker suicide pact).
    
    On Unix: Sends SIGTERM to self for graceful shutdown
    On Windows: Calls os._exit() since Windows lacks SIGTERM
    """
    if is_windows()
    """
    Terminate the current process (for worker suicide pact).
    
    On Unix: Sends SIGTERM to self for graceful shutdown
    On Windows: Calls os._exit() since Windows lacks SIGTERM
    """
def run_shell_command(
    cmd: str,  # Shell command to execute
    check: bool = True,  # Whether to raise on non-zero exit
    capture_output: bool = False,  # Whether to capture stdout/stderr
    **kwargs  # Additional kwargs passed to subprocess.run
) -> subprocess.CompletedProcess
    """
    Run a shell command cross-platform.
    
    Unlike using shell=True with executable='/bin/bash', this function
    uses the platform's default shell:
    - Linux/macOS: /bin/sh (or $SHELL)
    - Windows: cmd.exe
    """
def conda_env_exists(
    env_name: str,  # Name of the conda environment
    conda_cmd: str = "conda"  # Conda command (conda, mamba, micromamba)
) -> bool
    """
    Check if a conda environment exists (cross-platform).
    
    Uses 'conda env list --json' instead of piping to grep,
    which doesn't work on Windows.
    """
def get_micromamba_download_url(
    platform_str: Optional[str] = None  # Platform string (e.g., 'linux-x64'), uses current if None
) -> str:  # Download URL for micromamba binary
    "Get the micromamba download URL for the specified or current platform."
def download_micromamba(
    dest_path: Path,  # Destination path for the micromamba binary
    platform_str: Optional[str] = None,  # Platform string, uses current if None
    show_progress: bool = True  # Whether to print progress messages
) -> bool:  # True if download succeeded
    "Download and extract micromamba binary to the specified path."
def get_conda_command(
    config: CJMConfig  # Configuration object with runtime settings
) -> List[str]:  # Base command with prefix args if needed
    "Get the conda/mamba/micromamba base command with prefix args for local mode."
def build_conda_command(
    config: CJMConfig,  # Configuration object with runtime settings
    *args: str  # Additional command arguments
) -> List[str]:  # Complete command ready for subprocess
    "Build a complete conda/mamba/micromamba command."
def get_micromamba_binary_path(
    config: CJMConfig  # Configuration object with runtime settings
) -> Optional[Path]:  # Path to micromamba binary or None
    "Get the configured micromamba binary path for the current platform."
def ensure_runtime_available(
    config: CJMConfig  # Configuration object with runtime settings
) -> bool:  # True if runtime is available
    "Check if the configured conda/micromamba runtime is available."

Variables

MICROMAMBA_URLS: Dict[str, str]

Remote Plugin Proxy (proxy.ipynb)

Bridge between Host application and isolated Worker processes

Import

from cjm_plugin_system.core.proxy import (
    RemotePluginProxy,
    execute_async,
    execute_stream_sync,
    execute_stream,
    execute_with_oom_check,
    execute_async_with_oom_check,
    get_stats,
    is_alive,
    cancel,
    cancel_async,
    get_progress,
    get_progress_async,
    on_disable,
    on_enable,
    get_system_status,
    get_system_status_async,
    list_processes,
    list_processes_async,
    prefetch,
    prefetch_async,
    reconfigure,
    reconfigure_async
)

Functions

def _maybe_serialize_input(
    self,
    obj: Any # Object to potentially serialize
) -> Any: # Serialized form (path string or original object)
    "Convert FileBackedDTO objects to file paths for zero-copy transfer."
def _prepare_payload(
    self,
    args: tuple, # Positional arguments
    kwargs: dict # Keyword arguments
) -> Dict[str, Any]: # JSON-serializable payload
    "Prepare arguments for HTTP transmission."
async def execute_async(
    self,
    *args,
    **kwargs
) -> Any: # Plugin result
    """
    Execute the plugin asynchronously.
    
    CR-4: HTTP 409 from the worker is mapped to a typed `PluginCancelledError`.
    Same 409/200/other semantics as the sync `execute()` variant.
    """
def _raise_from_job_error_chunk(
    """
    SG-52: convert a `_job_error` JobError-shaped dict into the right typed exception.
    
    Mapping rules:
    - `original_exc_repr` starts with `"PluginCancelledError"` → raise PluginCancelledError
      (preserves the non-retriable semantic that category alone doesn't capture).
    - category == "user_input" → PluginInputError (with fields_invalid).
    - category == "transient" → PluginTransientError (with retry_after_seconds).
    - category == "resource" → PluginResourceError (with reconstructed ResourceShortfall).
    - category == "fatal" → PluginFatalError.
    - Unknown category → RuntimeError carrying the chunk for forensic inspection.
    
    This is the streaming-side counterpart to /execute's 409 → PluginCancelledError
    detection. Same intent: the typed exception survives the HTTP wire boundary
    so substrate / JobQueue / consumer code can branch on category without
    parsing string messages.
    """
def execute_stream_sync(self, *args, **kwargs) -> Generator[Any, None, None]
    "Synchronous wrapper for streaming (blocking)."
async def execute_stream(
    self,
    *args,
    **kwargs
) -> AsyncGenerator[Any, None]: # Yields parsed JSON chunks
    """
    Execute with streaming response (async generator).
    
    SG-52: detects the terminal `{"_job_error": <JobError dict>}` chunk and
    raises the corresponding typed exception client-side instead of yielding
    it to downstream consumers. Mirrors /execute's HTTP 409 → typed-exception
    behavior at the streaming wire boundary. Normal plugin output chunks
    pass through unchanged (they never carry the `_job_error` key).
    """
def _check_worker_death(self) -> None:
    """CR-7 Track A: inspect subprocess state after an httpx fault on /execute.
    
    Raises `WorkerOOMError` if the worker died with SIGKILL (POSIX returncode -9 —
    kernel OOM-killer is the dominant cause). Raises `PluginTransientError` if
    it died with any other returncode. Returns None silently when the worker
    is still alive (caller re-raises the original httpx error).
    
    `signal.SIGKILL` doesn't exist on Windows, where worker-OOM looks different
    (STATUS_NO_MEMORY 0xC0000017 = -1073741799). Cross-platform OOM detection is
    a future enhancement when there's a concrete Windows substrate consumer.
    """
    if self.process is None
    """
    CR-7 Track A: inspect subprocess state after an httpx fault on /execute.
    
    Raises `WorkerOOMError` if the worker died with SIGKILL (POSIX returncode -9 —
    kernel OOM-killer is the dominant cause). Raises `PluginTransientError` if
    it died with any other returncode. Returns None silently when the worker
    is still alive (caller re-raises the original httpx error).
    
    `signal.SIGKILL` doesn't exist on Windows, where worker-OOM looks different
    (STATUS_NO_MEMORY 0xC0000017 = -1073741799). Cross-platform OOM detection is
    a future enhancement when there's a concrete Windows substrate consumer.
    """
def execute_with_oom_check(self, *args, **kwargs) -> Any:
    """CR-7 Track A wrapper around the sync execute path.
    
    Catches httpx connection / protocol faults, calls `_check_worker_death`,
    and either raises a typed `WorkerOOMError` / `PluginTransientError` or
    re-raises the original httpx error (worker still alive — caller treats
    as a generic transient network issue).
    """
    payload = self._prepare_payload(args, kwargs)
    try
    """
    CR-7 Track A wrapper around the sync execute path.
    
    Catches httpx connection / protocol faults, calls `_check_worker_death`,
    and either raises a typed `WorkerOOMError` / `PluginTransientError` or
    re-raises the original httpx error (worker still alive — caller treats
    as a generic transient network issue).
    """
async def execute_async_with_oom_check(self, *args, **kwargs) -> Any:
    """CR-7 Track A wrapper around the async execute path. Same semantics."""
    payload = self._prepare_payload(args, kwargs)
    try
    "CR-7 Track A wrapper around the async execute path. Same semantics."
def get_stats(self) -> Dict[str, Any]: # Process telemetry
    """Get worker process resource usage."""
    with httpx.Client() as client
    "Get worker process resource usage."
def is_alive(self) -> bool: # True if worker is responsive
    """Check if the worker process is still running and responsive."""
    if not self.process or self.process.poll() is not None
    "Check if the worker process is still running and responsive."
def cancel(self) -> bool: # True if cancel request was sent
    """Request cancellation of running execution."""
    try
    "Request cancellation of running execution."
async def cancel_async(self) -> bool: # True if cancel request was sent
    """Request cancellation asynchronously."""
    try
    "Request cancellation asynchronously."
def get_progress(self) -> Dict[str, Any]: # {progress: float, message: str}
    """Get current execution progress from worker."""
    try
    "Get current execution progress from worker."
async def get_progress_async(self) -> Dict[str, Any]: # {progress: float, message: str}
    """Get current execution progress asynchronously."""
    try
    "Get current execution progress asynchronously."
def on_disable(self) -> bool:  # True if hook signal accepted by worker
    """CR-2: forward the substrate's on_disable signal to the worker process.
    
    Plugin can opt in via PluginInterface.on_disable(); default implementation
    is a no-op so silent-pass-through is the norm. Failures to reach the
    worker (already terminated, network blip) are logged-and-swallowed —
    the substrate-side enable/disable bookkeeping doesn't depend on the
    hook actually firing.
    """
    try
    """
    CR-2: forward the substrate's on_disable signal to the worker process.
    
    Plugin can opt in via PluginInterface.on_disable(); default implementation
    is a no-op so silent-pass-through is the norm. Failures to reach the
    worker (already terminated, network blip) are logged-and-swallowed —
    the substrate-side enable/disable bookkeeping doesn't depend on the
    hook actually firing.
    """
def on_enable(self) -> bool:  # True if hook signal accepted by worker
    """CR-2: forward the substrate's on_enable signal to the worker process.
    
    Same delivery semantics as on_disable: best-effort, errors logged-and-
    swallowed. The plugin's hook (default no-op) decides whether to eagerly
    re-acquire resources or rely on lazy re-load at next execute().
    """
    try
    """
    CR-2: forward the substrate's on_enable signal to the worker process.
    
    Same delivery semantics as on_disable: best-effort, errors logged-and-
    swallowed. The plugin's hook (default no-op) decides whether to eagerly
    re-acquire resources or rely on lazy re-load at next execute().
    """
def get_system_status(self) -> Optional[Dict[str, Any]]:  # SystemStats dict, or None on transport / config failure
    """CR-3: typed MonitorPlugin accessor. POSTs to worker's `/get_system_status`.
    
    Status code semantics (worker side raises HTTPException with these codes):
    - 200: SystemStats dict returned
    - 404: plugin is not a MonitorPlugin — logged at ERROR (configuration error;
          no amount of retry fixes it) and returns None. Loudly distinguished
          from the substrate's WARN-level transient-failure degradation.
    - 501: legacy monitor predating CR-3; REMOVE-AFTER-OVERHAUL fallback to
          `/execute("get_system_status")` returns a dict in the pre-CR-3 wire format
    - 500: real plugin failure; propagates as HTTPStatusError
    - ConnectError: worker may have died; returns None silently (substrate
          degrades to empty stats)
    """
    try
    """
    CR-3: typed MonitorPlugin accessor. POSTs to worker's `/get_system_status`.
    
    Status code semantics (worker side raises HTTPException with these codes):
    - 200: SystemStats dict returned
    - 404: plugin is not a MonitorPlugin — logged at ERROR (configuration error;
          no amount of retry fixes it) and returns None. Loudly distinguished
          from the substrate's WARN-level transient-failure degradation.
    - 501: legacy monitor predating CR-3; REMOVE-AFTER-OVERHAUL fallback to
          `/execute("get_system_status")` returns a dict in the pre-CR-3 wire format
    - 500: real plugin failure; propagates as HTTPStatusError
    - ConnectError: worker may have died; returns None silently (substrate
          degrades to empty stats)
    """
async def get_system_status_async(self) -> Optional[Dict[str, Any]]:  # SystemStats dict, or None on transport / config failure
    """Async variant of `get_system_status`. Same 200/404/501/500/ConnectError semantics."""
    try
    "Async variant of `get_system_status`. Same 200/404/501/500/ConnectError semantics."
def list_processes(self) -> Optional[List[Dict[str, Any]]]:  # ProcessStats dict list, or None on transport / config failure
    """CR-3: typed MonitorPlugin accessor. POSTs to worker's `/list_processes`.
    
    Same 200/404/501/500/ConnectError semantics as `get_system_status`. Note that
    `MonitorPlugin.list_processes()` defaults to returning `[]`, so monitors without
    per-process visibility yield a 200 with an empty list rather than 501.
    """
    try
    """
    CR-3: typed MonitorPlugin accessor. POSTs to worker's `/list_processes`.
    
    Same 200/404/501/500/ConnectError semantics as `get_system_status`. Note that
    `MonitorPlugin.list_processes()` defaults to returning `[]`, so monitors without
    per-process visibility yield a 200 with an empty list rather than 501.
    """
async def list_processes_async(self) -> Optional[List[Dict[str, Any]]]:  # ProcessStats dict list, or None on transport / config failure
    """Async variant of `list_processes`. Same semantics."""
    try
    "Async variant of `list_processes`. Same semantics."
def prefetch(
    self,
    stall_threshold_seconds: Optional[float] = None,  # Override SubstrateConfig.prefetch_stall_threshold_seconds; None = use config
    poll_interval_seconds: float = 1.0,               # How often to poll /progress for stall detection
) -> bool:  # True if worker accepted the prefetch hook
    """
    CR-4 / Session A 2026-05-27: forward the substrate's prefetch signal with
    progress-based stall detection.
    
    Replaces wall-clock-timeout-based startup waiting (operators racing arbitrary
    timeouts vs. network speeds for model downloads). Approach:
    
      1. POST /prefetch fires in a background thread with httpx timeout=None.
      2. Main thread polls /progress every poll_interval_seconds.
      3. Each (progress, message) change resets the stall counter.
      4. If no change in stall_threshold_seconds AND POST still pending →
         SIGTERM the worker subprocess + raise PluginTimeoutError.
    
    Plugins opt in to fine-grained stall defeat by calling
    self.report_progress(...) periodically during long lifecycle operations
    (model download, server startup, etc.). Plugins that don't report progress
    are fine as long as the threshold accommodates their slowest plausible
    silent stretch.
    
    Errors raised by the plugin (worker 500) propagate as RuntimeError; worker
    unreachable propagates as `False`; stall fires PluginTimeoutError.
    """
async def prefetch_async(
    self,
    stall_threshold_seconds: Optional[float] = None,
    poll_interval_seconds: float = 1.0,
) -> bool:  # True if worker accepted the prefetch hook
    "Async variant of `prefetch`. Same stall-detection semantics."
def _resolve_prefetch_stall_threshold() -> float:
    """Resolve the stall threshold from SubstrateConfig with a defensive fallback."""
    try
    "Resolve the stall threshold from SubstrateConfig with a defensive fallback."
def _run_prefetch_with_stall_detection(
    proxy: 'RemotePluginProxy',
    stall_threshold_seconds: float,
    poll_interval_seconds: float,
) -> bool
    """
    Sync stall-detecting prefetch implementation.
    
    Runs POST /prefetch in a daemon thread; main thread polls /progress for
    a (progress, message) advance every poll_interval_seconds. If no advance
    in stall_threshold_seconds AND the POST is still in-flight, SIGTERMs the
    worker subprocess (so its plugin.cleanup() can run via the worker's
    shutdown handler — closes the orphan-subprocess-on-stall bug) and raises
    PluginTimeoutError client-side.
    """
async def _run_prefetch_with_stall_detection_async(
    proxy: 'RemotePluginProxy',
    stall_threshold_seconds: float,
    poll_interval_seconds: float,
) -> bool
    """
    Async stall-detecting prefetch implementation. Mirrors the sync variant
    using asyncio.gather instead of a daemon thread.
    """
def reconfigure(
    self,
    old_config: Optional[Dict[str, Any]],  # Previous config snapshot
    new_config: Optional[Dict[str, Any]],  # Config being applied
) -> bool:  # True if worker accepted the reconfigure call
    """
    CR-4: forward a reconfigure(old, new) call to the worker process.
    
    The plugin's default reconfigure() body delegates to
    reconfigure_with_triggers, which walks RELOAD_TRIGGER metadata on the
    plugin's config_class to fire `_release_<trigger>` methods for fields
    whose values changed. Plugins not opting into the declarative pattern
    land in a silent no-op; the substrate's PluginManager.update_plugin_config
    then falls back to initialize(new_config) for the actual state change.
    """
async def reconfigure_async(
    self,
    old_config: Optional[Dict[str, Any]],  # Previous config snapshot
    new_config: Optional[Dict[str, Any]],  # Config being applied
) -> bool:  # True if worker accepted the reconfigure call
    "Async variant of `reconfigure`. Same semantics."
def __enter__(self):
    """Enter context manager."""
    return self

def __exit__(self, exc_type, exc_val, exc_tb)
    "Enter context manager."
def __exit__(self, exc_type, exc_val, exc_tb):
    """Exit context manager and cleanup."""
    self.cleanup()
    return False

async def __aenter__(self)
    "Exit context manager and cleanup."
async def __aenter__(self):
    """Enter async context manager."""
    return self

async def __aexit__(self, exc_type, exc_val, exc_tb)
    "Enter async context manager."
async def __aexit__(self, exc_type, exc_val, exc_tb)
    "Exit async context manager and cleanup."

Classes

class RemotePluginProxy:
    def __init__(
        self,
        manifest:Dict[str, Any], # Plugin manifest with python_path, module, class, etc.
        extra_env:Optional[Dict[str, str]]=None # CR-12: resolved worker-env overlay (secrets + visible overrides) injected at spawn
    )
    "Proxy that forwards plugin calls to an isolated Worker subprocess."
    
    def __init__(
            self,
            manifest:Dict[str, Any], # Plugin manifest with python_path, module, class, etc.
            extra_env:Optional[Dict[str, str]]=None # CR-12: resolved worker-env overlay (secrets + visible overrides) injected at spawn
        )
        "Initialize proxy and start the worker process."
    
    def name(self) -> str: # Plugin name from manifest
            """Plugin name."""
            return self.manifest.get('name', 'unknown')
        
        @property
        def version(self) -> str: # Plugin version from manifest
        "Plugin name."
    
    def version(self) -> str: # Plugin version from manifest
            """Plugin version."""
            return self.manifest.get('version', '0.0.0')
    
        def _bind_listen_socket(self) -> Tuple[socket.socket, int]
        "Plugin version."
    
    def initialize(
            self,
            config:Optional[Dict[str, Any]]=None # Configuration dictionary
        ) -> None
        "Initialize or reconfigure the plugin."
    
    def execute(
            self,
            *args,
            **kwargs
        ) -> Any: # Plugin result
        "Execute the plugin synchronously.

CR-4: HTTP 409 from the worker is mapped to a typed
`PluginCancelledError` raised in the host process, so substrate /
JobQueue / consumer callers can distinguish cooperative cancellation
from a real plugin failure (500  RuntimeError as before)."
    
    def get_config_schema(self) -> Dict[str, Any]: # JSON Schema
            """Get the plugin's configuration schema."""
            with httpx.Client() as client
        "Get the plugin's configuration schema."
    
    def get_current_config(self) -> Dict[str, Any]: # Current config values
            """Get the plugin's current configuration."""
            with httpx.Client() as client
        "Get the plugin's current configuration."
    
    def config_options(self) -> Dict[str, Any]: # CR-11: live config option domains
            """Get the plugin's runtime config option providers (CR-11).
            
            Returns the worker's get_config_options() output (FieldOptions per
            dynamic field, JSON-serialized to dicts). Empty dict when the plugin
            exposes no dynamic options.
            """
            with httpx.Client() as client
        "Get the plugin's runtime config option providers (CR-11).

Returns the worker's get_config_options() output (FieldOptions per
dynamic field, JSON-serialized to dicts). Empty dict when the plugin
exposes no dynamic options."
    
    def cleanup(self) -> None:
            """Clean up plugin resources and terminate worker process."""
            # Send cleanup request to worker
            try
        "Clean up plugin resources and terminate worker process."

Job Queue (queue.ipynb)

Resource-aware job queue for sequential plugin execution with cancellation support

Import

from cjm_plugin_system.core.queue import (
    JobStatus,
    JobEventType,
    CancelPhase,
    Job,
    JobEvent,
    QueueStats,
    SequenceStep,
    StepResult,
    JobSequence,
    ResourceSnapshot,
    JobQueueDependencies,
    JobQueue
)

Functions

async def _enqueue_job(
    self,
    job: Job,  # Pre-constructed Job (caller fills sequence fields if needed)
) -> str:  # job_id
    """
    Internal: enqueue a pre-constructed Job.
    
    Caller is responsible for validation (disabled-plugin check, etc.).
    Used by `submit` (Stage 1) and `submit_sequence` (Stage 2) — the latter
    needs to populate `sequence_id` + `sequence_index` on the Job before
    enqueueing so the running job appears in sequence-tagged event streams
    from its first STATE_TRANSITION.
    """
async def submit(
    self,
    plugin_instance_id: str,  # Target plugin instance (per CR-10)
    *args,
    priority: int = 0,  # Higher = more urgent
    **kwargs
) -> str:  # Returns job_id
    """
    Submit a job to the queue.
    
    CR-2: rejects jobs for disabled plugins at submit time (typed
    PluginDisabledError) so the failure surface matches PluginManager.
    execute_plugin's disabled gate. Submitting to a disabled plugin would
    otherwise sit in the queue until execution, then raise — moving the
    check earlier gives operators an actionable signal immediately.
    
    CR-6: no STATE_TRANSITION event fires at submit because the job is
    already in `pending` state at construction — there's no transition
    to publish. The first STATE_TRANSITION fires when the processor loop
    moves the job pending → running.
    """
async def cancel(
    self,
    job_id: str  # Job to cancel
) -> bool:  # True if cancelled
    """
    Cancel a pending or running job.
    
    CR-6: publishes STATE_TRANSITION when a pending job moves directly to
    cancelled (no transition through running). Running-job cancellation
    publishes CANCEL_PHASE_CHANGED events from `_execute_with_cancellation`
    when Stage 4 wires the phase transitions; Stage 1 just records the
    cancel-request marker.
    
    Stage 2: when the cancelled job is a sequence member, `_advance_sequence`
    runs after the lock is released so the cancelled status propagates to
    the sequence registry (marks sequence cancelled, no further submission).
    Lock release is required because `_advance_sequence` may need to enqueue
    a next member in some flows; asyncio.Lock is not re-entrant.
    """
def reorder(
    self,
    job_id: str,  # Job to move
    new_priority: int  # New priority value
) -> bool:  # True if reordered
    "Change the priority of a pending job."
def get_job(
    self,
    job_id: str  # Job to retrieve
) -> Optional[Job]:  # Job or None
    "Get a job by ID."
async def wait_for_job(
    self,
    job_id: str,  # Job to wait for
    timeout: Optional[float] = None  # Max seconds to wait
) -> Job:  # Completed/failed/cancelled job
    """
    Wait for a job to complete.
    
    Independent of the CR-6 event bus — uses a per-job `asyncio.Event` for
    the simple block-until-done affordance. Streaming consumers should use
    `events(job_id)` instead.
    """
def get_pending(self) -> List[Job]:  # Pending jobs, priority-sorted
    """Get pending jobs, priority-sorted (higher priority first, then FIFO)."""
    return sorted(self._pending)

def get_running(self) -> Optional[Job]:  # Running job or None
    "Get pending jobs, priority-sorted (higher priority first, then FIFO)."
def get_running(self) -> Optional[Job]:  # Running job or None
    """Get the currently-executing job, or None if idle."""
    return self._running

def get_history(
    self,
    limit: Optional[int] = None,  # Max jobs to return (most recent N); None = all
) -> List[Job]:  # Completed/failed/cancelled jobs, most recent first
    "Get the currently-executing job, or None if idle."
def get_history(
    self,
    limit: Optional[int] = None,  # Max jobs to return (most recent N); None = all
) -> List[Job]:  # Completed/failed/cancelled jobs, most recent first
    """
    Get completed jobs, most recent first.
    
    If `limit` is provided, returns the most recent N. The internal history
    list grows append-only up to `max_history`, so older jobs are evicted
    in submission order (oldest first).
    """
def get_stats(self) -> QueueStats:  # Aggregate counts
    """Get aggregate queue stats — total counts by terminal status."""
    return QueueStats(
        total_pending=len(self._pending),
        total_completed=sum(1 for j in self._history if j.status == JobStatus.completed),
        total_failed=sum(1 for j in self._history if j.status == JobStatus.failed),
        total_cancelled=sum(1 for j in self._history if j.status == JobStatus.cancelled),
    )

# CR-6 Stage 3: log-slicing helpers. Worker log format from `worker.py`
    "Get aggregate queue stats — total counts by terminal status."
def _parse_log_timestamp(line: str) -> Optional[datetime]:
    """Parse the leading timestamp from a worker log line.

    Returns naive datetime (no tzinfo) in local time — caller is responsible
    for tz alignment with the job's UTC timestamps. Returns None for
    continuation lines (no parseable leading timestamp), blank lines, etc.
    """
    m = _LOG_TS_PATTERN.match(line)
    if not m
    """
    Parse the leading timestamp from a worker log line.
    
    Returns naive datetime (no tzinfo) in local time — caller is responsible
    for tz alignment with the job's UTC timestamps. Returns None for
    continuation lines (no parseable leading timestamp), blank lines, etc.
    """
def _slice_log_by_job_window(
    raw: str,  # Full log content
    started_at: datetime,  # Job's UTC start time
    completed_at: Optional[datetime],  # Job's UTC end time (None if still running)
    max_lines: int,  # Max lines to return
) -> str:  # Sliced log content
    """
    Slice log content by a job's execution window.
    
    Worker logs are local-time naive; job timestamps are UTC. We convert
    the job's UTC times to local-time-naive for comparison. Continuation
    lines (no parseable timestamp) are associated with the most recent
    timestamped line — included if that timestamp was in the window.
    
    Best-effort: if `started_at` is None (job never ran) or no lines have
    parseable timestamps, returns the raw tail unchanged.
    """
def get_job_logs(
    self,
    job_id: str,  # Job to get logs for
    lines: int = 100  # Max lines to return
) -> str:  # Log content scoped to this job's execution window
    """
    Get logs for a job, scoped to its (started_at, completed_at) window.
    
    CR-6 Stage 3: replaces the legacy whole-plugin-log behavior. The
    substrate over-fetches the plugin log (lines * 5) and slices by the
    job's execution window — the job-monitor library's `--- Starting` marker
    heuristic in `_filter_current_session` becomes obsolete in cascade.
    
    Falls back gracefully when timestamps are unparseable or the job's
    window isn't known: returns the raw tail.
    """
def _subscriber_keys_for(event: JobEvent) -> List[str]:
    """Return the subscriber keys an event should fan out to (CR-6).

    Every event reaches "all" subscribers + the per-job subscribers.
    Sequence-tagged events additionally reach per-sequence subscribers.
    """
    keys = ["all", f"job:{event.job_id}"]
    if event.sequence_id is not None
    """
    Return the subscriber keys an event should fan out to (CR-6).
    
    Every event reaches "all" subscribers + the per-job subscribers.
    Sequence-tagged events additionally reach per-sequence subscribers.
    """
def _publish_event(
    self,
    event: JobEvent,  # Event to fan out
) -> None
    """
    Fan out an event to all matching subscribers (CR-6).
    
    Slow subscribers backpressure themselves via `asyncio.QueueFull` drop —
    publisher never blocks. Each subscriber tracks `dropped_count` so
    operators / future telemetry can surface backpressure visibility.
    """
async def _subscribe(
    self,
    key: str,  # Subscriber key ("all" | "job:<id>" | "seq:<id>")
) -> AsyncIterator[JobEvent]
    """
    Internal: register a subscription; yield events until the consumer
    exits the async generator.
    
    Cleanup runs in `finally` so the subscriber is unregistered even if the
    consumer raises or is cancelled. Empty key lists are deleted to avoid
    memory accumulation across many short-lived subscriptions.
    """
async def events(
    self,
    job_id: str,  # Filter to events for this job
) -> AsyncIterator[JobEvent]
    """
    Subscribe to events for a single job (async generator).
    
    Yields events as they fire. Multiple concurrent subscribers to the same
    job_id each get their own independent stream — useful for multi-tab UIs.
    """
async def events_for_sequence(
    self,
    sequence_id: str,  # Filter to events tagged with this sequence
) -> AsyncIterator[JobEvent]
    """
    Subscribe to events for all jobs in a sequence (async generator).
    
    Yields the unified per-sequence narrative: member-job lifecycle events
    interleaved with `SEQUENCE_ADVANCED` aggregate signals (wired in Stage 2).
    """
async def all_events(self) -> AsyncIterator[JobEvent]:
    """Subscribe to all events (firehose; async generator).

    Useful for global dashboards, audit logs, and telemetry sinks that need
    the complete event stream rather than a filtered view.
    """
    async for evt in self._subscribe("all")
    """
    Subscribe to all events (firehose; async generator).
    
    Useful for global dashboards, audit logs, and telemetry sinks that need
    the complete event stream rather than a filtered view.
    """
async def submit_sequence(
    self,
    steps: List[SequenceStep],  # Sequence steps to run in order
    priority: int = 0,          # Sequence-level priority (per-step override possible)
    fail_fast: bool = True,     # Halt on first failure (audit-locked default)
) -> str:  # sequence_id
    """
    Submit a multi-step job sequence (CR-6 Stage 2).
    
    Validates all step plugins upfront (`PluginDisabledError` if any are
    disabled) so operators get an immediate failure signal rather than
    discovering the problem mid-sequence. The first member-job is tagged
    with sequence fields BEFORE enqueue, so its first STATE_TRANSITION
    already routes through sequence-tagged subscribers.
    
    Returns the sequence_id; consumers subscribe via
    `queue.events_for_sequence(sequence_id)` or query state via
    `queue.get_sequence(sequence_id)`.
    """
async def submit_uniform_sequence(
    self,
    plugin_instance_id: str,  # Plugin every step targets
    args_list: List[Tuple[Any, ...]],  # Per-step positional args
    kwargs_list: Optional[List[Dict[str, Any]]] = None,  # Per-step keyword args (default: empty)
    priority: int = 0,        # Sequence-level priority
    fail_fast: bool = True,   # Halt on first failure (audit-locked default)
) -> str:  # sequence_id
    """
    Submit a sequence where every step targets the same plugin (CR-6 Stage 2).
    
    Sugar over `submit_sequence` for the common "same plugin, many arg sets"
    case — multi-source transcription / batch processing / parameter sweeps.
    """
async def cancel_sequence(
    self,
    sequence_id: str  # Sequence to cancel
) -> bool:  # True if cancellation was recorded
    """
    Cancel an in-flight sequence (CR-6 Stage 2).
    
    Marks the sequence as cancelled (no further advancement) and cancels
    the current member job. Returns False if the sequence is unknown or
    already terminal; True if cancellation was recorded.
    
    Sets `seq.status = JobStatus.cancelled` BEFORE issuing the member-job
    cancel. This prevents a race where the member completes between the
    intent record and the cancel call — `_advance_sequence` checks
    `seq.status == cancelled` first and halts regardless of member outcome.
    """
def get_sequence(
    self,
    sequence_id: str  # Sequence to retrieve
) -> Optional[JobSequence]:  # JobSequence or None
    "Get a job sequence by ID (read-only inspection)."
async def _advance_sequence(
    self,
    completed_job: Job  # The member job that just reached terminal status
) -> None
    """
    Advance the sequence after a member job completes (CR-6 Stage 2).
    
    Records the member's StepResult, decides whether to advance / halt /
    finalize, and submits the next member if applicable. Always called
    OUTSIDE the queue's main lock — may take it via `_enqueue_job`.
    """
def _sample_resource_snapshot(
    self,
    job: Job  # Job to sample resources for
) -> Optional[ResourceSnapshot]
    """
    Sample worker + sysmon stats for a job (CR-6 Stage 3 internal helper).
    
    Returns None if the worker proxy doesn't support `get_stats` or the call
    fails — substrate can't fabricate a snapshot. Sysmon enrichment is
    best-effort: if the named plugin isn't loaded / errors / lacks the CR-3
    typed methods, GPU fields stay None and the worker-only snapshot is
    returned.
    
    Subprocess-spawning plugins (e.g. Voxtral-vLLM's managed vLLM server)
    spawn grandchild PIDs that hold GPU memory the worker itself doesn't.
    GPU attribution delegates to `attribute_gpu_to_worker_subtree`, which
    intersects the worker-reported `subtree_pids` set with sysmon's per-PID
    GPU enumeration. The pre-fix path matched only `worker_pid` and reported
    `gpu_memory_mb=None` for any subprocess-spawning plugin.
    """
def get_resource_snapshot(
    self,
    job_id: str  # Job to sample resources for
) -> Optional[ResourceSnapshot]
    """
    Get a point-in-time resource snapshot for a job (CR-6 Stage 3).
    
    Returns None if the job is unknown or the worker proxy doesn't expose
    `get_stats`. Composes worker stats with sysmon GPU stats when the queue
    is configured with a `sysmon_plugin_name`.
    """
async def start(self) -> None:
    """Start the queue processor.

    CR-6 Stage 4: installs the substrate-side retry observer on `_deps`
    (typically a PluginManager). When CR-7's reactive-retry path fires for
    a running job, the observer updates `Job.retry_count` and publishes a
    RETRY_STARTED event tagged with the in-flight job. Previous observer
    value (if any) is saved + restored in stop() for cooperative coexistence.
    """
    if self._running_flag
    """
    Start the queue processor.
    
    CR-6 Stage 4: installs the substrate-side retry observer on `_deps`
    (typically a PluginManager). When CR-7's reactive-retry path fires for
    a running job, the observer updates `Job.retry_count` and publishes a
    RETRY_STARTED event tagged with the in-flight job. Previous observer
    value (if any) is saved + restored in stop() for cooperative coexistence.
    """
async def stop(self) -> None:
    """Stop the queue processor gracefully.

    CR-6 Stage 4: restores the previous `_on_retry` observer on deps to
    leave the manager in the state we found it (cooperative with other
    queue instances, tests, etc.).
    """
    self._running_flag = False
    self._job_available.set()  # Wake up the processor

    if self._processor_task
    """
    Stop the queue processor gracefully.
    
    CR-6 Stage 4: restores the previous `_on_retry` observer on deps to
    leave the manager in the state we found it (cooperative with other
    queue instances, tests, etc.).
    """
def _on_manager_retry(
    """
    Substrate-side retry observer (CR-6 Stage 4).
    
    Invoked synchronously by PluginManager's CR-7 retry loop just before
    each retry attempt. Updates `Job.retry_count` on the matching in-flight
    job + emits RETRY_STARTED. Best-effort: synchronous callback; emission
    failure shouldn't propagate back into the retry loop.
    
    `attempt` semantics: PluginManager's loop iterates
    `for attempt in range(max_retries + 1)`. The first iteration
    (`attempt=0`) is the original try and never invokes this callback —
    it only fires when `last_resource_error is not None`, which means
    the prior iteration raised. So the value PASSED here is already the
    1-based retry number: `attempt=1` is the first retry, `attempt=2` is
    the second retry, etc.
    
    Match logic: find the running job whose plugin_instance_id matches.
    Multi-instance / concurrent execution complicates this — the single
    `self._running` field tracks one job at a time, matching the queue's
    current single-worker-execution model. Future concurrent-execution
    support would extend the match strategy.
    """
def _move_to_history(self, job: Job) -> None:
    """Move a job to history, maintaining max_history limit."""
    self._history.append(job)
    if len(self._history) > self.max_history
    "Move a job to history, maintaining max_history limit."
def _signal_job_completed(self, job_id: str) -> None:
    """Signal that a job has completed."""
    event = self._job_completed_events.get(job_id)
    if event
    "Signal that a job has completed."
def _emit_state_transition(
    self,
    job: Job,
    prev_status: JobStatus,
) -> None
    """
    Emit a STATE_TRANSITION event for `job`'s most recent status change.
    
    Centralized so every transition site (start, completed, failed, cancelled,
    or future cancel-phase-driven transitions) carries identical tag context.
    """
def _emit_cancel_phase(
    self,
    job: Job,
    new_phase: CancelPhase,
) -> None
    """
    Emit a CANCEL_PHASE_CHANGED event (CR-6 Stage 4).
    
    Updates `job.cancel_phase` to the new value and publishes an event with
    the prior phase + new phase in the payload. Centralized so every phase
    transition site in `_execute_with_cancellation` produces identically-shaped
    events.
    """
def _emit_block_reason(
    self,
    job: Job,
    new_reason: Optional[str],
) -> None
    """
    Emit a BLOCK_REASON_CHANGED event (CR-6 Stage 4 — reserved).
    
    Updates `job.block_reason` and publishes an event with the prior + new
    reason. Stage 4 ships this helper for future scheduler-integration use;
    the queue's current scheduling logic doesn't surface block reasons, so
    the helper is reserved for the eventual scheduler-coordination wiring.
    """
async def _process_loop(self) -> None:
    """Main processing loop."""
    while self._running_flag
    "Main processing loop."
async def _execute_job(self, job: Job) -> None:
    """Execute a single job."""
    self.logger.info(f"Starting job {job.id[:8]} ({job.plugin_instance_id})")

    # Mark as running + emit transition pending → running
    prev_status = job.status
    job.status = JobStatus.running
    job.started_at = datetime.now(timezone.utc)
    self._running = job
    self._emit_state_transition(job, prev_status)

    try
    "Execute a single job."
async def _execute_with_cancellation(
    self,
    job: Job,
    plugin: Any
) -> Any
    """
    Execute job with cancellation monitoring.
    
    CR-6 Stage 4 wires CANCEL_PHASE_CHANGED events for the substrate's
    cooperative → force → reloading → completed state machine.
    
    Cooperative-success path (plugin acknowledges cancel within timeout):
        COOPERATIVE → COMPLETED
    Force-kill path (cooperative timeout):
        COOPERATIVE → FORCE → RELOADING → COMPLETED
    """
async def _poll_progress(
    self,
    job: Job,
    plugin: Any
) -> None
    """
    Poll progress + sample resources from the plugin during execution.
    
    Emits PROGRESS_CHANGED events when progress or status_message changes
    from the previous poll (avoids spamming the bus between meaningful
    updates). CR-6 Stage 3: also emits RESOURCE_SNAPSHOT events every
    `resource_snapshot_cadence_polls` iterations; snapshot is also stored
    on `job.last_resource_snapshot` for synchronous inspection.
    """
def get_state(self) -> Dict[str, Any]:  # Queue state for UI (legacy shape)
    """Get the current queue state in the pre-CR-6 dict shape.

    REMOVE-AFTER-OVERHAUL: kept so existing UI consumers (notably
    `cjm-fasthtml-job-monitor`) continue to work until they migrate to the
    typed `get_pending` / `get_running` / `get_history` / `get_stats`
    accessors. Reads from the new accessors and reshapes:

    - Legacy keys (`plugin_name`, `created_at`) read from the renamed fields
      via the Job dataclass's backward-compat `@property` aliases.
    - Legacy `error` was a bare string; new is `JobError`. Shim returns
      `error.message` to preserve the string shape.
    - Legacy timestamps were unix floats; new are `datetime`. Shim returns
      `.timestamp()` to preserve float shape.
    """
    running = self.get_running()
    running_dict = None
    if running
    """
    Get the current queue state in the pre-CR-6 dict shape.
    
    REMOVE-AFTER-OVERHAUL: kept so existing UI consumers (notably
    `cjm-fasthtml-job-monitor`) continue to work until they migrate to the
    typed `get_pending` / `get_running` / `get_history` / `get_stats`
    accessors. Reads from the new accessors and reshapes:
    
    - Legacy keys (`plugin_name`, `created_at`) read from the renamed fields
      via the Job dataclass's backward-compat `@property` aliases.
    - Legacy `error` was a bare string; new is `JobError`. Shim returns
      `error.message` to preserve the string shape.
    - Legacy timestamps were unix floats; new are `datetime`. Shim returns
      `.timestamp()` to preserve float shape.
    """

Classes

class JobStatus(str, Enum):
    "Status of a job in the queue."
class JobEventType(str, Enum):
    """
    Push-based job event types (CR-6).
    
    Emitted by JobQueue on a multi-subscriber event bus. Consumers subscribe
    via `queue.events(job_id)` / `queue.events_for_sequence(seq_id)` /
    `queue.all_events()` and receive `JobEvent` instances asynchronously.
    
    Stage 1 wires STATE_TRANSITION + PROGRESS_CHANGED at the existing
    execute-path lifecycle points. The remaining types are reserved for
    Stages 2-4 (sequences / resources + logs / cancel-phase + retry +
    block-reason). Reserving the enum values up front keeps later stages
    additive — new event sources publish without changing the enum.
    """
class CancelPhase(str, Enum):
    """
    Phase of a cancellation in progress (CR-6 + CR-4 pairing).
    
    Surfaces the substrate's cancel state machine. Stage 4 wires the
    transitions; Stage 1 reserves the enum so `Job.cancel_phase` can be
    typed correctly without dangling forward references.
    """
@runtime_checkable
class JobQueueDependencies(Protocol):
    """
    Substrate dependencies the JobQueue requires (CR-6).
    
    PluginManager satisfies this structurally; the Protocol exists so JobQueue
    can be tested in isolation (with a lightweight test double) and so a future
    extraction into a separate library has no API constraint locked in.
    
    Method surface chosen empirically — these are the exact 5 methods the
    queue's execute path calls today. Adding new dependencies to the Protocol
    (e.g., a typed system-monitor reader when Stage 3 wires resource snapshots)
    is an additive change.
    """
    
    def get_plugin_meta(self, name_or_id: str) -> Optional[Any]: ...
        def get_plugin(self, name_or_id: str) -> Optional[Any]: ...
    
    def get_plugin(self, name_or_id: str) -> Optional[Any]: ...
        async def execute_plugin_async(self, name_or_id: str, *args: Any, **kwargs: Any) -> Any: ...
    
    async def execute_plugin_async(self, name_or_id: str, *args: Any, **kwargs: Any) -> Any: ...
        def reload_plugin(self, name_or_id: str) -> Any: ...
    
    def reload_plugin(self, name_or_id: str) -> Any: ...
        def get_plugin_logs(self, plugin_name: str, lines: int = 50) -> str: ...
    
    def get_plugin_logs(self, plugin_name: str, lines: int = 50) -> str: ...
@dataclass
class Job:
    """
    A queued plugin execution request (CR-6 reshape).
    
    Stage 1 lands the full field set so subsequent stages are purely additive
    on the population paths. Fields tagged "Stage N" are typed but unpopulated
    by Stage 1's execute path; their default values keep the dataclass safely
    constructible from existing call sites.
    """
    
    id: str  # Unique job identifier (UUID)
    plugin_instance_id: str  # Target plugin instance (per CR-10)
    args: Tuple[Any, ...]  # Positional arguments for execute()
    kwargs: Dict[str, Any]  # Keyword arguments for execute()
    status: JobStatus = JobStatus.pending  # Current job status
    priority: int = 0  # Higher = more urgent
    submitted_at: datetime = field(...)  # When submitted
    started_at: Optional[datetime]  # When execution started
    completed_at: Optional[datetime]  # When execution finished
    progress: float = 0.0  # 0.0 to 1.0, or -1.0 for indeterminate
    status_message: str = ''  # Descriptive status message
    result: Any  # Execution result (if completed)
    error: Optional[JobError]  # Structured failure summary (CR-5)
    sequence_id: Optional[str]  # Set when part of a sequence (Stage 2)
    sequence_index: Optional[int]  # 0-based position in sequence (Stage 2)
    cancel_requested_at: Optional[datetime]  # When cancel was requested (Stage 4)
    cancel_phase: Optional[CancelPhase]  # Active cancel phase (Stage 4)
    block_reason: Optional[str]  # Why the scheduler is blocking (Stage 4)
    retry_count: int = 0  # Reactive retries attempted (CR-7 + Stage 4)
    last_resource_snapshot: Optional[Any]  # Stage 3 wires this (ResourceSnapshot)
    
    def plugin_name(self) -> str:
            return self.plugin_instance_id
    
        @property
        def created_at(self) -> float
    
    def created_at(self) -> float:
            return self.submitted_at.timestamp()
    
    
    @dataclass
    class JobEvent
@dataclass
class JobEvent:
    """
    A push-based job event (CR-6).
    
    Carries full tag context so a subscriber to `all_events()`, `events(job_id)`,
    or `events_for_sequence(seq_id)` receives identically-shaped instances.
    `payload` is a per-event-type structured dict (e.g., STATE_TRANSITION carries
    `{"from": "pending", "to": "running"}`; PROGRESS_CHANGED carries
    `{"progress": 0.42, "status_message": "..."}`).
    """
    
    type: JobEventType
    job_id: str
    plugin_instance_id: str
    sequence_id: Optional[str]
    sequence_index: Optional[int]
    timestamp: datetime = field(...)
    payload: Dict[str, Any] = field(...)
@dataclass
class QueueStats:
    "Aggregate counts returned by `JobQueue.get_stats()` (CR-6)."
    
    total_pending: int
    total_completed: int
    total_failed: int
    total_cancelled: int
@dataclass
class _Subscription:
    """
    Internal: per-subscriber bounded queue + drop counter (CR-6 event bus).
    
    Slow subscribers backpressure themselves via `asyncio.QueueFull` drop;
    the publisher never blocks. `dropped_count` surfaces visibility for
    operators / future telemetry.
    """
    
    queue: 'asyncio.Queue[JobEvent]'
    dropped_count: int = 0
@dataclass
class SequenceStep:
    """
    A single step in a job sequence (CR-6 Stage 2).
    
    User-facing — supplied to `JobQueue.submit_sequence`. The substrate
    constructs the actual `Job` from this at advance time, tagging the
    job with `sequence_id` + `sequence_index` so it appears in
    `events_for_sequence` subscriptions.
    """
    
    plugin_instance_id: str  # Target plugin instance for this step
    args: Tuple[Any, ...] = ()  # Positional arguments
    kwargs: Dict[str, Any] = field(...)  # Keyword arguments
    priority: int = 0  # Per-step priority override (0 = inherit sequence priority)
@dataclass
class StepResult:
    """
    Result of one step in a job sequence (CR-6 Stage 2).
    
    `success=True` means the member-job completed with `JobStatus.completed`.
    Failure modes (failed / cancelled) capture the `JobError` in `error`.
    For best-effort sequences (`fail_fast=False`), failure rows are recorded
    in `JobSequence.results` and the sequence continues.
    """
    
    job_id: str  # Member job's UUID (empty string if sequence aborted before submit)
    success: bool  # True if the step's job completed successfully
    result: Any  # Job result (if success)
    error: Optional[JobError]  # Structured failure summary (if not success)
@dataclass
class JobSequence:
    """
    Internal: tracks a multi-step job sequence (CR-6 Stage 2).
    
    Lives in `JobQueue._sequences`. State machine: starts `running`,
    transitions to `completed` (all steps attempted) / `failed` (fail_fast
    halted on a failed member) / `cancelled` (cancel_sequence called or
    a member was cancelled). Once terminal, `completed_at` is set and
    `current_job_id` is cleared.
    """
    
    id: str  # Sequence UUID
    steps: List[SequenceStep]  # The full step list (immutable post-submit)
    fail_fast: bool = True  # Halt on first failure (audit-locked default)
    priority: int = 0  # Sequence-level priority (per-step override possible)
    current_index: int = 0  # 0-based index of the active member step
    current_job_id: Optional[str]  # The active member-job's ID, if any
    results: List[StepResult] = field(...)  # Per-step outcomes
    status: JobStatus = JobStatus.running  # Sequence-level status (reuses JobStatus)
    submitted_at: datetime = field(...)
    completed_at: Optional[datetime]  # Set when sequence reaches terminal status
@dataclass
class ResourceSnapshot:
    """
    Point-in-time resource usage for one job (CR-6 Stage 3).
    
    Worker stats (cpu_percent, memory_rss_mb) come from the plugin proxy's
    `get_stats()`. GPU fields come from the configured system-monitor plugin
    (when set on JobQueue) via CR-3's typed `list_processes()` (per-PID
    matching) and `get_system_status()` (global GPU stats). All GPU fields
    are Optional — None if no sysmon configured or the worker isn't running
    on a GPU.
    
    Distinct from CR-7's `EmpiricalResourceRecord` (aggregated profile
    across runs); this is "what's happening right now" for one job.
    """
    
    timestamp: datetime  # When the sample was taken
    worker_pid: int = 0  # OS PID of the plugin worker subprocess
    cpu_percent: float = 0.0  # Worker process CPU%
    memory_rss_mb: float = 0.0  # Worker process resident memory (MB)
    gpu_index: Optional[int]  # GPU index the worker is on (None if not GPU-bound)
    gpu_memory_mb: Optional[float]  # Worker's GPU memory usage (MB)
    gpu_type: Optional[str]  # GPU vendor (NVIDIA / AMD / Intel / None)
    gpu_total_mb: Optional[float]  # Total GPU memory available globally (MB)
    gpu_load_percent: Optional[float]  # GPU compute utilization (global)
class JobQueue:
    def __init__(
        self,
        deps: JobQueueDependencies,        # Substrate dependencies (PluginManager satisfies structurally)
        max_history: int = 100,            # Max completed jobs to retain
        cancel_timeout: float = 3.0,       # Seconds to wait for cooperative cancel
        progress_poll_interval: float = 1.0,  # Seconds between progress polls
        sysmon_plugin_name: Optional[str] = None,  # CR-3 MonitorPlugin instance for GPU stats (None = no GPU info)
        resource_snapshot_cadence_polls: int = 4,  # Sample resources every Nth progress poll
    )
    "Resource-aware job queue with push-based observability (CR-6)."
    
    def __init__(
            self,
            deps: JobQueueDependencies,        # Substrate dependencies (PluginManager satisfies structurally)
            max_history: int = 100,            # Max completed jobs to retain
            cancel_timeout: float = 3.0,       # Seconds to wait for cooperative cancel
            progress_poll_interval: float = 1.0,  # Seconds between progress polls
            sysmon_plugin_name: Optional[str] = None,  # CR-3 MonitorPlugin instance for GPU stats (None = no GPU info)
            resource_snapshot_cadence_polls: int = 4,  # Sample resources every Nth progress poll
        )
        "Initialize the job queue.

CR-6 Stage 3 adds `sysmon_plugin_name` and `resource_snapshot_cadence_polls`.
Sysmon integration is optional  when set, the named plugin must satisfy
CR-3's typed `MonitorPlugin` shape (`get_system_status` + `list_processes`).
When unset, ResourceSnapshot still carries worker stats but all GPU
fields stay None."
    
    def manager(self) -> JobQueueDependencies

Variables

_LOG_TS_PATTERN

Scheduling (scheduling.ipynb)

Resource scheduling policies for plugin execution

Import

from cjm_plugin_system.core.scheduling import (
    ResourceScheduler,
    PermissiveScheduler,
    SafetyScheduler,
    QueueScheduler
)

Classes

class ResourceScheduler(ABC):
    "Abstract base class for resource allocation policies."
    
    def allocate(
            self,
            plugin_meta: PluginMeta,  # Metadata of the plugin requesting resources
            stats_provider: Callable[[], Dict[str, Any]]  # Function that returns fresh stats
        ) -> bool:  # True if execution is allowed
        "Decide if a plugin can start based on its requirements and system state."
    
    async def allocate_async(
            self,
            plugin_meta: PluginMeta,  # Metadata of the plugin requesting resources
            stats_provider: Callable[[], Awaitable[Dict[str, Any]]]  # Async function returning stats
        ) -> bool:  # True if execution is allowed
        "Async allocation decision. Default delegates to sync allocate after fetching stats once."
    
    def on_execution_start(
            self,
            plugin_name: str  # Name of the plugin starting execution
        ) -> None
        "Notify scheduler that a task started (to reserve resources)."
    
    def on_execution_finish(
            self,
            plugin_name: str  # Name of the plugin finishing execution
        ) -> None
        "Notify scheduler that a task finished (to release resources)."
class PermissiveScheduler(ResourceScheduler):
    "Scheduler that allows all executions (Default / Dev Mode)."
    
    def allocate(
            self,
            plugin_meta: PluginMeta,  # Metadata of the plugin requesting resources
            stats_provider: Callable[[], Dict[str, Any]]  # Stats provider (ignored)
        ) -> bool:  # Always returns True
        "Allow all plugin executions without checking resources."
    
    def on_execution_start(
            self,
            plugin_name: str  # Name of the plugin starting execution
        ) -> None
        "No-op for permissive scheduler."
    
    def on_execution_finish(
            self,
            plugin_name: str  # Name of the plugin finishing execution
        ) -> None
        "No-op for permissive scheduler."
class SafetyScheduler(ResourceScheduler):
    "Scheduler that prevents execution if resources are insufficient."
    
    def allocate(
            self,
            plugin_meta: PluginMeta,  # Metadata of the plugin requesting resources
            stats_provider: Callable[[], Dict[str, Any]]  # Function returning current stats
        ) -> bool:  # True if resources are available
        "Check resource requirements against system state."
    
    def on_execution_start(
            self,
            plugin_name: str  # Name of the plugin starting execution
        ) -> None
        "Called when execution starts (for future resource reservation)."
    
    def on_execution_finish(
            self,
            plugin_name: str  # Name of the plugin finishing execution
        ) -> None
        "Called when execution finishes (for future resource release)."
class QueueScheduler:
    def __init__(
        self,
        timeout: float = 300.0,  # Max seconds to wait for resources
        poll_interval: float = 2.0  # Seconds between resource checks
    )
    "Scheduler that waits for resources to become available."
    
    def __init__(
            self,
            timeout: float = 300.0,  # Max seconds to wait for resources
            poll_interval: float = 2.0  # Seconds between resource checks
        )
        "Initialize queue scheduler with timeout and polling settings."
    
    def allocate(
            self,
            plugin_meta: PluginMeta,  # Metadata of the plugin requesting resources
            stats_provider: Callable[[], Dict[str, Any]]  # Function returning current stats
        ) -> bool:  # True if resources become available before timeout
        "Wait for resources using blocking sleep."
    
    async def allocate_async(
            self,
            plugin_meta: PluginMeta,  # Metadata of the plugin requesting resources
            stats_provider: Callable[[], Awaitable[Dict[str, Any]]]  # Async stats function
        ) -> bool:  # True if resources become available before timeout
        "Wait for resources using non-blocking async sleep."
    
    def on_execution_start(
            self,
            plugin_name: str  # Name of the plugin starting execution
        ) -> None
        "Track that a plugin has started executing."
    
    def on_execution_finish(
            self,
            plugin_name: str  # Name of the plugin finishing execution
        ) -> None
        "Track that a plugin has finished executing."
    
    def get_active_plugins(self) -> Set[str]:  # Set of currently executing plugin names
        "Get the set of plugins with active executions."

Plugin Secret Store (secret_store.ipynb)

CR-12: project-local secret storage for API-based plugins (file-backed, 0600)

Import

from cjm_plugin_system.core.secret_store import (
    SecretStore,
    LocalSecretStore
)

Functions

def _default_secrets_dir() -> Path:
    """Default secrets directory: `~/.cjm/secrets` (bootstrap fallback)."""
    return Path.home() / ".cjm" / "secrets"


class LocalSecretStore
    "Default secrets directory: `~/.cjm/secrets` (bootstrap fallback)."

Classes

@runtime_checkable
class SecretStore(Protocol):
    "Protocol for resolving per-plugin secrets (API keys, tokens)."
    
    def get_secret(self, plugin_name: str, key: str, *, scope: Optional[str] = None) -> Optional[str]:
            """Return the secret value for (plugin, key) under `scope`, or None."""
            ...
    
        def set_secret(self, plugin_name: str, key: str, value: str, *, scope: Optional[str] = None) -> None
        "Return the secret value for (plugin, key) under `scope`, or None."
    
    def set_secret(self, plugin_name: str, key: str, value: str, *, scope: Optional[str] = None) -> None:
            """Persist a secret value for (plugin, key) under `scope`."""
            ...
    
        def delete_secret(self, plugin_name: str, key: str, *, scope: Optional[str] = None) -> bool
        "Persist a secret value for (plugin, key) under `scope`."
    
    def delete_secret(self, plugin_name: str, key: str, *, scope: Optional[str] = None) -> bool:
            """Remove (plugin, key) under `scope`. Returns True if a secret was deleted."""
            ...
    
        def list_keys(self, plugin_name: str, *, scope: Optional[str] = None) -> List[str]
        "Remove (plugin, key) under `scope`. Returns True if a secret was deleted."
    
    def list_keys(self, plugin_name: str, *, scope: Optional[str] = None) -> List[str]
        "Return the NAMES of secrets stored for a plugin under `scope` — never values."
class LocalSecretStore:
    def __init__(
        self,
        secrets_dir: Optional[Path] = None  # Directory for secrets.json; None -> ~/.cjm/secrets
    )
    "File-backed default `SecretStore` (0600 JSON under `secrets_dir`)."
    
    def __init__(
            self,
            secrets_dir: Optional[Path] = None  # Directory for secrets.json; None -> ~/.cjm/secrets
        )
        "Initialize the store. `secrets_dir=None` uses `~/.cjm/secrets`."
    
    def get_secret(
            self,
            plugin_name: str,  # Plugin the secret belongs to
            key: str,          # Secret key (typically the env-var name, e.g. GEMINI_API_KEY)
            *,
            scope: Optional[str] = None  # Reserved multi-user seam; ignored by the local store
        ) -> Optional[str]:  # The secret value, or None if absent
        "Resolve a secret value."
    
    def set_secret(
            self,
            plugin_name: str,  # Plugin the secret belongs to
            key: str,          # Secret key
            value: str,        # Secret value (stored plaintext at 0600)
            *,
            scope: Optional[str] = None  # Reserved multi-user seam
        ) -> None
        "Persist a secret value."
    
    def delete_secret(
            self,
            plugin_name: str,  # Plugin the secret belongs to
            key: str,          # Secret key
            *,
            scope: Optional[str] = None  # Reserved multi-user seam
        ) -> bool:  # True if a secret was removed
        "Remove a secret, pruning now-empty plugin/scope containers."
    
    def list_keys(
            self,
            plugin_name: str,  # Plugin to list secrets for
            *,
            scope: Optional[str] = None  # Reserved multi-user seam
        ) -> List[str]:  # Secret key NAMES (never values)
        "Return the names of secrets stored for a plugin (never the values)."

Variables

_SECRETS_FILENAME = 'secrets.json'
_DEFAULT_SCOPE = '__default__'

Substrate Telemetry Helpers (telemetry.ipynb)

Shared GPU/CPU attribution helpers used by both JobQueue._sample_resource_snapshot (CR-6 Stage 3) and PluginManager._record_sample_safe (CR-7).

Import

# No corresponding Python module found for core.telemetry

Functions

def _proc_field(proc: Any, key: str, default: Any = None) -> Any:
    """Read a field from a sysmon process record, accepting dict or dataclass.

    `MonitorPlugin.list_processes()` returns `ProcessStats` dataclasses (CR-3),
    but proxy round-trips frequently coerce to dicts. Accept both so the
    helper works against either form without the caller pre-normalizing.
    """
    if isinstance(proc, dict)
    """
    Read a field from a sysmon process record, accepting dict or dataclass.
    
    `MonitorPlugin.list_processes()` returns `ProcessStats` dataclasses (CR-3),
    but proxy round-trips frequently coerce to dicts. Accept both so the
    helper works against either form without the caller pre-normalizing.
    """
def _worker_subtree_pids(stats: Dict[str, Any]) -> set:
    """Build the worker subtree PID set from a `/stats` dict.

    Falls back to a single-pid set when `subtree_pids` is absent (pre-fix
    workers, mock test fixtures). The worker pid itself is always included.
    """
    tree: set = set()
    """
    Build the worker subtree PID set from a `/stats` dict.
    
    Falls back to a single-pid set when `subtree_pids` is absent (pre-fix
    workers, mock test fixtures). The worker pid itself is always included.
    """
def attribute_gpu_to_worker_subtree(
    stats: Dict[str, Any],  # Worker `/stats` payload (must include 'pid'; uses 'subtree_pids' if present)
    sysmon: Any,            # The configured MonitorPlugin (or None)
) -> Optional[Dict[str, Any]]
    """
    Attribute GPU memory across the worker's process subtree.
    
    Returns `{'gpu_memory_mb': float, 'gpu_index': Optional[int]}` when sysmon
    is reachable, or `None` when sysmon isn't configured / doesn't expose
    `list_processes()` / errors out. Callers treat `None` as "sysmon
    unavailable" and leave GPU snapshot fields as their defaults; a 0.0 sum
    means sysmon worked but no subtree PID holds GPU memory (CPU-only plugin
    on a GPU box).
    """

Configuration Validation (validation.ipynb)

Validation helpers for plugin configuration dataclasses

Import

from cjm_plugin_system.utils.validation import (
    T,
    SCHEMA_TITLE,
    SCHEMA_DESC,
    SCHEMA_MIN,
    SCHEMA_MAX,
    SCHEMA_ENUM,
    SCHEMA_MIN_LEN,
    SCHEMA_MAX_LEN,
    SCHEMA_PATTERN,
    SCHEMA_FORMAT,
    validate_field_value,
    validate_config,
    config_to_dict,
    dict_to_config,
    extract_defaults,
    dataclass_to_jsonschema
)

Functions

def validate_field_value(
    value:Any, # Value to validate
    metadata:Dict[str, Any], # Field metadata containing constraints
    field_name:str="" # Field name for error messages
) -> Tuple[bool, Optional[str]]: # (is_valid, error_message)
    "Validate a value against field metadata constraints."
def validate_config(
    config:Any # Configuration dataclass instance to validate
) -> Tuple[bool, Optional[str]]: # (is_valid, error_message)
    "Validate all fields in a configuration dataclass against their metadata constraints."
def config_to_dict(
    config:Any # Configuration dataclass instance
) -> Dict[str, Any]: # Dictionary representation of the configuration
    "Convert a configuration dataclass instance to a dictionary."
def dict_to_config(
    config_class:Type[T], # Configuration dataclass type
    data:Optional[Dict[str, Any]]=None, # Dictionary with configuration values
    validate:bool=False, # Whether to validate against metadata constraints
    strict:bool=True # SG-8: reject unknown keys (default); set False to log+filter for forward-compat
) -> T: # Instance of the configuration dataclass
    """
    Create a configuration dataclass instance from a dictionary.
    
    SG-8: by default, unknown keys raise `PluginConfigError`. The previous
    behavior (silently filter unknowns) is available via `strict=False`,
    which logs a warning so the drift is still visible in operator logs.
    """
def extract_defaults(
    config_class:Type # Configuration dataclass type
) -> Dict[str, Any]: # Default values from the dataclass
    "Extract default values from a configuration dataclass type."
def _python_type_to_json_type(
    python_type:type # Python type annotation to convert
) -> Dict[str, Any]: # JSON schema type definition
    "Convert Python type to JSON schema type."
def dataclass_to_jsonschema(
    cls:type # Dataclass with field metadata
) -> Dict[str, Any]: # JSON schema dictionary
    "Convert a dataclass to a JSON schema for form generation."

Variables

T
SCHEMA_TITLE = 'title'  # Display title for the field
SCHEMA_DESC = 'description'  # Help text description
SCHEMA_MIN = 'minimum'  # Minimum value for numbers
SCHEMA_MAX = 'maximum'  # Maximum value for numbers
SCHEMA_ENUM = 'enum'  # Allowed values for dropdowns
SCHEMA_MIN_LEN = 'minLength'  # Minimum string length
SCHEMA_MAX_LEN = 'maxLength'  # Maximum string length
SCHEMA_PATTERN = 'pattern'  # Regex pattern for strings
SCHEMA_FORMAT = 'format'  # String format (email, uri, date, etc.)

Universal Worker (worker.ipynb)

FastAPI server that runs inside isolated plugin environments

Import

from cjm_plugin_system.core.worker import (
    EnhancedJSONEncoder,
    parent_monitor,
    create_app,
    run_worker
)

Functions

def parent_monitor(
    ppid: int # Parent process ID to monitor
) -> None
    """
    Monitor parent process and terminate self if parent dies.
    
    This implements the "Suicide Pact" pattern: if the Host process dies,
    the Worker must terminate itself to prevent zombie processes.
    """
def create_app(
    module_name: str, # Python module path (e.g., "my_plugin.plugin")
    class_name: str   # Plugin class name (e.g., "WhisperPlugin")
) -> FastAPI: # Configured FastAPI application
    "Create FastAPI app that hosts the specified plugin."
def run_worker() -> None:
    """CLI entry point for running the worker."""
    parser = argparse.ArgumentParser(description="Universal Plugin Worker")
    parser.add_argument("--module", required=True, help="Plugin module path")
    parser.add_argument("--class", dest="class_name", required=True, help="Plugin class name")
    # SG-4: parent-bound listening-socket FD inheritance closes the
    "CLI entry point for running the worker."

Classes

class EnhancedJSONEncoder(JSONEncoder):
    """
    JSON encoder that handles dataclasses and other common types.
    
    SG-52: datetime support added so JobError.occurred_at serializes cleanly
    when emitted via the typed `_job_error` terminal chunk in /execute_stream.
    """
    
    def default(
            self,
            o: Any # Object to encode
        ) -> Any: # JSON-serializable representation
        "Convert non-serializable objects to serializable form."

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cjm_plugin_system-0.0.36.tar.gz (313.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cjm_plugin_system-0.0.36-py3-none-any.whl (201.9 kB view details)

Uploaded Python 3

File details

Details for the file cjm_plugin_system-0.0.36.tar.gz.

File metadata

  • Download URL: cjm_plugin_system-0.0.36.tar.gz
  • Upload date:
  • Size: 313.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for cjm_plugin_system-0.0.36.tar.gz
Algorithm Hash digest
SHA256 71efa7aaca982a14f6dfda081b76c06fc82e9a687a0ebb98f68432744c61412b
MD5 727401948a47299cca19db614361a8a8
BLAKE2b-256 3876552bd733c7f8741e14230557ffc6b4bee8019078e3df4a038c99bb93a66d

See more details on using hashes here.

File details

Details for the file cjm_plugin_system-0.0.36-py3-none-any.whl.

File metadata

File hashes

Hashes for cjm_plugin_system-0.0.36-py3-none-any.whl
Algorithm Hash digest
SHA256 fe686ddf9a9ec8594aa1d98139f747f392dfde934a3a19ae8bbe30e3fad64994
MD5 a42bef444f036e21d1401386b3916195
BLAKE2b-256 a76299b05c34e5ae920a9535c06f01d452dc7be0c4e681af700f5f2ee84b27f9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page