Process-isolated plugin system with resource-aware scheduling, configuration validation, and lifecycle management for extensible Python applications.
Project description
cjm-plugin-system
Install
pip install cjm_plugin_system
Project Structure
nbs/
├── core/ (14)
│ ├── config.ipynb # Project-level configuration for paths, runtime settings, and environment management
│ ├── config_store.ipynb # Persistent storage for per-plugin configuration (with enabled flag)
│ ├── empirical_store.ipynb # Persistent store for empirically-observed resource usage per (instance_id, config_hash) pair. CR-7's data foundation — `record_sample` is called from `PluginManager.execute_plugin*` finally blocks; aggregates feed eviction-candidate selection + future UI hints + cost-aware retry decisions.
│ ├── errors.ipynb # Typed exception hierarchy + JobError dataclass + default classification of bare Python exceptions. The substrate's CR-5 implementation per the 2026-05-19 substrate audit.
│ ├── interface.ipynb # Abstract base class defining the generic plugin interface
│ ├── manager.ipynb # Plugin discovery, loading, and lifecycle management system
│ ├── manifest_format.ipynb # Typed parser + writer for the nested v2.0 manifest layout per the 2026-05-19 substrate audit's CR-8. Substrate manifests transitioned from a flat top-level JSON object to a four-section nested layout: `install` (deployment-specific facts populated at install time), `code` (code-derived facts refreshed by `cjm-ctl regenerate-manifest`), `drift_tracking` (a config_schema hash that records the witness shape so live-vs-stored comparisons can detect drift), and `overrides` (an operator-supplied overlay placeholder).
│ ├── metadata.ipynb # Data structures for plugin metadata
│ ├── platform.ipynb # Cross-platform utilities for process management, path handling, and system detection
│ ├── proxy.ipynb # Bridge between Host application and isolated Worker processes
│ ├── queue.ipynb # Resource-aware job queue for sequential plugin execution with cancellation support
│ ├── scheduling.ipynb # Resource scheduling policies for plugin execution
│ ├── secret_store.ipynb # CR-12: project-local secret storage for API-based plugins (file-backed, 0600)
│ └── worker.ipynb # FastAPI server that runs inside isolated plugin environments
├── utils/ (2)
│ ├── hashing.ipynb # Shared cryptographic hashing primitives for content integrity verification
│ └── validation.ipynb # Validation helpers for plugin configuration dataclasses
├── bootstrap.ipynb # One-call factory that assembles a PluginManager + JobQueue + plugin bindings — closes the demo-app boilerplate duplication audited across 5 substrate consumers.
└── cli.ipynb # CLI tool for declarative plugin management
Total: 18 notebooks across 2 directories
Module Dependencies
graph LR
bootstrap["bootstrap<br/>Bootstrap"]
cli["cli<br/>cli"]
core_config["core.config<br/>Configuration"]
core_config_store["core.config_store<br/>Plugin Config Store"]
core_empirical_store["core.empirical_store<br/>Empirical Resource Tracking"]
core_errors["core.errors<br/>Plugin Error Taxonomy"]
core_interface["core.interface<br/>Plugin Interface"]
core_manager["core.manager<br/>Plugin Manager"]
core_manifest_format["core.manifest_format<br/>Manifest Format (v2.0)"]
core_metadata["core.metadata<br/>Plugin Metadata"]
core_platform["core.platform<br/>Platform Utilities"]
core_proxy["core.proxy<br/>Remote Plugin Proxy"]
core_queue["core.queue<br/>Job Queue"]
core_scheduling["core.scheduling<br/>Scheduling"]
core_secret_store["core.secret_store<br/>Plugin Secret Store"]
core_worker["core.worker<br/>Universal Worker"]
utils_hashing["utils.hashing<br/>Content Hashing Utilities"]
utils_validation["utils.validation<br/>Configuration Validation"]
bootstrap --> core_manager
bootstrap --> core_scheduling
bootstrap --> core_queue
cli --> core_metadata
cli --> core_config
cli --> core_manifest_format
cli --> core_platform
core_empirical_store --> utils_hashing
core_interface --> core_errors
core_manager --> core_errors
core_manager --> core_metadata
core_manager --> core_empirical_store
core_manager --> core_config
core_manager --> core_manifest_format
core_manager --> utils_validation
core_manager --> core_scheduling
core_manager --> core_proxy
core_manager --> core_secret_store
core_manager --> core_config_store
core_manager --> core_interface
core_manifest_format --> utils_hashing
core_manifest_format --> core_metadata
core_platform --> core_config
core_proxy --> core_errors
core_proxy --> core_config
core_proxy --> core_platform
core_proxy --> core_interface
core_queue --> core_errors
core_scheduling --> core_metadata
core_worker --> core_platform
core_worker --> core_errors
utils_validation --> core_errors
32 cross-module dependencies detected
CLI Reference
No CLI commands found in this project.
Module Overview
Detailed documentation for each module in the project:
Bootstrap (bootstrap.ipynb)
One-call factory that assembles a PluginManager + JobQueue + plugin bindings — closes the demo-app boilerplate duplication audited across 5 substrate consumers.
Import
from cjm_plugin_system.bootstrap import (
PluginSpec,
Pipeline,
create_pipeline
)
Functions
def _normalize_spec(
spec: PluginSpec # Raw spec from caller
) -> Tuple[str, Optional[Dict[str, Any]]]: # (plugin_name, optional config)
"""
Normalize a plugin spec into a `(name, config)` pair.
Accepts a bare string, a `(name, config)` tuple, or a mapping with a 'name'
key (config under 'config' or the mapping itself minus 'name').
"""
def create_pipeline(
plugins: Optional[Iterable[PluginSpec]] = None, # Plugins to discover + load
scheduler: Optional[ResourceScheduler] = None, # Resource policy (default: permissive)
system_monitor: Optional[str] = None, # Plugin name to register as system monitor
search_paths: Optional[List[Path]] = None, # Custom manifest search paths
queue_kwargs: Optional[Dict[str, Any]] = None, # Extra kwargs forwarded to JobQueue
strict: bool = True, # SG-5 strict config validation on each load
) -> Pipeline: # Assembled stack ready to start
"""
Assemble a PluginManager + JobQueue + plugin bindings in one call.
Steps performed:
1. Construct PluginManager with the given scheduler + search paths
2. discover_manifests()
3. For each spec in `plugins`: load the plugin and create a PluginBinding
4. If `system_monitor` is set, register that plugin as the sys-mon
5. Construct JobQueue (NOT started — caller starts via context manager)
Plugins that fail to load are logged but do not raise; their entries are
omitted from `Pipeline.bindings`. Use the returned `Pipeline.manager` to
inspect which loads succeeded.
"""
Classes
@dataclass
class Pipeline:
"Assembled substrate stack: manager + queue + plugin bindings."
manager: PluginManager # Discovery + lifecycle
queue: JobQueue # Job submission + scheduling
bindings: Dict[str, PluginBinding] = field(...) # Plugin name -> bound view
async def start(self) -> None:
"""Start the job queue's background processor."""
await self.queue.start()
async def stop(self) -> None
"Start the job queue's background processor."
async def stop(self) -> None:
"""Stop the job queue and unload all plugins."""
await self.queue.stop()
self.manager.unload_all()
async def __aenter__(self) -> "Pipeline"
"Stop the job queue and unload all plugins."
cli (cli.ipynb)
CLI tool for declarative plugin management
Import
from cjm_plugin_system.cli import (
app,
main,
setup_runtime,
run_cmd,
regenerate_manifest,
install_all,
setup_host,
estimate_size,
list_plugins,
remove_plugin,
validate_file,
set_secret,
list_secrets
)
Functions
def main(
ctx:typer.Context,
cjm_config:Annotated[Optional[Path], typer.Option(
"--cjm-config",
help="Path to cjm.yaml configuration file"
)]=None,
data_dir:Annotated[Optional[Path], typer.Option(
"--data-dir",
help="Override data directory (manifests, logs)"
)]=None,
conda_prefix:Annotated[Optional[Path], typer.Option(
"--conda-prefix",
help="Override conda/mamba prefix path"
)]=None,
conda_type:Annotated[Optional[str], typer.Option(
"--conda-type",
help="Conda implementation: micromamba, miniforge, or conda"
)]=None,
) -> None
"CJM Plugin System CLI for managing isolated plugin environments."
def setup_runtime(
force:bool=typer.Option(False, "--force", "-f", help="Re-download even if binary exists")
) -> None
"Download and setup micromamba runtime for project-local mode."
def _check_runtime_available() -> None:
"""Check if the configured conda runtime is available, exit with helpful message if not."""
cfg = get_config()
if not ensure_runtime_available(cfg)
"Check if the configured conda runtime is available, exit with helpful message if not."
def _get_conda_cmd_str() -> str
"Get the conda/micromamba command string for shell commands."
def _download_url_to_temp(
url: str, # URL to download
suffix: str = ".yml" # File suffix for temp file
) -> Optional[Path]: # Path to temp file or None if failed
"Download a URL to a temporary file. Returns None if download fails."
def _resolve_env_file(
env_file: str # Path or URL to environment file
) -> tuple[str, Optional[Path]]: # (resolved_path, temp_file_to_cleanup)
"""
Resolve env_file to a local path, downloading if it's a URL.
Returns (local_path, temp_file) where temp_file is set if we created
a temporary file that should be cleaned up later.
"""
def run_cmd(
cmd: str, # Shell command to execute
check: bool = True # Whether to raise on non-zero exit
) -> None
"""
Run a shell command and stream output.
Uses the platform's default shell (no hardcoded /bin/bash).
"""
def _generate_manifest(
env_name: str, # Name of the Conda environment
package_name: str, # Package source string (git URL or package name)
manifest_dir: Path # Directory to write manifest JSON files
) -> Optional[Path]: # Path to the manifest written, or None on failure
"""
Run introspection script inside the target env and write a v2.0 manifest.
Builds a `ManifestV2` from the introspection output + install-time markers,
computes `drift_tracking.config_schema_hash`, and writes via
`write_manifest` (nested layout, indent=2). Both `installed_at` and
`regenerated_at` are set to "now"; `regenerate_manifest` preserves the
original `installed_at` via post-write fix-up.
"""
def regenerate_manifest(
plugin_name: str = typer.Argument(..., help="Plugin name as it appears in the manifest"),
plugins_path: Optional[str] = typer.Option(
None, "--plugins",
help="Path to plugins.yaml for package_source recovery (legacy manifests)",
),
package: Optional[str] = typer.Option(
None, "--package",
help="Package spec override (e.g., git URL or pip name); wins over manifest/plugins.yaml lookups",
),
) -> None
"""
Re-run introspection for an installed plugin and rewrite its manifest.
Reads the existing manifest via `load_manifest` (handles both v2.0 nested
+ legacy v1.0 flat layouts), recovers `env_name` + `package_source` from
the install section, runs `_generate_manifest` to refresh the code section,
then post-writes to preserve the original `installed_at` so the regenerate
only updates `regenerated_at` semantically. Always emits v2.0 layout —
regenerating a v1.0 manifest transparently upgrades it.
"""
def _conda_env_exists_configured(
env_name: str # Name of the conda environment
) -> bool: # True if environment exists
"Check if conda environment exists using configured conda command."
def install_all(
plugins_path:str=typer.Option("plugins.yaml", "--plugins", help="Path to plugins.yaml file"),
force:bool=typer.Option(False, help="Force recreation of environments")
) -> None
"Install and register all plugins defined in plugins.yaml."
def setup_host(
plugins_path:str=typer.Option("plugins.yaml", "--plugins", help="Path to plugins.yaml file"),
yes:bool=typer.Option(False, "--yes", "-y", help="Skip confirmation prompt")
) -> None
"Install interface libraries in the current Python environment."
def _format_size(
size_bytes: int # Size in bytes
) -> str: # Human-readable size string
"Format bytes as human-readable string."
def _get_pypi_size(
package_spec: str # Package name or git URL
) -> tuple[int, str]: # (size_bytes, package_name)
"""
Query PyPI for package download size.
SG-37: PyPI normalizes package names to dash-form per PEP 503, so we try
the dash form first and fall back to the underscore form. Without this,
cjm-* packages (whose repo names contain dashes) silently 404 because
the old heuristic only tried the underscored form.
"""
def _estimate_conda_size(
env_file: str, # Path or URL to environment.yml
env_name: str # Target environment name
) -> tuple[int, int]: # (total_bytes, package_count)
"Estimate conda package sizes using dry-run."
def _estimate_pip_sizes(
packages: list[str] # List of pip package specs
) -> tuple[int, int, list[tuple[str, int]]]: # (total_bytes, found_count, [(name, size), ...])
"Estimate pip package sizes from PyPI."
def estimate_size(
plugins_path:str=typer.Option("plugins.yaml", "--plugins", help="Path to plugins.yaml file"),
plugin_name:Optional[str]=typer.Option(None, "--plugin", "-p", help="Estimate for a single plugin"),
verbose:bool=typer.Option(False, "--verbose", "-v", help="Show per-package breakdown")
) -> None
"Estimate disk space required for plugin environments."
def _get_conda_envs() -> set[str]: # Set of existing conda environment names
"""Get list of existing conda environment names using configured conda command."""
cfg = get_config()
cmd_parts = build_conda_command(cfg, "env", "list", "--json")
try
"Get list of existing conda environment names using configured conda command."
def _v2_to_legacy_flat_view(
raw: Dict[str, Any] # Manifest JSON dict as read from disk
) -> Dict[str, Any]: # Flat-shaped dict (install + code merged at top level)
"""
REMOVE-AFTER-OVERHAUL: produce a legacy flat-shaped view of a manifest.
Consumer code in `list_plugins` + `remove_plugin` still reads manifests as
flat dicts (`manifest.get('python_path')`, etc.). When the on-disk manifest
is v2.0 nested, we flatten install + code sections to the top level so
those consumers don't need migration in the same PR. The shim retires when
consumers migrate to `load_manifest` for typed access.
v1.0 manifests pass through unchanged.
"""
def _get_installed_manifests(
manifest_dir:Optional[Path]=None # Directory to scan (uses config default if None)
) -> list[dict]: # List of manifest dictionaries
"""
Load all manifest JSON files from the manifest directory.
Returns flat-shaped dicts regardless of on-disk format (v2.0 manifests are
flattened via `_v2_to_legacy_flat_view`). Consumers that want typed access
should use `load_manifest` from `cjm_plugin_system.core.manifest_format`
instead.
"""
def _extract_env_from_python_path(
python_path:str # Path like /home/user/miniforge3/envs/my-env/bin/python
) -> str: # Extracted environment name or empty string
"Extract conda environment name from python_path."
def list_plugins(
plugins_path:Optional[str]=typer.Option(None, "--plugins", help="Path to plugins.yaml for cross-reference"),
show_envs:bool=typer.Option(False, "--envs", "-e", help="Show conda environment status")
) -> None
"List installed plugins from manifest directory."
def remove_plugin(
plugin_name:str=typer.Argument(..., help="Name of the plugin to remove"),
plugins_path:Optional[str]=typer.Option(None, "--plugins", help="Path to plugins.yaml for env name lookup"),
keep_env:bool=typer.Option(False, "--keep-env", help="Keep the conda environment, only remove manifest"),
yes:bool=typer.Option(False, "--yes", "-y", help="Skip confirmation prompt")
) -> None
"Remove a plugin's manifest and conda environment."
def _validate_taxonomy_block(
tax: Any, # taxonomy sub-dict (may be None or non-dict; we type-check here)
top_level_interface: str, # `interface` field for cross-check
path_prefix: str, # Error message prefix (e.g., "manifest" or "manifest: code")
) -> List[str]
"""
Type-check the taxonomy block + cross-check interface_fqcn vs top-level interface.
Shared between v1.0 (flat) and v2.0 (nested) validators. `path_prefix` is
used to disambiguate error messages between layouts.
"""
def _validate_resources_block(
res: Any, # resources sub-dict (may be None or non-dict; we type-check here)
path_prefix: str, # Error message prefix
) -> List[str]
"Phase 5a: type-check the resources block. Shared between v1.0 and v2.0."
def _validate_manifest_v2_dict(
data: Dict[str, Any] # v2.0 nested manifest dict (caller already verified format_version)
) -> List[str]: # Empty list = valid
"""
CR-8: validate the nested v2.0 manifest layout.
Required sections: `install` + `code`. Optional sections: `drift_tracking`
(substrate emits it on every fresh write but legacy-via-load_manifest
upgrades leave it empty until the first regenerate); `overrides` (free-form
operator overlay).
Required `code.*` fields mirror v1.0's required top-level fields. Required
`install.python_path` mirrors v1.0's required top-level `python_path`.
"""
def _validate_manifest_v1_dict(
data: Dict[str, Any] # Legacy flat manifest dict (no format_version)
) -> List[str]: # Empty list = valid
"""
REMOVE-AFTER-OVERHAUL: validate the legacy v1.0 flat manifest layout.
Retires when `cascade_manifests.py` rewrites every production manifest to
v2.0. Keeps the SG-6 validator working against pre-CR-8 manifests during
the transition window.
"""
def _validate_manifest_dict(
data: Any # Loaded manifest JSON
) -> List[str]: # List of human-readable error messages (empty == valid)
"""
SG-6 + CR-8: structural validation, dispatching on `format_version`.
`format_version == "2.0"` validates the nested v2.0 layout.
Absent `format_version` validates the legacy flat v1.0 layout
(REMOVE-AFTER-OVERHAUL — retires after cascade_manifests.py).
Any other value rejects with a single error so unknown future formats
fail loud rather than silently degrading.
"""
def _validate_plugins_yaml_dict(
data: Any # Loaded plugins.yaml content
) -> List[str]: # List of human-readable error messages (empty == valid)
"""
Structural validation of a plugins.yaml file.
Each plugin entry must have name + env_name + package, plus either env_file
or python_version (one defines how the conda env is created).
"""
def _detect_manifest_format(
path: Path # File to inspect
) -> Optional[str]: # 'manifest' | 'plugins_yaml' | None
"Auto-detect file format from extension."
def validate_file(
path:Path=typer.Argument(..., help="Manifest JSON or plugins.yaml to validate"),
format:Optional[str]=typer.Option(
None, "--format", "-f",
help="Override format detection: 'manifest' or 'plugins_yaml'",
),
) -> None
"""
SG-6: validate a manifest JSON or plugins.yaml file's structure.
Auto-detects format from the file extension (`.json` → manifest,
`.yaml`/`.yml` → plugins.yaml). Exits non-zero with a list of validation
errors if any check fails.
"""
def _open_secret_store():
"""Open the project-local LocalSecretStore at <data_dir>/secrets (CR-12)."""
from cjm_plugin_system.core.secret_store import LocalSecretStore
cfg = get_config()
data_dir = getattr(cfg, "data_dir", None)
secrets_dir = (data_dir / "secrets") if data_dir is not None else None
return LocalSecretStore(secrets_dir)
@app.command("set-secret")
def set_secret(
plugin_name: str = typer.Argument(..., help="Plugin name (manifest 'name', e.g. cjm-transcription-plugin-gemini)"),
key: str = typer.Argument(..., help="Secret key = the env-var name the worker reads (e.g. GEMINI_API_KEY)"),
value: Optional[str] = typer.Option(None, "--value", help="Secret value (omit to be prompted with hidden input)"),
scope: Optional[str] = typer.Option(None, "--scope", help="Reserved multi-user scope (default: single-user)"),
)
"Open the project-local LocalSecretStore at <data_dir>/secrets (CR-12)."
def set_secret(
plugin_name: str = typer.Argument(..., help="Plugin name (manifest 'name', e.g. cjm-transcription-plugin-gemini)"),
key: str = typer.Argument(..., help="Secret key = the env-var name the worker reads (e.g. GEMINI_API_KEY)"),
value: Optional[str] = typer.Option(None, "--value", help="Secret value (omit to be prompted with hidden input)"),
scope: Optional[str] = typer.Option(None, "--scope", help="Reserved multi-user scope (default: single-user)"),
)
"""
Store a plugin secret in the project-local SecretStore (CR-12).
The value is written to <data_dir>/secrets/secrets.json (0600) — never to
plugins.yaml, manifests, or the config store. Plugins read it from their
worker env at spawn. Omit --value to be prompted (hidden input) so the
secret stays out of shell history. After setting, reload the plugin (or
restart the host) so its worker respawns with the new env — the GUI /
PluginManager.set_plugin_secret do this automatically.
"""
def list_secrets(
plugin_name: str = typer.Argument(..., help="Plugin name to list secret KEY NAMES for"),
scope: Optional[str] = typer.Option(None, "--scope", help="Reserved multi-user scope"),
)
"List the secret KEY NAMES stored for a plugin — never the values (CR-12)."
Variables
_PYPI_404_CACHE: set[str]
Configuration (config.ipynb)
Project-level configuration for paths, runtime settings, and environment management
Import
from cjm_plugin_system.core.config import (
RuntimeMode,
CondaType,
RuntimeConfig,
SubstrateConfig,
CJMConfig,
load_config,
get_config,
set_config,
reset_config
)
Functions
def _load_from_yaml(
yaml_path:Path # Path to cjm.yaml file
) -> CJMConfig: # Parsed configuration
"Load config from YAML file, resolving relative paths."
def load_config(
config_path:Optional[Path]=None, # CLI --cjm-config
data_dir:Optional[Path]=None, # CLI --data-dir
conda_prefix:Optional[Path]=None, # CLI --conda-prefix
conda_type:Optional[str]=None # CLI --conda-type
) -> CJMConfig: # Resolved configuration
"Load config with layered resolution (CLI > env vars > yaml > defaults)."
def get_config() -> CJMConfig: # Current configuration
"""Get current config (loads defaults if not set)."""
global _current_config
if _current_config is None
"Get current config (loads defaults if not set)."
def set_config(
config:CJMConfig # Configuration to set as current
) -> None
"Set current config (called by CLI callback)."
def reset_config() -> None
"Reset to unloaded state (for testing)."
Classes
class RuntimeMode(str, Enum):
"Runtime mode for the plugin system."
class CondaType(str, Enum):
"Type of conda implementation to use."
@dataclass
class RuntimeConfig:
"Runtime environment configuration."
mode: RuntimeMode = RuntimeMode.SYSTEM # LOCAL for project-local, SYSTEM for global
conda_type: CondaType = CondaType.CONDA # Conda implementation to use
prefix: Optional[Path] # Path to runtime directory (LOCAL mode only)
binaries: Dict[str, Path] = field(...) # Platform-specific binary paths
@dataclass
class SubstrateConfig:
"""
Substrate behavior toggles.
Loaded from the `substrate:` section of `cjm.yaml`. Each flag gates a
substrate-wide behavior that hosts can disable when they don't want the
per-load or per-execute cost.
- `drift_detection` (CR-8): per-load `/config_schema` HTTP call + hash
comparison against the manifest's stored hash. PluginManager's load
path branches around `_check_config_schema_drift` when False.
- `empirical_tracking` (CR-7): per-execute resource sample recording into
`EmpiricalResourceStore`. PluginManager skips `record_sample` calls when
False; the store's lazy-init also short-circuits.
"""
drift_detection: bool = True # Run /config_schema hash compare on every load_plugin
empirical_tracking: bool = True # Record ResourceSample after every execute_plugin*
@dataclass
class CJMConfig:
"Main configuration for cjm-plugin-system."
runtime: RuntimeConfig = field(...) # Runtime environment settings
data_dir: Path = field(...) # Base directory for manifests, logs
plugins_config: Path = field(...) # Path to plugins.yaml file
models_dir: Optional[Path] # Directory for model downloads
substrate: SubstrateConfig = field(...) # CR-8 substrate behavior toggles
def manifests_dir(self) -> Path: # Directory containing plugin manifests
"""Directory containing plugin manifests."""
return self.data_dir / "manifests"
@property
def plugin_data_dir(self) -> Path: # Directory for plugin runtime data
"Directory containing plugin manifests."
def plugin_data_dir(self) -> Path: # Directory for plugin runtime data
"""Directory for plugin runtime data (databases, caches)."""
return self.data_dir / "data"
@property
def logs_dir(self) -> Path: # Directory containing plugin logs
"Directory for plugin runtime data (databases, caches)."
def logs_dir(self) -> Path: # Directory containing plugin logs
"""Directory containing plugin logs."""
return self.data_dir / "logs"
@property
def conda_binary_path(self) -> Optional[Path]: # Path to conda/micromamba binary or None
"Directory containing plugin logs."
def conda_binary_path(self) -> Optional[Path]: # Path to conda/micromamba binary or None
"""Get the configured binary path for the current platform."""
# Inline platform detection to avoid circular imports
system = platform_mod.system().lower()
machine = platform_mod.machine().lower()
if system == "windows"
"Get the configured binary path for the current platform."
Variables
_current_config: Optional[CJMConfig] = None
Plugin Config Store (config_store.ipynb)
Persistent storage for per-plugin configuration (with enabled flag)
Import
from cjm_plugin_system.core.config_store import (
PluginConfigRecord,
PluginConfigStore,
LocalPluginConfigStore
)
Functions
def _default_db_path() -> Path:
"""Default SQLite location: `~/.cjm/plugin_configs.db`."""
return Path.home() / ".cjm" / "plugin_configs.db"
class LocalPluginConfigStore
"Default SQLite location: `~/.cjm/plugin_configs.db`."
Classes
@dataclass
class PluginConfigRecord:
"Persisted state for a plugin: config dict + enabled flag."
config: Dict[str, Any] = field(...) # Plugin's current config values
enabled: bool = True # Whether the substrate should accept jobs for this plugin
updated_at: float = 0.0 # Unix timestamp of the last write (server clock)
@runtime_checkable
class PluginConfigStore(Protocol):
"Protocol for persisting per-plugin `PluginConfigRecord` across sessions."
def get(self, plugin_name: str) -> Optional[PluginConfigRecord]:
"""Fetch the record for a plugin, or None if no record exists yet."""
...
def set(self, plugin_name: str, record: PluginConfigRecord) -> None
"Fetch the record for a plugin, or None if no record exists yet."
def set(self, plugin_name: str, record: PluginConfigRecord) -> None:
"""Persist a record. Overwrites any prior record for the same plugin.
Implementations stamp `record.updated_at` to the current time during
the write so callers don't have to manage timestamps.
"""
...
def delete(self, plugin_name: str) -> bool
"Persist a record. Overwrites any prior record for the same plugin.
Implementations stamp `record.updated_at` to the current time during
the write so callers don't have to manage timestamps."
def delete(self, plugin_name: str) -> bool:
"""Remove the record for a plugin. Returns True if a record was deleted."""
...
def list_all(self) -> Dict[str, PluginConfigRecord]
"Remove the record for a plugin. Returns True if a record was deleted."
def list_all(self) -> Dict[str, PluginConfigRecord]
"Return every stored record, keyed by plugin name."
class LocalPluginConfigStore:
def __init__(self, db_path: Optional[Path] = None):
"""Initialize the store. `db_path=None` uses `~/.cjm/plugin_configs.db`."""
self.db_path = Path(db_path) if db_path is not None else _default_db_path()
@contextmanager
def _conn(self) -> Iterator[sqlite3.Connection]
"""
SQLite-backed default implementation of `PluginConfigStore`.
The DB is created lazily on first write. Reads against a non-existent DB
return empty results rather than raising, so hosts can call `.get()` on
a fresh install without preparing the file first.
"""
def __init__(self, db_path: Optional[Path] = None):
"""Initialize the store. `db_path=None` uses `~/.cjm/plugin_configs.db`."""
self.db_path = Path(db_path) if db_path is not None else _default_db_path()
@contextmanager
def _conn(self) -> Iterator[sqlite3.Connection]
"Initialize the store. `db_path=None` uses `~/.cjm/plugin_configs.db`."
def get(
self,
plugin_name: str # Plugin to look up
) -> Optional[PluginConfigRecord]: # Persisted record or None if absent
"Fetch the record for a plugin."
def set(
self,
plugin_name: str, # Plugin to write
record: PluginConfigRecord # New record (updated_at overwritten with current time)
) -> None
"Persist a record. Stamps `updated_at` to the current time."
def delete(
self,
plugin_name: str # Plugin to remove
) -> bool: # True if a row was deleted
"Remove the record for a plugin."
def list_all(self) -> Dict[str, PluginConfigRecord]: # plugin_name -> record
"""Return all stored records keyed by plugin name."""
if not self.db_path.exists()
"Return all stored records keyed by plugin name."
Variables
_SCHEMA = '\nCREATE TABLE IF NOT EXISTS plugin_configs (\n plugin_name TEXT PRIMARY KEY,\n config_json TEXT NOT NULL,\n enabled INTEGER NOT NULL DEFAULT 1,\n updated_at REAL NOT NULL\n)\n'
Empirical Resource Tracking (empirical_store.ipynb)
Persistent store for empirically-observed resource usage per (instance_id, config_hash) pair. CR-7’s data foundation —
record_sampleis called fromPluginManager.execute_plugin*finally blocks; aggregates feed eviction-candidate selection + future UI hints + cost-aware retry decisions.
Import
from cjm_plugin_system.core.empirical_store import (
compute_config_hash,
ResourceSample,
EmpiricalResourceRecord,
EmpiricalResourceStore,
LocalEmpiricalResourceStore
)
Functions
def compute_config_hash(
"""
CR-7: hash a plugin instance's effective config for empirical-record keying.
Same canonicalization as CR-8's `compute_config_schema_hash` — sorted keys,
no whitespace, `"sha256:hex"` shape. None / empty configs hash deterministically
to the canonical-empty value so plugins with no config still get a single
record per instance rather than scattering across hash-of-None edge cases.
"""
def _default_db_path() -> Path:
"""Default SQLite location: `~/.cjm/empirical_resources.db`.
Hosts using per-project `data_dir` (the intended pattern per CR-8 cascade_manifests
docs) override this by passing `db_path=cfg.data_dir / "empirical_resources.db"`
when constructing the store. PluginManager's lazy-init does this automatically.
"""
return Path.home() / ".cjm" / "empirical_resources.db"
class LocalEmpiricalResourceStore
"""
Default SQLite location: `~/.cjm/empirical_resources.db`.
Hosts using per-project `data_dir` (the intended pattern per CR-8 cascade_manifests
docs) override this by passing `db_path=cfg.data_dir / "empirical_resources.db"`
when constructing the store. PluginManager's lazy-init does this automatically.
"""
Classes
class ResourceSample:
"""
Single observation captured after an execute call completes.
Frozen — substrate aggregates online via Welford's algorithm; no need to
keep raw samples around. `observed_at` is tz-aware per the CR-5 convention.
"""
@dataclass
class EmpiricalResourceRecord:
"Aggregated empirical resource profile for a (instance_id, config_hash) pair."
instance_id: str # PluginInstance.instance_id (CR-10 multi-instance aware)
plugin_name: str # Convenience: PluginInstance.plugin_name; derivable but cheap to denormalize
config_hash: str # compute_config_hash(inst.config) at sample time
sample_count: int # Number of ResourceSamples folded into this record
cpu_percent_mean: float # Welford running mean of cpu_percent
memory_mb_peak_max: float # max(sample.memory_mb_peak over all samples) — worst observed
memory_mb_peak_mean: float # Welford running mean of sample.memory_mb_peak
gpu_memory_mb_peak_max: float # max(sample.gpu_memory_mb_peak over all samples)
gpu_memory_mb_peak_mean: float # Welford running mean of sample.gpu_memory_mb_peak
duration_seconds_mean: float # Welford running mean of sample.duration_seconds
success_rate: float # success_count / sample_count
last_observed: datetime # tz-aware; tracks most recent ResourceSample.observed_at
api_usage_totals: Dict[str, float] = field(...) # SG-54: cumulative per-unit usage summed across runs (tokens/credits/pages/...); {} for compute-only plugins
@runtime_checkable
class EmpiricalResourceStore(Protocol):
"""
Protocol for persisting empirically-observed resource usage.
Implementations aggregate online (Welford for means, max-of-peaks for memory).
No raw-sample retention required — v1 is one row per (instance_id, config_hash)
pair with running aggregates. A future implementation can add a samples table
if time-series queries become necessary.
"""
def record_sample(
self,
instance_id: str,
plugin_name: str,
config_hash: str,
sample: ResourceSample,
) -> None
"Fold a sample into the running aggregate. Creates a new record on first call."
def get_record(
self,
instance_id: str,
config_hash: str,
) -> Optional[EmpiricalResourceRecord]
"Fetch the aggregated record for (instance_id, config_hash), or None."
def list_records(
self,
plugin_name: Optional[str] = None,
) -> List[EmpiricalResourceRecord]
"List all records, optionally filtered to a single plugin_name."
def delete_record(
self,
instance_id: str,
config_hash: str,
) -> bool
"Remove a record. Returns True if a row was deleted."
class LocalEmpiricalResourceStore:
def __init__(self, db_path: Optional[Path] = None):
"""Initialize the store. `db_path=None` uses `~/.cjm/empirical_resources.db`."""
self.db_path = Path(db_path) if db_path is not None else _default_db_path()
@contextmanager
def _conn(self) -> Iterator[sqlite3.Connection]
"""
SQLite-backed default implementation of `EmpiricalResourceStore`.
Online Welford aggregation for means; max-of-peaks for memory metrics.
success_rate computed at read time from `success_count / sample_count`.
DB + schema created lazily on first write.
"""
def __init__(self, db_path: Optional[Path] = None):
"""Initialize the store. `db_path=None` uses `~/.cjm/empirical_resources.db`."""
self.db_path = Path(db_path) if db_path is not None else _default_db_path()
@contextmanager
def _conn(self) -> Iterator[sqlite3.Connection]
"Initialize the store. `db_path=None` uses `~/.cjm/empirical_resources.db`."
def record_sample(
self,
instance_id: str, # PluginInstance.instance_id
plugin_name: str, # PluginInstance.plugin_name (denormalized for filtering)
config_hash: str, # compute_config_hash(inst.config)
sample: ResourceSample, # One observation
) -> None
"Fold a sample into the running aggregate. Creates a new row on first call.
Welford update for each mean. Max-of-peaks for memory metrics.
success_count incremented by 1 if sample.success else 0."
def get_record(
self,
instance_id: str,
config_hash: str,
) -> Optional[EmpiricalResourceRecord]
"Fetch the aggregated record for (instance_id, config_hash), or None."
def list_records(
self,
plugin_name: Optional[str] = None,
) -> List[EmpiricalResourceRecord]
"List all records, optionally filtered to a plugin."
def delete_record(
self,
instance_id: str,
config_hash: str,
) -> bool
"Remove a record. Returns True if a row was deleted."
Variables
_SCHEMA = "\nCREATE TABLE IF NOT EXISTS empirical_resources (\n instance_id TEXT NOT NULL,\n plugin_name TEXT NOT NULL,\n config_hash TEXT NOT NULL,\n sample_count INTEGER NOT NULL DEFAULT 0,\n success_count INTEGER NOT NULL DEFAULT 0,\n cpu_percent_mean REAL NOT NULL DEFAULT 0.0,\n memory_mb_peak_max REAL NOT NULL DEFAULT 0.0,\n memory_mb_peak_mean REAL NOT NULL DEFAULT 0.0,\n gpu_memory_mb_peak_max REAL NOT NULL DEFAULT 0.0,\n gpu_memory_mb_peak_mean REAL NOT NULL DEFAULT 0.0,\n duration_seconds_mean REAL NOT NULL DEFAULT 0.0,\n last_observed TEXT NOT NULL,\n api_usage_totals TEXT NOT NULL DEFAULT '{}',\n PRIMARY KEY (instance_id, config_hash)\n)\n"
Plugin Error Taxonomy (errors.ipynb)
Typed exception hierarchy + JobError dataclass + default classification of bare Python exceptions. The substrate’s CR-5 implementation per the 2026-05-19 substrate audit.
Import
from cjm_plugin_system.core.errors import (
PluginError,
PluginInputError,
PluginTransientError,
PluginResourceError,
PluginFatalError,
PluginDisabledError,
PluginNotLoadedError,
PluginTimeoutError,
PluginCancelledError,
WorkerOOMError,
PluginConfigError,
ResourceShortfall,
TracebackPolicy,
JobError,
classify_exception,
map_bare_exception_to_job_error
)
Functions
def classify_exception(
exc: BaseException # The exception to classify
) -> "Literal['user_input', 'transient', 'resource', 'fatal']": # Category
"""
Return the substrate category for any exception.
PluginError subclasses report their own declared `category`. Bare Python
exceptions are mapped via `__mro__` walk against `_BARE_EXCEPTION_CATEGORY_MAP`;
the first ancestor in the table wins. Unrecognized exceptions classify as
`fatal` (don't auto-retry the unknown).
"""
def map_bare_exception_to_job_error(
exc: BaseException, # The raised exception
*,
plugin_name: Optional[str] = None, # Name of the plugin that raised
plugin_instance_id: Optional[str] = None, # Per CR-10
traceback_policy: TracebackPolicy = TracebackPolicy.FULL, # How much detail to record
occurred_at: Optional[datetime] = None, # Override; defaults to datetime.now(timezone.utc)
) -> JobError
"""
Convert any exception into a structured `JobError`.
PluginError subclasses contribute their category-specific structured data
(`fields_invalid` for input errors, `resource_shortfall` for resource errors,
`retry_after_seconds` for transient errors). Bare exceptions get the
default category-based retriable flag and no structured side-channel.
"""
Classes
class PluginError(Exception):
"""
Base for substrate-recognized plugin exceptions.
Subclasses declare a `category` and `default_retriable` ClassVar so the
JobQueue + scheduler can route the failure without sniffing exception
text. Bare Python exceptions raised by plugin code go through
`map_bare_exception_to_job_error` to acquire a default category.
"""
class PluginInputError:
def __init__(
self,
message: str, # Human-readable description
*,
fields_invalid: Optional[List[str]] = None, # Names of inputs that failed validation
)
"""
User-fixable error: bad config, invalid argument, missing file.
Multi-inherits `ValueError` so SG-8-era `except ValueError:` catch sites that
legitimately want input errors keep working through the SG-47 migration
window. The MRO is `PluginInputError → PluginError → ValueError → Exception`;
other category bases (`PluginTransientError`, `PluginResourceError`,
`PluginFatalError`) deliberately do NOT extend `ValueError` because their
failure modes are not semantically value errors.
"""
def __init__(
self,
message: str, # Human-readable description
*,
fields_invalid: Optional[List[str]] = None, # Names of inputs that failed validation
)
class PluginTransientError:
def __init__(
self,
message: str, # Human-readable description
*,
retry_after_seconds: Optional[float] = None, # Hint for backoff strategies
)
"""
Temporary failure: timeout, network blip, brief resource contention.
Substrate / JobQueue may retry on its own initiative. Plugin authors raise
this when they know the failure is recoverable.
"""
def __init__(
self,
message: str, # Human-readable description
*,
retry_after_seconds: Optional[float] = None, # Hint for backoff strategies
)
class PluginResourceError:
def __init__(
self,
message: str, # Human-readable description
*,
resource_shortfall: Optional["ResourceShortfall"] = None, # Quantitative gap
)
"""
Resource exhaustion: GPU VRAM, system RAM, disk full.
JobQueue's reactive-eviction flow (CR-7) routes resource errors to retry
after attempting to free the named resource. Plugin authors set
`resource_shortfall` so the substrate knows what to evict.
"""
def __init__(
self,
message: str, # Human-readable description
*,
resource_shortfall: Optional["ResourceShortfall"] = None, # Quantitative gap
)
class PluginFatalError(PluginError):
"""
Bug / irrecoverable state. The plugin cannot complete this job; retrying won't help.
Plugin authors raise this when they know the failure is permanent for the
given inputs. The substrate does NOT retry fatal errors.
"""
class PluginDisabledError:
def __init__(self, plugin_name: str):
super().__init__(f"Plugin {plugin_name!r} is disabled")
self.plugin_name = plugin_name
class PluginNotLoadedError(PluginFatalError)
"""
JobQueue / execute_plugin rejected: the plugin is currently disabled.
User-fixable (re-enable the plugin). Inherits `PluginInputError`'s ValueError
MRO so existing `except ValueError:` callers see it as an input error.
Raised by CR-2's enable/disable wiring once that lands.
"""
def __init__(self, plugin_name: str):
super().__init__(f"Plugin {plugin_name!r} is disabled")
self.plugin_name = plugin_name
class PluginNotLoadedError(PluginFatalError)
class PluginNotLoadedError:
def __init__(self, plugin_name: str):
super().__init__(f"Plugin {plugin_name!r} is not loaded")
self.plugin_name = plugin_name
class PluginTimeoutError(PluginTransientError)
"""
Caller submitted to a plugin that was never loaded.
Fatal category because this is a programmer / orchestration bug, not a
user-fixable condition. NOT a ValueError — the right reader intent is
`except PluginNotLoadedError:` (or the broader `except PluginError:`), not
a blanket `except ValueError:`.
"""
def __init__(self, plugin_name: str):
super().__init__(f"Plugin {plugin_name!r} is not loaded")
self.plugin_name = plugin_name
class PluginTimeoutError(PluginTransientError)
class PluginTimeoutError:
def __init__(
self,
plugin_name: str,
timeout_seconds: float,
*,
retry_after_seconds: Optional[float] = None,
)
"""
A per-job timeout fired before the plugin finished.
Transient category — retry may succeed if the slow operation completes faster
next time. Carries `retry_after_seconds` from `PluginTransientError`.
Raised by SG-14's per-job timeout primitive when that lands.
"""
def __init__(
self,
plugin_name: str,
timeout_seconds: float,
*,
retry_after_seconds: Optional[float] = None,
)
class PluginCancelledError:
def __init__(self, plugin_name: str):
super().__init__(f"Plugin {plugin_name!r} cancelled by operator")
self.plugin_name = plugin_name
class WorkerOOMError(PluginResourceError)
"""
Cooperative cancellation signal raised from `PluginInterface.check_cancel()`.
Anchors under `PluginTransientError` because cancellation is in-principle
re-runnable — a future attempt with the same inputs won't auto-fail if the
cancel flag isn't set. But `default_retriable` is False: cancellation was
a deliberate operator action, so the substrate should NOT auto-retry.
Job-monitor / JobQueue render cancelled jobs with their own state
(separate from "failed"); the JobError category remains `transient` so
consumers reading the typed taxonomy can group recoverable signals.
Plugin authors raise this implicitly via `self.check_cancel()` inside
`execute()`; substrate sets the underlying `_cancel_requested` flag via
`cancel()`. See CR-4's cancellation primitives for the cooperative-cancel
protocol.
"""
def __init__(self, plugin_name: str):
super().__init__(f"Plugin {plugin_name!r} cancelled by operator")
self.plugin_name = plugin_name
class WorkerOOMError(PluginResourceError)
class WorkerOOMError:
def __init__(
self,
plugin_name: str,
*,
process_returncode: Optional[int] = None,
message: Optional[str] = None,
)
"""
The worker subprocess died with a kill-signal during an active execute call.
CR-7 Track A — substrate-side OOM detection: when an HTTP call to the worker
faults and the subprocess has died with `returncode == -signal.SIGKILL` (or
the platform equivalent), the substrate raises this. The kernel OOM-killer
is the most common cause of SIGKILL during normal execute paths, so the
substrate treats SIGKILL-during-call as "assume OOM" and surfaces a typed
resource error for the reactive retry path.
`resource_shortfall` is `None` for Track A — the substrate only saw "worker
died from kill-signal" and has no per-resource needed/available numbers.
Track B (per SG-47's sub-task: plugin-side wrapping of `torch.cuda.OutOfMemoryError`
et al.) raises `PluginResourceError` directly with a populated
`ResourceShortfall` because the plugin had the context. Both land at the
same `except PluginResourceError` site in CR-7's reactive retry loop.
`process_returncode` carries the observed exit code for debugging /
classification (e.g. operators can distinguish kernel-OOM SIGKILL from
other signals if they read it). Defaults to `None` for callers that don't
have it on hand.
"""
def __init__(
self,
plugin_name: str,
*,
process_returncode: Optional[int] = None,
message: Optional[str] = None,
)
class PluginConfigError:
def __init__(
self,
message: str, # Human-readable description
*,
fields_invalid: Optional[List[str]] = None, # Canonical: list of bad config keys
config_class_name: str = "", # Dataclass / plugin name for the schema
# REMOVE-AFTER-OVERHAUL: drop unknown_keys kwarg after SG-47 cascade completes
unknown_keys: Optional[List[str]] = None,
)
"""
Unknown / invalid keys in a config dict against a plugin's config schema.
Reparented from `cjm_plugin_system.utils.validation` (Wave 2 / SG-8) under
CR-5. Inherits `PluginInputError`'s ValueError MRO automatically.
`config_class_name` is the dataclass / plugin name whose schema was violated.
"""
def __init__(
self,
message: str, # Human-readable description
*,
fields_invalid: Optional[List[str]] = None, # Canonical: list of bad config keys
config_class_name: str = "", # Dataclass / plugin name for the schema
# REMOVE-AFTER-OVERHAUL: drop unknown_keys kwarg after SG-47 cascade completes
unknown_keys: Optional[List[str]] = None,
)
def unknown_keys(self) -> List[str]
"Deprecated alias for `fields_invalid` (SG-8-era attribute name).
Accessing this property does NOT emit a deprecation warning — only
constructor-time misuse triggers the warning. Read-only by design;
nothing in the ecosystem mutates exception attributes after construction."
@dataclass
class ResourceShortfall:
"Quantitative gap between what a plugin needed and what was available."
resource: Literal['gpu_vram_mb', 'system_ram_mb', 'disk_mb'] # Which resource
needed: float # Amount the plugin reported it needed
available: float # Amount actually available when the failure occurred
class TracebackPolicy(str, Enum):
"How much exception detail the substrate records on a JobError."
@dataclass
class JobError:
"""
Structured failure summary recorded on a completed Job.
Populated by the JobQueue when a plugin execution fails (CR-6 owns the
population logic; CR-5 owns the shape). Sufficient for UI to render a
failure card + retry affordance without re-running the plugin.
"""
category: Literal['user_input', 'transient', 'resource', 'fatal']
message: str # Human-readable error message
retriable: bool # Whether the substrate considers this safe to auto-retry
original_exc_repr: str # repr(exc) of the original exception
traceback: Optional[str] # Full traceback per TracebackPolicy
retry_after_seconds: Optional[float] # Backoff hint from PluginTransientError
fields_invalid: Optional[List[str]] # From PluginInputError subclasses
resource_shortfall: Optional[ResourceShortfall] # From PluginResourceError
plugin_name: Optional[str] # Name of the plugin that raised
plugin_instance_id: Optional[str] # Per CR-10 multi-instance support
occurred_at: Optional[datetime] # When the failure was recorded
Variables
_BARE_EXCEPTION_CATEGORY_MAP: "dict[type, Literal['user_input', 'transient', 'resource', 'fatal']]"
_CATEGORY_RETRIABLE_DEFAULTS: 'dict[str, bool]'
Content Hashing Utilities (hashing.ipynb)
Shared cryptographic hashing primitives for content integrity verification
Import
from cjm_plugin_system.utils.hashing import (
hash_bytes,
hash_file,
verify_hash,
hash_dict_canonical
)
Functions
def hash_bytes(
content: bytes, # Byte content to hash
algo: str = "sha256" # Hash algorithm name (e.g., "sha256", "sha3_256")
) -> str: # Hash string in "algo:hexdigest" format
"Compute a hash of byte content."
def hash_file(
path: Union[str, Path], # Path to file to hash
algo: str = "sha256", # Hash algorithm name
chunk_size: int = 8192 # Read chunk size in bytes
) -> str: # Hash string in "algo:hexdigest" format
"Stream-hash a file without loading it entirely into memory."
def verify_hash(
content: bytes, # Content to verify
expected: str # Expected hash in "algo:hexdigest" format
) -> bool: # True if content matches expected hash
"Verify content against an expected hash string."
def hash_dict_canonical(
data: Optional[Dict[str, Any]], # Dict to hash (or None — treated as {})
algo: str = "sha256", # Hash algorithm name
) -> str: # Hash string in "algo:hexdigest" format
"""
Hash a dict via canonical JSON encoding.
Canonicalization: `json.dumps(data, sort_keys=True, separators=(",", ":"))`.
Sorted keys eliminate dict-insertion-order variance; minimal separators
eliminate whitespace variance. Result is deterministic across Python
versions and machines.
"""
Plugin Interface (interface.ipynb)
Abstract base class defining the generic plugin interface
Import
from cjm_plugin_system.core.interface import (
RELOAD_TRIGGER,
FileBackedDTO,
ConfigOption,
FieldOptions,
EnvVarSpec,
PluginInterface,
plugin_action,
collect_plugin_actions
)
Functions
def plugin_action(
action_name: str # Public action name the decorated method handles
) -> Callable[[Callable], Callable]: # Decorator
"""
Marker decorator tagging a plugin method as the handler for `action_name`.
Sets `func._plugin_action = action_name`. Plugin authors with dispatcher-style
`execute(action, **kwargs)` use `collect_plugin_actions(cls)` to derive their
`supported_actions` set from these markers rather than maintaining a separate
list. The decorator does not change call semantics — the wrapped function is
returned unchanged.
"""
def collect_plugin_actions(
cls: type # Class (or subclass) to scan for @plugin_action-tagged methods
) -> Set[str]: # Set of action names handled by `cls` (including inherited)
"""
Collect action names from `@plugin_action`-decorated methods on `cls`.
Walks the class's MRO so subclasses inherit action handlers from base
classes automatically. The returned set is suitable for
`supported_actions: ClassVar[Set[str]] = collect_plugin_actions(MyPlugin)`
once the plugin class body has been defined.
"""
Classes
@runtime_checkable
class FileBackedDTO(Protocol):
"Protocol for Data Transfer Objects that serialize to disk for zero-copy transfer."
def to_temp_file(self) -> str: # Absolute path to the temporary file
"Save the data to a temporary file and return the absolute path."
@dataclass
class ConfigOption:
"CR-11: one live option for a dynamic config field, with optional metadata."
value: Any # option value (e.g. "gemini-2.5-flash")
label: str # display label (e.g. "Gemini 2.5 Flash")
metadata: Dict[str, Any] = dataclasses.field(default_factory=dict) # token limits, descriptions, ...
@dataclass
class FieldOptions:
"""
CR-11: the live option domain for one dynamic config field.
Kept SEPARATE from the static config_schema (which CR-8 hashes for drift
detection). The plugin-config UI merges these live options on top of the
static schema; folding them into the schema would make every API plugin
perpetually 'drift'.
"""
options: List[ConfigOption] # current valid options
constraints: Dict[str, Any] = dataclasses.field(default_factory=dict) # derived field-constraint overrides
@dataclass
class EnvVarSpec:
"""
CR-12: one entry of a plugin's spawn-time worker-environment contract.
A plugin declares the environment variables its worker subprocess reads at
startup via `WORKER_ENV: ClassVar[List[EnvVarSpec]]`. Worker env vars are
FIXED AT SPAWN — changing one requires a worker RESPAWN, so the substrate
routes such changes through `reload_plugin`, never `reconfigure` (the env is
baked into the subprocess at `Popen` and can't be mutated in-process). This
is the lifecycle distinction from a normal config field (reconfigurable in
place via `reconfigure`/`_release_<trigger>`).
Two flavors share this one declaration:
- `secret=True` : value resolved from the `SecretStore` (masked; never
persisted in the config store, echoed in config_schema,
or logged). A secret never carries a `default` — a
baked-in secret is a leak.
- `secret=False` : visible value resolved from the override chain
(operator override > manifest `install.env_vars` >
this `default`); safe to display.
Both share one injection seam: the substrate composes the resolved
{name: value} overlay at load time and injects it into the worker env at
spawn (extending the existing CJM_DATA_DIR / CJM_MODELS_DIR injection).
This is "derive from behaviour, not metadata" applied to the spawn env: the
plugin declares WHICH vars it consumes + whether each is secret/required;
the substrate owns resolution + injection.
`options` is a forward seam for visible vars with a finite domain (e.g. a
device selector enumerating GPUs); unused today but reserved so the
plugin-config UI / a future `set-env` surface isn't blocked.
"""
name: str # The env var the worker reads, e.g. "GEMINI_API_KEY"
secret: bool = False # True -> value resolved from the SecretStore, masked
required: bool = False # Worker can't do useful work until this is satisfied
label: str = '' # Display label for CLI / GUI affordances
description: str = '' # Help text for CLI / GUI
default: Optional[str] # Visible vars only; secrets must leave this None
options: Optional[List[str]] # Forward seam: finite domain for a visible var (unused today)
class PluginInterface(ABC):
"""
Abstract base class for all plugins (both local workers and remote proxies).
CR-4 extended this surface with: prefetch hook (SG-19), made cleanup optional
(SG-43), reconfigure split + RELOAD_TRIGGER declarative-helper (lifecycle
hook split), and cooperative-cancellation primitives (SG-16 — flag + callback
+ context manager).
Abstract methods: `name`, `version`, `initialize`, `execute`, `get_config_schema`,
`get_current_config`. Concrete defaults (overridable): `execute_stream`,
`cleanup`, `prefetch`, `reconfigure`, `cancel`, `check_cancel`,
`register_cancel_callback`, `cancel_signal_to`, `report_progress`,
`fields_that_changed`, `reconfigure_with_triggers`, `on_disable`, `on_enable`.
"""
def name(self) -> str: # Unique identifier for this plugin
"""Unique plugin identifier."""
...
@property
@abstractmethod
def version(self) -> str: # Semantic version string (e.g., "1.0.0")
"Unique plugin identifier."
def version(self) -> str: # Semantic version string (e.g., "1.0.0")
"""Plugin version."""
...
@abstractmethod
def initialize(
self,
config: Optional[Dict[str, Any]] = None # Configuration dictionary
) -> None
"Plugin version."
def initialize(
self,
config: Optional[Dict[str, Any]] = None # Configuration dictionary
) -> None
"Initialize or re-configure the plugin.
CR-4: this is "first-time setup" — called once after construction with
the initial config. Substrate uses `reconfigure(old, new)` for delta
updates afterwards. Plugins predating CR-4 see no behavior change since
the default `reconfigure()` body delegates to `reconfigure_with_triggers`
which is a no-op unless the plugin opts in via RELOAD_TRIGGER metadata."
def execute(
self,
*args,
**kwargs
) -> Any: # Plugin-specific output
"Execute the plugin's main functionality."
def execute_stream(
self,
*args,
**kwargs
) -> Generator[Any, None, None]: # Yields partial results
"Stream execution results chunk by chunk."
def get_config_schema(self) -> Dict[str, Any]: # JSON Schema for configuration
"""Return JSON Schema describing the plugin's configuration options."""
...
@abstractmethod
def get_current_config(self) -> Dict[str, Any]: # Current configuration values
"Return JSON Schema describing the plugin's configuration options."
def get_current_config(self) -> Dict[str, Any]: # Current configuration values
"""Return the current configuration state as a dictionary."""
...
def get_config_options(self) -> Dict[str, "FieldOptions"]
"Return the current configuration state as a dictionary."
def get_config_options(self) -> Dict[str, "FieldOptions"]:
"""CR-11: live option domains for dynamic config fields, keyed by field name.
Optional. Default: {} (fully static plugins). For fields whose valid
domain is determined at runtime (e.g. an API model list), return a
`FieldOptions` carrying current `ConfigOption` values + per-option
metadata (token limits, etc.) + optional derived constraints. Runs in
the worker subprocess (has the plugin's deps + credentials).
Kept SEPARATE from get_config_schema(): the schema is static + hashed for
CR-8 drift detection; these options are the live, un-hashed companion the
plugin-config UI merges on top. A fetch failure should raise a typed CR-5
error; the substrate's PluginManager.get_config_options accessor degrades
to {} so the UI can fall back to the static schema.
"""
return {}
def cleanup(self) -> None
"CR-11: live option domains for dynamic config fields, keyed by field name.
Optional. Default: {} (fully static plugins). For fields whose valid
domain is determined at runtime (e.g. an API model list), return a
`FieldOptions` carrying current `ConfigOption` values + per-option
metadata (token limits, etc.) + optional derived constraints. Runs in
the worker subprocess (has the plugin's deps + credentials).
Kept SEPARATE from get_config_schema(): the schema is static + hashed for
CR-8 drift detection; these options are the live, un-hashed companion the
plugin-config UI merges on top. A fetch failure should raise a typed CR-5
error; the substrate's PluginManager.get_config_options accessor degrades
to {} so the UI can fall back to the static schema."
def cleanup(self) -> None:
"""Clean up resources when plugin is unloaded.
CR-4: made optional (SG-43 closure). Was `@abstractmethod` before; every
audited plugin overrode it with a near-no-op, so the default is now a
no-op and plugin authors override only when they have non-trivial
teardown (file handles, GPU memory, database connections). The
substrate's worker /cleanup endpoint calls this regardless.
"""
pass
def prefetch(self) -> None
"Clean up resources when plugin is unloaded.
CR-4: made optional (SG-43 closure). Was `@abstractmethod` before; every
audited plugin overrode it with a near-no-op, so the default is now a
no-op and plugin authors override only when they have non-trivial
teardown (file handles, GPU memory, database connections). The
substrate's worker /cleanup endpoint calls this regardless."
def prefetch(self) -> None:
"""Acquire heavy resources eagerly without invoking execute().
CR-4 (SG-19): default no-op. Plugin authors override when downstream
callers benefit from eager acquisition — typically transcription /
inference plugins that lazy-download models on first execute. The
substrate's prefetch_plugin(name) API + worker /prefetch endpoint
invoke this. Should be idempotent (safe to call multiple times) since
the substrate may pre-warm at load time AND on operator request.
"""
pass
def reconfigure(
self,
old_config: Optional[Dict[str, Any]], # Previous configuration values
new_config: Optional[Dict[str, Any]], # New configuration values to apply
) -> None
"Acquire heavy resources eagerly without invoking execute().
CR-4 (SG-19): default no-op. Plugin authors override when downstream
callers benefit from eager acquisition — typically transcription /
inference plugins that lazy-download models on first execute. The
substrate's prefetch_plugin(name) API + worker /prefetch endpoint
invoke this. Should be idempotent (safe to call multiple times) since
the substrate may pre-warm at load time AND on operator request."
def reconfigure(
self,
old_config: Optional[Dict[str, Any]], # Previous configuration values
new_config: Optional[Dict[str, Any]], # New configuration values to apply
) -> None
"Apply a configuration change without re-running full initialize().
CR-4 completion (2026-05-25): reconfigure is the substrate's canonical
delta path - `PluginManager.update_plugin_config` routes here, NOT through
a bare `initialize(new_config)`. It fires `_release_<trigger>` releases for
changed RELOAD_TRIGGER fields, then re-applies config (see body below).
Distinction from initialize(): initialize sets up persistent state once
after construction; reconfigure applies delta updates and is the
canonical entry point for hot-reload via the substrate's
update_plugin_config path."
def fields_that_changed(
self,
old: Dict[str, Any], # Previous config snapshot
new: Dict[str, Any], # Proposed new config snapshot
) -> Set[str]: # Field names whose values differ between old and new
"Return the set of field names whose values differ between old and new.
Includes fields present in only one dict (treated as a change to/from
the implicit None). Equality is structural via `!=`; nested dicts /
lists compare by value, not identity. Hashable-vs-unhashable values
compare correctly because we never put them in a set themselves."
def reconfigure_with_triggers(
self,
old_config: Dict[str, Any], # Previous configuration values
new_config: Dict[str, Any], # New configuration values being applied
) -> None
"CR-4 helper: walk RELOAD_TRIGGER metadata + fire `_release_<trigger>` methods.
Resolution sequence:
1. Find the plugin's `config_class` attribute (a dataclass). Absent
means the plugin hasn't opted into the declarative pattern; the
helper returns silently.
2. Compute the diff between `old_config` and `new_config` via
`fields_that_changed`.
3. For each changed field, read its `RELOAD_TRIGGER` metadata key
(if any) and accumulate the trigger names into a set (de-duping
when multiple fields share a trigger).
4. For each trigger, call `self._release_<trigger>()` if it exists
on the instance.
Plugin authors opt in by:
- Setting `config_class = MyConfigDataclass` as a class attribute.
- Annotating field metadata with `{RELOAD_TRIGGER: "model"}` etc.
- Implementing matching `_release_model(self)` instance methods.
Plugins WITHOUT config_class or RELOAD_TRIGGER metadata land here as a
no-op — safe default for the SG-T3tr migration window where the
cascade hasn't yet adopted the declarative pattern everywhere."
def cancel(self) -> None:
"""Request cooperative cancellation of the current execute() call.
CR-4: default sets the substrate-tracked `_cancel_requested` flag and
fires any callbacks registered via `register_cancel_callback`.
Plugin authors who need extra teardown logic (signaling a subprocess,
closing a network connection) override `cancel()` and SHOULD call
`super().cancel()` to preserve the flag-setting + callback-fire
behavior. The plugin's `execute()` polls via `check_cancel()` at safe
interruption points and unwinds when it raises `PluginCancelledError`.
"""
self._cancel_requested = True
for cb in list(getattr(self, "_cancel_callbacks", ()))
"Request cooperative cancellation of the current execute() call.
CR-4: default sets the substrate-tracked `_cancel_requested` flag and
fires any callbacks registered via `register_cancel_callback`.
Plugin authors who need extra teardown logic (signaling a subprocess,
closing a network connection) override `cancel()` and SHOULD call
`super().cancel()` to preserve the flag-setting + callback-fire
behavior. The plugin's `execute()` polls via `check_cancel()` at safe
interruption points and unwinds when it raises `PluginCancelledError`."
def check_cancel(self) -> None:
"""Raise `PluginCancelledError` if cancellation has been requested.
CR-4 (SG-16 polling primitive): plugin authors call this at safe
interruption points inside `execute()`. The substrate sets the flag via
`cancel()` (typically driven by an operator's "Cancel" button); the
next `check_cancel` call surfaces the cancellation as a typed exception
that unwinds execute() cleanly.
The substrate's worker /execute wrapper resets the flag before each
call so cancellation doesn't leak from one job into the next.
"""
if self._cancel_requested
"Raise `PluginCancelledError` if cancellation has been requested.
CR-4 (SG-16 polling primitive): plugin authors call this at safe
interruption points inside `execute()`. The substrate sets the flag via
`cancel()` (typically driven by an operator's "Cancel" button); the
next `check_cancel` call surfaces the cancellation as a typed exception
that unwinds execute() cleanly.
The substrate's worker /execute wrapper resets the flag before each
call so cancellation doesn't leak from one job into the next."
def register_cancel_callback(
self,
callback: Callable[[], None], # Called when cancel() fires
) -> None
"Register a callback that fires when cancel() is called.
CR-4 (SG-16 callback primitive): for plugins that can't easily insert
polling at strategic points (e.g., a plugin wrapping a blocking C
extension). Callbacks should be non-blocking and idempotent. Multiple
callbacks can be registered; all fire in registration order when
cancel() is invoked. A misbehaving callback that raises is logged
and skipped — the remaining callbacks still fire."
def cancel_signal_to(
self,
callback: Callable[[], None], # Cancellation callback scoped to the with-block
) -> Generator[None, None, None]
"Context manager registering `callback` for the duration of the with-block.
Useful for binding cancellation to a finite-scope resource (a subprocess,
a network request, a temporary file handle) without needing to
deregister manually. Pairs with `register_cancel_callback` for cases
where lifetime is tied to a `try:`/`finally:` block.
Example:
def execute(self, *args, **kwargs):
proc = subprocess.Popen(...)
with self.cancel_signal_to(lambda: proc.terminate()):
return proc.wait()"
def on_disable(self) -> None:
"""CR-2: signal that the substrate has marked this plugin as disabled.
Worker stays alive; plugin can release heavy resources here (e.g., free
GPU memory, close model files). The substrate fires this hook AFTER any
in-flight job for this plugin finishes — see PluginManager.disable_plugin
deferred-hook semantics. Default: no-op; plugins opt in by overriding.
"""
pass
def on_enable(self) -> None
"CR-2: signal that the substrate has marked this plugin as disabled.
Worker stays alive; plugin can release heavy resources here (e.g., free
GPU memory, close model files). The substrate fires this hook AFTER any
in-flight job for this plugin finishes — see PluginManager.disable_plugin
deferred-hook semantics. Default: no-op; plugins opt in by overriding."
def on_enable(self) -> None:
"""CR-2: signal that the substrate has marked this plugin as re-enabled.
Plugin can eagerly re-acquire heavy resources here, or rely on lazy
re-acquisition via the next execute() call (substrate doesn't prefer
one strategy over the other). Default: no-op; plugins opt in by overriding.
"""
pass
def report_progress(
self,
progress: float, # 0.0 to 1.0, or -1.0 for indeterminate
message: str = "" # Descriptive status message
) -> None
"CR-2: signal that the substrate has marked this plugin as re-enabled.
Plugin can eagerly re-acquire heavy resources here, or rely on lazy
re-acquisition via the next execute() call (substrate doesn't prefer
one strategy over the other). Default: no-op; plugins opt in by overriding."
def report_progress(
self,
progress: float, # 0.0 to 1.0, or -1.0 for indeterminate
message: str = "" # Descriptive status message
) -> None
"Report execution progress. Call during execute() to update status."
def report_usage(
self,
usage: Dict[str, float], # Measured usage for this execute, keyed by plugin-defined unit name
) -> None
"SG-54: report measured API/service usage for the current execute() call.
Unit-agnostic by design — the plugin (which holds the API response)
supplies whatever unit names it measures: {"input_tokens": .., "output_tokens": ..}
for an LLM, {"pages": ..} for OCR, {"characters": ..} for TTS,
{"credits"/"requests"/"minutes": ..} for others. The substrate stores +
accumulates per unit name WITHOUT interpreting them (summed across runs in
the empirical store's api_usage_totals). Pricing is deliberately NOT here
(volatile, per-service, often not API-accessible) — a consumer-side rate
table turns raw units into cost. "Derive from behaviour": the plugin
MEASURES actual usage from the response; the substrate aggregates.
Stored on `self._last_api_usage`; the worker exposes it via /stats and the
substrate folds it into the post-execute ResourceSample. The worker resets
it before each execute so a failed/usage-less call can't inherit stale
usage. Default: store-only (parallel to report_progress)."
class _CR4MinimalPlugin(PluginInterface):
"Concrete plugin satisfying abstracts; relies on CR-4 default cleanup()."
def name(self) -> str: return "cr4-minimal"
@property
def version(self) -> str: return "0.0.0"
def version(self) -> str: return "0.0.0"
def initialize(self, config=None): self._cfg = dict(config or {})
def initialize(self, config=None): self._cfg = dict(config or {})
def execute(self, *args, **kwargs): return None
def execute(self, *args, **kwargs): return None
def get_config_schema(self) -> Dict[str, Any]: return {}
def get_config_schema(self) -> Dict[str, Any]: return {}
def get_current_config(self) -> Dict[str, Any]: return dict(getattr(self, "_cfg", {}))
def get_current_config(self) -> Dict[str, Any]: return dict(getattr(self, "_cfg", {}))
Variables
RELOAD_TRIGGER = 'reload_trigger'
Plugin Manager (manager.ipynb)
Plugin discovery, loading, and lifecycle management system
Import
from cjm_plugin_system.core.manager import (
PluginManager,
PluginBinding
)
Functions
def register_system_monitor(
self,
plugin_name:str # Name of the system monitor plugin
) -> None
"Bind a loaded plugin to act as the hardware system monitor."
def _get_global_stats(self) -> Dict[str, Any]: # Current system telemetry
"""Fetch real-time stats from the system monitor plugin (sync).
CR-3: prefer typed `get_system_status()` over magic-string dispatcher.
Duck-types because the substrate references `system_monitor` as a
generic `PluginInterface` — CR-1's host-no-imports rule means substrate
does not import `cjm-infra-plugin-system` to type-narrow the reference.
Proxies after CR-3 expose `get_system_status` as a bound method that
POSTs to `/get_system_status` and returns `Optional[Dict[str, Any]]`.
"""
if not self.system_monitor
"""
Fetch real-time stats from the system monitor plugin (sync).
CR-3: prefer typed `get_system_status()` over magic-string dispatcher.
Duck-types because the substrate references `system_monitor` as a
generic `PluginInterface` — CR-1's host-no-imports rule means substrate
does not import `cjm-infra-plugin-system` to type-narrow the reference.
Proxies after CR-3 expose `get_system_status` as a bound method that
POSTs to `/get_system_status` and returns `Optional[Dict[str, Any]]`.
"""
async def _get_global_stats_async(self) -> Dict[str, Any]: # Current system telemetry
"""Fetch real-time stats from the system monitor plugin (async).
Same CR-3 duck-type semantics as the sync variant. Async variant exists
because the substrate's `execute_plugin_async` path (CR-2 + CR-10) needs
a non-blocking stats fetch when scheduling under an asyncio event loop.
"""
if not self.system_monitor
"""
Fetch real-time stats from the system monitor plugin (async).
Same CR-3 duck-type semantics as the sync variant. Async variant exists
because the substrate's `execute_plugin_async` path (CR-2 + CR-10) needs
a non-blocking stats fetch when scheduling under an asyncio event loop.
"""
def _check_interface_fqn(
self,
iface_fqn:str, # Interface FQN string from the manifest
plugin_name:str # For error messages
) -> bool: # True if FQN passes the format check
"""
SG-7: sanity-check the interface FQN format at discovery time.
Substrate cannot actually import every interface library (the host
env doesn't carry them all), so this is a format-only check. The
authoritative import check happens at install time in
_generate_manifest's introspection script.
"""
def _parse_taxonomy(
self,
manifest: Dict[str, Any] # Loaded manifest dict
) -> Optional[PluginTaxonomy]
"""
CR-1: parse the manifest's taxonomy block into a PluginTaxonomy.
Returns None when the manifest predates CR-1 (no taxonomy block).
Used for callers that have a flat dict (post-CR-8, discover_manifests
uses load_manifest's typed PluginTaxonomy directly and bypasses this).
"""
def _parse_resources(
self,
manifest: Dict[str, Any] # Loaded manifest dict
) -> Optional[ResourceRequirements]
"Phase 5a: parse the manifest's resources block into a ResourceRequirements."
def _derive_category(
self,
manifest: Dict[str, Any], # Loaded manifest dict (flat-shaped)
taxonomy: Optional[PluginTaxonomy] # Parsed taxonomy (or None for legacy manifests)
) -> str
"""
CR-1: derive the human-readable category label.
Resolution: category_override > Title-Case of taxonomy.domain (with
underscores → spaces) > legacy `category` field > empty.
`category_override` lives at top level on v1.0 flat manifests only
(v2.0 layout has no slot — CR-1 closed decision deferred a custom
label channel until a concrete consumer asks for one).
"""
def discover_manifests(self) -> List[PluginMeta]: # List of discovered plugin metadata
"""Discover plugins via JSON manifests in search paths.
CR-8: reads each manifest via `load_manifest`, which transparently parses
both v2.0 nested + legacy v1.0 flat layouts into a typed `ManifestV2`.
`meta.manifest` is set to a flat-shaped dict view so existing consumers
(proxy, scheduling, execute path) continue working unchanged; the typed
`ManifestV2` is also attached as `meta.manifest_v2` so drift detection
+ future typed callers can read `drift_tracking.config_schema_hash`
without re-parsing.
"""
self.discovered = []
seen_plugins = set()
for base_path in self.search_paths
"""
Discover plugins via JSON manifests in search paths.
CR-8: reads each manifest via `load_manifest`, which transparently parses
both v2.0 nested + legacy v1.0 flat layouts into a typed `ManifestV2`.
`meta.manifest` is set to a flat-shaped dict view so existing consumers
(proxy, scheduling, execute path) continue working unchanged; the typed
`ManifestV2` is also attached as `meta.manifest_v2` so drift detection
+ future typed callers can read `drift_tracking.config_schema_hash`
without re-parsing.
"""
def get_discovered_by_category(
self,
category:str # Category to filter by (e.g., "transcription")
) -> List[PluginMeta]: # List of matching discovered plugins
"Get discovered plugins filtered by category."
def get_plugins_by_category(
self,
category:str # Category to filter by (e.g., "transcription")
) -> List[PluginMeta]: # List of matching loaded plugins
"Get loaded plugins filtered by category."
def get_discovered_categories(self) -> List[str]: # List of unique categories
"""Get all unique categories among discovered plugins."""
return list(set(meta.category for meta in self.discovered if meta.category))
def get_loaded_categories(self) -> List[str]: # List of unique categories
"Get all unique categories among discovered plugins."
def get_loaded_categories(self) -> List[str]: # List of unique categories
"""Get all unique categories among loaded plugins."""
return list(set(meta.category for meta in self.plugins.values() if meta.category))
def get_plugin_meta(
self,
plugin_name:str # Name of the plugin
) -> Optional[PluginMeta]: # Plugin metadata or None
"Get all unique categories among loaded plugins."
def get_plugin_meta(
self,
plugin_name:str # Name of the plugin
) -> Optional[PluginMeta]: # Plugin metadata or None
"Get metadata for a loaded plugin by name."
def get_discovered_meta(
self,
plugin_name:str # Name of the plugin
) -> Optional[PluginMeta]: # Plugin metadata or None
"Get metadata for a discovered (not necessarily loaded) plugin by name."
def _extract_defaults_from_schema(
self,
config_schema:Optional[Dict[str, Any]] # JSON Schema with properties
) -> Dict[str, Any]: # Default values extracted from schema
"Extract default values from a JSON Schema's properties."
def _validate_config_against_schema(
"""
SG-5: validate a config dict against the manifest's `config_schema`
before forwarding to the plugin's `initialize()`.
"""
def _check_config_schema_drift(
self,
proxy:Any, # RemotePluginProxy with a live worker
plugin_meta:PluginMeta, # Metadata to flag if drift is detected
) -> None
"""
SG-9 + CR-8: compare live worker `/config_schema` to the stored hash.
Reads the stored hash from `plugin_meta.manifest_v2.drift_tracking.config_schema_hash`
(populated by `discover_manifests`). Computes the live hash with
`compute_config_schema_hash` and compares — drift = hashes differ.
Honors `cfg.substrate.drift_detection` opt-out from `cjm.yaml`: hosts
that don't want the per-load `/config_schema` HTTP call can disable
detection there. Default is on.
Test fixtures that stub `meta.manifest = {}` without going through
`discover_manifests` won't have a `manifest_v2` attribute; the
`getattr(..., None)` fallback yields `stored_hash=None`, which doesn't
match any live hash — those tests don't expose a real proxy so the
drift warning fires harmlessly.
"""
def _persist_config(
self,
plugin_name: str # Plugin to persist
) -> None
"""
CR-2: write current PluginMeta state + live worker config to the store.
Reads `meta.enabled` (the substrate-authoritative flag) and the worker's
current_config (when reachable). Failures are logged + swallowed —
persistence is a best-effort side-channel, not a correctness invariant.
"""
def _maybe_fire_disable_hook(
self,
name_or_id: str # instance_id (or legacy plugin_name) whose in-flight job just finished
) -> None
"""
CR-2 + CR-10: fire deferred on_disable for `name_or_id` if pending.
Idempotent. Resolves via self.instances first; falls back to
self.plugins[name].instance for legacy code paths.
"""
def _validate_instance_id(self, instance_id: str) -> None:
"""Reject malformed explicit instance_ids at load time.
Pattern: alphanumeric + underscore + hyphen, length 1..64. Raises
ValueError on invalid input so the caller sees the constraint failure
immediately rather than at first execute / unload.
"""
import re as _re
if not isinstance(instance_id, str)
"""
Reject malformed explicit instance_ids at load time.
Pattern: alphanumeric + underscore + hyphen, length 1..64. Raises
ValueError on invalid input so the caller sees the constraint failure
immediately rather than at first execute / unload.
"""
def _generate_instance_id(self, plugin_name: str) -> str:
"""Generate a unique instance_id of form `{plugin_name}-{6-char-hex}`.
Used when load_plugin is called with new_instance=True and no explicit
instance_id. Retries up to 16 times if a collision occurs in self.instances.
"""
import secrets as _secrets
for _ in range(16)
"""
Generate a unique instance_id of form `{plugin_name}-{6-char-hex}`.
Used when load_plugin is called with new_instance=True and no explicit
instance_id. Retries up to 16 times if a collision occurs in self.instances.
"""
def get_instance(
self,
name_or_id: str # Plugin name (default-loaded) or explicit instance_id
) -> Optional[PluginInstance]
"""
Return the PluginInstance for `name_or_id`, or None if not loaded.
Lookup is keyed by instance_id (which equals plugin_name for default-
loaded plugins). Multi-instance IDs only exist in self.instances.
"""
def list_instances(
"List all loaded instances, optionally filtered by underlying plugin name."
def _worker_env_specs(
self,
plugin_meta: PluginMeta # Plugin whose WORKER_ENV contract to read
) -> List[Dict[str, Any]]: # List of EnvVarSpec-as-dict entries (possibly empty)
"""
Return a plugin's WORKER_ENV contract as spec dicts (CR-12).
Prefers the typed manifest_v2 code section; falls back to the flat manifest
dict view. Empty list when the plugin declares no worker-env contract.
"""
def _resolve_worker_env(
self,
plugin_meta: PluginMeta, # Plugin being loaded
scope: Optional[str] = None # SG-55 forward seam: per-principal scope (None = single-user)
) -> Dict[str, str]: # {ENV_NAME: value} overlay injected into the worker at spawn
"""
CR-12: compose the resolved worker-env overlay for a load.
Secrets resolve from the SecretStore keyed by plugin_name — so every
instance of a plugin shares one credential (CR-10: two Gemini instances,
one GEMINI_API_KEY). A missing secret is OMITTED (the worker spawns without
it; the plugin reports the gap at execute) rather than injected empty.
Visible vars resolve from their declared `default` (the operator-override
store is a deferred source; the manifest's static `install.env_vars` already
injects fixed visible defaults via the proxy, so this only adds WORKER_ENV
defaults not already covered there). All values are fixed at spawn — a
change requires `reload_plugin`.
"""
def get_worker_env_status(
self,
name_or_meta: Any, # Plugin name (loaded/discovered) or a PluginMeta
scope: Optional[str] = None # SG-55 forward seam
) -> List[Dict[str, Any]]: # Per-entry status dicts (secret values never returned)
"""
CR-12: per-entry satisfaction status of a plugin's worker-env contract.
Each entry: {name, secret, required, satisfied, label, description}.
`satisfied` means a value is resolvable (secret present in the store, or a
visible var has a default/override). Secret VALUES are never returned — only
whether one is set. The plugin-config UI uses this to gate config display on
required secrets being satisfied.
"""
def missing_required_env(
self,
name_or_meta: Any, # Plugin name or PluginMeta
scope: Optional[str] = None # SG-55 forward seam
) -> List[str]: # Names of required worker-env entries with no resolvable value
"CR-12: names of required worker-env entries that are unsatisfied."
def set_plugin_secret(
self,
name_or_id: str, # Plugin name or instance_id whose secret to set
key: str, # Secret key (the env-var name, e.g. "GEMINI_API_KEY")
value: str, # Secret value (stored via the SecretStore, never config/logs)
*,
scope: Optional[str] = None, # SG-55 forward seam: per-principal scope
reload: bool = True # Respawn loaded worker(s) so the new env is injected
) -> bool: # True if the secret was stored
"""
CR-12: store a plugin secret, then respawn its worker(s) to inject it.
Secrets are keyed by the underlying PLUGIN name (not instance_id), so all
instances of a plugin share one credential — set the Gemini key once and
every Gemini instance gets it at (re)spawn. Because worker env is fixed at
spawn, the new value only reaches a *running* worker via a RESPAWN, so this
reloads each loaded instance of the plugin (unless `reload=False`, e.g. when
provisioning a secret before the plugin is loaded). This is the
actuation seam both the CLI (`cjm-ctl set-secret`) and a future config UI
call. Reload failures are logged, not raised.
"""
def load_plugin(
self,
plugin_meta:PluginMeta, # Plugin metadata (with manifest attached)
config:Optional[Dict[str, Any]]=None, # Initial configuration
strict:bool=True, # SG-5: reject unknown keys against manifest config_schema (default)
instance_id:Optional[str]=None, # CR-10: explicit instance_id; None defaults to plugin_name
new_instance:bool=False, # CR-10: auto-generate `{name}-{hex}` instance_id (with instance_id=None)
max_concurrent_requests:Optional[int]=None # SG-33 (CR-7): per-instance async concurrency cap; None = unbounded
) -> bool: # True if successfully loaded
"""
Load a plugin by spawning a Worker subprocess.
CR-2: reads the persisted PluginConfigRecord from `self.config_store`
before launching the worker. If a persisted record exists and the
caller didn't pass an explicit config, the persisted config is used
as the effective input. The persisted `enabled` flag is applied to
`plugin_meta.enabled` so disabled plugins stay disabled across
process restarts.
CR-10: optional `instance_id` allows multi-instance loading.
- instance_id=None, new_instance=False (default): instance_id =
plugin_meta.name. Populates self.plugins[plugin_name] + self.instances
[plugin_name] together (single-instance backward compat).
- instance_id="custom": validated against `[A-Za-z0-9_-]{1,64}`. Populates
self.instances[custom]. Persistence is keyed by plugin_name and only
applied to the default instance.
- instance_id=None, new_instance=True: auto-generates `{name}-{6-hex}`.
Idempotent: re-load against an existing instance_id returns True without
re-spawning.
CR-7: computes `config_hash` from the effective config (post-defaults +
post-validation) and stores it on the PluginInstance so execute_plugin*
can key empirical samples by (instance_id, config_hash). SG-33 stores
`max_concurrent_requests` on the instance — the actual asyncio.Semaphore
is lazy-created in execute_plugin_async via `_get_concurrent_limiter`.
"""
def load_all(
self,
configs:Optional[Dict[str, Dict[str, Any]]]=None # Plugin name -> config mapping
) -> Dict[str, bool]: # Plugin name -> success mapping
"Discover and load all available plugins."
def unload_plugin(
self,
name_or_id:str # Plugin name (default-loaded) or instance_id (multi-instance)
) -> bool: # True if successfully unloaded
"""
Unload a plugin instance and terminate its Worker process (CR-10).
If name_or_id resolves to the default instance (instance_id == plugin_name)
and no other instances remain for the same plugin, also removes the
PluginMeta from self.plugins. Otherwise removes only the instance and
clears PluginMeta.instance if it pointed at the unloaded canonical.
"""
def unload_all(self) -> None:
"""Unload all plugin instances and terminate all Worker processes (CR-10).
Iterates self.instances (CR-10 keying) rather than self.plugins so all
multi-instance entries get torn down, not just the canonical instances.
"""
for inst_id in list(self.instances.keys())
"""
Unload all plugin instances and terminate all Worker processes (CR-10).
Iterates self.instances (CR-10 keying) rather than self.plugins so all
multi-instance entries get torn down, not just the canonical instances.
"""
def get_plugin(
self,
name_or_id:str # Plugin name (default-loaded) or instance_id (multi-instance)
) -> Optional[PluginInterface]: # Plugin proxy instance or None
"""
Get a loaded plugin's proxy by name or instance_id (CR-10).
Lookup order: self.instances first (covers both default plugin_name and
multi-instance IDs), falling back to PluginMeta.instance for any
legacy code path that populated self.plugins without self.instances
(defensive — shouldn't happen post-CR-10 since load_plugin always
records the instance).
"""
def list_plugins(self) -> List[PluginMeta]: # List of loaded plugin metadata
"List all loaded plugins."
def _record_sample_safe(self, inst:PluginInstance, start_time:float, success:bool) -> None:
"""CR-7: best-effort empirical sample recording.
Captures worker stats at end-of-execute (proxy of peak), builds a
ResourceSample, and records it via the EmpiricalResourceStore. Failures
log + swallow — sample recording must never break the execute path
(matches CR-2's `_persist_config` best-effort discipline).
Stats fetch can fail naturally (e.g. worker died with WorkerOOMError —
the proxy is unreachable). The sample still records with zero stats +
success=False so we have a record of the failed attempt for the
success_rate aggregate.
"""
# Defensive against test fixtures that bypass __init__: if empirical_store
"""
CR-7: best-effort empirical sample recording.
Captures worker stats at end-of-execute (proxy of peak), builds a
ResourceSample, and records it via the EmpiricalResourceStore. Failures
log + swallow — sample recording must never break the execute path
(matches CR-2's `_persist_config` best-effort discipline).
Stats fetch can fail naturally (e.g. worker died with WorkerOOMError —
the proxy is unreachable). The sample still records with zero stats +
success=False so we have a record of the failed attempt for the
success_rate aggregate.
"""
def _get_concurrent_limiter(self, instance_id:str) -> Optional[asyncio.Semaphore]:
"""SG-33 (CR-7): lazy-create the per-instance asyncio.Semaphore.
Returns None when the instance has no `max_concurrent_requests` set (the
default — unbounded). Otherwise creates the semaphore on first call and
caches it in `self._concurrent_limiters`. Semaphores are bound to the
event loop they were created in; lazy creation inside `execute_plugin_async`
ensures we're inside the right loop at construction time (Python 3.10+
semaphore-loop-binding rules).
Defensive: returns None if the manager was constructed via __new__ without
`_concurrent_limiters` being populated (test-fixture pattern).
"""
limiters = getattr(self, '_concurrent_limiters', None)
if limiters is None
"""
SG-33 (CR-7): lazy-create the per-instance asyncio.Semaphore.
Returns None when the instance has no `max_concurrent_requests` set (the
default — unbounded). Otherwise creates the semaphore on first call and
caches it in `self._concurrent_limiters`. Semaphores are bound to the
event loop they were created in; lazy creation inside `execute_plugin_async`
ensures we're inside the right loop at construction time (Python 3.10+
semaphore-loop-binding rules).
Defensive: returns None if the manager was constructed via __new__ without
`_concurrent_limiters` being populated (test-fixture pattern).
"""
def _reactive_evict_for(
self,
needed_meta:PluginMeta,
shortfall:Optional[Any]=None, # Optional ResourceShortfall from Track B; informational only
) -> bool
"""
CR-7: try to free resources after a PluginResourceError during execute.
Wraps `_evict_for_resources` with reactive-flow logging. `_evict_for_resources`
itself extends to multi-axis + cost-aware candidate selection (drops the
GPU-only filter, prefers evicting empirically-expensive idle plugins).
`shortfall` is recorded for log context but doesn't currently steer
candidate selection beyond what _evict_for_resources already does via
its `needed_meta.resources` check. A future enhancement could pass it
through for axis-specific candidate filtering.
"""
def _evict_for_resources(self, needed_meta:PluginMeta) -> bool
"""
Attempt to free resources by unloading/releasing idle plugins (LRU).
CR-7: extended from GPU-only LRU to multi-axis cost-aware eviction.
- Candidate set: any loaded plugin that isn't the one we're allocating
for (drops the pre-CR-7 `requires_gpu` filter).
- Sort key: primary = idle (older last_executed first, classic LRU);
secondary = empirical cost when available (highest peak gets evicted
first among equally-idle candidates). Cost axis follows the needed
plugin's `resources.requires_gpu` flag — GPU peak when we're freeing
for a GPU plugin, system memory peak otherwise.
Without empirical data (no store / unmeasured plugin), the secondary
key is 0.0 and pure LRU applies. Cost-aware selection is opt-in via
`empirical_tracking: true`.
"""
def execute_plugin(
self,
name_or_id:str, # Plugin name (default-loaded) or instance_id (multi-instance)
*args,
**kwargs
) -> Any: # Plugin result
"""
Execute a plugin instance's main functionality (sync).
CR-10: resolves `name_or_id` via self.instances; per-instance enabled
flag gates execution. `_running_executions` tracks by instance_id so
concurrent multi-instance executes don't collide.
CR-2: raises PluginDisabledError (typed) when the instance is disabled.
CR-7: reactive retry on PluginResourceError — evicts other plugins to
free resources, then ALWAYS reloads the failing plugin's worker before
the retry attempt. Track A (WorkerOOMError — worker died from SIGKILL)
needs the reload because there's no live worker to retry on. Track B
(plugin-raised PluginResourceError — worker still alive) ALSO reloads
because PyTorch's CUDA caching allocator can fragment post-OOM in ways
the plugin can't clean up from within its own process; a fresh worker
is the only reliable reset. Bounded by `self.max_retries` (default 1).
Empirical sample recorded in the finally block — best-effort, doesn't
break execute on failure.
"""
async def execute_plugin_async(
self,
name_or_id:str, # Plugin name (default-loaded) or instance_id (multi-instance)
*args,
**kwargs
) -> Any: # Plugin result
"""
Execute a plugin instance's main functionality (async).
CR-10 + CR-2: same semantics as execute_plugin, async-flavored. Scheduler
allocation goes through allocate_async for non-blocking polling.
CR-7 + SG-33: reactive retry on PluginResourceError — always reloads
before retry (Track A + Track B converge on the same reload path; see
sync variant docstring for the rationale). Per-instance asyncio.Semaphore
enforces the `max_concurrent_requests` cap (None = unbounded). Empirical
sample recorded in the finally block.
"""
def enable_plugin(
self,
name_or_id:str # Plugin name (default instance) or instance_id (multi-instance)
) -> bool: # True if instance was enabled
"""
Enable a plugin instance (CR-10 multi-instance aware).
CR-2: persists the new state via `config_store` (default-instance only;
persistence is per-plugin, not per-instance) and fires the plugin's
on_enable hook on state-change. Idempotent for already-enabled instances.
"""
def disable_plugin(
self,
name_or_id:str # Plugin name (default instance) or instance_id (multi-instance)
) -> bool: # True if instance was disabled
"""
Disable a plugin instance without unloading it (CR-10 multi-instance aware).
CR-2: persists the new state (default-instance only) and fires the
plugin's on_disable hook — but defers the hook until any in-flight job
for THIS instance finishes (the per-instance `_running_executions` key
is the instance_id, so a concurrent execute on a different instance of
the same plugin doesn't gate this instance's hook).
"""
def get_plugin_logs(
self,
plugin_name:str, # Name of the plugin
lines:int=50 # Number of lines to return
) -> str: # Log content
"Read the last N lines of the plugin's log file."
def get_plugin_config(
self,
plugin_name: str # Name of the plugin
) -> Optional[Dict[str, Any]]: # Current configuration or None
"Get the current configuration of a plugin."
def get_plugin_config_schema(
self,
plugin_name: str # Name of the plugin
) -> Optional[Dict[str, Any]]: # JSON Schema or None
"Get the configuration JSON Schema for a plugin."
def get_config_options(
self,
name_or_id: str # Plugin name (default instance) or instance_id (multi-instance)
) -> Dict[str, Any]: # CR-11: live config option domains, or {} if unavailable
"""
Get a plugin instance's runtime config option providers (CR-11).
Forwards to the worker's get_config_options() - live enum domains +
per-option metadata for dynamic config fields (e.g. an API model list).
Kept separate from get_plugin_config_schema (static, hashed for CR-8 drift);
these options are the live companion the plugin-config UI merges on top.
Degrades to {} if the instance is missing or the worker call fails - the UI
then falls back to the static schema. Typed-error surfacing for the UI
consumer is deferred to the plugin-config UI library (Path C Step 4).
"""
def get_all_plugin_configs(self) -> Dict[str, Dict[str, Any]]: # Plugin name -> config mapping
"""Get current configuration for all loaded plugins."""
return {
name: plugin.get_current_config()
"Get current configuration for all loaded plugins."
def update_plugin_config(
self,
name_or_id: str, # Plugin name (default instance) or instance_id (multi-instance)
config: Dict[str, Any], # New configuration values
strict: bool = True # SG-5: reject unknown keys against manifest config_schema (default)
) -> bool: # True if successful
"""
Update a plugin instance's configuration (CR-10 multi-instance aware).
CR-2: on successful reconfigure, persists the new config (default instance
only; multi-instance loads don't persist). Per-instance `inst.config` is
updated regardless.
SG-5: validates against the underlying plugin's config_schema (per-plugin,
not per-instance, so all instances share the same schema).
"""
def reload_plugin(
self,
name_or_id: str, # Plugin name (default instance) or instance_id (multi-instance)
config: Optional[Dict[str, Any]] = None # Optional new configuration
) -> bool: # True if successful
"Reload a plugin instance by terminating and restarting its Worker (CR-10)."
def get_plugin_stats(
self,
name_or_id: str # Plugin name (default instance) or instance_id (multi-instance)
) -> Optional[Dict[str, Any]]: # Resource telemetry or None
"Get resource usage stats for a plugin instance's Worker process (CR-10)."
async def execute_plugin_stream(
self,
name_or_id: str, # Plugin name (default instance) or instance_id (multi-instance)
*args,
**kwargs
) -> AsyncGenerator[Any, None]: # Async generator yielding results
"""
Execute a plugin instance with streaming response (CR-10 multi-instance aware).
Same per-instance resolution as execute_plugin_async; scheduler allocation
keys off the PluginMeta (plugin-level), execution + bookkeeping key off
the PluginInstance (per-instance).
"""
async def load_plugin_async(
self,
plugin_meta: PluginMeta,
config: Optional[Dict[str, Any]] = None,
strict: bool = True,
instance_id: Optional[str] = None,
new_instance: bool = False,
) -> bool
"""
Async variant of `load_plugin` (CR-10b).
Runs the existing sync `load_plugin` via `asyncio.to_thread` so the
blocking proxy spawn + `_wait_for_ready` doesn't stall the event loop.
Backward compat: identical behavior to the sync method, just non-blocking.
"""
async def unload_plugin_async(
self,
name_or_id: str,
) -> bool
"Async variant of `unload_plugin` (CR-10b)."
def _spec_requested_key(spec: PluginLoadSpec, index: int) -> str:
"""Derive the dict key the load_plugins_concurrent result uses for `spec`.
Resolution: explicit `instance_id` > `meta.name` + `#new[{index}]` suffix
for ambiguous new_instance=True specs > `meta.name`. The suffix prevents
key collision when multiple specs request a new instance of the same plugin
without explicit instance_ids.
"""
if spec.instance_id is not None
"""
Derive the dict key the load_plugins_concurrent result uses for `spec`.
Resolution: explicit `instance_id` > `meta.name` + `#new[{index}]` suffix
for ambiguous new_instance=True specs > `meta.name`. The suffix prevents
key collision when multiple specs request a new instance of the same plugin
without explicit instance_ids.
"""
async def load_plugins_concurrent(
self,
specs: List[PluginLoadSpec], # Per-plugin load specifications
max_concurrency: Optional[int] = None, # Cap simultaneous loads; None = unbounded
fail_fast: bool = False, # Re-raise first exception (default: collect all results)
) -> Dict[str, Union[str, Exception]]: # requested_key → instance_id or Exception
"""
CR-10b: fan out plugin loads concurrently via asyncio.gather.
Each spec is loaded via `load_plugin_async` (`asyncio.to_thread` under the
hood). The total wall-clock drops from sum-of-spawns to max-of-spawns when
`max_concurrency=None`. Capped concurrency uses an asyncio.Semaphore.
Result keys come from `_spec_requested_key`: explicit `instance_id` if set,
`{plugin_name}#new[{index}]` for ambiguous new_instance specs, else
`plugin_name`. Successful entries map to the resolved instance_id (string);
failures map to the raised exception (caught regardless of fail_fast value
for non-fail-fast mode; re-raised in fail_fast=True).
"""
async def unload_plugins_concurrent(
self,
name_or_ids: List[str], # Plugin names or instance_ids to unload
max_concurrency: Optional[int] = None,
fail_fast: bool = False,
) -> Dict[str, Union[bool, Exception]]: # name_or_id → True or Exception
"""
CR-10b: fan out plugin unloads concurrently via asyncio.gather.
Same concurrency + fail_fast semantics as load_plugins_concurrent. Result
keys are the input `name_or_ids` (deduplication is the caller's
responsibility; duplicate inputs produce one dict entry per unique key).
"""
def bind(
self,
plugin_name: str, # Name of the plugin to pre-bind
default_config: Optional[Dict[str, Any]] = None # Default config used by binding.load()
) -> PluginBinding: # Bound view ready for instance-style use
"Create a PluginBinding pre-bound to this manager + plugin_name."
def get_by_role(
self,
role: str # Interface class name segment of the FQCN (e.g., "TranscriptionPlugin")
) -> List[PluginMeta]: # Discovered plugins matching the role
"CR-1: return discovered plugins implementing the given interface role."
def get_by_domain(
self,
domain: str # Domain segment of the taxonomy (e.g., "transcription")
) -> List[PluginMeta]: # Discovered plugins in the domain
"CR-1: return discovered plugins in the given domain."
def get_canonical(
self,
role: str # Interface class name to look up
) -> Optional[PluginMeta]: # The unique matching plugin or None
"""
CR-1: return the single canonical plugin for a role.
Returns None if zero or multiple plugins implement the role — useful for
substrate-internal use cases (e.g., the graph storage plugin) where the
expectation is exactly one implementation. Callers that want
multi-implementation handling use `get_by_role()` directly.
Multi-match is logged at WARNING level because it's a substrate-visible
configuration-time surprise — without the warning, the caller's None-handling
branch can't distinguish "no plugins installed for this role" from
"multiple plugins competing for an exactly-one role." Zero-match is silent
because absence-of-optional-plugin is a normal probe outcome.
"""
def get_compatible_for_current_platform(self) -> List[PluginMeta]: # Plugins compatible with current platform
"""Phase 5a: return discovered plugins compatible with the host platform.
Filters by `resources.platforms`. Plugins with an empty (or absent)
platforms list are considered universally compatible — that's the
introspection-time convention when a plugin author didn't declare a
platform constraint. Plugins lacking the entire `resources` block
(legacy / pre-Phase-5a manifests) also pass through as universal.
Does NOT filter on `requires_gpu` — substrate doesn't know whether a
GPU is present without invoking a system monitor plugin. Callers gate
on GPU availability separately if needed.
"""
# Late import: platform module brings in subprocess + json; defer to call time.
"""
Phase 5a: return discovered plugins compatible with the host platform.
Filters by `resources.platforms`. Plugins with an empty (or absent)
platforms list are considered universally compatible — that's the
introspection-time convention when a plugin author didn't declare a
platform constraint. Plugins lacking the entire `resources` block
(legacy / pre-Phase-5a manifests) also pass through as universal.
Does NOT filter on `requires_gpu` — substrate doesn't know whether a
GPU is present without invoking a system monitor plugin. Callers gate
on GPU availability separately if needed.
"""
Classes
class PluginManager:
def __init__(
self,
plugin_interface:Type[PluginInterface]=PluginInterface, # Base interface for type checking
search_paths:Optional[List[Path]]=None, # Custom manifest search paths
scheduler:Optional[ResourceScheduler]=None, # Resource allocation policy
config_store:Optional[PluginConfigStore]=None, # CR-2: persistence backend; lazy LocalPluginConfigStore default per OQ-4
empirical_store:Optional[EmpiricalResourceStore]=None, # CR-7: resource-usage tracking backend; lazy LocalEmpiricalResourceStore when cfg.substrate.empirical_tracking
secret_store:Optional[SecretStore]=None, # CR-12: secret backend; lazy LocalSecretStore default (project-local <data_dir>/secrets)
max_retries:int=1 # CR-7: how many reactive retries to attempt on PluginResourceError (default 1 — one retry after eviction)
)
"Manages plugin discovery, loading, and lifecycle via process isolation."
def __init__(
self,
plugin_interface:Type[PluginInterface]=PluginInterface, # Base interface for type checking
search_paths:Optional[List[Path]]=None, # Custom manifest search paths
scheduler:Optional[ResourceScheduler]=None, # Resource allocation policy
config_store:Optional[PluginConfigStore]=None, # CR-2: persistence backend; lazy LocalPluginConfigStore default per OQ-4
empirical_store:Optional[EmpiricalResourceStore]=None, # CR-7: resource-usage tracking backend; lazy LocalEmpiricalResourceStore when cfg.substrate.empirical_tracking
secret_store:Optional[SecretStore]=None, # CR-12: secret backend; lazy LocalSecretStore default (project-local <data_dir>/secrets)
max_retries:int=1 # CR-7: how many reactive retries to attempt on PluginResourceError (default 1 — one retry after eviction)
)
"Initialize the plugin manager."
@dataclass
class PluginBinding:
"""
Pre-bound view of a single plugin through a shared PluginManager.
Eliminates the wrapper-class duplication audited across 8 consumer services
(SG-17). Methods forward to the manager with `plugin_name` pre-supplied;
`default_config` is the fallback used when `load()` is called without an
explicit config (matches the manifest-default behavior in `load_plugin`).
"""
manager: 'PluginManager' # The shared PluginManager
plugin_name: str # Name of the plugin this binding targets
default_config: Dict[str, Any] = _field(default_factory=dict) # Used when load() called without config
def meta(self) -> Optional[PluginMeta]:
"""The PluginMeta if the plugin is loaded, else None."""
return self.manager.get_plugin_meta(self.plugin_name)
@property
def is_loaded(self) -> bool
"The PluginMeta if the plugin is loaded, else None."
def is_loaded(self) -> bool:
"""True if the plugin is loaded in the bound manager."""
return self.manager.get_plugin(self.plugin_name) is not None
@property
def is_enabled(self) -> bool
"True if the plugin is loaded in the bound manager."
def is_enabled(self) -> bool:
"""True if the plugin is loaded AND not currently disabled."""
m = self.meta
return m is not None and m.enabled
# --- Lifecycle ---
def load(
self,
config: Optional[Dict[str, Any]] = None, # Override default_config when provided
strict: bool = True # SG-5 strict validation
) -> bool: # True if loaded successfully
"True if the plugin is loaded AND not currently disabled."
def load(
self,
config: Optional[Dict[str, Any]] = None, # Override default_config when provided
strict: bool = True # SG-5 strict validation
) -> bool: # True if loaded successfully
"Load via the bound manager. Uses `default_config` if no `config` provided."
def unload(self) -> bool: # True if unloaded
"""Unload the bound plugin."""
return self.manager.unload_plugin(self.plugin_name)
def reload(
self,
config: Optional[Dict[str, Any]] = None # Optional new config; current config used if None
) -> bool
"Unload the bound plugin."
def reload(
self,
config: Optional[Dict[str, Any]] = None # Optional new config; current config used if None
) -> bool
"Reload the bound plugin (terminate + restart worker)."
def enable(self) -> bool:
"""Enable the bound plugin."""
return self.manager.enable_plugin(self.plugin_name)
def disable(self) -> bool
"Enable the bound plugin."
def disable(self) -> bool:
"""Disable the bound plugin (worker stays alive; jobs rejected)."""
return self.manager.disable_plugin(self.plugin_name)
# --- Execution ---
def execute(self, *args, **kwargs) -> Any
"Disable the bound plugin (worker stays alive; jobs rejected)."
def execute(self, *args, **kwargs) -> Any:
"""Execute via the bound manager (sync)."""
return self.manager.execute_plugin(self.plugin_name, *args, **kwargs)
async def execute_async(self, *args, **kwargs) -> Any
"Execute via the bound manager (sync)."
async def execute_async(self, *args, **kwargs) -> Any:
"""Execute via the bound manager (async)."""
return await self.manager.execute_plugin_async(self.plugin_name, *args, **kwargs)
# --- Configuration ---
def update_config(
self,
config: Dict[str, Any], # New config values
strict: bool = True # SG-5 strict validation
) -> bool
"Execute via the bound manager (async)."
def update_config(
self,
config: Dict[str, Any], # New config values
strict: bool = True # SG-5 strict validation
) -> bool
"Hot-reload the bound plugin's configuration."
def get_config(self) -> Optional[Dict[str, Any]]:
"""Current configuration values (None if not loaded)."""
return self.manager.get_plugin_config(self.plugin_name)
def get_config_schema(self) -> Optional[Dict[str, Any]]
"Current configuration values (None if not loaded)."
def get_config_schema(self) -> Optional[Dict[str, Any]]:
"""JSON Schema describing this plugin's configuration."""
return self.manager.get_plugin_config_schema(self.plugin_name)
def get_stats(self) -> Optional[Dict[str, Any]]
"JSON Schema describing this plugin's configuration."
def get_stats(self) -> Optional[Dict[str, Any]]:
"""Resource telemetry for the bound plugin's worker process."""
return self.manager.get_plugin_stats(self.plugin_name)
def bind(
self,
plugin_name: str, # Name of the plugin to pre-bind
default_config: Optional[Dict[str, Any]] = None # Default config used by binding.load()
) -> PluginBinding: # Bound view ready for instance-style use
"Resource telemetry for the bound plugin's worker process."
class _CR10StubProxy:
def __init__(self, name="stub"):
self._name = name
self.execute_calls = []
self.on_disable_calls = 0
self.on_enable_calls = 0
@property
def name(self): return self._name
"Stand-in proxy tracking execute calls + hook fires for verification."
def __init__(self, name="stub"):
self._name = name
self.execute_calls = []
self.on_disable_calls = 0
self.on_enable_calls = 0
@property
def name(self): return self._name
def name(self): return self._name
@property
def version(self): return "0.0.1"
def version(self): return "0.0.1"
def initialize(self, config): self._config = dict(config or {})
def initialize(self, config): self._config = dict(config or {})
def execute(self, *args, **kwargs)
def execute(self, *args, **kwargs):
self.execute_calls.append((args, kwargs))
return {"who": self._name, "args": args, "kwargs": kwargs}
def get_config_schema(self): return {}
def get_current_config(self): return {}
def get_current_config(self): return {}
def cleanup(self): pass
def cleanup(self): pass
def on_disable(self): self.on_disable_calls += 1
def on_disable(self): self.on_disable_calls += 1
def on_enable(self): self.on_enable_calls += 1
def on_enable(self): self.on_enable_calls += 1
Manifest Format (v2.0) (manifest_format.ipynb)
Typed parser + writer for the nested v2.0 manifest layout per the 2026-05-19 substrate audit’s CR-8. Substrate manifests transitioned from a flat top-level JSON object to a four-section nested layout:
install(deployment-specific facts populated at install time),code(code-derived facts refreshed bycjm-ctl regenerate-manifest),drift_tracking(a config_schema hash that records the witness shape so live-vs-stored comparisons can detect drift), andoverrides(an operator-supplied overlay placeholder).
Import
from cjm_plugin_system.core.manifest_format import (
CURRENT_FORMAT_VERSION,
InstallSection,
CodeSection,
DriftTracking,
ManifestV2,
compute_config_schema_hash,
load_manifest,
manifest_to_dict,
write_manifest
)
Functions
def compute_config_schema_hash(
schema: Optional[Dict[str, Any]], # JSON Schema or None
) -> str: # "sha256:hexdigest"
"""
Hash a JSON Schema with stable canonicalization.
None is treated as `{}` — the hash records "no schema declared" rather
than refusing. This way a plugin that lost its config_schema between
install and load still gets a drift warning rather than a crash.
"""
def _parse_taxonomy_dict(d: Optional[Dict[str, Any]]) -> Optional[PluginTaxonomy]:
"""Build a `PluginTaxonomy` from its JSON sub-dict, or None."""
if not d
"Build a `PluginTaxonomy` from its JSON sub-dict, or None."
def _parse_resources_dict(d: Optional[Dict[str, Any]]) -> Optional[ResourceRequirements]:
"""Build a `ResourceRequirements` from its JSON sub-dict, or None."""
if not d
"Build a `ResourceRequirements` from its JSON sub-dict, or None."
def _from_v2_dict(
data: Dict[str, Any], # Parsed JSON dict with `format_version == "2.0"`
) -> ManifestV2
"Parse a v2.0 nested manifest dict into a typed `ManifestV2`."
def _from_v1_flat_dict(
data: Dict[str, Any], # Legacy flat manifest dict (no `format_version`)
) -> ManifestV2
"""
REMOVE-AFTER-OVERHAUL: legacy flat-manifest reader shim.
Maps top-level fields into the nested ManifestV2 shape so substrate code
paths only see v2.0 after this point. Returns a ManifestV2 with
`format_version == "1.0"` and `drift_tracking.config_schema_hash == None`
so callers can tell a legacy load apart from a fresh v2.0 write.
Retires after `cascade_manifests.py` rewrites every production manifest
to v2.0; SG-48 sweep removes this function.
"""
def load_manifest(
path: Union[str, Path], # Path to manifest JSON file on disk
) -> ManifestV2: # Parsed manifest in v2.0 typed shape
"""
Load a manifest file and return a typed `ManifestV2`.
Format detection by top-level `format_version` key:
- `"2.0"` → nested layout, parse directly.
- missing → legacy flat layout, pass through v1.0 shim.
- anything else → ValueError.
"""
def _taxonomy_to_dict(t: Optional[PluginTaxonomy]) -> Optional[Dict[str, str]]:
"""Serialize a `PluginTaxonomy` back to its JSON sub-dict, or None."""
if t is None
"Serialize a `PluginTaxonomy` back to its JSON sub-dict, or None."
def _resources_to_dict(r: Optional[ResourceRequirements]) -> Optional[Dict[str, Any]]:
"""Serialize a `ResourceRequirements` back to its JSON sub-dict, or None."""
if r is None
"Serialize a `ResourceRequirements` back to its JSON sub-dict, or None."
def _code_section_to_dict(c: CodeSection) -> Dict[str, Any]:
"""Serialize a `CodeSection` to its JSON sub-dict, renaming `class_name` -> `class`."""
d: Dict[str, Any] = {
"Serialize a `CodeSection` to its JSON sub-dict, renaming `class_name` -> `class`."
def manifest_to_dict(
m: ManifestV2, # Manifest to serialize
) -> Dict[str, Any]: # v2.0 nested dict ready for `json.dumps`
"""
Serialize a `ManifestV2` to a v2.0 dict.
Always emits `format_version == CURRENT_FORMAT_VERSION` — even if the
manifest was loaded from a legacy v1.0 file. This is the upgrade seam:
load-then-write transparently rewrites flat manifests as nested.
"""
def write_manifest(
path: Union[str, Path], # Output JSON file path
manifest: ManifestV2, # Manifest to serialize
) -> None
"Serialize a `ManifestV2` to disk in v2.0 nested layout (indent=2)."
Classes
@dataclass
class InstallSection:
"""
Deployment-specific facts populated at install time.
These fields are written by `install_all` (paths, conda env, env vars)
plus `_generate_manifest`'s post-introspection step (installed_at,
installer_version, package_source). `regenerate-manifest` preserves
the install section across regeneration so paths survive code-side
refreshes.
"""
python_path: str = '' # Absolute path to the plugin env's python interpreter
conda_env: str = '' # Conda environment name
db_path: str = '' # Plugin's per-data SQLite path (if any)
env_vars: Dict[str, str] = field(...) # Per-plugin env vars
installed_at: str = '' # ISO-8601 UTC timestamp of install/regen
installer_version: str = '' # "cjm-ctl <version>" that wrote this manifest
package_source: str = '' # Original install input (git URL or pip spec)
@dataclass
class CodeSection:
"""
Code-derived facts refreshed by `cjm-ctl regenerate-manifest`.
Everything in this section comes from running the introspection script
inside the plugin's conda env: metadata + interface + config_schema +
derived taxonomy + binary platform/hardware hard-facts. Drift detection
hashes this section's `config_schema` field as its witness shape.
`class_name` serializes as the JSON key `"class"` (Python reserved-word
workaround).
"""
name: str = '' # Plugin's unique identifier
version: str = '' # Plugin's version string
description: str = '' # Brief description (SG-6 required)
module: str = '' # Importable module path for the plugin class
class_name: str = '' # Plugin class name (JSON key: "class")
interface: str = '' # Fully qualified interface class name
taxonomy: Optional[PluginTaxonomy] # CR-1 domain/role/FQCN triple
resources: Optional[ResourceRequirements] # Phase 5a hard-facts
config_schema: Optional[Dict[str, Any]] # JSON Schema for plugin config
regenerated_at: Optional[str] # ISO-8601 UTC of last regen
worker_env: Optional[List[Dict[str, Any]]] # CR-12 spawn-env contract: asdict(EnvVarSpec) list
@dataclass
class DriftTracking:
"""
Witness hashes for drift detection.
`config_schema_hash` is computed at write time (regenerate-manifest /
install_all) from a canonical JSON encoding of the code section's
`config_schema`. The PluginManager's drift-check fetches the live
`/config_schema` from the worker, hashes it the same way, and compares;
a mismatch raises `PluginMeta.config_schema_drift = True` plus a
warning log.
"""
config_schema_hash: Optional[str] # "sha256:hexdigest" of canonical config_schema
@dataclass
class ManifestV2:
"""
Top-level v2.0 manifest with four named sections plus `format_version`.
Loaded from a v2.0 nested JSON file as-is, or from a v1.0 flat file via
`_from_v1_flat_dict` (REMOVE-AFTER-OVERHAUL shim). When the v1.0 shim
fires, `format_version` is set to `"1.0"` so substrate code can distinguish
legacy loads from fresh writes; otherwise `format_version` is always
`CURRENT_FORMAT_VERSION`.
"""
install: InstallSection = field(...)
code: CodeSection = field(...)
drift_tracking: DriftTracking = field(...)
overrides: Dict[str, Any] = field(...)
format_version: str = CURRENT_FORMAT_VERSION
Variables
CURRENT_FORMAT_VERSION = '2.0' # Emitted on every freshly-written manifest
Plugin Metadata (metadata.ipynb)
Data structures for plugin metadata
Import
from cjm_plugin_system.core.metadata import (
PluginTaxonomy,
ResourceRequirements,
PluginMeta,
PluginInstance,
PluginLoadSpec
)
Classes
@dataclass
class PluginTaxonomy:
"""
Derived classification of a plugin via its interface FQN.
Populated at install time by `_generate_manifest`'s introspection script
(running in the plugin's own conda env, where the interface library is
installed). Substrate stores strings only — no host-side imports of
interface libraries needed for taxonomy queries.
- `domain`: the substrate area, derived from the interface library's
naming convention `cjm-<domain>-plugin-system` (e.g., "transcription",
"graph", "media", "text").
- `role`: the interface class name segment of the FQN (e.g.,
"TranscriptionPlugin", "GraphPlugin", "ForcedAlignmentPlugin"). Multiple
plugins can share a role within a domain.
- `interface_fqcn`: the full dotted path, kept verbatim for queries and
reverse-lookup.
"""
domain: str # e.g., "transcription", "graph", "media"
role: str # e.g., "TranscriptionPlugin", "GraphPlugin"
interface_fqcn: str # Full dotted interface class path
@dataclass
class ResourceRequirements:
"""
Binary hard-facts about what a plugin needs to run (Phase 5a).
Quantitative resource amounts (min_vram_mb, etc.) deliberately omitted
per CR-7's reactive resource management reframing — plugin authors can't
reliably estimate model × dtype × quantization combinatorics, and Blender-
style variable-render plugins can't estimate at all. The substrate uses
these binary hard-facts purely for discovery filtering; actual resource
contention is handled reactively by CR-7's eviction + retry flow.
- `requires_gpu`: True if the plugin needs any GPU; the substrate gates
execution on a system monitor reporting one is present.
- `platforms`: e.g., ["linux-x64", "darwin-arm64"]. Empty list means no
platform constraint declared (assume universal compatibility).
- `accelerators`: e.g., ["cuda", "mps", "cpu"]. Informational; substrate
doesn't auto-select but consumers can filter on the values.
"""
requires_gpu: bool = False
platforms: List[str] = field(...)
accelerators: List[str] = field(...)
@dataclass
class PluginMeta:
"Metadata about a plugin."
name: str # Plugin's unique identifier
version: str # Plugin's version string
description: str = '' # Brief description of the plugin's functionality
category: str = '' # Display label — derived from taxonomy.domain (CR-1) or read from legacy manifests
interface: str = '' # Fully qualified interface class name (kept verbatim for SG-7 format check)
taxonomy: Optional['PluginTaxonomy']
resources: Optional['ResourceRequirements']
config_schema: Optional[Dict[str, Any]] # JSON Schema for plugin configuration
instance: Optional[Any] # Plugin instance (PluginInterface subclass)
enabled: bool = True # Whether the plugin is enabled
last_executed: float = 0.0 # Unix timestamp
config_schema_drift: bool = False
live_config_schema: Optional[Dict[str, Any]]
@dataclass
class PluginInstance:
"""
Per-instance runtime state for a loaded plugin (CR-10 multi-instance).
Differs from PluginMeta in scope:
- PluginMeta is per-plugin-name discovery + canonical-instance state.
- PluginInstance is per-load-call runtime state.
A plugin loaded with no instance_id (default) gets `instance_id == plugin_name`
and is the canonical instance referenced by PluginMeta.instance. Multi-instance
loads (instance_id != plugin_name) add entries to PluginManager.instances
without changing the canonical reference.
"""
instance_id: str # Unique key in PluginManager.instances; default = plugin_name
plugin_name: str # The underlying discovered plugin's name (PluginMeta.name)
config: Dict[str, Any] = field(...) # Effective config used at initialize()
proxy: Optional[Any]
enabled: bool = True # Per-instance enable flag; substrate's execute_plugin checks this
last_executed: float = 0.0 # Unix timestamp of the most recent execute on this instance
created_at: datetime = field(...)
config_hash: str = ''
max_concurrent_requests: Optional[int]
@dataclass
class PluginLoadSpec:
"""
One entry in `PluginManager.load_plugins_concurrent`'s batch input (CR-10).
Mirrors the positional arguments of `load_plugin` so the concurrent helper
can fan out load calls without repeating the per-spec instance_id /
new_instance plumbing.
- `meta`: the discovered PluginMeta to load (must have a `.manifest` attached).
- `config`: initial configuration; falls through to persisted-or-schema-defaults
when None (default-instance only; multi-instance starts fresh).
- `instance_id`: explicit instance_id (validated against [A-Za-z0-9_-]{1,64}).
None defaults to plugin_name (single-instance backward compat).
- `new_instance`: when True with instance_id=None, auto-generate
`{plugin_name}-{6-hex}`.
"""
meta: Any # PluginMeta — typed as Any to avoid forward-reference quirk under nbdev's late binding
config: Optional[Dict[str, Any]]
instance_id: Optional[str]
new_instance: bool = False
Platform Utilities (platform.ipynb)
Cross-platform utilities for process management, path handling, and system detection
Import
from cjm_plugin_system.core.platform import (
MICROMAMBA_URLS,
is_windows,
is_macos,
is_linux,
is_apple_silicon,
get_current_platform,
get_python_in_env,
get_popen_isolation_kwargs,
terminate_process,
terminate_self,
run_shell_command,
conda_env_exists,
get_micromamba_download_url,
download_micromamba,
get_conda_command,
build_conda_command,
get_micromamba_binary_path,
ensure_runtime_available
)
Functions
def is_windows() -> bool:
"""Check if running on Windows."""
return platform.system() == "Windows"
def is_macos() -> bool
"Check if running on Windows."
def is_macos() -> bool:
"""Check if running on macOS."""
return platform.system() == "Darwin"
def is_linux() -> bool
"Check if running on macOS."
def is_linux() -> bool:
"""Check if running on Linux."""
return platform.system() == "Linux"
def is_apple_silicon() -> bool
"Check if running on Linux."
def is_apple_silicon() -> bool
"Check if running on Apple Silicon Mac (for MPS detection)."
def get_current_platform() -> str:
"""Get current platform string for manifest filtering.
Returns strings like 'linux-x64', 'darwin-arm64', 'win-x64'.
"""
system = platform.system().lower()
machine = platform.machine().lower()
# Normalize system names
if system == "darwin"
"""
Get current platform string for manifest filtering.
Returns strings like 'linux-x64', 'darwin-arm64', 'win-x64'.
"""
def get_python_in_env(
env_path: Path # Path to conda environment root
) -> Path: # Path to Python executable
"""
Get the Python executable path for a conda environment.
On Windows: env_path/python.exe
On Unix: env_path/bin/python
"""
def get_popen_isolation_kwargs() -> Dict[str, Any]:
"""Return kwargs for process isolation in subprocess.Popen.
On Unix: Returns {'start_new_session': True}
On Windows: Returns {'creationflags': CREATE_NEW_PROCESS_GROUP}
Usage:
process = subprocess.Popen(cmd, **get_popen_isolation_kwargs(), ...)
"""
if is_windows()
"""
Return kwargs for process isolation in subprocess.Popen.
On Unix: Returns {'start_new_session': True}
On Windows: Returns {'creationflags': CREATE_NEW_PROCESS_GROUP}
Usage:
process = subprocess.Popen(cmd, **get_popen_isolation_kwargs(), ...)
"""
def terminate_process(
process: subprocess.Popen, # Process to terminate
timeout: float = 2.0 # Seconds to wait before force kill
) -> None
"""
Terminate a subprocess gracefully, with fallback to force kill.
On all platforms:
1. Calls process.terminate() (SIGTERM on Unix, TerminateProcess on Windows)
2. Waits for timeout seconds
3. If still running, calls process.kill() (SIGKILL on Unix, TerminateProcess on Windows)
"""
def terminate_self() -> None:
"""Terminate the current process (for worker suicide pact).
On Unix: Sends SIGTERM to self for graceful shutdown
On Windows: Calls os._exit() since Windows lacks SIGTERM
"""
if is_windows()
"""
Terminate the current process (for worker suicide pact).
On Unix: Sends SIGTERM to self for graceful shutdown
On Windows: Calls os._exit() since Windows lacks SIGTERM
"""
def run_shell_command(
cmd: str, # Shell command to execute
check: bool = True, # Whether to raise on non-zero exit
capture_output: bool = False, # Whether to capture stdout/stderr
**kwargs # Additional kwargs passed to subprocess.run
) -> subprocess.CompletedProcess
"""
Run a shell command cross-platform.
Unlike using shell=True with executable='/bin/bash', this function
uses the platform's default shell:
- Linux/macOS: /bin/sh (or $SHELL)
- Windows: cmd.exe
"""
def conda_env_exists(
env_name: str, # Name of the conda environment
conda_cmd: str = "conda" # Conda command (conda, mamba, micromamba)
) -> bool
"""
Check if a conda environment exists (cross-platform).
Uses 'conda env list --json' instead of piping to grep,
which doesn't work on Windows.
"""
def get_micromamba_download_url(
platform_str: Optional[str] = None # Platform string (e.g., 'linux-x64'), uses current if None
) -> str: # Download URL for micromamba binary
"Get the micromamba download URL for the specified or current platform."
def download_micromamba(
dest_path: Path, # Destination path for the micromamba binary
platform_str: Optional[str] = None, # Platform string, uses current if None
show_progress: bool = True # Whether to print progress messages
) -> bool: # True if download succeeded
"Download and extract micromamba binary to the specified path."
def get_conda_command(
config: CJMConfig # Configuration object with runtime settings
) -> List[str]: # Base command with prefix args if needed
"Get the conda/mamba/micromamba base command with prefix args for local mode."
def build_conda_command(
config: CJMConfig, # Configuration object with runtime settings
*args: str # Additional command arguments
) -> List[str]: # Complete command ready for subprocess
"Build a complete conda/mamba/micromamba command."
def get_micromamba_binary_path(
config: CJMConfig # Configuration object with runtime settings
) -> Optional[Path]: # Path to micromamba binary or None
"Get the configured micromamba binary path for the current platform."
def ensure_runtime_available(
config: CJMConfig # Configuration object with runtime settings
) -> bool: # True if runtime is available
"Check if the configured conda/micromamba runtime is available."
Variables
MICROMAMBA_URLS: Dict[str, str]
Remote Plugin Proxy (proxy.ipynb)
Bridge between Host application and isolated Worker processes
Import
from cjm_plugin_system.core.proxy import (
RemotePluginProxy,
execute_async,
execute_stream_sync,
execute_stream,
execute_with_oom_check,
execute_async_with_oom_check,
get_stats,
is_alive,
cancel,
cancel_async,
get_progress,
get_progress_async,
on_disable,
on_enable,
get_system_status,
get_system_status_async,
list_processes,
list_processes_async,
prefetch,
prefetch_async,
reconfigure,
reconfigure_async
)
Functions
def _maybe_serialize_input(
self,
obj: Any # Object to potentially serialize
) -> Any: # Serialized form (path string or original object)
"Convert FileBackedDTO objects to file paths for zero-copy transfer."
def _prepare_payload(
self,
args: tuple, # Positional arguments
kwargs: dict # Keyword arguments
) -> Dict[str, Any]: # JSON-serializable payload
"Prepare arguments for HTTP transmission."
async def execute_async(
self,
*args,
**kwargs
) -> Any: # Plugin result
"""
Execute the plugin asynchronously.
CR-4: HTTP 409 from the worker is mapped to a typed `PluginCancelledError`.
Same 409/200/other semantics as the sync `execute()` variant.
"""
def _raise_from_job_error_chunk(
"""
SG-52: convert a `_job_error` JobError-shaped dict into the right typed exception.
Mapping rules:
- `original_exc_repr` starts with `"PluginCancelledError"` → raise PluginCancelledError
(preserves the non-retriable semantic that category alone doesn't capture).
- category == "user_input" → PluginInputError (with fields_invalid).
- category == "transient" → PluginTransientError (with retry_after_seconds).
- category == "resource" → PluginResourceError (with reconstructed ResourceShortfall).
- category == "fatal" → PluginFatalError.
- Unknown category → RuntimeError carrying the chunk for forensic inspection.
This is the streaming-side counterpart to /execute's 409 → PluginCancelledError
detection. Same intent: the typed exception survives the HTTP wire boundary
so substrate / JobQueue / consumer code can branch on category without
parsing string messages.
"""
def execute_stream_sync(self, *args, **kwargs) -> Generator[Any, None, None]
"Synchronous wrapper for streaming (blocking)."
async def execute_stream(
self,
*args,
**kwargs
) -> AsyncGenerator[Any, None]: # Yields parsed JSON chunks
"""
Execute with streaming response (async generator).
SG-52: detects the terminal `{"_job_error": <JobError dict>}` chunk and
raises the corresponding typed exception client-side instead of yielding
it to downstream consumers. Mirrors /execute's HTTP 409 → typed-exception
behavior at the streaming wire boundary. Normal plugin output chunks
pass through unchanged (they never carry the `_job_error` key).
"""
def _check_worker_death(self) -> None:
"""CR-7 Track A: inspect subprocess state after an httpx fault on /execute.
Raises `WorkerOOMError` if the worker died with SIGKILL (POSIX returncode -9 —
kernel OOM-killer is the dominant cause). Raises `PluginTransientError` if
it died with any other returncode. Returns None silently when the worker
is still alive (caller re-raises the original httpx error).
`signal.SIGKILL` doesn't exist on Windows, where worker-OOM looks different
(STATUS_NO_MEMORY 0xC0000017 = -1073741799). Cross-platform OOM detection is
a future enhancement when there's a concrete Windows substrate consumer.
"""
if self.process is None
"""
CR-7 Track A: inspect subprocess state after an httpx fault on /execute.
Raises `WorkerOOMError` if the worker died with SIGKILL (POSIX returncode -9 —
kernel OOM-killer is the dominant cause). Raises `PluginTransientError` if
it died with any other returncode. Returns None silently when the worker
is still alive (caller re-raises the original httpx error).
`signal.SIGKILL` doesn't exist on Windows, where worker-OOM looks different
(STATUS_NO_MEMORY 0xC0000017 = -1073741799). Cross-platform OOM detection is
a future enhancement when there's a concrete Windows substrate consumer.
"""
def execute_with_oom_check(self, *args, **kwargs) -> Any:
"""CR-7 Track A wrapper around the sync execute path.
Catches httpx connection / protocol faults, calls `_check_worker_death`,
and either raises a typed `WorkerOOMError` / `PluginTransientError` or
re-raises the original httpx error (worker still alive — caller treats
as a generic transient network issue).
"""
payload = self._prepare_payload(args, kwargs)
try
"""
CR-7 Track A wrapper around the sync execute path.
Catches httpx connection / protocol faults, calls `_check_worker_death`,
and either raises a typed `WorkerOOMError` / `PluginTransientError` or
re-raises the original httpx error (worker still alive — caller treats
as a generic transient network issue).
"""
async def execute_async_with_oom_check(self, *args, **kwargs) -> Any:
"""CR-7 Track A wrapper around the async execute path. Same semantics."""
payload = self._prepare_payload(args, kwargs)
try
"CR-7 Track A wrapper around the async execute path. Same semantics."
def get_stats(self) -> Dict[str, Any]: # Process telemetry
"""Get worker process resource usage."""
with httpx.Client() as client
"Get worker process resource usage."
def is_alive(self) -> bool: # True if worker is responsive
"""Check if the worker process is still running and responsive."""
if not self.process or self.process.poll() is not None
"Check if the worker process is still running and responsive."
def cancel(self) -> bool: # True if cancel request was sent
"""Request cancellation of running execution."""
try
"Request cancellation of running execution."
async def cancel_async(self) -> bool: # True if cancel request was sent
"""Request cancellation asynchronously."""
try
"Request cancellation asynchronously."
def get_progress(self) -> Dict[str, Any]: # {progress: float, message: str}
"""Get current execution progress from worker."""
try
"Get current execution progress from worker."
async def get_progress_async(self) -> Dict[str, Any]: # {progress: float, message: str}
"""Get current execution progress asynchronously."""
try
"Get current execution progress asynchronously."
def on_disable(self) -> bool: # True if hook signal accepted by worker
"""CR-2: forward the substrate's on_disable signal to the worker process.
Plugin can opt in via PluginInterface.on_disable(); default implementation
is a no-op so silent-pass-through is the norm. Failures to reach the
worker (already terminated, network blip) are logged-and-swallowed —
the substrate-side enable/disable bookkeeping doesn't depend on the
hook actually firing.
"""
try
"""
CR-2: forward the substrate's on_disable signal to the worker process.
Plugin can opt in via PluginInterface.on_disable(); default implementation
is a no-op so silent-pass-through is the norm. Failures to reach the
worker (already terminated, network blip) are logged-and-swallowed —
the substrate-side enable/disable bookkeeping doesn't depend on the
hook actually firing.
"""
def on_enable(self) -> bool: # True if hook signal accepted by worker
"""CR-2: forward the substrate's on_enable signal to the worker process.
Same delivery semantics as on_disable: best-effort, errors logged-and-
swallowed. The plugin's hook (default no-op) decides whether to eagerly
re-acquire resources or rely on lazy re-load at next execute().
"""
try
"""
CR-2: forward the substrate's on_enable signal to the worker process.
Same delivery semantics as on_disable: best-effort, errors logged-and-
swallowed. The plugin's hook (default no-op) decides whether to eagerly
re-acquire resources or rely on lazy re-load at next execute().
"""
def get_system_status(self) -> Optional[Dict[str, Any]]: # SystemStats dict, or None on transport / config failure
"""CR-3: typed MonitorPlugin accessor. POSTs to worker's `/get_system_status`.
Status code semantics (worker side raises HTTPException with these codes):
- 200: SystemStats dict returned
- 404: plugin is not a MonitorPlugin — logged at ERROR (configuration error;
no amount of retry fixes it) and returns None. Loudly distinguished
from the substrate's WARN-level transient-failure degradation.
- 501: legacy monitor predating CR-3; REMOVE-AFTER-OVERHAUL fallback to
`/execute("get_system_status")` returns a dict in the pre-CR-3 wire format
- 500: real plugin failure; propagates as HTTPStatusError
- ConnectError: worker may have died; returns None silently (substrate
degrades to empty stats)
"""
try
"""
CR-3: typed MonitorPlugin accessor. POSTs to worker's `/get_system_status`.
Status code semantics (worker side raises HTTPException with these codes):
- 200: SystemStats dict returned
- 404: plugin is not a MonitorPlugin — logged at ERROR (configuration error;
no amount of retry fixes it) and returns None. Loudly distinguished
from the substrate's WARN-level transient-failure degradation.
- 501: legacy monitor predating CR-3; REMOVE-AFTER-OVERHAUL fallback to
`/execute("get_system_status")` returns a dict in the pre-CR-3 wire format
- 500: real plugin failure; propagates as HTTPStatusError
- ConnectError: worker may have died; returns None silently (substrate
degrades to empty stats)
"""
async def get_system_status_async(self) -> Optional[Dict[str, Any]]: # SystemStats dict, or None on transport / config failure
"""Async variant of `get_system_status`. Same 200/404/501/500/ConnectError semantics."""
try
"Async variant of `get_system_status`. Same 200/404/501/500/ConnectError semantics."
def list_processes(self) -> Optional[List[Dict[str, Any]]]: # ProcessStats dict list, or None on transport / config failure
"""CR-3: typed MonitorPlugin accessor. POSTs to worker's `/list_processes`.
Same 200/404/501/500/ConnectError semantics as `get_system_status`. Note that
`MonitorPlugin.list_processes()` defaults to returning `[]`, so monitors without
per-process visibility yield a 200 with an empty list rather than 501.
"""
try
"""
CR-3: typed MonitorPlugin accessor. POSTs to worker's `/list_processes`.
Same 200/404/501/500/ConnectError semantics as `get_system_status`. Note that
`MonitorPlugin.list_processes()` defaults to returning `[]`, so monitors without
per-process visibility yield a 200 with an empty list rather than 501.
"""
async def list_processes_async(self) -> Optional[List[Dict[str, Any]]]: # ProcessStats dict list, or None on transport / config failure
"""Async variant of `list_processes`. Same semantics."""
try
"Async variant of `list_processes`. Same semantics."
def prefetch(self) -> bool: # True if worker accepted the prefetch hook
"""
CR-4: forward the substrate's prefetch signal to the worker process.
Plugin can opt in via PluginInterface.prefetch() to eagerly download
models / warm caches without invoking execute(). Default implementation
is a no-op so silent-pass-through is normal. Errors raised by the
plugin (worker 500) propagate as RuntimeError so callers can distinguish
"plugin can't acquire" from "worker unreachable" (False).
"""
async def prefetch_async(self) -> bool: # True if worker accepted the prefetch hook
"""Async variant of `prefetch`. Same semantics."""
try
"Async variant of `prefetch`. Same semantics."
def reconfigure(
self,
old_config: Optional[Dict[str, Any]], # Previous config snapshot
new_config: Optional[Dict[str, Any]], # Config being applied
) -> bool: # True if worker accepted the reconfigure call
"""
CR-4: forward a reconfigure(old, new) call to the worker process.
The plugin's default reconfigure() body delegates to
reconfigure_with_triggers, which walks RELOAD_TRIGGER metadata on the
plugin's config_class to fire `_release_<trigger>` methods for fields
whose values changed. Plugins not opting into the declarative pattern
land in a silent no-op; the substrate's PluginManager.update_plugin_config
then falls back to initialize(new_config) for the actual state change.
"""
async def reconfigure_async(
self,
old_config: Optional[Dict[str, Any]], # Previous config snapshot
new_config: Optional[Dict[str, Any]], # Config being applied
) -> bool: # True if worker accepted the reconfigure call
"Async variant of `reconfigure`. Same semantics."
def __enter__(self):
"""Enter context manager."""
return self
def __exit__(self, exc_type, exc_val, exc_tb)
"Enter context manager."
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit context manager and cleanup."""
self.cleanup()
return False
async def __aenter__(self)
"Exit context manager and cleanup."
async def __aenter__(self):
"""Enter async context manager."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb)
"Enter async context manager."
async def __aexit__(self, exc_type, exc_val, exc_tb)
"Exit async context manager and cleanup."
Classes
class RemotePluginProxy:
def __init__(
self,
manifest:Dict[str, Any], # Plugin manifest with python_path, module, class, etc.
extra_env:Optional[Dict[str, str]]=None # CR-12: resolved worker-env overlay (secrets + visible overrides) injected at spawn
)
"Proxy that forwards plugin calls to an isolated Worker subprocess."
def __init__(
self,
manifest:Dict[str, Any], # Plugin manifest with python_path, module, class, etc.
extra_env:Optional[Dict[str, str]]=None # CR-12: resolved worker-env overlay (secrets + visible overrides) injected at spawn
)
"Initialize proxy and start the worker process."
def name(self) -> str: # Plugin name from manifest
"""Plugin name."""
return self.manifest.get('name', 'unknown')
@property
def version(self) -> str: # Plugin version from manifest
"Plugin name."
def version(self) -> str: # Plugin version from manifest
"""Plugin version."""
return self.manifest.get('version', '0.0.0')
def _bind_listen_socket(self) -> Tuple[socket.socket, int]
"Plugin version."
def initialize(
self,
config:Optional[Dict[str, Any]]=None # Configuration dictionary
) -> None
"Initialize or reconfigure the plugin."
def execute(
self,
*args,
**kwargs
) -> Any: # Plugin result
"Execute the plugin synchronously.
CR-4: HTTP 409 from the worker is mapped to a typed
`PluginCancelledError` raised in the host process, so substrate /
JobQueue / consumer callers can distinguish cooperative cancellation
from a real plugin failure (500 → RuntimeError as before)."
def get_config_schema(self) -> Dict[str, Any]: # JSON Schema
"""Get the plugin's configuration schema."""
with httpx.Client() as client
"Get the plugin's configuration schema."
def get_current_config(self) -> Dict[str, Any]: # Current config values
"""Get the plugin's current configuration."""
with httpx.Client() as client
"Get the plugin's current configuration."
def config_options(self) -> Dict[str, Any]: # CR-11: live config option domains
"""Get the plugin's runtime config option providers (CR-11).
Returns the worker's get_config_options() output (FieldOptions per
dynamic field, JSON-serialized to dicts). Empty dict when the plugin
exposes no dynamic options.
"""
with httpx.Client() as client
"Get the plugin's runtime config option providers (CR-11).
Returns the worker's get_config_options() output (FieldOptions per
dynamic field, JSON-serialized to dicts). Empty dict when the plugin
exposes no dynamic options."
def cleanup(self) -> None:
"""Clean up plugin resources and terminate worker process."""
# Send cleanup request to worker
try
"Clean up plugin resources and terminate worker process."
Job Queue (queue.ipynb)
Resource-aware job queue for sequential plugin execution with cancellation support
Import
from cjm_plugin_system.core.queue import (
JobStatus,
JobEventType,
CancelPhase,
Job,
JobEvent,
QueueStats,
SequenceStep,
StepResult,
JobSequence,
ResourceSnapshot,
JobQueueDependencies,
JobQueue
)
Functions
async def _enqueue_job(
self,
job: Job, # Pre-constructed Job (caller fills sequence fields if needed)
) -> str: # job_id
"""
Internal: enqueue a pre-constructed Job.
Caller is responsible for validation (disabled-plugin check, etc.).
Used by `submit` (Stage 1) and `submit_sequence` (Stage 2) — the latter
needs to populate `sequence_id` + `sequence_index` on the Job before
enqueueing so the running job appears in sequence-tagged event streams
from its first STATE_TRANSITION.
"""
async def submit(
self,
plugin_instance_id: str, # Target plugin instance (per CR-10)
*args,
priority: int = 0, # Higher = more urgent
**kwargs
) -> str: # Returns job_id
"""
Submit a job to the queue.
CR-2: rejects jobs for disabled plugins at submit time (typed
PluginDisabledError) so the failure surface matches PluginManager.
execute_plugin's disabled gate. Submitting to a disabled plugin would
otherwise sit in the queue until execution, then raise — moving the
check earlier gives operators an actionable signal immediately.
CR-6: no STATE_TRANSITION event fires at submit because the job is
already in `pending` state at construction — there's no transition
to publish. The first STATE_TRANSITION fires when the processor loop
moves the job pending → running.
"""
async def cancel(
self,
job_id: str # Job to cancel
) -> bool: # True if cancelled
"""
Cancel a pending or running job.
CR-6: publishes STATE_TRANSITION when a pending job moves directly to
cancelled (no transition through running). Running-job cancellation
publishes CANCEL_PHASE_CHANGED events from `_execute_with_cancellation`
when Stage 4 wires the phase transitions; Stage 1 just records the
cancel-request marker.
Stage 2: when the cancelled job is a sequence member, `_advance_sequence`
runs after the lock is released so the cancelled status propagates to
the sequence registry (marks sequence cancelled, no further submission).
Lock release is required because `_advance_sequence` may need to enqueue
a next member in some flows; asyncio.Lock is not re-entrant.
"""
def reorder(
self,
job_id: str, # Job to move
new_priority: int # New priority value
) -> bool: # True if reordered
"Change the priority of a pending job."
def get_job(
self,
job_id: str # Job to retrieve
) -> Optional[Job]: # Job or None
"Get a job by ID."
async def wait_for_job(
self,
job_id: str, # Job to wait for
timeout: Optional[float] = None # Max seconds to wait
) -> Job: # Completed/failed/cancelled job
"""
Wait for a job to complete.
Independent of the CR-6 event bus — uses a per-job `asyncio.Event` for
the simple block-until-done affordance. Streaming consumers should use
`events(job_id)` instead.
"""
def get_pending(self) -> List[Job]: # Pending jobs, priority-sorted
"""Get pending jobs, priority-sorted (higher priority first, then FIFO)."""
return sorted(self._pending)
def get_running(self) -> Optional[Job]: # Running job or None
"Get pending jobs, priority-sorted (higher priority first, then FIFO)."
def get_running(self) -> Optional[Job]: # Running job or None
"""Get the currently-executing job, or None if idle."""
return self._running
def get_history(
self,
limit: Optional[int] = None, # Max jobs to return (most recent N); None = all
) -> List[Job]: # Completed/failed/cancelled jobs, most recent first
"Get the currently-executing job, or None if idle."
def get_history(
self,
limit: Optional[int] = None, # Max jobs to return (most recent N); None = all
) -> List[Job]: # Completed/failed/cancelled jobs, most recent first
"""
Get completed jobs, most recent first.
If `limit` is provided, returns the most recent N. The internal history
list grows append-only up to `max_history`, so older jobs are evicted
in submission order (oldest first).
"""
def get_stats(self) -> QueueStats: # Aggregate counts
"""Get aggregate queue stats — total counts by terminal status."""
return QueueStats(
total_pending=len(self._pending),
total_completed=sum(1 for j in self._history if j.status == JobStatus.completed),
total_failed=sum(1 for j in self._history if j.status == JobStatus.failed),
total_cancelled=sum(1 for j in self._history if j.status == JobStatus.cancelled),
)
# CR-6 Stage 3: log-slicing helpers. Worker log format from `worker.py`
"Get aggregate queue stats — total counts by terminal status."
def _parse_log_timestamp(line: str) -> Optional[datetime]:
"""Parse the leading timestamp from a worker log line.
Returns naive datetime (no tzinfo) in local time — caller is responsible
for tz alignment with the job's UTC timestamps. Returns None for
continuation lines (no parseable leading timestamp), blank lines, etc.
"""
m = _LOG_TS_PATTERN.match(line)
if not m
"""
Parse the leading timestamp from a worker log line.
Returns naive datetime (no tzinfo) in local time — caller is responsible
for tz alignment with the job's UTC timestamps. Returns None for
continuation lines (no parseable leading timestamp), blank lines, etc.
"""
def _slice_log_by_job_window(
raw: str, # Full log content
started_at: datetime, # Job's UTC start time
completed_at: Optional[datetime], # Job's UTC end time (None if still running)
max_lines: int, # Max lines to return
) -> str: # Sliced log content
"""
Slice log content by a job's execution window.
Worker logs are local-time naive; job timestamps are UTC. We convert
the job's UTC times to local-time-naive for comparison. Continuation
lines (no parseable timestamp) are associated with the most recent
timestamped line — included if that timestamp was in the window.
Best-effort: if `started_at` is None (job never ran) or no lines have
parseable timestamps, returns the raw tail unchanged.
"""
def get_job_logs(
self,
job_id: str, # Job to get logs for
lines: int = 100 # Max lines to return
) -> str: # Log content scoped to this job's execution window
"""
Get logs for a job, scoped to its (started_at, completed_at) window.
CR-6 Stage 3: replaces the legacy whole-plugin-log behavior. The
substrate over-fetches the plugin log (lines * 5) and slices by the
job's execution window — the job-monitor library's `--- Starting` marker
heuristic in `_filter_current_session` becomes obsolete in cascade.
Falls back gracefully when timestamps are unparseable or the job's
window isn't known: returns the raw tail.
"""
def _subscriber_keys_for(event: JobEvent) -> List[str]:
"""Return the subscriber keys an event should fan out to (CR-6).
Every event reaches "all" subscribers + the per-job subscribers.
Sequence-tagged events additionally reach per-sequence subscribers.
"""
keys = ["all", f"job:{event.job_id}"]
if event.sequence_id is not None
"""
Return the subscriber keys an event should fan out to (CR-6).
Every event reaches "all" subscribers + the per-job subscribers.
Sequence-tagged events additionally reach per-sequence subscribers.
"""
def _publish_event(
self,
event: JobEvent, # Event to fan out
) -> None
"""
Fan out an event to all matching subscribers (CR-6).
Slow subscribers backpressure themselves via `asyncio.QueueFull` drop —
publisher never blocks. Each subscriber tracks `dropped_count` so
operators / future telemetry can surface backpressure visibility.
"""
async def _subscribe(
self,
key: str, # Subscriber key ("all" | "job:<id>" | "seq:<id>")
) -> AsyncIterator[JobEvent]
"""
Internal: register a subscription; yield events until the consumer
exits the async generator.
Cleanup runs in `finally` so the subscriber is unregistered even if the
consumer raises or is cancelled. Empty key lists are deleted to avoid
memory accumulation across many short-lived subscriptions.
"""
async def events(
self,
job_id: str, # Filter to events for this job
) -> AsyncIterator[JobEvent]
"""
Subscribe to events for a single job (async generator).
Yields events as they fire. Multiple concurrent subscribers to the same
job_id each get their own independent stream — useful for multi-tab UIs.
"""
async def events_for_sequence(
self,
sequence_id: str, # Filter to events tagged with this sequence
) -> AsyncIterator[JobEvent]
"""
Subscribe to events for all jobs in a sequence (async generator).
Yields the unified per-sequence narrative: member-job lifecycle events
interleaved with `SEQUENCE_ADVANCED` aggregate signals (wired in Stage 2).
"""
async def all_events(self) -> AsyncIterator[JobEvent]:
"""Subscribe to all events (firehose; async generator).
Useful for global dashboards, audit logs, and telemetry sinks that need
the complete event stream rather than a filtered view.
"""
async for evt in self._subscribe("all")
"""
Subscribe to all events (firehose; async generator).
Useful for global dashboards, audit logs, and telemetry sinks that need
the complete event stream rather than a filtered view.
"""
async def submit_sequence(
self,
steps: List[SequenceStep], # Sequence steps to run in order
priority: int = 0, # Sequence-level priority (per-step override possible)
fail_fast: bool = True, # Halt on first failure (audit-locked default)
) -> str: # sequence_id
"""
Submit a multi-step job sequence (CR-6 Stage 2).
Validates all step plugins upfront (`PluginDisabledError` if any are
disabled) so operators get an immediate failure signal rather than
discovering the problem mid-sequence. The first member-job is tagged
with sequence fields BEFORE enqueue, so its first STATE_TRANSITION
already routes through sequence-tagged subscribers.
Returns the sequence_id; consumers subscribe via
`queue.events_for_sequence(sequence_id)` or query state via
`queue.get_sequence(sequence_id)`.
"""
async def submit_uniform_sequence(
self,
plugin_instance_id: str, # Plugin every step targets
args_list: List[Tuple[Any, ...]], # Per-step positional args
kwargs_list: Optional[List[Dict[str, Any]]] = None, # Per-step keyword args (default: empty)
priority: int = 0, # Sequence-level priority
fail_fast: bool = True, # Halt on first failure (audit-locked default)
) -> str: # sequence_id
"""
Submit a sequence where every step targets the same plugin (CR-6 Stage 2).
Sugar over `submit_sequence` for the common "same plugin, many arg sets"
case — multi-source transcription / batch processing / parameter sweeps.
"""
async def cancel_sequence(
self,
sequence_id: str # Sequence to cancel
) -> bool: # True if cancellation was recorded
"""
Cancel an in-flight sequence (CR-6 Stage 2).
Marks the sequence as cancelled (no further advancement) and cancels
the current member job. Returns False if the sequence is unknown or
already terminal; True if cancellation was recorded.
Sets `seq.status = JobStatus.cancelled` BEFORE issuing the member-job
cancel. This prevents a race where the member completes between the
intent record and the cancel call — `_advance_sequence` checks
`seq.status == cancelled` first and halts regardless of member outcome.
"""
def get_sequence(
self,
sequence_id: str # Sequence to retrieve
) -> Optional[JobSequence]: # JobSequence or None
"Get a job sequence by ID (read-only inspection)."
async def _advance_sequence(
self,
completed_job: Job # The member job that just reached terminal status
) -> None
"""
Advance the sequence after a member job completes (CR-6 Stage 2).
Records the member's StepResult, decides whether to advance / halt /
finalize, and submits the next member if applicable. Always called
OUTSIDE the queue's main lock — may take it via `_enqueue_job`.
"""
def _sample_resource_snapshot(
self,
job: Job # Job to sample resources for
) -> Optional[ResourceSnapshot]
"""
Sample worker + sysmon stats for a job (CR-6 Stage 3 internal helper).
Returns None if the worker proxy doesn't support `get_stats` or the call
fails — substrate can't fabricate a snapshot. Sysmon enrichment is
best-effort: if the named plugin isn't loaded / errors / lacks the CR-3
typed methods, GPU fields stay None and the worker-only snapshot is
returned.
"""
def get_resource_snapshot(
self,
job_id: str # Job to sample resources for
) -> Optional[ResourceSnapshot]
"""
Get a point-in-time resource snapshot for a job (CR-6 Stage 3).
Returns None if the job is unknown or the worker proxy doesn't expose
`get_stats`. Composes worker stats with sysmon GPU stats when the queue
is configured with a `sysmon_plugin_name`.
"""
async def start(self) -> None:
"""Start the queue processor.
CR-6 Stage 4: installs the substrate-side retry observer on `_deps`
(typically a PluginManager). When CR-7's reactive-retry path fires for
a running job, the observer updates `Job.retry_count` and publishes a
RETRY_STARTED event tagged with the in-flight job. Previous observer
value (if any) is saved + restored in stop() for cooperative coexistence.
"""
if self._running_flag
"""
Start the queue processor.
CR-6 Stage 4: installs the substrate-side retry observer on `_deps`
(typically a PluginManager). When CR-7's reactive-retry path fires for
a running job, the observer updates `Job.retry_count` and publishes a
RETRY_STARTED event tagged with the in-flight job. Previous observer
value (if any) is saved + restored in stop() for cooperative coexistence.
"""
async def stop(self) -> None:
"""Stop the queue processor gracefully.
CR-6 Stage 4: restores the previous `_on_retry` observer on deps to
leave the manager in the state we found it (cooperative with other
queue instances, tests, etc.).
"""
self._running_flag = False
self._job_available.set() # Wake up the processor
if self._processor_task
"""
Stop the queue processor gracefully.
CR-6 Stage 4: restores the previous `_on_retry` observer on deps to
leave the manager in the state we found it (cooperative with other
queue instances, tests, etc.).
"""
def _on_manager_retry(
"""
Substrate-side retry observer (CR-6 Stage 4).
Invoked synchronously by PluginManager's CR-7 retry loop just before
each retry attempt. Updates `Job.retry_count` on the matching in-flight
job + emits RETRY_STARTED. Best-effort: synchronous callback; emission
failure shouldn't propagate back into the retry loop.
`attempt` semantics: PluginManager's loop iterates
`for attempt in range(max_retries + 1)`. The first iteration
(`attempt=0`) is the original try and never invokes this callback —
it only fires when `last_resource_error is not None`, which means
the prior iteration raised. So the value PASSED here is already the
1-based retry number: `attempt=1` is the first retry, `attempt=2` is
the second retry, etc.
Match logic: find the running job whose plugin_instance_id matches.
Multi-instance / concurrent execution complicates this — the single
`self._running` field tracks one job at a time, matching the queue's
current single-worker-execution model. Future concurrent-execution
support would extend the match strategy.
"""
def _move_to_history(self, job: Job) -> None:
"""Move a job to history, maintaining max_history limit."""
self._history.append(job)
if len(self._history) > self.max_history
"Move a job to history, maintaining max_history limit."
def _signal_job_completed(self, job_id: str) -> None:
"""Signal that a job has completed."""
event = self._job_completed_events.get(job_id)
if event
"Signal that a job has completed."
def _emit_state_transition(
self,
job: Job,
prev_status: JobStatus,
) -> None
"""
Emit a STATE_TRANSITION event for `job`'s most recent status change.
Centralized so every transition site (start, completed, failed, cancelled,
or future cancel-phase-driven transitions) carries identical tag context.
"""
def _emit_cancel_phase(
self,
job: Job,
new_phase: CancelPhase,
) -> None
"""
Emit a CANCEL_PHASE_CHANGED event (CR-6 Stage 4).
Updates `job.cancel_phase` to the new value and publishes an event with
the prior phase + new phase in the payload. Centralized so every phase
transition site in `_execute_with_cancellation` produces identically-shaped
events.
"""
def _emit_block_reason(
self,
job: Job,
new_reason: Optional[str],
) -> None
"""
Emit a BLOCK_REASON_CHANGED event (CR-6 Stage 4 — reserved).
Updates `job.block_reason` and publishes an event with the prior + new
reason. Stage 4 ships this helper for future scheduler-integration use;
the queue's current scheduling logic doesn't surface block reasons, so
the helper is reserved for the eventual scheduler-coordination wiring.
"""
async def _process_loop(self) -> None:
"""Main processing loop."""
while self._running_flag
"Main processing loop."
async def _execute_job(self, job: Job) -> None:
"""Execute a single job."""
self.logger.info(f"Starting job {job.id[:8]} ({job.plugin_instance_id})")
# Mark as running + emit transition pending → running
prev_status = job.status
job.status = JobStatus.running
job.started_at = datetime.now(timezone.utc)
self._running = job
self._emit_state_transition(job, prev_status)
try
"Execute a single job."
async def _execute_with_cancellation(
self,
job: Job,
plugin: Any
) -> Any
"""
Execute job with cancellation monitoring.
CR-6 Stage 4 wires CANCEL_PHASE_CHANGED events for the substrate's
cooperative → force → reloading → completed state machine.
Cooperative-success path (plugin acknowledges cancel within timeout):
COOPERATIVE → COMPLETED
Force-kill path (cooperative timeout):
COOPERATIVE → FORCE → RELOADING → COMPLETED
"""
async def _poll_progress(
self,
job: Job,
plugin: Any
) -> None
"""
Poll progress + sample resources from the plugin during execution.
Emits PROGRESS_CHANGED events when progress or status_message changes
from the previous poll (avoids spamming the bus between meaningful
updates). CR-6 Stage 3: also emits RESOURCE_SNAPSHOT events every
`resource_snapshot_cadence_polls` iterations; snapshot is also stored
on `job.last_resource_snapshot` for synchronous inspection.
"""
def get_state(self) -> Dict[str, Any]: # Queue state for UI (legacy shape)
"""Get the current queue state in the pre-CR-6 dict shape.
REMOVE-AFTER-OVERHAUL: kept so existing UI consumers (notably
`cjm-fasthtml-job-monitor`) continue to work until they migrate to the
typed `get_pending` / `get_running` / `get_history` / `get_stats`
accessors. Reads from the new accessors and reshapes:
- Legacy keys (`plugin_name`, `created_at`) read from the renamed fields
via the Job dataclass's backward-compat `@property` aliases.
- Legacy `error` was a bare string; new is `JobError`. Shim returns
`error.message` to preserve the string shape.
- Legacy timestamps were unix floats; new are `datetime`. Shim returns
`.timestamp()` to preserve float shape.
"""
running = self.get_running()
running_dict = None
if running
"""
Get the current queue state in the pre-CR-6 dict shape.
REMOVE-AFTER-OVERHAUL: kept so existing UI consumers (notably
`cjm-fasthtml-job-monitor`) continue to work until they migrate to the
typed `get_pending` / `get_running` / `get_history` / `get_stats`
accessors. Reads from the new accessors and reshapes:
- Legacy keys (`plugin_name`, `created_at`) read from the renamed fields
via the Job dataclass's backward-compat `@property` aliases.
- Legacy `error` was a bare string; new is `JobError`. Shim returns
`error.message` to preserve the string shape.
- Legacy timestamps were unix floats; new are `datetime`. Shim returns
`.timestamp()` to preserve float shape.
"""
Classes
class JobStatus(str, Enum):
"Status of a job in the queue."
class JobEventType(str, Enum):
"""
Push-based job event types (CR-6).
Emitted by JobQueue on a multi-subscriber event bus. Consumers subscribe
via `queue.events(job_id)` / `queue.events_for_sequence(seq_id)` /
`queue.all_events()` and receive `JobEvent` instances asynchronously.
Stage 1 wires STATE_TRANSITION + PROGRESS_CHANGED at the existing
execute-path lifecycle points. The remaining types are reserved for
Stages 2-4 (sequences / resources + logs / cancel-phase + retry +
block-reason). Reserving the enum values up front keeps later stages
additive — new event sources publish without changing the enum.
"""
class CancelPhase(str, Enum):
"""
Phase of a cancellation in progress (CR-6 + CR-4 pairing).
Surfaces the substrate's cancel state machine. Stage 4 wires the
transitions; Stage 1 reserves the enum so `Job.cancel_phase` can be
typed correctly without dangling forward references.
"""
@runtime_checkable
class JobQueueDependencies(Protocol):
"""
Substrate dependencies the JobQueue requires (CR-6).
PluginManager satisfies this structurally; the Protocol exists so JobQueue
can be tested in isolation (with a lightweight test double) and so a future
extraction into a separate library has no API constraint locked in.
Method surface chosen empirically — these are the exact 5 methods the
queue's execute path calls today. Adding new dependencies to the Protocol
(e.g., a typed system-monitor reader when Stage 3 wires resource snapshots)
is an additive change.
"""
def get_plugin_meta(self, name_or_id: str) -> Optional[Any]: ...
def get_plugin(self, name_or_id: str) -> Optional[Any]: ...
def get_plugin(self, name_or_id: str) -> Optional[Any]: ...
async def execute_plugin_async(self, name_or_id: str, *args: Any, **kwargs: Any) -> Any: ...
async def execute_plugin_async(self, name_or_id: str, *args: Any, **kwargs: Any) -> Any: ...
def reload_plugin(self, name_or_id: str) -> Any: ...
def reload_plugin(self, name_or_id: str) -> Any: ...
def get_plugin_logs(self, plugin_name: str, lines: int = 50) -> str: ...
def get_plugin_logs(self, plugin_name: str, lines: int = 50) -> str: ...
@dataclass
class Job:
"""
A queued plugin execution request (CR-6 reshape).
Stage 1 lands the full field set so subsequent stages are purely additive
on the population paths. Fields tagged "Stage N" are typed but unpopulated
by Stage 1's execute path; their default values keep the dataclass safely
constructible from existing call sites.
"""
id: str # Unique job identifier (UUID)
plugin_instance_id: str # Target plugin instance (per CR-10)
args: Tuple[Any, ...] # Positional arguments for execute()
kwargs: Dict[str, Any] # Keyword arguments for execute()
status: JobStatus = JobStatus.pending # Current job status
priority: int = 0 # Higher = more urgent
submitted_at: datetime = field(...) # When submitted
started_at: Optional[datetime] # When execution started
completed_at: Optional[datetime] # When execution finished
progress: float = 0.0 # 0.0 to 1.0, or -1.0 for indeterminate
status_message: str = '' # Descriptive status message
result: Any # Execution result (if completed)
error: Optional[JobError] # Structured failure summary (CR-5)
sequence_id: Optional[str] # Set when part of a sequence (Stage 2)
sequence_index: Optional[int] # 0-based position in sequence (Stage 2)
cancel_requested_at: Optional[datetime] # When cancel was requested (Stage 4)
cancel_phase: Optional[CancelPhase] # Active cancel phase (Stage 4)
block_reason: Optional[str] # Why the scheduler is blocking (Stage 4)
retry_count: int = 0 # Reactive retries attempted (CR-7 + Stage 4)
last_resource_snapshot: Optional[Any] # Stage 3 wires this (ResourceSnapshot)
def plugin_name(self) -> str:
return self.plugin_instance_id
@property
def created_at(self) -> float
def created_at(self) -> float:
return self.submitted_at.timestamp()
@dataclass
class JobEvent
@dataclass
class JobEvent:
"""
A push-based job event (CR-6).
Carries full tag context so a subscriber to `all_events()`, `events(job_id)`,
or `events_for_sequence(seq_id)` receives identically-shaped instances.
`payload` is a per-event-type structured dict (e.g., STATE_TRANSITION carries
`{"from": "pending", "to": "running"}`; PROGRESS_CHANGED carries
`{"progress": 0.42, "status_message": "..."}`).
"""
type: JobEventType
job_id: str
plugin_instance_id: str
sequence_id: Optional[str]
sequence_index: Optional[int]
timestamp: datetime = field(...)
payload: Dict[str, Any] = field(...)
@dataclass
class QueueStats:
"Aggregate counts returned by `JobQueue.get_stats()` (CR-6)."
total_pending: int
total_completed: int
total_failed: int
total_cancelled: int
@dataclass
class _Subscription:
"""
Internal: per-subscriber bounded queue + drop counter (CR-6 event bus).
Slow subscribers backpressure themselves via `asyncio.QueueFull` drop;
the publisher never blocks. `dropped_count` surfaces visibility for
operators / future telemetry.
"""
queue: 'asyncio.Queue[JobEvent]'
dropped_count: int = 0
@dataclass
class SequenceStep:
"""
A single step in a job sequence (CR-6 Stage 2).
User-facing — supplied to `JobQueue.submit_sequence`. The substrate
constructs the actual `Job` from this at advance time, tagging the
job with `sequence_id` + `sequence_index` so it appears in
`events_for_sequence` subscriptions.
"""
plugin_instance_id: str # Target plugin instance for this step
args: Tuple[Any, ...] = () # Positional arguments
kwargs: Dict[str, Any] = field(...) # Keyword arguments
priority: int = 0 # Per-step priority override (0 = inherit sequence priority)
@dataclass
class StepResult:
"""
Result of one step in a job sequence (CR-6 Stage 2).
`success=True` means the member-job completed with `JobStatus.completed`.
Failure modes (failed / cancelled) capture the `JobError` in `error`.
For best-effort sequences (`fail_fast=False`), failure rows are recorded
in `JobSequence.results` and the sequence continues.
"""
job_id: str # Member job's UUID (empty string if sequence aborted before submit)
success: bool # True if the step's job completed successfully
result: Any # Job result (if success)
error: Optional[JobError] # Structured failure summary (if not success)
@dataclass
class JobSequence:
"""
Internal: tracks a multi-step job sequence (CR-6 Stage 2).
Lives in `JobQueue._sequences`. State machine: starts `running`,
transitions to `completed` (all steps attempted) / `failed` (fail_fast
halted on a failed member) / `cancelled` (cancel_sequence called or
a member was cancelled). Once terminal, `completed_at` is set and
`current_job_id` is cleared.
"""
id: str # Sequence UUID
steps: List[SequenceStep] # The full step list (immutable post-submit)
fail_fast: bool = True # Halt on first failure (audit-locked default)
priority: int = 0 # Sequence-level priority (per-step override possible)
current_index: int = 0 # 0-based index of the active member step
current_job_id: Optional[str] # The active member-job's ID, if any
results: List[StepResult] = field(...) # Per-step outcomes
status: JobStatus = JobStatus.running # Sequence-level status (reuses JobStatus)
submitted_at: datetime = field(...)
completed_at: Optional[datetime] # Set when sequence reaches terminal status
@dataclass
class ResourceSnapshot:
"""
Point-in-time resource usage for one job (CR-6 Stage 3).
Worker stats (cpu_percent, memory_rss_mb) come from the plugin proxy's
`get_stats()`. GPU fields come from the configured system-monitor plugin
(when set on JobQueue) via CR-3's typed `list_processes()` (per-PID
matching) and `get_system_status()` (global GPU stats). All GPU fields
are Optional — None if no sysmon configured or the worker isn't running
on a GPU.
Distinct from CR-7's `EmpiricalResourceRecord` (aggregated profile
across runs); this is "what's happening right now" for one job.
"""
timestamp: datetime # When the sample was taken
worker_pid: int = 0 # OS PID of the plugin worker subprocess
cpu_percent: float = 0.0 # Worker process CPU%
memory_rss_mb: float = 0.0 # Worker process resident memory (MB)
gpu_index: Optional[int] # GPU index the worker is on (None if not GPU-bound)
gpu_memory_mb: Optional[float] # Worker's GPU memory usage (MB)
gpu_type: Optional[str] # GPU vendor (NVIDIA / AMD / Intel / None)
gpu_total_mb: Optional[float] # Total GPU memory available globally (MB)
gpu_load_percent: Optional[float] # GPU compute utilization (global)
class JobQueue:
def __init__(
self,
deps: JobQueueDependencies, # Substrate dependencies (PluginManager satisfies structurally)
max_history: int = 100, # Max completed jobs to retain
cancel_timeout: float = 3.0, # Seconds to wait for cooperative cancel
progress_poll_interval: float = 1.0, # Seconds between progress polls
sysmon_plugin_name: Optional[str] = None, # CR-3 MonitorPlugin instance for GPU stats (None = no GPU info)
resource_snapshot_cadence_polls: int = 4, # Sample resources every Nth progress poll
)
"Resource-aware job queue with push-based observability (CR-6)."
def __init__(
self,
deps: JobQueueDependencies, # Substrate dependencies (PluginManager satisfies structurally)
max_history: int = 100, # Max completed jobs to retain
cancel_timeout: float = 3.0, # Seconds to wait for cooperative cancel
progress_poll_interval: float = 1.0, # Seconds between progress polls
sysmon_plugin_name: Optional[str] = None, # CR-3 MonitorPlugin instance for GPU stats (None = no GPU info)
resource_snapshot_cadence_polls: int = 4, # Sample resources every Nth progress poll
)
"Initialize the job queue.
CR-6 Stage 3 adds `sysmon_plugin_name` and `resource_snapshot_cadence_polls`.
Sysmon integration is optional — when set, the named plugin must satisfy
CR-3's typed `MonitorPlugin` shape (`get_system_status` + `list_processes`).
When unset, ResourceSnapshot still carries worker stats but all GPU
fields stay None."
def manager(self) -> JobQueueDependencies
Variables
_LOG_TS_PATTERN
Scheduling (scheduling.ipynb)
Resource scheduling policies for plugin execution
Import
from cjm_plugin_system.core.scheduling import (
ResourceScheduler,
PermissiveScheduler,
SafetyScheduler,
QueueScheduler
)
Classes
class ResourceScheduler(ABC):
"Abstract base class for resource allocation policies."
def allocate(
self,
plugin_meta: PluginMeta, # Metadata of the plugin requesting resources
stats_provider: Callable[[], Dict[str, Any]] # Function that returns fresh stats
) -> bool: # True if execution is allowed
"Decide if a plugin can start based on its requirements and system state."
async def allocate_async(
self,
plugin_meta: PluginMeta, # Metadata of the plugin requesting resources
stats_provider: Callable[[], Awaitable[Dict[str, Any]]] # Async function returning stats
) -> bool: # True if execution is allowed
"Async allocation decision. Default delegates to sync allocate after fetching stats once."
def on_execution_start(
self,
plugin_name: str # Name of the plugin starting execution
) -> None
"Notify scheduler that a task started (to reserve resources)."
def on_execution_finish(
self,
plugin_name: str # Name of the plugin finishing execution
) -> None
"Notify scheduler that a task finished (to release resources)."
class PermissiveScheduler(ResourceScheduler):
"Scheduler that allows all executions (Default / Dev Mode)."
def allocate(
self,
plugin_meta: PluginMeta, # Metadata of the plugin requesting resources
stats_provider: Callable[[], Dict[str, Any]] # Stats provider (ignored)
) -> bool: # Always returns True
"Allow all plugin executions without checking resources."
def on_execution_start(
self,
plugin_name: str # Name of the plugin starting execution
) -> None
"No-op for permissive scheduler."
def on_execution_finish(
self,
plugin_name: str # Name of the plugin finishing execution
) -> None
"No-op for permissive scheduler."
class SafetyScheduler(ResourceScheduler):
"Scheduler that prevents execution if resources are insufficient."
def allocate(
self,
plugin_meta: PluginMeta, # Metadata of the plugin requesting resources
stats_provider: Callable[[], Dict[str, Any]] # Function returning current stats
) -> bool: # True if resources are available
"Check resource requirements against system state."
def on_execution_start(
self,
plugin_name: str # Name of the plugin starting execution
) -> None
"Called when execution starts (for future resource reservation)."
def on_execution_finish(
self,
plugin_name: str # Name of the plugin finishing execution
) -> None
"Called when execution finishes (for future resource release)."
class QueueScheduler:
def __init__(
self,
timeout: float = 300.0, # Max seconds to wait for resources
poll_interval: float = 2.0 # Seconds between resource checks
)
"Scheduler that waits for resources to become available."
def __init__(
self,
timeout: float = 300.0, # Max seconds to wait for resources
poll_interval: float = 2.0 # Seconds between resource checks
)
"Initialize queue scheduler with timeout and polling settings."
def allocate(
self,
plugin_meta: PluginMeta, # Metadata of the plugin requesting resources
stats_provider: Callable[[], Dict[str, Any]] # Function returning current stats
) -> bool: # True if resources become available before timeout
"Wait for resources using blocking sleep."
async def allocate_async(
self,
plugin_meta: PluginMeta, # Metadata of the plugin requesting resources
stats_provider: Callable[[], Awaitable[Dict[str, Any]]] # Async stats function
) -> bool: # True if resources become available before timeout
"Wait for resources using non-blocking async sleep."
def on_execution_start(
self,
plugin_name: str # Name of the plugin starting execution
) -> None
"Track that a plugin has started executing."
def on_execution_finish(
self,
plugin_name: str # Name of the plugin finishing execution
) -> None
"Track that a plugin has finished executing."
def get_active_plugins(self) -> Set[str]: # Set of currently executing plugin names
"Get the set of plugins with active executions."
Plugin Secret Store (secret_store.ipynb)
CR-12: project-local secret storage for API-based plugins (file-backed, 0600)
Import
from cjm_plugin_system.core.secret_store import (
SecretStore,
LocalSecretStore
)
Functions
def _default_secrets_dir() -> Path:
"""Default secrets directory: `~/.cjm/secrets` (bootstrap fallback)."""
return Path.home() / ".cjm" / "secrets"
class LocalSecretStore
"Default secrets directory: `~/.cjm/secrets` (bootstrap fallback)."
Classes
@runtime_checkable
class SecretStore(Protocol):
"Protocol for resolving per-plugin secrets (API keys, tokens)."
def get_secret(self, plugin_name: str, key: str, *, scope: Optional[str] = None) -> Optional[str]:
"""Return the secret value for (plugin, key) under `scope`, or None."""
...
def set_secret(self, plugin_name: str, key: str, value: str, *, scope: Optional[str] = None) -> None
"Return the secret value for (plugin, key) under `scope`, or None."
def set_secret(self, plugin_name: str, key: str, value: str, *, scope: Optional[str] = None) -> None:
"""Persist a secret value for (plugin, key) under `scope`."""
...
def delete_secret(self, plugin_name: str, key: str, *, scope: Optional[str] = None) -> bool
"Persist a secret value for (plugin, key) under `scope`."
def delete_secret(self, plugin_name: str, key: str, *, scope: Optional[str] = None) -> bool:
"""Remove (plugin, key) under `scope`. Returns True if a secret was deleted."""
...
def list_keys(self, plugin_name: str, *, scope: Optional[str] = None) -> List[str]
"Remove (plugin, key) under `scope`. Returns True if a secret was deleted."
def list_keys(self, plugin_name: str, *, scope: Optional[str] = None) -> List[str]
"Return the NAMES of secrets stored for a plugin under `scope` — never values."
class LocalSecretStore:
def __init__(
self,
secrets_dir: Optional[Path] = None # Directory for secrets.json; None -> ~/.cjm/secrets
)
"File-backed default `SecretStore` (0600 JSON under `secrets_dir`)."
def __init__(
self,
secrets_dir: Optional[Path] = None # Directory for secrets.json; None -> ~/.cjm/secrets
)
"Initialize the store. `secrets_dir=None` uses `~/.cjm/secrets`."
def get_secret(
self,
plugin_name: str, # Plugin the secret belongs to
key: str, # Secret key (typically the env-var name, e.g. GEMINI_API_KEY)
*,
scope: Optional[str] = None # Reserved multi-user seam; ignored by the local store
) -> Optional[str]: # The secret value, or None if absent
"Resolve a secret value."
def set_secret(
self,
plugin_name: str, # Plugin the secret belongs to
key: str, # Secret key
value: str, # Secret value (stored plaintext at 0600)
*,
scope: Optional[str] = None # Reserved multi-user seam
) -> None
"Persist a secret value."
def delete_secret(
self,
plugin_name: str, # Plugin the secret belongs to
key: str, # Secret key
*,
scope: Optional[str] = None # Reserved multi-user seam
) -> bool: # True if a secret was removed
"Remove a secret, pruning now-empty plugin/scope containers."
def list_keys(
self,
plugin_name: str, # Plugin to list secrets for
*,
scope: Optional[str] = None # Reserved multi-user seam
) -> List[str]: # Secret key NAMES (never values)
"Return the names of secrets stored for a plugin (never the values)."
Variables
_SECRETS_FILENAME = 'secrets.json'
_DEFAULT_SCOPE = '__default__'
Configuration Validation (validation.ipynb)
Validation helpers for plugin configuration dataclasses
Import
from cjm_plugin_system.utils.validation import (
T,
SCHEMA_TITLE,
SCHEMA_DESC,
SCHEMA_MIN,
SCHEMA_MAX,
SCHEMA_ENUM,
SCHEMA_MIN_LEN,
SCHEMA_MAX_LEN,
SCHEMA_PATTERN,
SCHEMA_FORMAT,
validate_field_value,
validate_config,
config_to_dict,
dict_to_config,
extract_defaults,
dataclass_to_jsonschema
)
Functions
def validate_field_value(
value:Any, # Value to validate
metadata:Dict[str, Any], # Field metadata containing constraints
field_name:str="" # Field name for error messages
) -> Tuple[bool, Optional[str]]: # (is_valid, error_message)
"Validate a value against field metadata constraints."
def validate_config(
config:Any # Configuration dataclass instance to validate
) -> Tuple[bool, Optional[str]]: # (is_valid, error_message)
"Validate all fields in a configuration dataclass against their metadata constraints."
def config_to_dict(
config:Any # Configuration dataclass instance
) -> Dict[str, Any]: # Dictionary representation of the configuration
"Convert a configuration dataclass instance to a dictionary."
def dict_to_config(
config_class:Type[T], # Configuration dataclass type
data:Optional[Dict[str, Any]]=None, # Dictionary with configuration values
validate:bool=False, # Whether to validate against metadata constraints
strict:bool=True # SG-8: reject unknown keys (default); set False to log+filter for forward-compat
) -> T: # Instance of the configuration dataclass
"""
Create a configuration dataclass instance from a dictionary.
SG-8: by default, unknown keys raise `PluginConfigError`. The previous
behavior (silently filter unknowns) is available via `strict=False`,
which logs a warning so the drift is still visible in operator logs.
"""
def extract_defaults(
config_class:Type # Configuration dataclass type
) -> Dict[str, Any]: # Default values from the dataclass
"Extract default values from a configuration dataclass type."
def _python_type_to_json_type(
python_type:type # Python type annotation to convert
) -> Dict[str, Any]: # JSON schema type definition
"Convert Python type to JSON schema type."
def dataclass_to_jsonschema(
cls:type # Dataclass with field metadata
) -> Dict[str, Any]: # JSON schema dictionary
"Convert a dataclass to a JSON schema for form generation."
Variables
T
SCHEMA_TITLE = 'title' # Display title for the field
SCHEMA_DESC = 'description' # Help text description
SCHEMA_MIN = 'minimum' # Minimum value for numbers
SCHEMA_MAX = 'maximum' # Maximum value for numbers
SCHEMA_ENUM = 'enum' # Allowed values for dropdowns
SCHEMA_MIN_LEN = 'minLength' # Minimum string length
SCHEMA_MAX_LEN = 'maxLength' # Maximum string length
SCHEMA_PATTERN = 'pattern' # Regex pattern for strings
SCHEMA_FORMAT = 'format' # String format (email, uri, date, etc.)
Universal Worker (worker.ipynb)
FastAPI server that runs inside isolated plugin environments
Import
from cjm_plugin_system.core.worker import (
EnhancedJSONEncoder,
parent_monitor,
create_app,
run_worker
)
Functions
def parent_monitor(
ppid: int # Parent process ID to monitor
) -> None
"""
Monitor parent process and terminate self if parent dies.
This implements the "Suicide Pact" pattern: if the Host process dies,
the Worker must terminate itself to prevent zombie processes.
"""
def create_app(
module_name: str, # Python module path (e.g., "my_plugin.plugin")
class_name: str # Plugin class name (e.g., "WhisperPlugin")
) -> FastAPI: # Configured FastAPI application
"Create FastAPI app that hosts the specified plugin."
def run_worker() -> None:
"""CLI entry point for running the worker."""
parser = argparse.ArgumentParser(description="Universal Plugin Worker")
parser.add_argument("--module", required=True, help="Plugin module path")
parser.add_argument("--class", dest="class_name", required=True, help="Plugin class name")
# SG-4: parent-bound listening-socket FD inheritance closes the
"CLI entry point for running the worker."
Classes
class EnhancedJSONEncoder(JSONEncoder):
"""
JSON encoder that handles dataclasses and other common types.
SG-52: datetime support added so JobError.occurred_at serializes cleanly
when emitted via the typed `_job_error` terminal chunk in /execute_stream.
"""
def default(
self,
o: Any # Object to encode
) -> Any: # JSON-serializable representation
"Convert non-serializable objects to serializable form."
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cjm_plugin_system-0.0.34.tar.gz.
File metadata
- Download URL: cjm_plugin_system-0.0.34.tar.gz
- Upload date:
- Size: 300.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e9ffb283fd408ca22afa9d808249341a0b1e6d9f3c9ac036206ee72bc74001d9
|
|
| MD5 |
0a08dac3f06241cc6d980d53b216c2a8
|
|
| BLAKE2b-256 |
37c13b2a83b4db10e3bbbecf5120a6cfc0f5a1319b8aab3c041d5aea6b194134
|
File details
Details for the file cjm_plugin_system-0.0.34-py3-none-any.whl.
File metadata
- Download URL: cjm_plugin_system-0.0.34-py3-none-any.whl
- Upload date:
- Size: 192.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e295c121a953d81740cb88fc9ebbac8f076532df0817e95d1ba5e783f5b69a0c
|
|
| MD5 |
e1f6d0d8678fbee6190861852d689881
|
|
| BLAKE2b-256 |
893a15860a1ca29490bcce2201ab80b24c24a39a98d22e33bdd9b51723873b3b
|