A dependency-isolated capability-composition runtime: heterogeneous tools run in their own environments behind a uniform HTTP/JSON boundary; the host composes them into workflows with resource-aware scheduling and threads provenance through a context graph.
Project description
cjm-substrate
Install
pip install cjm_substrate
Project Structure
nbs/
├── core/ (21)
│ ├── adapter.ipynb # The typed-task half of the capability-unit fracture (pass-2 Thread 3) —
│ ├── adapter_manifest.ipynb # The ADAPTER unit's registration manifest + the surface-based compatibility matcher (CR-17 pt 2, stage 4). Pass-2 Thread 3: registration/discovery = per-unit manifests generated in-env and found by `discover_manifests()`; compatibility is DERIVED, not declared — the capability records only its structural surface, the adapter declares its protocol (recorded here as member names + parameter lists), and the substrate matches manifest-vs-manifest. Works against UNLOADED capabilities with zero protocol imports host-side.
│ ├── capability.ipynb # The tool-capability interface — the manage-the-tool half of the capability-unit fracture (pass-2 Thread 3)
│ ├── config.ipynb # Project-level configuration for paths, runtime settings, and environment management
│ ├── config_store.ipynb # Persistent storage for per-capability configuration (with enabled flag)
│ ├── diagnostics_store.ipynb # CR-14 (stage 7): the disposable diagnostic-narrative class. Worker-written structured log records (substrate handler stamps contextvars identity — authors never supply attribution) + the host-pumped raw stream chunks (the zero-cooperation death-rattle floor). Retention is a QUERY, not file mechanics. Design ledger: `claude-docs/stage-7-evidence.md`.
│ ├── empirical_store.ipynb # Persistent store for empirically-observed resource usage per (instance_id, config_hash) pair. CR-7's data foundation — `record_sample` is called from `CapabilityManager.execute_capability*` finally blocks; aggregates feed eviction-candidate selection + future UI hints + cost-aware retry decisions.
│ ├── errors.ipynb # Typed exception hierarchy + JobError dataclass + default classification of bare Python exceptions. The substrate's CR-5 implementation per the 2026-05-19 substrate audit.
│ ├── journal_store.ipynb # CR-14 (stage 7): the durable account-of-action. One substrate-derived, host-written, never-auto-deleted SQLite store of typed observability events — the operational half of the attempted-vs-happened asymmetry (the graph records what HAPPENED; the journal records what was ATTEMPTED, including everything the graph by design refuses to contain: failures, refusals, retries, admission decisions, worker lifecycle). Design ledger: `claude-docs/stage-7-evidence.md`.
│ ├── manager.ipynb # Capability discovery, loading, and lifecycle management system
│ ├── manifest_format.ipynb # Typed parser + writer for the nested v2.0 manifest layout per the 2026-05-19 substrate audit's CR-8. Substrate manifests transitioned from a flat top-level JSON object to a four-section nested layout: `install` (deployment-specific facts populated at install time), `code` (code-derived facts refreshed by `cjm-ctl regenerate-manifest`), `drift_tracking` (a config_schema hash that records the witness shape so live-vs-stored comparisons can detect drift), and `overrides` (an operator-supplied overlay placeholder).
│ ├── metadata.ipynb # Data structures for capability metadata
│ ├── platform.ipynb # Cross-platform utilities for process management, path handling, and system detection
│ ├── ports.ipynb # Capability compositions as DAGs of invocation nodes with typed input/output
│ ├── proxy.ipynb # Bridge between Host application and isolated Worker processes
│ ├── queue.ipynb # Resource-aware job queue for sequential capability execution with cancellation support
│ ├── scheduling.ipynb # Resource scheduling policies for capability execution
│ ├── secret_store.ipynb # CR-12: project-local secret storage for API-based capabilities (file-backed, 0600)
│ ├── telemetry.ipynb # Shared GPU/CPU attribution helpers used by both `JobQueue._sample_resource_snapshot` (CR-6 Stage 3) and `CapabilityManager._record_sample_safe` (CR-7).
│ ├── wire.ipynb # Typed data transfer at the worker boundary — the zero-copy `FileBackedDTO`
│ └── worker.ipynb # FastAPI server that runs inside isolated capability environments
├── utils/ (3)
│ ├── cache_paths.ipynb # Per-(input-content, config) deterministic cache directories for capability outputs
│ ├── hashing.ipynb # Shared cryptographic hashing primitives for content integrity verification
│ └── validation.ipynb # Validation helpers for capability configuration dataclasses
├── bootstrap.ipynb # One-call factory that assembles a CapabilityManager + JobQueue + capability bindings — closes the demo-app boilerplate duplication audited across 5 substrate consumers.
└── cli.ipynb # CLI tool for declarative capability management
Total: 26 notebooks across 2 directories
Module Dependencies
graph LR
bootstrap["bootstrap<br/>Bootstrap"]
cli["cli<br/>cli"]
core_adapter["core.adapter<br/>Task Adapter"]
core_adapter_manifest["core.adapter_manifest<br/>core.adapter_manifest"]
core_capability["core.capability<br/>Tool Capability"]
core_config["core.config<br/>Configuration"]
core_config_store["core.config_store<br/>Capability Config Store"]
core_diagnostics_store["core.diagnostics_store<br/>Diagnostics Store"]
core_empirical_store["core.empirical_store<br/>Empirical Resource Tracking"]
core_errors["core.errors<br/>Capability Error Taxonomy"]
core_journal_store["core.journal_store<br/>Journal Store"]
core_manager["core.manager<br/>Capability Manager"]
core_manifest_format["core.manifest_format<br/>Manifest Format (v2.0)"]
core_metadata["core.metadata<br/>Capability Metadata"]
core_platform["core.platform<br/>Platform Utilities"]
core_ports["core.ports<br/>Composition Ports"]
core_proxy["core.proxy<br/>Remote Capability Proxy"]
core_queue["core.queue<br/>Job Queue"]
core_scheduling["core.scheduling<br/>Scheduling"]
core_secret_store["core.secret_store<br/>Capability Secret Store"]
core__telemetry["core._telemetry<br/>Substrate Telemetry Helpers"]
core_wire["core.wire<br/>Typed Wire Layer"]
core_worker["core.worker<br/>Universal Worker"]
utils_cache_paths["utils.cache_paths<br/>Cache Paths"]
utils_hashing["utils.hashing<br/>Content Hashing Utilities"]
utils_validation["utils.validation<br/>Configuration Validation"]
bootstrap --> core_manager
bootstrap --> core_scheduling
bootstrap --> core_queue
cli --> core_config
cli --> core_platform
cli --> core_manifest_format
cli --> core_adapter_manifest
cli --> core_metadata
core_capability --> core_errors
core_diagnostics_store --> core_wire
core_empirical_store --> utils_hashing
core_manager --> core_diagnostics_store
core_manager --> core_config
core_manager --> core_errors
core_manager --> core_metadata
core_manager --> core_config_store
core_manager --> core_capability
core_manager --> core__telemetry
core_manager --> core_empirical_store
core_manager --> core_manifest_format
core_manager --> core_adapter_manifest
core_manager --> core_journal_store
core_manager --> utils_validation
core_manager --> core_scheduling
core_manager --> core_secret_store
core_manager --> core_proxy
core_manifest_format --> core_metadata
core_manifest_format --> utils_hashing
core_platform --> core_config
core_ports --> core_errors
core_proxy --> core_diagnostics_store
core_proxy --> core_config
core_proxy --> core_errors
core_proxy --> core_wire
core_proxy --> core_capability
core_proxy --> core_journal_store
core_proxy --> core_platform
core_queue --> core_diagnostics_store
core_queue --> core_ports
core_queue --> core_wire
core_queue --> core__telemetry
core_queue --> core_journal_store
core_queue --> core_errors
core_scheduling --> core_metadata
core_worker --> core_wire
core_worker --> core_platform
core_worker --> core_errors
core_worker --> core_diagnostics_store
core_worker --> core_journal_store
core_worker --> core_capability
utils_cache_paths --> core_empirical_store
utils_cache_paths --> utils_hashing
utils_validation --> core_errors
53 cross-module dependencies detected
CLI Reference
cjm-ctl Command
Usage: cjm-ctl [OPTIONS] COMMAND [ARGS]...
cjm-substrate CLI
╭─ Options ────────────────────────────────────────────────────────────────────╮
│ --cjm-config PATH Path to cjm.yaml configuration file │
│ --data-dir PATH Override data directory (manifests, logs) │
│ --conda-prefix PATH Override conda/mamba prefix path │
│ --conda-type TEXT Conda implementation: micromamba, │
│ miniforge, or conda │
│ --install-completion Install completion for the current shell. │
│ --show-completion Show completion for the current shell, to │
│ copy it or customize the installation. │
│ --help Show this message and exit. │
╰──────────────────────────────────────────────────────────────────────────────╯
╭─ Commands ───────────────────────────────────────────────────────────────────╮
│ setup-runtime Download and setup micromamba runtime for │
│ project-local mode. │
│ regenerate-manifest Re-run introspection for an installed capability │
│ and rewrite its manifest. │
│ generate-adapter-manifest CR-17 pt 2 (stage 4): introspect a task-adapter │
│ impl in-env and write its adapter manifest. │
│ install-all Install and register all capabilities defined in │
│ capabilities.yaml. │
│ setup-host Install interface libraries in the current Python │
│ environment. │
│ list List installed capabilities from manifest │
│ directory. │
│ logs Tail / follow the observability stores (CR-14). │
│ retention Apply the diagnostics retention policy now │
│ (CR-14). │
│ remove Remove a capability's manifest and conda │
│ environment. │
│ validate SG-6 + T23: validate a manifest / │
│ capabilities.yaml / capability source. │
│ set-secret Store a capability secret in the project-local │
│ SecretStore (CR-12). │
│ list-secrets List the secret KEY NAMES stored for a capability │
│ — never the values (CR-12). │
╰──────────────────────────────────────────────────────────────────────────────╯
For detailed help on any command, use cjm-ctl <command> --help.
Module Overview
Detailed documentation for each module in the project:
Task Adapter (adapter.ipynb)
The typed-task half of the capability-unit fracture (pass-2 Thread 3) —
Import
from cjm_substrate.core.adapter import (
TaskAdapter
)
Classes
class TaskAdapter(ABC):
"""
Base for task adapters — the typed-task half of the capability-unit
fracture (pass-2 Thread 3).
Subclasses (one ABC per task, in `cjm-<task>-adapter-interface` libraries)
declare:
- the TYPED task method (the contract `execute(*args, **kwargs)` never
gave the task), abstract on the per-task ABC;
- `task_name`: the task this adapter serves (e.g. "transcription");
- `required_tool_protocol`: the structural contract required of a tool
capability (a `typing.Protocol`; provisional `None` until the
protocol is evidence-locked — Q5 posture: declare the slot, let
stage-4/8 tool-splitting evidence finalize the protocol bodies);
- the task's persistence helpers (storage classes), beside the task
method rather than on it.
Implementations run in-worker beside their tool capability. The base is
deliberately mechanism-light: registry/routing is CR-17 pt 2 (stage 4).
"""
core.adapter_manifest (adapter_manifest.ipynb)
The ADAPTER unit’s registration manifest + the surface-based compatibility matcher (CR-17 pt 2, stage 4). Pass-2 Thread 3: registration/discovery = per-unit manifests generated in-env and found by
discover_manifests(); compatibility is DERIVED, not declared — the capability records only its structural surface, the adapter declares its protocol (recorded here as member names + parameter lists), and the substrate matches manifest-vs-manifest. Works against UNLOADED capabilities with zero protocol imports host-side.
Import
from cjm_substrate.core.adapter_manifest import (
AdapterManifest,
adapter_manifest_from_dict,
is_adapter_manifest,
match_protocol_against_surface
)
Functions
def adapter_manifest_from_dict(
d: Dict[str, Any], # On-disk JSON dict (the "class" key maps to class_name)
) -> AdapterManifest: # Typed adapter manifest
"Reconstruct an `AdapterManifest` from its on-disk JSON shape."
def is_adapter_manifest(
data: Any, # Raw JSON-decoded manifest content
) -> bool: # True when the payload declares the adapter unit kind
"""
Route a manifest file by the `unit` discriminator (capability manifests
carry no `unit` key; checked BEFORE `load_manifest` parsing).
"""
def match_protocol_against_surface(
protocol_members: Dict[str, Any], # {"methods": [...], "properties": [...]} from the adapter manifest
structural_surface: Optional[Dict[str, Any]], # Capability manifest code.structural_surface (None = pre-fracture)
) -> Dict[str, Any]: # {"compatible", "missing_methods", "missing_properties", "param_mismatches", "reason"}
"""
Surface-based compatibility (pass-2 Thread 3) — host-side, manifest-vs-
manifest, safe against UNLOADED capabilities.
Method rule: same name present + (when both sides record params) the
surface params must START WITH the protocol params (prefix rule).
Property rule: name present in the surface's properties.
No surface recorded -> NOT compatible, with the reason spelled out.
"""
Classes
@dataclass
class AdapterManifest:
"""
A discovered ADAPTER unit (CR-17 pt 2) — the registration record for one
task-adapter implementation installed in some tool's worker env.
Generated in-env by `cjm-ctl generate-adapter-manifest` (the protocol
members are introspected where the protocol is importable); discovered
host-side beside capability manifests via the `unit` discriminator.
"""
name: str # Unique unit name ("module.ClassName")
version: str # Interface-lib version at generation
task_name: str # The task this adapter serves (e.g. "graph-storage")
module: str # Impl module (importable in the tool's worker env)
class_name: str # Impl class name
required_tool_protocol: str # Protocol FQN (semantic contract; host never imports it)
protocol_members: Dict[str, Any] = field(...) # {"methods": [{"name","signature","params"}], "properties": [...]}
conda_env: str = '' # Env the manifest was generated from
generated_at: str = '' # ISO timestamp
unit: str = 'adapter' # Manifest-kind discriminator
def to_dict(self) -> Dict[str, Any]: # JSON-ready dict ("class" key on disk)
"""Serialize to the on-disk JSON shape."""
return {
"unit": self.unit,
"Serialize to the on-disk JSON shape."
Bootstrap (bootstrap.ipynb)
One-call factory that assembles a CapabilityManager + JobQueue + capability bindings — closes the demo-app boilerplate duplication audited across 5 substrate consumers.
Import
from cjm_substrate.bootstrap import (
CapabilitySpec,
Pipeline,
create_pipeline
)
Functions
def _normalize_spec(
spec: CapabilitySpec # Raw spec from caller
) -> Tuple[str, Optional[Dict[str, Any]]]: # (capability_name, optional config)
"""
Normalize a capability spec into a `(name, config)` pair.
Accepts a bare string, a `(name, config)` tuple, or a mapping with a 'name'
key (config under 'config' or the mapping itself minus 'name').
"""
def create_pipeline(
capabilities: Optional[Iterable[CapabilitySpec]] = None, # Capabilities to discover + load
scheduler: Optional[ResourceScheduler] = None, # Resource policy (default: permissive)
system_monitor: Optional[str] = None, # Capability name to register as system monitor
search_paths: Optional[List[Path]] = None, # Custom manifest search paths
queue_kwargs: Optional[Dict[str, Any]] = None, # Extra kwargs forwarded to JobQueue
strict: bool = True, # SG-5 strict config validation on each load
) -> Pipeline: # Assembled stack ready to start
"""
Assemble a CapabilityManager + JobQueue + capability bindings in one call.
Steps performed:
1. Construct CapabilityManager with the given scheduler + search paths
2. discover_manifests()
3. For each spec in `capabilities`: load the capability and create a CapabilityBinding
4. If `system_monitor` is set, register that capability as the sys-mon
5. Construct JobQueue (NOT started — caller starts via context manager)
Capabilities that fail to load are logged but do not raise; their entries are
omitted from `Pipeline.bindings`. Use the returned `Pipeline.manager` to
inspect which loads succeeded.
"""
Classes
@dataclass
class Pipeline:
"Assembled substrate stack: manager + queue + capability bindings."
manager: CapabilityManager # Discovery + lifecycle
queue: JobQueue # Job submission + scheduling
bindings: Dict[str, CapabilityBinding] = field(...) # Capability name -> bound view
async def start(self) -> None:
"""Start the job queue's background processor."""
await self.queue.start()
async def stop(self) -> None
"Start the job queue's background processor."
async def stop(self) -> None:
"""Stop the job queue and unload all capabilities."""
await self.queue.stop()
self.manager.unload_all()
async def __aenter__(self) -> "Pipeline"
"Stop the job queue and unload all capabilities."
Cache Paths (cache_paths.ipynb)
Per-(input-content, config) deterministic cache directories for capability outputs
Import
from cjm_substrate.utils.cache_paths import (
cache_dir_for_config,
list_cache_entries,
prune_cache_for_input
)
Functions
def _sanitize_stem(
input_path: Union[str, Path], # Path whose stem to sanitize
) -> str: # Filesystem-portable stem string
"""
Return a filesystem-portable, length-capped version of input_path's stem.
Steps: take `Path(input_path).stem`, replace unsafe characters with `_`,
strip leading/trailing dots + whitespace (Windows path-component rule),
length-cap to `_MAX_STEM_LEN`. Empty stems (degenerate paths like `.`)
return `_` so callers always get a usable path component.
"""
def _get_stat_cache_path() -> Optional[Path]:
"""Locate the stat-cache SQLite path under the substrate's data dir.
Returns the configured `cfg.data_dir / "input_hash_cache.db"` when the
substrate config is available, falling back to `~/.cjm/input_hash_cache.db`
when get_config() raises (e.g., during early-init tests). Returns None
only when neither resolves — callers then skip caching entirely.
"""
try
"""
Locate the stat-cache SQLite path under the substrate's data dir.
Returns the configured `cfg.data_dir / "input_hash_cache.db"` when the
substrate config is available, falling back to `~/.cjm/input_hash_cache.db`
when get_config() raises (e.g., during early-init tests). Returns None
only when neither resolves — callers then skip caching entirely.
"""
def _ensure_stat_cache_schema(conn: sqlite3.Connection) -> None
"Create the stat-cache table + indices if not present."
def _hash_input_with_stat_cache(
input_path: Path, # File whose content hash we need
cache_path: Optional[Path] = None, # Explicit cache DB path (default: substrate-resolved)
*,
skip_cache: bool = False, # If True, bypass cache entirely (compute always)
) -> str: # Hash string in "algo:hexdigest" format
"""
Return the SHA-256 content hash of `input_path`, with stat-cache memoization.
Fast path: stat the file, look up `(absolute_path, mtime_ns, size)` in the
SQLite cache; cache hit returns in microseconds. Cold path: `hash_file`
streams the file content, then writes the result to the cache.
`skip_cache=True` bypasses the cache entirely — useful for callers that
KNOW the file content changed (e.g., a capability just wrote it and wants
to record the canonical hash without polluting the cache with intermediate
states).
File not found / unreadable → propagates the underlying OSError. Cache
DB errors are LOG-and-FALLBACK to direct hashing — the cache is an
optimization, not a correctness invariant.
"""
def cache_dir_for_config(
capability_data_dir: Union[str, Path], # The capability's own data subdirectory (typically <cfg.capability_data_dir>/<capability_name>)
input_path: Union[str, Path], # The input file the capability operates on
action: str, # The capability action name (e.g., "segment_audio", "convert", "execute")
config_dict: Dict[str, Any], # The capability's effective config for this action
*,
input_hash_length: int = 6, # Truncation length for the input content hash in the directory name
config_hash_length: int = 12, # Truncation length for the config hash in the directory name
create: bool = True, # Auto-create the directory (parents=True, exist_ok=True)
hash_input_content: bool = True, # If False, hash str(input_path) instead (e.g., URL inputs)
skip_input_cache: bool = False, # If True, bypass the stat-cache (always recompute content hash)
) -> Path: # The deterministic cache directory path
"""
Return (and optionally create) a per-(input-content, config) cache directory.
Path layout::
<capability_data_dir>/<action>/<sanitized-stem>/<input_hash[:N]>_<config_hash[:M]>/
The same `(input_content, action, config_dict)` always resolves to the same
path; any change to input content OR config produces a different path. This
means:
1. Different configs go to different directories — no silent overwrite.
2. Stale-artifact accumulation is impossible — each unique
`(input_content, config)` tuple has its OWN directory.
3. For chained capability sequences, upstream config changes propagate through
content changes: if capability A's output content depends on A's config and
capability B reads that output, B's cache key automatically reflects A's
config indirectly.
`hash_input_content=False` switches to hashing the string form of
`input_path` instead of file content — for capabilities whose "input" is a URL,
a database row ID, or another non-file identifier. Sequence chaining via
content propagation only works for true file inputs.
`skip_input_cache=True` recomputes the input content hash even if the
stat-cache has a record. Useful for capabilities that just wrote the input file
and want to record its canonical hash without stale-cache risk.
Raises FileNotFoundError if `input_path` doesn't exist and
`hash_input_content=True`. Raises OSError on directory-create failure
when `create=True`.
"""
def list_cache_entries(
"""
Enumerate all per-config cache directories for a given (input, action).
Returns the paths of every `<input_hash>_<config_hash>` directory under
`<capability_data_dir>/<action>/<sanitized-stem>/`. Each entry corresponds to
a unique `(input_content, config)` tuple — operators can inspect their
contents, diff them, or pass selected ones to `prune_cache_for_input` to
keep them through a sweep.
Returns an empty list if the parent directory doesn't exist (capability never
ran this action for this input).
"""
def prune_cache_for_input(
"""
Delete per-config cache directories for `(input, action)`, optionally
preserving a `keep` set.
Pairs with `list_cache_entries` for inspect-then-prune workflows: list
candidates, choose which to keep, then call prune with the keep set.
`keep=None` deletes ALL entries.
`dry_run=True` returns the would-delete list without touching the
filesystem — useful for operator confirmation before destructive ops.
Returns the list of deleted (or would-delete) paths.
"""
Variables
_MAX_STEM_LEN = 100
_UNSAFE_CHARS
Tool Capability (capability.ipynb)
The tool-capability interface — the manage-the-tool half of the capability-unit fracture (pass-2 Thread 3)
Import
from cjm_substrate.core.capability import (
RELOAD_TRIGGER,
WORKER_ENV_TEMPLATE_PLACEHOLDERS,
ConfigOption,
FieldOptions,
EnvVarSpec,
expand_worker_env_template,
template_check_placeholders,
ToolCapability,
capability_action,
collect_capability_actions,
derive_structural_surface
)
Functions
def expand_worker_env_template(
template: str, # The raw EnvVarSpec.default value (may contain ${...} placeholders)
placeholders: Mapping[str, Optional[str]], # Resolved values keyed by placeholder name
*,
capability_name: str = "", # For error context ("template X on capability Y references ...")
var_name: str = "", # For error context ("on EnvVarSpec(name=Z)")
) -> str
"""
Substitute `${VAR}` placeholders in `template` using `placeholders`.
Strict mode (no `safe_substitute`): unknown placeholders raise
`CapabilityConfigError` with descriptive context. Single-pass, non-recursive —
substituted values are taken verbatim, never re-scanned for further
placeholders. Templates without any `${...}` syntax pass through unchanged
(so plain static defaults work as before).
The allowed placeholder vocabulary is fixed via `WORKER_ENV_TEMPLATE_PLACEHOLDERS`.
A `${FOO}` whose name is in the vocabulary but whose RESOLVED value is None
(e.g. `CJM_MODELS_DIR` when the operator hasn't configured one) raises
`CapabilityConfigError` with the same shape — operators get a clear signal that
the capability needs a value they haven't provided, rather than a silent
substitution of empty string into a load-bearing path.
"""
def template_check_placeholders(
template: str, # The raw EnvVarSpec.default value
) -> Set[str]: # Placeholder names referenced (allowed-vocabulary-validated)
"""
Return the set of placeholder names referenced by a worker-env template.
Validates the vocabulary (unknown names raise CapabilityConfigError) without
requiring a placeholder-value mapping. Useful for `cjm-ctl validate`'s
dry-run check at install/release time — surface the bug BEFORE the capability
tries to spawn a worker with a malformed default.
Templates without `${...}` return an empty set.
"""
def _report_progress_threadsafe(
self,
progress: float, # 0.0 to 1.0, or -1.0 for indeterminate
message: str = "", # Descriptive status message
) -> None
"""
Thread-safe report_progress — replaces ToolCapability.report_progress.
Writes `_progress` + `_status_message_base` + `_status_message` under the
capability's `_progress_lock` if one exists (lazy-init by `heartbeat()` before
spawning its thread). When no lock exists yet — the single-threaded common
case before any heartbeat() call — the writes happen without lock overhead.
The heartbeat thread reads `_status_message_base` (NOT `_status_message`)
so heartbeat-amended messages don't accumulate elapsed-time suffixes when
`report_progress` is called concurrently. The capability's call overwrites both
fields atomically; the next heartbeat tick reads the new base.
"""
@contextmanager
def _heartbeat(
self,
phase: str, # Phase label embedded in the heartbeat message
*,
interval: float = 0.5, # Seconds between heartbeats (substrate stalls at 60s default)
progress: Optional[float] = None, # Optional initial progress; None preserves current
) -> Iterator[None]
"""
Heartbeat context manager — spawn a daemon thread that advances the
(progress, message) tuple every `interval` seconds, defeating the
substrate's prefetch stall detection during silent blocking calls.
Usage:
def prefetch(self):
with self.heartbeat("loading whisper model"):
self._load_model() # Blocking; HF Hub download / from_pretrained
Behavior:
- At entry: writes `(progress, phase)` to set the initial state. If
`progress` is None, preserves whatever `self._progress` already holds
(defaulting to 0.5 indeterminate if never set).
- During the block: a daemon thread emits an updated `_status_message =
"<base> ({elapsed:.1f}s)"` every `interval` seconds. The thread reads
`_status_message_base` (set by `report_progress`) for the base, so an
explicit `report_progress(0.3, "downloading weights")` from inside the
block makes the next heartbeat tick display "downloading weights (Xs)".
- At exit: signals the thread to stop, joins with timeout (the thread is
daemon so cleanup is best-effort but reliable). The capability's final
`_status_message` state is left as the last heartbeat-amended value;
callers wanting a clean "completed" state should call
`report_progress(1.0, "done")` after the with-block.
Thread safety: relies on the upgraded thread-safe `report_progress`
(loaded by this same cell). The lock is lazy-initialized HERE — before
spawning the heartbeat thread — so concurrent main-thread + heartbeat-
thread calls always see a lock.
Cancellation: the heartbeat thread checks `stop_event` between sleeps
and exits cleanly. If the with-block raises, the `finally` clause still
fires `stop_event.set()`, so the thread won't leak.
"""
def capability_action(
action_name: str # Public action name the decorated method handles
) -> Callable[[Callable], Callable]: # Decorator
"""
Marker decorator tagging a capability method as the handler for `action_name`.
Sets `func._capability_action = action_name`. Capability authors with dispatcher-style
`execute(action, **kwargs)` use `collect_capability_actions(cls)` to derive their
`supported_actions` set from these markers rather than maintaining a separate
list. The decorator does not change call semantics — the wrapped function is
returned unchanged.
"""
def collect_capability_actions(
cls: type # Class (or subclass) to scan for @capability_action-tagged methods
) -> Set[str]: # Set of action names handled by `cls` (including inherited)
"""
Collect action names from `@capability_action`-decorated methods on `cls`.
Walks the class's MRO so subclasses inherit action handlers from base
classes automatically. The returned set is suitable for
`supported_actions: ClassVar[Set[str]] = collect_capability_actions(MyCapability)`
once the capability class body has been defined.
"""
def _dispatch_to_action(
self,
action: str, # Action name to dispatch (matched against @capability_action tags)
**kwargs, # Forwarded verbatim to the resolved handler
) -> Any: # Whatever the handler returns
"""
T28: dispatch `action` to its `@capability_action`-tagged handler.
Walks the instance's MRO for a method tagged `_capability_action == action`
(the SAME markers `collect_capability_actions` / `supported_actions` are built
from) and calls it as `handler(self, **kwargs)`. Unknown actions raise the
typed `CapabilityInputError(fields_invalid=["action"])` (CR-5) — identical
behaviour to the hand-rolled dispatchers this replaces.
Dispatcher-style capabilities (MediaProcessing / Graph / Text) collapse their
`execute` to a one-liner instead of reimplementing the MRO walk in every
capability (the 5x copy SG-44 + this helper retire):
@capability_action("separate_vocals")
def _separate_vocals(self, **kwargs): ...
supported_actions = collect_capability_actions(MyCapability)
def execute(self, action="separate_vocals", **kwargs):
return self.dispatch_to_action(action, **kwargs)
"""
def derive_structural_surface(
cls: type, # The capability class to introspect
) -> Dict[str, Any]: # {"methods": [...], "properties": [...], "attributes": [...]}
"""
Record a capability class's structural surface by pure self-introspection.
The FULL public surface is recorded, inherited members included —
the surface is what adapter protocols match against, and protocol
members may name inherited methods. Deterministic (name-sorted) so
the canonical-JSON witness hash is stable across runs.
Classification: `property` → properties (names only); functions
(static/class methods unwrapped) → methods with `str(inspect.signature)`
+ the parameter NAME list `params` (self excluded — the CR-17 pt 2
surface matcher's input; stage 4);
everything else public → attributes with the value's type name
(config_class, supported_actions, WORKER_ENV, ...).
"""
Classes
@dataclass
class ConfigOption:
"CR-11: one live option for a dynamic config field, with optional metadata."
value: Any # option value (e.g. "gemini-2.5-flash")
label: str # display label (e.g. "Gemini 2.5 Flash")
metadata: Dict[str, Any] = dataclasses.field(default_factory=dict) # token limits, descriptions, ...
@dataclass
class FieldOptions:
"""
CR-11: the live option domain for one dynamic config field.
Kept SEPARATE from the static config_schema (which CR-8 hashes for drift
detection). The capability-config UI merges these live options on top of the
static schema; folding them into the schema would make every API capability
perpetually 'drift'.
"""
options: List[ConfigOption] # current valid options
constraints: Dict[str, Any] = dataclasses.field(default_factory=dict) # derived field-constraint overrides
@dataclass
class EnvVarSpec:
"""
CR-12: one entry of a capability's spawn-time worker-environment contract.
A capability declares the environment variables its worker subprocess reads at
startup via `WORKER_ENV: ClassVar[List[EnvVarSpec]]`. Worker env vars are
FIXED AT SPAWN — changing one requires a worker RESPAWN, so the substrate
routes such changes through `reload_capability`, never `reconfigure` (the env is
baked into the subprocess at `Popen` and can't be mutated in-process). This
is the lifecycle distinction from a normal config field (reconfigurable in
place via `reconfigure`/`_release_<trigger>`).
Two flavors share this one declaration:
- `secret=True` : value resolved from the `SecretStore` (masked; never
persisted in the config store, echoed in config_schema,
or logged). A secret never carries a `default` — a
baked-in secret is a leak.
- `secret=False` : visible value resolved from the override chain
(operator override > manifest `install.env_vars` >
this `default`); safe to display.
Both share one injection seam: the substrate composes the resolved
{name: value} overlay at load time and injects it into the worker env at
spawn (extending the existing CJM_CAPABILITY_DATA_DIR / CJM_MODELS_DIR injection).
This is "derive from behaviour, not metadata" applied to the spawn env: the
capability declares WHICH vars it consumes + whether each is secret/required;
the substrate owns resolution + injection.
`options` is a forward seam for visible vars with a finite domain (e.g. a
device selector enumerating GPUs); unused today but reserved so the
capability-config UI / a future `set-env` surface isn't blocked.
"""
name: str # The env var the worker reads, e.g. "GEMINI_API_KEY"
secret: bool = False # True -> value resolved from the SecretStore, masked
required: bool = False # Worker can't do useful work until this is satisfied
label: str = '' # Display label for CLI / GUI affordances
description: str = '' # Help text for CLI / GUI
default: Optional[str] # Visible vars only; secrets must leave this None
options: Optional[List[str]] # Forward seam: finite domain for a visible var (unused today)
class ToolCapability(ABC):
"""
Tool-capability interface: manage the tool/worker — identity, lifecycle,
config, cancellation, observability (pass-2 Thread 3 fracture).
The task channel is NOT part of this surface: `execute` left the abstract
set when the capability-unit fracture split tool capabilities from task
adapters. Typed task contracts live on adapters (`core.adapter` + the
per-task `cjm-<task>-adapter-interface` libraries). Fused-era capabilities (the
pre-Option-C 12) still define `execute` themselves and their domain ABCs
still declare it abstract — they kept working unchanged through the
class-identical `ToolCapability` alias (later REMOVED at SG-48) until the
Option C migration cascade split them (the alias was REMOVED at SG-48).
CR-4 extended this surface with: prefetch hook (SG-19), made cleanup optional
(SG-43), reconfigure split + RELOAD_TRIGGER declarative-helper (lifecycle
hook split), and cooperative-cancellation primitives (SG-16 — flag + callback
+ context manager).
Abstract methods: `name`, `version`, `initialize`, `get_config_schema`,
`get_current_config`. Concrete defaults (overridable): `execute_stream`
(transitional — see method), `cleanup`, `prefetch`, `reconfigure`,
`cancel`, `check_cancel`, `register_cancel_callback`, `cancel_signal_to`,
`report_progress`, `report_usage`, `fields_that_changed`,
`reconfigure_with_triggers`, `on_disable`, `on_enable`.
"""
def name(self) -> str: # Unique identifier for this capability
"""Unique capability identifier."""
...
@property
@abstractmethod
def version(self) -> str: # Semantic version string (e.g., "1.0.0")
"Unique capability identifier."
def version(self) -> str: # Semantic version string (e.g., "1.0.0")
"""Capability version."""
...
@abstractmethod
def initialize(
self,
config: Optional[Dict[str, Any]] = None # Configuration dictionary
) -> None
"Capability version."
def initialize(
self,
config: Optional[Dict[str, Any]] = None # Configuration dictionary
) -> None
"Initialize or re-configure the capability.
CR-4: this is "first-time setup" — called once after construction with
the initial config. Substrate uses `reconfigure(old, new)` for delta
updates afterwards. Capabilities predating CR-4 see no behavior change since
the default `reconfigure()` body delegates to `reconfigure_with_triggers`
which is a no-op unless the capability opts in via RELOAD_TRIGGER metadata."
def execute_stream(
self,
*args,
**kwargs
) -> Generator[Any, None, None]: # Yields partial results
"Stream execution results chunk by chunk.
TRANSITIONAL(option-c-cascade): streaming is substrate/composition-
supplied under the pass-2 fracture (off both interfaces); the default
stays here only because fused-era capabilities rely on it (it calls the
capability's own `execute`, which a split tool capability does not have).
Relocates when CR-17 adapter routing lands (execution stage 4)."
def get_config_schema(self) -> Dict[str, Any]: # JSON Schema for configuration
"""Return JSON Schema describing the capability's configuration options."""
...
@abstractmethod
def get_current_config(self) -> Dict[str, Any]: # Current configuration values
"Return JSON Schema describing the capability's configuration options."
def get_current_config(self) -> Dict[str, Any]: # Current configuration values
"""Return the current configuration state as a dictionary."""
...
def get_config_options(self) -> Dict[str, "FieldOptions"]
"Return the current configuration state as a dictionary."
def get_config_options(self) -> Dict[str, "FieldOptions"]:
"""CR-11: live option domains for dynamic config fields, keyed by field name.
Optional. Default: {} (fully static capabilities). For fields whose valid
domain is determined at runtime (e.g. an API model list), return a
`FieldOptions` carrying current `ConfigOption` values + per-option
metadata (token limits, etc.) + optional derived constraints. Runs in
the worker subprocess (has the capability's deps + credentials).
Kept SEPARATE from get_config_schema(): the schema is static + hashed for
CR-8 drift detection; these options are the live, un-hashed companion the
capability-config UI merges on top. A fetch failure should raise a typed CR-5
error; the substrate's CapabilityManager.get_config_options accessor degrades
to {} so the UI can fall back to the static schema.
"""
return {}
def cleanup(self) -> None
"CR-11: live option domains for dynamic config fields, keyed by field name.
Optional. Default: {} (fully static capabilities). For fields whose valid
domain is determined at runtime (e.g. an API model list), return a
`FieldOptions` carrying current `ConfigOption` values + per-option
metadata (token limits, etc.) + optional derived constraints. Runs in
the worker subprocess (has the capability's deps + credentials).
Kept SEPARATE from get_config_schema(): the schema is static + hashed for
CR-8 drift detection; these options are the live, un-hashed companion the
capability-config UI merges on top. A fetch failure should raise a typed CR-5
error; the substrate's CapabilityManager.get_config_options accessor degrades
to {} so the UI can fall back to the static schema."
def cleanup(self) -> None:
"""Clean up resources when capability is unloaded.
CR-4: made optional (SG-43 closure). Was `@abstractmethod` before; every
audited capability overrode it with a near-no-op, so the default is now a
no-op and capability authors override only when they have non-trivial
teardown (file handles, GPU memory, database connections). The
substrate's worker /cleanup endpoint calls this regardless.
"""
pass
def prefetch(self) -> None
"Clean up resources when capability is unloaded.
CR-4: made optional (SG-43 closure). Was `@abstractmethod` before; every
audited capability overrode it with a near-no-op, so the default is now a
no-op and capability authors override only when they have non-trivial
teardown (file handles, GPU memory, database connections). The
substrate's worker /cleanup endpoint calls this regardless."
def prefetch(self) -> None:
"""Acquire heavy resources eagerly without invoking execute().
CR-4 (SG-19): default no-op. Capability authors override when downstream
callers benefit from eager acquisition — typically transcription /
inference capabilities that lazy-download models on first execute. The
substrate's prefetch_capability(name) API + worker /prefetch endpoint
invoke this. Should be idempotent (safe to call multiple times) since
the substrate may pre-warm at load time AND on operator request.
"""
pass
def reconfigure(
self,
old_config: Optional[Dict[str, Any]], # Previous configuration values
new_config: Optional[Dict[str, Any]], # New configuration values to apply
) -> None
"Acquire heavy resources eagerly without invoking execute().
CR-4 (SG-19): default no-op. Capability authors override when downstream
callers benefit from eager acquisition — typically transcription /
inference capabilities that lazy-download models on first execute. The
substrate's prefetch_capability(name) API + worker /prefetch endpoint
invoke this. Should be idempotent (safe to call multiple times) since
the substrate may pre-warm at load time AND on operator request."
def reconfigure(
self,
old_config: Optional[Dict[str, Any]], # Previous configuration values
new_config: Optional[Dict[str, Any]], # New configuration values to apply
) -> None
"Apply a configuration change without re-running full initialize().
CR-4 completion (2026-05-25): reconfigure is the substrate's canonical
delta path - `CapabilityManager.update_capability_config` routes here, NOT through
a bare `initialize(new_config)`. It fires `_release_<trigger>` releases for
changed RELOAD_TRIGGER fields, then re-applies config (see body below).
Distinction from initialize(): initialize sets up persistent state once
after construction; reconfigure applies delta updates and is the
canonical entry point for hot-reload via the substrate's
update_capability_config path."
def fields_that_changed(
self,
old: Dict[str, Any], # Previous config snapshot
new: Dict[str, Any], # Proposed new config snapshot
) -> Set[str]: # Field names whose values differ between old and new
"Return the set of field names whose values differ between old and new.
Includes fields present in only one dict (treated as a change to/from
the implicit None). Equality is structural via `!=`; nested dicts /
lists compare by value, not identity. Hashable-vs-unhashable values
compare correctly because we never put them in a set themselves."
def reconfigure_with_triggers(
self,
old_config: Dict[str, Any], # Previous configuration values
new_config: Dict[str, Any], # New configuration values being applied
) -> None
"CR-4 helper: walk RELOAD_TRIGGER metadata + fire `_release_<trigger>` methods.
Resolution sequence:
1. Find the capability's `config_class` attribute (a dataclass). Absent
means the capability hasn't opted into the declarative pattern; the
helper returns silently.
2. Compute the diff between `old_config` and `new_config` via
`fields_that_changed`.
3. For each changed field, read its `RELOAD_TRIGGER` metadata key
(if any) and accumulate the trigger names into a set (de-duping
when multiple fields share a trigger).
4. For each trigger, call `self._release_<trigger>()` if it exists
on the instance.
Capability authors opt in by:
- Setting `config_class = MyConfigDataclass` as a class attribute.
- Annotating field metadata with `{RELOAD_TRIGGER: "model"}` etc.
- Implementing matching `_release_model(self)` instance methods.
Capabilities WITHOUT config_class or RELOAD_TRIGGER metadata land here as a
no-op — safe default for the SG-T3tr migration window where the
cascade hasn't yet adopted the declarative pattern everywhere."
def cancel(self) -> None:
"""Request cooperative cancellation of the current execute() call.
CR-4: default sets the substrate-tracked `_cancel_requested` flag and
fires any callbacks registered via `register_cancel_callback`.
Capability authors who need extra teardown logic (signaling a subprocess,
closing a network connection) override `cancel()` and SHOULD call
`super().cancel()` to preserve the flag-setting + callback-fire
behavior. The capability's `execute()` polls via `check_cancel()` at safe
interruption points and unwinds when it raises `CapabilityCancelledError`.
"""
self._cancel_requested = True
for cb in list(getattr(self, "_cancel_callbacks", ()))
"Request cooperative cancellation of the current execute() call.
CR-4: default sets the substrate-tracked `_cancel_requested` flag and
fires any callbacks registered via `register_cancel_callback`.
Capability authors who need extra teardown logic (signaling a subprocess,
closing a network connection) override `cancel()` and SHOULD call
`super().cancel()` to preserve the flag-setting + callback-fire
behavior. The capability's `execute()` polls via `check_cancel()` at safe
interruption points and unwinds when it raises `CapabilityCancelledError`."
def check_cancel(self) -> None:
"""Raise `CapabilityCancelledError` if cancellation has been requested.
CR-4 (SG-16 polling primitive): capability authors call this at safe
interruption points inside `execute()`. The substrate sets the flag via
`cancel()` (typically driven by an operator's "Cancel" button); the
next `check_cancel` call surfaces the cancellation as a typed exception
that unwinds execute() cleanly.
The substrate's worker /execute wrapper resets the flag before each
call so cancellation doesn't leak from one job into the next.
"""
if self._cancel_requested
"Raise `CapabilityCancelledError` if cancellation has been requested.
CR-4 (SG-16 polling primitive): capability authors call this at safe
interruption points inside `execute()`. The substrate sets the flag via
`cancel()` (typically driven by an operator's "Cancel" button); the
next `check_cancel` call surfaces the cancellation as a typed exception
that unwinds execute() cleanly.
The substrate's worker /execute wrapper resets the flag before each
call so cancellation doesn't leak from one job into the next."
def register_cancel_callback(
self,
callback: Callable[[], None], # Called when cancel() fires
) -> None
"Register a callback that fires when cancel() is called.
CR-4 (SG-16 callback primitive): for capabilities that can't easily insert
polling at strategic points (e.g., a capability wrapping a blocking C
extension). Callbacks should be non-blocking and idempotent. Multiple
callbacks can be registered; all fire in registration order when
cancel() is invoked. A misbehaving callback that raises is logged
and skipped — the remaining callbacks still fire."
def cancel_signal_to(
self,
callback: Callable[[], None], # Cancellation callback scoped to the with-block
) -> Generator[None, None, None]
"Context manager registering `callback` for the duration of the with-block.
Useful for binding cancellation to a finite-scope resource (a subprocess,
a network request, a temporary file handle) without needing to
deregister manually. Pairs with `register_cancel_callback` for cases
where lifetime is tied to a `try:`/`finally:` block.
Example:
def execute(self, *args, **kwargs):
proc = subprocess.Popen(...)
with self.cancel_signal_to(lambda: proc.terminate()):
return proc.wait()"
def on_disable(self) -> None:
"""CR-2: signal that the substrate has marked this capability as disabled.
Worker stays alive; capability can release heavy resources here (e.g., free
GPU memory, close model files). The substrate fires this hook AFTER any
in-flight job for this capability finishes — see CapabilityManager.disable_capability
deferred-hook semantics. Default: no-op; capabilities opt in by overriding.
"""
pass
def on_enable(self) -> None
"CR-2: signal that the substrate has marked this capability as disabled.
Worker stays alive; capability can release heavy resources here (e.g., free
GPU memory, close model files). The substrate fires this hook AFTER any
in-flight job for this capability finishes — see CapabilityManager.disable_capability
deferred-hook semantics. Default: no-op; capabilities opt in by overriding."
def on_enable(self) -> None:
"""CR-2: signal that the substrate has marked this capability as re-enabled.
Capability can eagerly re-acquire heavy resources here, or rely on lazy
re-acquisition via the next execute() call (substrate doesn't prefer
one strategy over the other). Default: no-op; capabilities opt in by overriding.
"""
pass
def report_progress(
self,
progress: float, # 0.0 to 1.0, or -1.0 for indeterminate
message: str = "" # Descriptive status message
) -> None
"CR-2: signal that the substrate has marked this capability as re-enabled.
Capability can eagerly re-acquire heavy resources here, or rely on lazy
re-acquisition via the next execute() call (substrate doesn't prefer
one strategy over the other). Default: no-op; capabilities opt in by overriding."
def report_progress(
self,
progress: float, # 0.0 to 1.0, or -1.0 for indeterminate
message: str = "" # Descriptive status message
) -> None
"Report execution progress. Call during execute() to update status."
def report_usage(
self,
usage: Dict[str, float], # Measured usage for this execute, keyed by capability-defined unit name
) -> None
"SG-54: report measured API/service usage for the current execute() call.
Unit-agnostic by design — the capability (which holds the API response)
supplies whatever unit names it measures: {"input_tokens": .., "output_tokens": ..}
for an LLM, {"pages": ..} for OCR, {"characters": ..} for TTS,
{"credits"/"requests"/"minutes": ..} for others. The substrate stores +
accumulates per unit name WITHOUT interpreting them (summed across runs in
the empirical store's api_usage_totals). Pricing is deliberately NOT here
(volatile, per-service, often not API-accessible) — a consumer-side rate
table turns raw units into cost. "Derive from behaviour": the capability
MEASURES actual usage from the response; the substrate aggregates.
Stored on `self._last_api_usage`; the worker exposes it via /stats and the
substrate folds it into the post-execute ResourceSample. The worker resets
it before each execute so a failed/usage-less call can't inherit stale
usage. Default: store-only (parallel to report_progress)."
class _CR4MinimalCapability(ToolCapability):
"Concrete capability satisfying abstracts; relies on CR-4 default cleanup()."
def name(self) -> str: return "cr4-minimal"
@property
def version(self) -> str: return "0.0.0"
def version(self) -> str: return "0.0.0"
def initialize(self, config=None): self._cfg = dict(config or {})
def initialize(self, config=None): self._cfg = dict(config or {})
def execute(self, *args, **kwargs): return None
def execute(self, *args, **kwargs): return None
def get_config_schema(self) -> Dict[str, Any]: return {}
def get_config_schema(self) -> Dict[str, Any]: return {}
def get_current_config(self) -> Dict[str, Any]: return dict(getattr(self, "_cfg", {}))
def get_current_config(self) -> Dict[str, Any]: return dict(getattr(self, "_cfg", {}))
Variables
RELOAD_TRIGGER = 'reload_trigger'
WORKER_ENV_TEMPLATE_PLACEHOLDERS: Set[str]
cli (cli.ipynb)
CLI tool for declarative capability management
Import
from cjm_substrate.cli import (
app,
main,
setup_runtime,
run_cmd,
regenerate_manifest,
generate_adapter_manifest,
install_all,
setup_host,
list_capabilities,
logs_command,
retention_command,
remove_capability,
validate_file,
set_secret,
list_secrets
)
Functions
def main(
ctx:typer.Context,
cjm_config:Annotated[Optional[Path], typer.Option(
"--cjm-config",
help="Path to cjm.yaml configuration file"
)]=None,
data_dir:Annotated[Optional[Path], typer.Option(
"--data-dir",
help="Override data directory (manifests, logs)"
)]=None,
conda_prefix:Annotated[Optional[Path], typer.Option(
"--conda-prefix",
help="Override conda/mamba prefix path"
)]=None,
conda_type:Annotated[Optional[str], typer.Option(
"--conda-type",
help="Conda implementation: micromamba, miniforge, or conda"
)]=None,
) -> None
"cjm-substrate CLI for managing isolated capability environments."
def setup_runtime(
force:bool=typer.Option(False, "--force", "-f", help="Re-download even if binary exists")
) -> None
"Download and setup micromamba runtime for project-local mode."
def _check_runtime_available() -> None:
"""Check if the configured conda runtime is available, exit with helpful message if not."""
cfg = get_config()
if not ensure_runtime_available(cfg)
"Check if the configured conda runtime is available, exit with helpful message if not."
def _get_conda_cmd_str() -> str
"Get the conda/micromamba command string for shell commands."
def _download_url_to_temp(
url: str, # URL to download
suffix: str = ".yml" # File suffix for temp file
) -> Optional[Path]: # Path to temp file or None if failed
"Download a URL to a temporary file. Returns None if download fails."
def _resolve_env_file(
env_file: str # Path or URL to environment file
) -> tuple[str, Optional[Path]]: # (resolved_path, temp_file_to_cleanup)
"""
Resolve env_file to a local path, downloading if it's a URL.
Returns (local_path, temp_file) where temp_file is set if we created
a temporary file that should be cleaned up later.
"""
def run_cmd(
cmd: str, # Shell command to execute
check: bool = True # Whether to raise on non-zero exit
) -> None
"""
Run a shell command and stream output.
Uses the platform's default shell (no hardcoded /bin/bash).
"""
def _generate_manifest(
env_name: str, # Name of the Conda environment
package_name: str, # Package source string (git URL or package name)
manifest_dir: Path # Directory to write manifest JSON files
) -> Optional[Path]: # Path to the manifest written, or None on failure
"""
Run introspection script inside the target env and write a v2.0 manifest.
Builds a `ManifestV2` from the introspection output + install-time markers,
computes `drift_tracking.config_schema_hash`, and writes via
`write_manifest` (nested layout, indent=2). Both `installed_at` and
`regenerated_at` are set to "now"; `regenerate_manifest` preserves the
original `installed_at` via post-write fix-up.
"""
def regenerate_manifest(
capability_name: str = typer.Argument(..., help="Capability name as it appears in the manifest"),
capabilities_path: Optional[str] = typer.Option(
None, "--capabilities",
help="Path to capabilities.yaml for package_source recovery (legacy manifests)",
),
package: Optional[str] = typer.Option(
None, "--package",
help="Package spec override (e.g., git URL or pip name); wins over manifest/capabilities.yaml lookups",
),
) -> None
"""
Re-run introspection for an installed capability and rewrite its manifest.
Reads the existing manifest via `load_manifest`, recovers `env_name` +
`package_source` from the install section, runs `_generate_manifest` to
refresh the code section, then post-writes to preserve the original
`installed_at` so the regenerate only updates `regenerated_at` semantically.
Always emits v2.0 layout.
"""
def _generate_adapter_manifest(
"""
Introspect a task-adapter impl in-env and write its adapter manifest (CR-17 pt 2).
The non-typer core shared by the `generate-adapter-manifest` command and
`install-all`'s per-capability `adapters:` entries (stage 6 J10 -- the I6/J8
install-pipeline gap: adapter installation + registration ride the SAME
pipeline as capability installation, never manual afterthoughts).
Raises ValueError on a malformed target and CalledProcessError when the
in-env introspection fails; callers decide the exit posture.
"""
def generate_adapter_manifest(
env_name: str = typer.Argument(..., help="Conda env containing the adapter impl (the tool's worker env)"),
target: str = typer.Argument(..., help="Adapter impl spec 'module:ClassName'"),
)
"""
CR-17 pt 2 (stage 4): introspect a task-adapter impl in-env and write its adapter manifest.
The adapter manifest is the REGISTRATION unit (pass-2 Thread 3): task_name
+ required_tool_protocol members (names + parameter lists + signatures)
recorded IN-ENV where the protocol is importable, so host-side
compatibility matching works against UNLOADED capabilities with zero
protocol imports host-side. Written to the same manifests dir capability
manifests live in; `discover_manifests()` routes by the `unit` key.
Thin typer wrapper over `_generate_adapter_manifest` (stage 6 J10:
install-all runs the same core for per-capability `adapters:` entries).
"""
def _conda_env_exists_configured(
env_name: str # Name of the conda environment
) -> bool: # True if environment exists
"Check if conda environment exists using configured conda command."
def install_all(
capabilities_path:Optional[str]=typer.Option(None, "--capabilities", help="Path to capabilities.yaml (default: cjm.yaml capabilities_config)"),
substrate_source:str=typer.Option("cjm-substrate", "--substrate-source", help="Substrate package spec installed into every worker env (default: published; pass a path or '-e <path>' for local-editable dev)"),
force:bool=typer.Option(False, help="Force recreation of environments")
) -> None
"""
Install and register all capabilities defined in capabilities.yaml.
Per-capability `adapters:` entries ride the same pipeline (stage 6 J10; closes
the I6/J8 manual-step gap): each entry's `lib` is pip-installed into the
worker env alongside the interface libs, and each `impl`
('module:ClassName') gets its adapter manifest generated right after the
capability manifest -- INSTALL puts code in envs, REGISTRATION is per-unit
manifests (pass-2 Thread 3), one command does both.
"""
def setup_host(
capabilities_path:str=typer.Option("capabilities.yaml", "--capabilities", help="Path to capabilities.yaml file"),
yes:bool=typer.Option(False, "--yes", "-y", help="Skip confirmation prompt")
) -> None
"Install interface libraries in the current Python environment."
def _format_size(
size_bytes: int # Size in bytes
) -> str: # Human-readable size string
"Format bytes as human-readable string."
def _get_pypi_size(
package_spec: str # Package name or git URL
) -> tuple[int, str]: # (size_bytes, package_name)
"""
Query PyPI for package download size.
SG-37: PyPI normalizes package names to dash-form per PEP 503, so we try
the dash form first and fall back to the underscore form. Without this,
cjm-* packages (whose repo names contain dashes) silently 404 because
the old heuristic only tried the underscored form.
"""
def _estimate_conda_size(
env_file: str, # Path or URL to environment.yml
env_name: str # Target environment name
) -> tuple[int, int]: # (total_bytes, package_count)
"Estimate conda package sizes using dry-run."
def _estimate_pip_sizes(
packages: list[str] # List of pip package specs
) -> tuple[int, int, list[tuple[str, int]]]: # (total_bytes, found_count, [(name, size), ...])
"Estimate pip package sizes from PyPI."
def _get_conda_envs() -> set[str]: # Set of existing conda environment names
"""Get list of existing conda environment names using configured conda command."""
cfg = get_config()
cmd_parts = build_conda_command(cfg, "env", "list", "--json")
try
"Get list of existing conda environment names using configured conda command."
def _get_installed_manifests(
manifest_dir:Optional[Path]=None # Directory to scan (uses config default if None)
) -> "list[ManifestV2]": # Typed capability manifests (adapter manifests skipped)
"""
Load installed capability manifests as typed `ManifestV2` objects.
Adapter manifests (routed by the `unit` discriminator) are skipped;
unreadable or unrecognized-format files are silently ignored.
"""
def _extract_env_from_python_path(
python_path:str # Path like /home/user/miniforge3/envs/my-env/bin/python
) -> str: # Extracted environment name or empty string
"Extract conda environment name from python_path."
def list_capabilities(
capabilities_path:Optional[str]=typer.Option(None, "--capabilities", help="Path to capabilities.yaml for cross-reference"),
show_envs:bool=typer.Option(False, "--envs", "-e", help="Show conda environment status")
) -> None
"List installed capabilities from manifest directory."
def _fmt_short(value: Optional[str], width: int = 8) -> str:
"""First `width` chars of an id, or '-' for None (display only)."""
return (value or "-")[:width]
"First `width` chars of an id, or '-' for None (display only)."
def _compact_payload(payload: Dict[str, Any], max_len: int = 160) -> str:
"""One-line payload rendering; the bulky job_snapshot collapses to its status."""
d = dict(payload or {})
snap = d.get("job_snapshot")
if isinstance(snap, dict)
"One-line payload rendering; the bulky job_snapshot collapses to its status."
def logs_command(
job:Annotated[Optional[str], typer.Option("--job", help="Filter to one queue job id (exact)")]=None,
run:Annotated[Optional[str], typer.Option("--run", help="Filter to one host run id (implies --journal)")]=None,
session:Annotated[Optional[str], typer.Option("--session", help="Filter to one worker session id")]=None,
level:Annotated[Optional[str], typer.Option("--level", help="Diagnostics level filter (e.g. WARNING)")]=None,
journal:Annotated[bool, typer.Option("--journal", help="Show the journal (account-of-action) instead of diagnostics records")]=False,
chunks:Annotated[bool, typer.Option("--chunks", help="Show raw stream chunks (death-rattle floor)")]=False,
limit:Annotated[int, typer.Option("--limit", "-n", help="Most recent N entries (0 = all)")]=50,
follow:Annotated[bool, typer.Option("--follow", "-f", help="Poll for new entries (Ctrl-C to stop)")]=False,
) -> None
"""
Tail / follow the observability stores (CR-14).
Default view: structured diagnostics records (worker logger output,
EXACTLY job-stamped via the call envelope). `--chunks`: the raw stream
pump. `--journal`: the durable account-of-action (job lifecycle, worker
spawn/death, admission, config, runs, worker-reported accounts).
`--follow` polls the store's seq cursor — exact, no byte offsets.
"""
def retention_command(
max_age_days:Annotated[Optional[float], typer.Option(
"--max-age-days", help="Delete diagnostics rows older than this (overrides cjm.yaml)")]=None,
max_total_mb:Annotated[Optional[float], typer.Option(
"--max-total-mb", help="Delete oldest rows until diagnostics.db is under this budget")]=None,
) -> None
"""
Apply the diagnostics retention policy now (CR-14).
The explicit half of the invocation policy (CapabilityManager's startup
sweep is the automatic half). Defaults come from `cjm.yaml`'s
`substrate.diagnostics_retention_days` / `diagnostics_retention_max_mb`.
The JOURNAL is never touched — it has no retention surface by design.
"""
def remove_capability(
capability_name:str=typer.Argument(..., help="Name of the capability to remove"),
capabilities_path:Optional[str]=typer.Option(None, "--capabilities", help="Path to capabilities.yaml for env name lookup"),
keep_env:bool=typer.Option(False, "--keep-env", help="Keep the conda environment, only remove manifest"),
yes:bool=typer.Option(False, "--yes", "-y", help="Skip confirmation prompt")
) -> None
"Remove a capability's manifest and conda environment."
def _validate_resources_block(
res: Any, # resources sub-dict (may be None or non-dict; we type-check here)
path_prefix: str, # Error message prefix
) -> List[str]
"Phase 5a: type-check the resources block. Shared between v1.0 and v2.0."
def _validate_manifest_v2_dict(
data: Dict[str, Any] # v2.0 nested manifest dict (caller already verified format_version)
) -> List[str]: # Empty list = valid
"""
CR-8: validate the nested v2.0 manifest layout.
Required sections: `install` + `code`. Optional sections: `drift_tracking`
(substrate emits it on every fresh write but legacy-via-load_manifest
upgrades leave it empty until the first regenerate); `overrides` (free-form
operator overlay).
Required `code.*` fields mirror v1.0's required top-level fields. Required
`install.python_path` mirrors v1.0's required top-level `python_path`.
"""
def _validate_manifest_dict(
data: Any # Loaded manifest JSON
) -> List[str]: # List of human-readable error messages (empty == valid)
"""
SG-6 + CR-8: structural validation, dispatching on `format_version`.
`format_version == "2.0"` validates the nested v2.0 layout. Any other
value (including a missing field — the legacy v1.0 flat shim was removed
at SG-48) rejects with a single error so unknown formats fail loud rather
than silently degrading.
"""
def _validate_capabilities_yaml_dict(
data: Any # Loaded capabilities.yaml content
) -> List[str]: # List of human-readable error messages (empty == valid)
"""
Structural validation of a capabilities.yaml file.
Each capability entry must have name + env_name + package, plus either env_file
or python_version (one defines how the conda env is created).
"""
def _collect_manifest_warnings(
data: Any # Loaded manifest JSON
) -> List[str]: # Human-readable warning strings (non-failing lints)
"""
T23: non-failing manifest lints (warnings, not errors).
- V4: a single-element `enum` in a config_schema property offers no operator
choice — the field should be dropped or its domain expanded.
- V12: quantitative resource fields (`min_gpu_vram_mb` / `recommended_gpu_vram_mb`
/ `min_system_ram_mb`) were dropped by the CR-7 reactive-resource reframe;
the substrate ignores them, so they are stale dead data.
Resolves the resources/config_schema location for both v2.0 (nested under
`code`) and legacy v1.0 (flat) layouts. The `validate` command prints these
without exiting non-zero (warnings alone don't fail validation).
"""
def _lint_capability_logging(
path: Path # A capability .py file or package directory to scan
) -> tuple: # (errors, warnings) — lists of human-readable findings
"""
T23 (CR-14): lint capability source for `logging.basicConfig` calls.
The substrate installs the worker's root handler
(`install_worker_diagnostics`) before capability code runs. A capability calling
`logging.basicConfig(force=True)` DESTROYS that handler (every
subsequent record silently bypasses the diagnostics store) -> ERROR.
A plain `basicConfig` call is a no-op once a handler exists — a fragile
pre-CR-14 idiom that suggests the capability expects to own process logging
-> WARNING. Directories scan their tree, skipping hidden dirs and
`tests_manual`/`_proc` (host-side scripts own their own logging).
"""
def _detect_manifest_format(
path: Path # File to inspect
) -> Optional[str]: # 'manifest' | 'capabilities_yaml' | None
"Auto-detect format: extension for files; directories lint as source."
def validate_file(
path:Path=typer.Argument(..., help="Manifest JSON, capabilities.yaml, or capability source (.py / package dir) to validate"),
format:Optional[str]=typer.Option(
None, "--format", "-f",
help="Override format detection: 'manifest', 'capabilities_yaml', or 'source'",
),
) -> None
"""
SG-6 + T23: validate a manifest / capabilities.yaml / capability source.
Auto-detects format from the path (`.json` → manifest, `.yaml`/`.yml` →
capabilities.yaml, `.py` or a directory → source lint). The source lint is
the CR-14 `logging.basicConfig` gate: `force=True` is an ERROR (it
destroys the substrate diagnostics handler), a plain call is a WARNING.
Exits non-zero with a list of validation errors if any check fails.
"""
def _open_secret_store():
"""Open the project-local LocalSecretStore at <data_dir>/secrets (CR-12)."""
from cjm_substrate.core.secret_store import LocalSecretStore
cfg = get_config()
data_dir = getattr(cfg, "data_dir", None)
secrets_dir = (data_dir / "secrets") if data_dir is not None else None
return LocalSecretStore(secrets_dir)
@app.command("set-secret")
def set_secret(
capability_name: str = typer.Argument(..., help="Capability name (manifest 'name', e.g. my-api-capability)"),
key: str = typer.Argument(..., help="Secret key = the env-var name the worker reads (e.g. MY_API_KEY)"),
value: Optional[str] = typer.Option(None, "--value", help="Secret value (omit to be prompted with hidden input)"),
scope: Optional[str] = typer.Option(None, "--scope", help="Reserved multi-user scope (default: single-user)"),
)
"Open the project-local LocalSecretStore at <data_dir>/secrets (CR-12)."
def set_secret(
capability_name: str = typer.Argument(..., help="Capability name (manifest 'name', e.g. my-api-capability)"),
key: str = typer.Argument(..., help="Secret key = the env-var name the worker reads (e.g. MY_API_KEY)"),
value: Optional[str] = typer.Option(None, "--value", help="Secret value (omit to be prompted with hidden input)"),
scope: Optional[str] = typer.Option(None, "--scope", help="Reserved multi-user scope (default: single-user)"),
)
"""
Store a capability secret in the project-local SecretStore (CR-12).
The value is written to <data_dir>/secrets/secrets.json (0600) — never to
capabilities.yaml, manifests, or the config store. Capabilities read it from their
worker env at spawn. Omit --value to be prompted (hidden input) so the
secret stays out of shell history. After setting, reload the capability (or
restart the host) so its worker respawns with the new env — the GUI /
CapabilityManager.set_capability_secret do this automatically.
"""
def list_secrets(
capability_name: str = typer.Argument(..., help="Capability name to list secret KEY NAMES for"),
scope: Optional[str] = typer.Option(None, "--scope", help="Reserved multi-user scope"),
)
"List the secret KEY NAMES stored for a capability — never the values (CR-12)."
Variables
_PYPI_404_CACHE: set[str]
Configuration (config.ipynb)
Project-level configuration for paths, runtime settings, and environment management
Import
from cjm_substrate.core.config import (
RuntimeMode,
CondaType,
RuntimeConfig,
SubstrateConfig,
CJMConfig,
load_config,
get_config,
set_config,
reset_config
)
Functions
def _load_from_yaml(
yaml_path:Path # Path to cjm.yaml file
) -> CJMConfig: # Parsed configuration
"Load config from YAML file, resolving relative paths."
def load_config(
config_path:Optional[Path]=None, # CLI --cjm-config
data_dir:Optional[Path]=None, # CLI --data-dir
conda_prefix:Optional[Path]=None, # CLI --conda-prefix
conda_type:Optional[str]=None # CLI --conda-type
) -> CJMConfig: # Resolved configuration
"Load config with layered resolution (CLI > env vars > yaml > defaults)."
def get_config() -> CJMConfig: # Current configuration
"""Get current config (loads defaults if not set)."""
global _current_config
if _current_config is None
"Get current config (loads defaults if not set)."
def set_config(
config:CJMConfig # Configuration to set as current
) -> None
"Set current config (called by CLI callback)."
def reset_config() -> None
"Reset to unloaded state (for testing)."
Classes
class RuntimeMode(str, Enum):
"Runtime mode for the capability system."
class CondaType(str, Enum):
"Type of conda implementation to use."
@dataclass
class RuntimeConfig:
"Runtime environment configuration."
mode: RuntimeMode = RuntimeMode.SYSTEM # LOCAL for project-local, SYSTEM for global
conda_type: CondaType = CondaType.CONDA # Conda implementation to use
prefix: Optional[Path] # Path to runtime directory (LOCAL mode only)
binaries: Dict[str, Path] = field(...) # Platform-specific binary paths
@dataclass
class SubstrateConfig:
"""
Substrate behavior toggles.
Loaded from the `substrate:` section of `cjm.yaml`. Each flag gates a
substrate-wide behavior that hosts can disable when they don't want the
per-load or per-execute cost.
- `drift_detection` (CR-8): per-load `/config_schema` HTTP call + hash
comparison against the manifest's stored hash. CapabilityManager's load
path branches around `_check_config_schema_drift` when False.
- `empirical_tracking` (CR-7): per-execute resource sample recording into
`EmpiricalResourceStore`. CapabilityManager skips `record_sample` calls when
False; the store's lazy-init also short-circuits.
- `prefetch_stall_threshold_seconds` (CR-4 / Session A 2026-05-27): how long
proxy.prefetch waits with no observed progress (via `/progress` polling)
before declaring a stall. Replaces per-capability wall-clock timeouts —
operators no longer race network speed against an arbitrary value. Capabilities
defeat the stall counter by calling `self.report_progress(...)` periodically
during long lifecycle operations (model download / vLLM server startup).
Default 60 s; bump higher for capabilities that don't report progress, or lower
if false-positive stalls are noisy.
"""
drift_detection: bool = True # Run /config_schema hash compare on every load_capability
empirical_tracking: bool = True # Record ResourceSample after every execute_capability*
prefetch_stall_threshold_seconds: float = 60.0 # CR-4 / Session A: stall detection threshold for proxy.prefetch
diagnostics_retention_days: float = 30.0 # CR-14 follow-up: age-based diagnostics retention; <=0 disables the startup sweep
diagnostics_retention_max_mb: Optional[float] # CR-14 follow-up: diagnostics.db size budget (None = no size-based deletion)
@dataclass
class CJMConfig:
"Main configuration for cjm-substrate."
runtime: RuntimeConfig = field(...) # Runtime environment settings
data_dir: Path = field(...) # Base directory for manifests, logs
capabilities_config: Path = field(...) # Path to capabilities.yaml file
models_dir: Optional[Path] # Directory for model downloads
substrate: SubstrateConfig = field(...) # CR-8 substrate behavior toggles
def manifests_dir(self) -> Path: # Directory containing capability manifests
"""Directory containing capability manifests."""
return self.data_dir / "manifests"
@property
def capability_data_dir(self) -> Path: # Directory for capability runtime data
"Directory containing capability manifests."
def capability_data_dir(self) -> Path: # Directory for capability runtime data
"""Directory for capability runtime data (databases, caches)."""
return self.data_dir / "data"
@property
def journal_db_path(self) -> Path: # Journal store (CR-14: durable account-of-action)
"Directory for capability runtime data (databases, caches)."
def journal_db_path(self) -> Path: # Journal store (CR-14: durable account-of-action)
"""Journal store path — the precious, host-written observability record."""
return self.data_dir / "journal.db"
@property
def diagnostics_db_path(self) -> Path: # Diagnostics store (CR-14: disposable narrative)
"Journal store path — the precious, host-written observability record."
def diagnostics_db_path(self) -> Path: # Diagnostics store (CR-14: disposable narrative)
"""Diagnostics store path — worker records + raw stream chunks; retention-managed."""
return self.data_dir / "diagnostics.db"
@property
def conda_binary_path(self) -> Optional[Path]: # Path to conda/micromamba binary or None
"Diagnostics store path — worker records + raw stream chunks; retention-managed."
def conda_binary_path(self) -> Optional[Path]: # Path to conda/micromamba binary or None
"""Get the configured binary path for the current platform."""
# Inline platform detection to avoid circular imports
system = platform_mod.system().lower()
machine = platform_mod.machine().lower()
if system == "windows"
"Get the configured binary path for the current platform."
Variables
_current_config: Optional[CJMConfig] = None
Capability Config Store (config_store.ipynb)
Persistent storage for per-capability configuration (with enabled flag)
Import
from cjm_substrate.core.config_store import (
CapabilityConfigRecord,
CapabilityConfigStore,
LocalCapabilityConfigStore
)
Functions
def _default_db_path() -> Path:
"""Default SQLite location: `~/.cjm/capability_configs.db`."""
return Path.home() / ".cjm" / "capability_configs.db"
class LocalCapabilityConfigStore
"Default SQLite location: `~/.cjm/capability_configs.db`."
@patch
@contextmanager
def _conn(self:LocalCapabilityConfigStore) -> Iterator[sqlite3.Connection]:
"""Open a connection, creating parent dirs + schema on demand."""
self.db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(self.db_path)
try
"Open a connection, creating parent dirs + schema on demand."
@patch
def get(
self:LocalCapabilityConfigStore,
capability_name: str # Capability to look up
) -> Optional[CapabilityConfigRecord]: # Persisted record or None if absent
"Fetch the record for a capability."
@patch
def set(
self:LocalCapabilityConfigStore,
capability_name: str, # Capability to write
record: CapabilityConfigRecord # New record (updated_at overwritten with current time)
) -> None
"Persist a record. Stamps `updated_at` to the current time."
@patch
def delete(
self:LocalCapabilityConfigStore,
capability_name: str # Capability to remove
) -> bool: # True if a row was deleted
"Remove the record for a capability."
@patch
def list_all(self:LocalCapabilityConfigStore) -> Dict[str, CapabilityConfigRecord]: # capability_name -> record
"""Return all stored records keyed by capability name."""
if not self.db_path.exists()
"Return all stored records keyed by capability name."
Classes
@dataclass
class CapabilityConfigRecord:
"Persisted state for a capability: config dict + enabled flag."
config: Dict[str, Any] = field(...) # Capability's current config values
enabled: bool = True # Whether the substrate should accept jobs for this capability
updated_at: float = 0.0 # Unix timestamp of the last write (server clock)
@runtime_checkable
class CapabilityConfigStore(Protocol):
"Protocol for persisting per-capability `CapabilityConfigRecord` across sessions."
def get(self, capability_name: str) -> Optional[CapabilityConfigRecord]:
"""Fetch the record for a capability, or None if no record exists yet."""
...
def set(self, capability_name: str, record: CapabilityConfigRecord) -> None
"Fetch the record for a capability, or None if no record exists yet."
def set(self, capability_name: str, record: CapabilityConfigRecord) -> None:
"""Persist a record. Overwrites any prior record for the same capability.
Implementations stamp `record.updated_at` to the current time during
the write so callers don't have to manage timestamps.
"""
...
def delete(self, capability_name: str) -> bool
"Persist a record. Overwrites any prior record for the same capability.
Implementations stamp `record.updated_at` to the current time during
the write so callers don't have to manage timestamps."
def delete(self, capability_name: str) -> bool:
"""Remove the record for a capability. Returns True if a record was deleted."""
...
def list_all(self) -> Dict[str, CapabilityConfigRecord]
"Remove the record for a capability. Returns True if a record was deleted."
def list_all(self) -> Dict[str, CapabilityConfigRecord]
"Return every stored record, keyed by capability name."
class LocalCapabilityConfigStore:
def __init__(self, db_path: Optional[Path] = None)
"""
SQLite-backed default implementation of `CapabilityConfigStore`.
The DB is created lazily on first write. Reads against a non-existent DB
return empty results rather than raising, so hosts can call `.get()` on
a fresh install without preparing the file first.
"""
def __init__(self, db_path: Optional[Path] = None)
"Initialize the store. `db_path=None` uses `~/.cjm/capability_configs.db`."
Variables
_SCHEMA = '\nCREATE TABLE IF NOT EXISTS capability_configs (\n capability_name TEXT PRIMARY KEY,\n config_json TEXT NOT NULL,\n enabled INTEGER NOT NULL DEFAULT 1,\n updated_at REAL NOT NULL\n)\n'
Diagnostics Store (diagnostics_store.ipynb)
CR-14 (stage 7): the disposable diagnostic-narrative class. Worker-written structured log records (substrate handler stamps contextvars identity — authors never supply attribution) + the host-pumped raw stream chunks (the zero-cooperation death-rattle floor). Retention is a QUERY, not file mechanics. Design ledger:
claude-docs/stage-7-evidence.md.
Import
from cjm_substrate.core.diagnostics_store import (
DiagnosticRecord,
StreamChunk,
DiagnosticsStore,
LocalDiagnosticsStore,
DiagnosticsLogHandler,
install_worker_diagnostics,
normalize_stream_line
)
Functions
@patch
@contextmanager
def _conn(self: LocalDiagnosticsStore) -> Iterator[sqlite3.Connection]:
"""Yield the persistent connection under the instance lock (lazy init:
parent dirs + WAL + schema on first use).
Same shape + rationale as `LocalJournalStore._conn` (stage-7 stress
catch: per-call close = WAL checkpoint = ~16 ms/append). Disposable
class on the WORKER hot path — `synchronous=NORMAL` is plenty.
"""
with self._lock
"""
Yield the persistent connection under the instance lock (lazy init:
parent dirs + WAL + schema on first use).
Same shape + rationale as `LocalJournalStore._conn` (stage-7 stress
catch: per-call close = WAL checkpoint = ~16 ms/append). Disposable
class on the WORKER hot path — `synchronous=NORMAL` is plenty.
"""
@patch
def append_record(
self: LocalDiagnosticsStore,
record: DiagnosticRecord, # Structured record to persist
) -> int: # Store-assigned seq
"Persist one structured record."
@patch
def append_chunk(
self: LocalDiagnosticsStore,
chunk: StreamChunk, # Raw stream line to persist
) -> int: # Store-assigned seq
"Persist one raw stream line."
@patch
def query_records(
self: LocalDiagnosticsStore,
job_id: Optional[str] = None, # EXACT job correlation (stamped at write)
worker_session_id: Optional[str] = None, # Session scope
level: Optional[str] = None, # Level name filter
after_seq: Optional[int] = None, # Tail cursor
limit: Optional[int] = None, # Max rows
descending: bool = False, # True = newest first
) -> List[DiagnosticRecord]: # Matching records, seq-ordered
"Filtered structured-record read."
@patch
def query_chunks(
self: LocalDiagnosticsStore,
worker_session_id: Optional[str] = None, # Session scope
after_seq: Optional[int] = None, # Tail cursor
limit: Optional[int] = None, # Max rows
descending: bool = False, # True = newest first
) -> List[StreamChunk]: # Matching chunks, seq-ordered
"Raw stream read, session-scoped."
@patch
def apply_retention(
self: LocalDiagnosticsStore,
max_age_days: Optional[float] = None, # Delete rows older than this
max_total_mb: Optional[float] = None, # Delete oldest rows until DB under budget
) -> Dict[str, int]: # {'records_deleted': n, 'chunks_deleted': m}
"""
Retention as a QUERY (the CR-14 reframe's mechanical payoff).
Age first, then size: oldest rows (both tables, interleaved by ts)
deleted in batches until the DB file is under budget. Safe against
concurrent writers (WAL; each batch is its own transaction).
"""
def install_worker_diagnostics() -> Optional[DiagnosticsLogHandler]:
"""Configure worker-process logging (replaces the old `basicConfig`).
Env contract (injected by the proxy at spawn):
- `CJM_DIAGNOSTICS_DB`: diagnostics store path -> install the handler.
- `CJM_WORKER_SESSION_ID`: spawn-scoped session id stamped on records.
- `CJM_LOG_LEVEL`: operator level control (default INFO) — the old
worker hardcoded INFO with no surface.
Without `CJM_DIAGNOSTICS_DB` (standalone/dev import) falls back to the
pre-CR-14 stdout `basicConfig` so nothing changes for direct runs.
Returns the installed handler (None on fallback).
"""
level_name = os.environ.get("CJM_LOG_LEVEL", "INFO").upper()
level = getattr(logging, level_name, logging.INFO)
db_path = os.environ.get("CJM_DIAGNOSTICS_DB")
if not db_path
"""
Configure worker-process logging (replaces the old `basicConfig`).
Env contract (injected by the proxy at spawn):
- `CJM_DIAGNOSTICS_DB`: diagnostics store path -> install the handler.
- `CJM_WORKER_SESSION_ID`: spawn-scoped session id stamped on records.
- `CJM_LOG_LEVEL`: operator level control (default INFO) — the old
worker hardcoded INFO with no surface.
Without `CJM_DIAGNOSTICS_DB` (standalone/dev import) falls back to the
pre-CR-14 stdout `basicConfig` so nothing changes for direct runs.
Returns the installed handler (None on fallback).
"""
def normalize_stream_line(
raw: str, # One decoded line (may contain \r progress frames)
) -> Optional[str]: # Final frame, or None when nothing durable remains
"Collapse CR progress frames to the final frame; drop empty results."
Classes
@dataclass
class DiagnosticRecord:
"One structured worker log record (CR-14 diagnostics class)."
message: str # record.getMessage() result
level: str = 'INFO' # Logging level name
logger_name: str = '' # Logger hierarchy name (restored — flat logs dropped it)
ts: datetime = field(...) # tz-aware UTC
worker_session_id: Optional[str] # Spawn-scoped session
job_id: Optional[str] # EXACT correlation via contextvars (None outside a call span)
exc_text: Optional[str] # Formatted traceback when the record carried exc_info
seq: Optional[int] # Store-assigned cursor
@dataclass
class StreamChunk:
"One raw stdout/stderr line the host pump captured (death-rattle floor)."
content: str # Decoded line content (tqdm CR-frames collapsed to final frame)
ts: datetime = field(...) # Capture time (host clock)
worker_session_id: Optional[str] # Session attribution (the honest unit)
stream: str = 'stdout' # Source stream (stderr merged into stdout today)
seq: Optional[int] # Store-assigned cursor
@runtime_checkable
class DiagnosticsStore(Protocol):
"Protocol for the disposable diagnostic-narrative store (CR-14)."
def append_record(self, record: DiagnosticRecord) -> int:
"""Persist one structured record; returns seq."""
...
def append_chunk(self, chunk: StreamChunk) -> int
"Persist one structured record; returns seq."
def append_chunk(self, chunk: StreamChunk) -> int:
"""Persist one raw stream line; returns seq."""
...
def query_records(
self,
job_id: Optional[str] = None,
worker_session_id: Optional[str] = None,
level: Optional[str] = None,
after_seq: Optional[int] = None,
limit: Optional[int] = None,
descending: bool = False,
) -> List[DiagnosticRecord]
"Persist one raw stream line; returns seq."
def query_records(
self,
job_id: Optional[str] = None,
worker_session_id: Optional[str] = None,
level: Optional[str] = None,
after_seq: Optional[int] = None,
limit: Optional[int] = None,
descending: bool = False,
) -> List[DiagnosticRecord]
"Filtered structured-record read; `job_id` is EXACT (stamped, not sliced)."
def query_chunks(
self,
worker_session_id: Optional[str] = None,
after_seq: Optional[int] = None,
limit: Optional[int] = None,
descending: bool = False,
) -> List[StreamChunk]
"Raw stream read, session-scoped."
def apply_retention(
self,
max_age_days: Optional[float] = None,
max_total_mb: Optional[float] = None,
) -> Dict[str, int]
"Delete old rows by age and/or size budget; returns deleted counts."
class LocalDiagnosticsStore:
def __init__(self, db_path: Optional[Path] = None):
"""`db_path=None` uses `~/.cjm/diagnostics.db`; workers receive the
host's path via the `CJM_DIAGNOSTICS_DB` env var at spawn."""
self.db_path = Path(db_path) if db_path is not None else Path.home() / ".cjm" / "diagnostics.db"
# Persistent lock-protected connection (stage-7 stress part-1 catch;
# see LocalJournalStore._conn): per-call open/close paid a WAL
"""
SQLite-backed default `DiagnosticsStore` (CR-14).
Many concurrent writers (workers + the host pump) -> WAL +
busy_timeout + per-call connections (no long-held handles; safe
from any thread). Disposable class: retention deletes are routine.
"""
def __init__(self, db_path: Optional[Path] = None):
"""`db_path=None` uses `~/.cjm/diagnostics.db`; workers receive the
host's path via the `CJM_DIAGNOSTICS_DB` env var at spawn."""
self.db_path = Path(db_path) if db_path is not None else Path.home() / ".cjm" / "diagnostics.db"
# Persistent lock-protected connection (stage-7 stress part-1 catch;
# see LocalJournalStore._conn): per-call open/close paid a WAL
"`db_path=None` uses `~/.cjm/diagnostics.db`; workers receive the
host's path via the `CJM_DIAGNOSTICS_DB` env var at spawn."
class DiagnosticsLogHandler:
def __init__(
self,
store: DiagnosticsStore, # Sink (LocalDiagnosticsStore in-process)
worker_session_id: Optional[str] = None, # Spawn-scoped session id
)
"""
Worker-side logging handler writing `DiagnosticRecord`s (CR-14).
Thread-safe via per-call connections (the worker runs capability execute
in an executor thread; contextvars propagate via copy_context at the
endpoint). Never raises into application code.
"""
def __init__(
self,
store: DiagnosticsStore, # Sink (LocalDiagnosticsStore in-process)
worker_session_id: Optional[str] = None, # Spawn-scoped session id
)
def emit(self, record: logging.LogRecord) -> None:
"""Write one record; job identity from the call-envelope contextvar."""
try
"Write one record; job identity from the call-envelope contextvar."
Variables
_DIAGNOSTICS_SCHEMA = "\nCREATE TABLE IF NOT EXISTS records (\n seq INTEGER PRIMARY KEY AUTOINCREMENT,\n ts TEXT NOT NULL,\n worker_session_id TEXT,\n job_id TEXT,\n level TEXT NOT NULL DEFAULT 'INFO',\n logger_name TEXT NOT NULL DEFAULT '',\n message TEXT NOT NULL,\n exc_text TEXT\n);\nCREATE INDEX IF NOT EXISTS idx_records_job ON records (job_id);\nCREATE INDEX IF NOT EXISTS idx_records_session ON records (worker_session_id);\nCREATE INDEX IF NOT EXISTS idx_records_ts ON records (ts);\nCREATE TABLE IF NOT EXISTS stream_chunks (\n seq INTEGER PRIMARY KEY AUTOINCREMENT,\n ts TEXT NOT NULL,\n worker_session_id TEXT,\n stream TEXT NOT NULL DEFAULT 'stdout',\n content TEXT NOT NULL\n);\nCREATE INDEX IF NOT EXISTS idx_chunks_session ON stream_chunks (worker_session_id);\nCREATE INDEX IF NOT EXISTS idx_chunks_ts ON stream_chunks (ts);\n"
Empirical Resource Tracking (empirical_store.ipynb)
Persistent store for empirically-observed resource usage per (instance_id, config_hash) pair. CR-7’s data foundation —
record_sampleis called fromCapabilityManager.execute_capability*finally blocks; aggregates feed eviction-candidate selection + future UI hints + cost-aware retry decisions.
Import
from cjm_substrate.core.empirical_store import (
compute_config_hash,
ResourceSample,
EmpiricalResourceRecord,
EmpiricalResourceStore,
LocalEmpiricalResourceStore
)
Functions
def compute_config_hash(
"""
CR-7: hash a capability instance's effective config for empirical-record keying.
Same canonicalization as CR-8's `compute_config_schema_hash` — sorted keys,
no whitespace, `"sha256:hex"` shape. None / empty configs hash deterministically
to the canonical-empty value so capabilities with no config still get a single
record per instance rather than scattering across hash-of-None edge cases.
"""
def _default_db_path() -> Path:
"""Default SQLite location: `~/.cjm/empirical_resources.db`.
Hosts using per-project `data_dir` (the intended pattern per CR-8 cascade_manifests
docs) override this by passing `db_path=cfg.data_dir / "empirical_resources.db"`
when constructing the store. CapabilityManager's lazy-init does this automatically.
"""
return Path.home() / ".cjm" / "empirical_resources.db"
class LocalEmpiricalResourceStore
"""
Default SQLite location: `~/.cjm/empirical_resources.db`.
Hosts using per-project `data_dir` (the intended pattern per CR-8 cascade_manifests
docs) override this by passing `db_path=cfg.data_dir / "empirical_resources.db"`
when constructing the store. CapabilityManager's lazy-init does this automatically.
"""
@patch
@contextmanager
def _conn(self:LocalEmpiricalResourceStore) -> Iterator[sqlite3.Connection]:
"""Open a connection, creating parent dirs + schema on demand."""
self.db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(self.db_path)
try
"Open a connection, creating parent dirs + schema on demand."
@patch
def record_sample(
self:LocalEmpiricalResourceStore,
instance_id: str, # CapabilityInstance.instance_id
capability_name: str, # CapabilityInstance.capability_name (denormalized for filtering)
config_hash: str, # compute_config_hash(inst.config)
sample: ResourceSample, # One observation
) -> None
"""
Fold a sample into the running aggregate. Creates a new row on first call.
Welford update for each mean. Max-of-peaks for memory metrics.
success_count incremented by 1 if sample.success else 0.
"""
@patch
def get_record(
self:LocalEmpiricalResourceStore,
instance_id: str,
config_hash: str,
) -> Optional[EmpiricalResourceRecord]
"Fetch the aggregated record for (instance_id, config_hash), or None."
@patch
def list_records(
self:LocalEmpiricalResourceStore,
capability_name: Optional[str] = None,
) -> List[EmpiricalResourceRecord]
"List all records, optionally filtered to a capability."
@patch
def delete_record(
self:LocalEmpiricalResourceStore,
instance_id: str,
config_hash: str,
) -> bool
"Remove a record. Returns True if a row was deleted."
Classes
class ResourceSample:
"""
Single observation captured after an execute call completes.
Frozen — substrate aggregates online via Welford's algorithm; no need to
keep raw samples around. `observed_at` is tz-aware per the CR-5 convention.
"""
@dataclass
class EmpiricalResourceRecord:
"Aggregated empirical resource profile for a (instance_id, config_hash) pair."
instance_id: str # CapabilityInstance.instance_id (CR-10 multi-instance aware)
capability_name: str # Convenience: CapabilityInstance.capability_name; derivable but cheap to denormalize
config_hash: str # compute_config_hash(inst.config) at sample time
sample_count: int # Number of ResourceSamples folded into this record
cpu_percent_mean: float # Welford running mean of cpu_percent
memory_mb_peak_max: float # max(sample.memory_mb_peak over all samples) — worst observed
memory_mb_peak_mean: float # Welford running mean of sample.memory_mb_peak
gpu_memory_mb_peak_max: float # max(sample.gpu_memory_mb_peak over all samples)
gpu_memory_mb_peak_mean: float # Welford running mean of sample.gpu_memory_mb_peak
duration_seconds_mean: float # Welford running mean of sample.duration_seconds
success_rate: float # success_count / sample_count
last_observed: datetime # tz-aware; tracks most recent ResourceSample.observed_at
api_usage_totals: Dict[str, float] = field(...) # SG-54: cumulative per-unit usage summed across runs (tokens/credits/pages/...); {} for compute-only capabilities
@runtime_checkable
class EmpiricalResourceStore(Protocol):
"""
Protocol for persisting empirically-observed resource usage.
Implementations aggregate online (Welford for means, max-of-peaks for memory).
No raw-sample retention required — v1 is one row per (instance_id, config_hash)
pair with running aggregates. A future implementation can add a samples table
if time-series queries become necessary.
"""
def record_sample(
self,
instance_id: str,
capability_name: str,
config_hash: str,
sample: ResourceSample,
) -> None
"Fold a sample into the running aggregate. Creates a new record on first call."
def get_record(
self,
instance_id: str,
config_hash: str,
) -> Optional[EmpiricalResourceRecord]
"Fetch the aggregated record for (instance_id, config_hash), or None."
def list_records(
self,
capability_name: Optional[str] = None,
) -> List[EmpiricalResourceRecord]
"List all records, optionally filtered to a single capability_name."
def delete_record(
self,
instance_id: str,
config_hash: str,
) -> bool
"Remove a record. Returns True if a row was deleted."
class LocalEmpiricalResourceStore:
def __init__(self, db_path: Optional[Path] = None):
"""Initialize the store. `db_path=None` uses `~/.cjm/empirical_resources.db`."""
self.db_path = Path(db_path) if db_path is not None else _default_db_path()
@staticmethod
def _row_to_record(row) -> EmpiricalResourceRecord
"""
SQLite-backed default implementation of `EmpiricalResourceStore`.
Online Welford aggregation for means; max-of-peaks for memory metrics.
success_rate computed at read time from `success_count / sample_count`.
DB + schema created lazily on first write.
"""
def __init__(self, db_path: Optional[Path] = None):
"""Initialize the store. `db_path=None` uses `~/.cjm/empirical_resources.db`."""
self.db_path = Path(db_path) if db_path is not None else _default_db_path()
@staticmethod
def _row_to_record(row) -> EmpiricalResourceRecord
"Initialize the store. `db_path=None` uses `~/.cjm/empirical_resources.db`."
Variables
_SCHEMA = "\nCREATE TABLE IF NOT EXISTS empirical_resources (\n instance_id TEXT NOT NULL,\n capability_name TEXT NOT NULL,\n config_hash TEXT NOT NULL,\n sample_count INTEGER NOT NULL DEFAULT 0,\n success_count INTEGER NOT NULL DEFAULT 0,\n cpu_percent_mean REAL NOT NULL DEFAULT 0.0,\n memory_mb_peak_max REAL NOT NULL DEFAULT 0.0,\n memory_mb_peak_mean REAL NOT NULL DEFAULT 0.0,\n gpu_memory_mb_peak_max REAL NOT NULL DEFAULT 0.0,\n gpu_memory_mb_peak_mean REAL NOT NULL DEFAULT 0.0,\n duration_seconds_mean REAL NOT NULL DEFAULT 0.0,\n last_observed TEXT NOT NULL,\n api_usage_totals TEXT NOT NULL DEFAULT '{}',\n PRIMARY KEY (instance_id, config_hash)\n)\n"
Capability Error Taxonomy (errors.ipynb)
Typed exception hierarchy + JobError dataclass + default classification of bare Python exceptions. The substrate’s CR-5 implementation per the 2026-05-19 substrate audit.
Import
from cjm_substrate.core.errors import (
CapabilityError,
CapabilityInputError,
CapabilityTransientError,
CapabilityResourceError,
CapabilityFatalError,
CapabilityDisabledError,
CapabilityNotLoadedError,
CapabilityTimeoutError,
CapabilityCancelledError,
WorkerOOMError,
CapabilityConfigError,
ResourceShortfall,
TracebackPolicy,
JobError,
classify_exception,
map_bare_exception_to_job_error
)
Functions
def classify_exception(
exc: BaseException # The exception to classify
) -> "Literal['user_input', 'transient', 'resource', 'fatal']": # Category
"""
Return the substrate category for any exception.
CapabilityError subclasses report their own declared `category`. Bare Python
exceptions are mapped via `__mro__` walk against `_BARE_EXCEPTION_CATEGORY_MAP`;
the first ancestor in the table wins. Unrecognized exceptions classify as
`fatal` (don't auto-retry the unknown).
"""
def map_bare_exception_to_job_error(
exc: BaseException, # The raised exception
*,
capability_name: Optional[str] = None, # Name of the capability that raised
capability_instance_id: Optional[str] = None, # Per CR-10
traceback_policy: TracebackPolicy = TracebackPolicy.FULL, # How much detail to record
occurred_at: Optional[datetime] = None, # Override; defaults to datetime.now(timezone.utc)
) -> JobError
"""
Convert any exception into a structured `JobError`.
CapabilityError subclasses contribute their category-specific structured data
(`fields_invalid` for input errors, `resource_shortfall` for resource errors,
`retry_after_seconds` for transient errors). Bare exceptions get the
default category-based retriable flag and no structured side-channel.
"""
Classes
class CapabilityError(Exception):
"""
Base for substrate-recognized capability exceptions.
Subclasses declare a `category` and `default_retriable` ClassVar so the
JobQueue + scheduler can route the failure without sniffing exception
text. Bare Python exceptions raised by capability code go through
`map_bare_exception_to_job_error` to acquire a default category.
"""
class CapabilityInputError:
def __init__(
self,
message: str, # Human-readable description
*,
fields_invalid: Optional[List[str]] = None, # Names of inputs that failed validation
)
"""
User-fixable error: bad config, invalid argument, missing file.
Like the other category bases (`CapabilityTransientError`,
`CapabilityResourceError`, `CapabilityFatalError`), it extends only
`CapabilityError`; the right reader intent is `except CapabilityInputError:`
(or the broader `except CapabilityError:`).
"""
def __init__(
self,
message: str, # Human-readable description
*,
fields_invalid: Optional[List[str]] = None, # Names of inputs that failed validation
)
class CapabilityTransientError:
def __init__(
self,
message: str, # Human-readable description
*,
retry_after_seconds: Optional[float] = None, # Hint for backoff strategies
)
"""
Temporary failure: timeout, network blip, brief resource contention.
Substrate / JobQueue may retry on its own initiative. Capability authors raise
this when they know the failure is recoverable.
"""
def __init__(
self,
message: str, # Human-readable description
*,
retry_after_seconds: Optional[float] = None, # Hint for backoff strategies
)
class CapabilityResourceError:
def __init__(
self,
message: str, # Human-readable description
*,
resource_shortfall: Optional["ResourceShortfall"] = None, # Quantitative gap
)
"""
Resource exhaustion: GPU VRAM, system RAM, disk full.
JobQueue's reactive-eviction flow (CR-7) routes resource errors to retry
after attempting to free the named resource. Capability authors set
`resource_shortfall` so the substrate knows what to evict.
"""
def __init__(
self,
message: str, # Human-readable description
*,
resource_shortfall: Optional["ResourceShortfall"] = None, # Quantitative gap
)
class CapabilityFatalError(CapabilityError):
"""
Bug / irrecoverable state. The capability cannot complete this job; retrying won't help.
Capability authors raise this when they know the failure is permanent for the
given inputs. The substrate does NOT retry fatal errors.
"""
class CapabilityDisabledError:
def __init__(self, capability_name: str)
"""
JobQueue / execute_capability rejected: the capability is currently disabled.
User-fixable (re-enable the capability). Raised by CR-2's enable/disable
wiring once that lands.
"""
def __init__(self, capability_name: str)
class CapabilityNotLoadedError:
def __init__(self, capability_name: str)
"""
Caller submitted to a capability that was never loaded.
Fatal category because this is a programmer / orchestration bug, not a
user-fixable condition. The right reader intent is
`except CapabilityNotLoadedError:` (or the broader `except CapabilityError:`).
"""
def __init__(self, capability_name: str)
class CapabilityTimeoutError:
def __init__(
self,
capability_name: str,
timeout_seconds: float,
*,
retry_after_seconds: Optional[float] = None,
)
"""
A per-job timeout fired before the capability finished.
Transient category — retry may succeed if the slow operation completes faster
next time. Carries `retry_after_seconds` from `CapabilityTransientError`.
Raised by SG-14's per-job timeout primitive when that lands.
"""
def __init__(
self,
capability_name: str,
timeout_seconds: float,
*,
retry_after_seconds: Optional[float] = None,
)
class CapabilityCancelledError:
def __init__(self, capability_name: str)
"""
Cooperative cancellation signal raised from `ToolCapability.check_cancel()`.
Anchors under `CapabilityTransientError` because cancellation is in-principle
re-runnable — a future attempt with the same inputs won't auto-fail if the
cancel flag isn't set. But `default_retriable` is False: cancellation was
a deliberate operator action, so the substrate should NOT auto-retry.
Job-monitor / JobQueue render cancelled jobs with their own state
(separate from "failed"); the JobError category remains `transient` so
consumers reading the typed taxonomy can group recoverable signals.
Capability authors raise this implicitly via `self.check_cancel()` inside
`execute()`; substrate sets the underlying `_cancel_requested` flag via
`cancel()`. See CR-4's cancellation primitives for the cooperative-cancel
protocol.
"""
def __init__(self, capability_name: str)
class WorkerOOMError:
def __init__(
self,
capability_name: str,
*,
process_returncode: Optional[int] = None,
message: Optional[str] = None,
)
"""
The worker subprocess died with a kill-signal during an active execute call.
CR-7 Track A — substrate-side OOM detection: when an HTTP call to the worker
faults and the subprocess has died with `returncode == -signal.SIGKILL` (or
the platform equivalent), the substrate raises this. The kernel OOM-killer
is the most common cause of SIGKILL during normal execute paths, so the
substrate treats SIGKILL-during-call as "assume OOM" and surfaces a typed
resource error for the reactive retry path.
`resource_shortfall` is `None` for Track A — the substrate only saw "worker
died from kill-signal" and has no per-resource needed/available numbers.
Track B (per SG-47's sub-task: capability-side wrapping of `torch.cuda.OutOfMemoryError`
et al.) raises `CapabilityResourceError` directly with a populated
`ResourceShortfall` because the capability had the context. Both land at the
same `except CapabilityResourceError` site in CR-7's reactive retry loop.
`process_returncode` carries the observed exit code for debugging /
classification (e.g. operators can distinguish kernel-OOM SIGKILL from
other signals if they read it). Defaults to `None` for callers that don't
have it on hand.
"""
def __init__(
self,
capability_name: str,
*,
process_returncode: Optional[int] = None,
message: Optional[str] = None,
)
class CapabilityConfigError:
def __init__(
self,
message: str, # Human-readable description
*,
fields_invalid: Optional[List[str]] = None, # Canonical: list of bad config keys
config_class_name: str = "", # Dataclass / capability name for the schema
)
"""
Unknown / invalid keys in a config dict against a capability's config schema.
Reparented from `cjm_substrate.utils.validation` (Wave 2 / SG-8) under
CR-5. Inherits `CapabilityInputError`'s ValueError MRO automatically.
`config_class_name` is the dataclass / capability name whose schema was violated.
"""
def __init__(
self,
message: str, # Human-readable description
*,
fields_invalid: Optional[List[str]] = None, # Canonical: list of bad config keys
config_class_name: str = "", # Dataclass / capability name for the schema
)
@dataclass
class ResourceShortfall:
"Quantitative gap between what a capability needed and what was available."
resource: Literal['gpu_vram_mb', 'system_ram_mb', 'disk_mb'] # Which resource
needed: float # Amount the capability reported it needed
available: float # Amount actually available when the failure occurred
class TracebackPolicy(str, Enum):
"How much exception detail the substrate records on a JobError."
@dataclass
class JobError:
"""
Structured failure summary recorded on a completed Job.
Populated by the JobQueue when a capability execution fails (CR-6 owns the
population logic; CR-5 owns the shape). Sufficient for UI to render a
failure card + retry affordance without re-running the capability.
"""
category: Literal['user_input', 'transient', 'resource', 'fatal']
message: str # Human-readable error message
retriable: bool # Whether the substrate considers this safe to auto-retry
original_exc_repr: str # repr(exc) of the original exception
traceback: Optional[str] # Full traceback per TracebackPolicy
retry_after_seconds: Optional[float] # Backoff hint from CapabilityTransientError
fields_invalid: Optional[List[str]] # From CapabilityInputError subclasses
resource_shortfall: Optional[ResourceShortfall] # From CapabilityResourceError
capability_name: Optional[str] # Name of the capability that raised
capability_instance_id: Optional[str] # Per CR-10 multi-instance support
occurred_at: Optional[datetime] # When the failure was recorded
Variables
_BARE_EXCEPTION_CATEGORY_MAP: "dict[type, Literal['user_input', 'transient', 'resource', 'fatal']]"
_CATEGORY_RETRIABLE_DEFAULTS: 'dict[str, bool]'
Content Hashing Utilities (hashing.ipynb)
Shared cryptographic hashing primitives for content integrity verification
Import
from cjm_substrate.utils.hashing import (
hash_bytes,
hash_file,
verify_hash,
hash_dict_canonical
)
Functions
def hash_bytes(
content: bytes, # Byte content to hash
algo: str = "sha256" # Hash algorithm name (e.g., "sha256", "sha3_256")
) -> str: # Hash string in "algo:hexdigest" format
"Compute a hash of byte content."
def hash_file(
path: Union[str, Path], # Path to file to hash
algo: str = "sha256", # Hash algorithm name
chunk_size: int = 8192 # Read chunk size in bytes
) -> str: # Hash string in "algo:hexdigest" format
"Stream-hash a file without loading it entirely into memory."
def verify_hash(
content: bytes, # Content to verify
expected: str # Expected hash in "algo:hexdigest" format
) -> bool: # True if content matches expected hash
"Verify content against an expected hash string."
def hash_dict_canonical(
data: Optional[Dict[str, Any]], # Dict to hash (or None — treated as {})
algo: str = "sha256", # Hash algorithm name
) -> str: # Hash string in "algo:hexdigest" format
"""
Hash a dict via canonical JSON encoding.
Canonicalization: `json.dumps(data, sort_keys=True, separators=(",", ":"))`.
Sorted keys eliminate dict-insertion-order variance; minimal separators
eliminate whitespace variance. Result is deterministic across Python
versions and machines.
"""
Journal Store (journal_store.ipynb)
CR-14 (stage 7): the durable account-of-action. One substrate-derived, host-written, never-auto-deleted SQLite store of typed observability events — the operational half of the attempted-vs-happened asymmetry (the graph records what HAPPENED; the journal records what was ATTEMPTED, including everything the graph by design refuses to contain: failures, refusals, retries, admission decisions, worker lifecycle). Design ledger:
claude-docs/stage-7-evidence.md.
Import
from cjm_substrate.core.journal_store import (
LIVENESS_EVENT_TYPES,
SubstrateEventType,
JournalEvent,
JournalStore,
LocalJournalStore
)
Functions
@patch
@contextmanager
def _conn(self: LocalJournalStore) -> Iterator[sqlite3.Connection]:
"""Yield the persistent connection under the instance lock (lazy init:
parent dirs + WAL + schema on first use).
Stage-7 stress catch: the previous per-call connect/close shape paid a
WAL checkpoint on every close (~16 ms/append — 25x over the design's
latency claim). `synchronous=NORMAL` is the standard WAL pairing
(durable to process crash; an OS/power crash may lose only the most
recent commits — the wedge gate covers append FAILURES, which stay
loud). `check_same_thread=False` + the lock makes any-thread use safe.
"""
with self._lock
"""
Yield the persistent connection under the instance lock (lazy init:
parent dirs + WAL + schema on first use).
Stage-7 stress catch: the previous per-call connect/close shape paid a
WAL checkpoint on every close (~16 ms/append — 25x over the design's
latency claim). `synchronous=NORMAL` is the standard WAL pairing
(durable to process crash; an OS/power crash may lose only the most
recent commits — the wedge gate covers append FAILURES, which stay
loud). `check_same_thread=False` + the lock makes any-thread use safe.
"""
@patch
def append(
self: LocalJournalStore,
event: JournalEvent, # Event to persist
) -> int: # Store-assigned seq (cursor)
"""
Persist one event; sets and returns `event.seq`.
LOUD by contract: sqlite errors propagate (the audit trail never
degrades silently — ratified design #13). One tiny WAL INSERT;
synchronous on purpose (G4: the dispatch fast path must stay
predictable; at substrate event volume this is microseconds).
"""
@patch
def query(
self: LocalJournalStore,
job_id: Optional[str] = None, # Filter: job correlation
run_id: Optional[str] = None, # Filter: host-tier run
composition_id: Optional[str] = None, # Filter: composition
capability_instance_id: Optional[str] = None, # Filter: instance
worker_session_id: Optional[str] = None, # Filter: worker session
event_type: Optional[str] = None, # Filter: one vocabulary value
after_seq: Optional[int] = None, # Tail cursor: rows with seq > this
since_ts: Optional[datetime] = None, # Filter: ts >= (isoformat compare)
until_ts: Optional[datetime] = None, # Filter: ts <= (isoformat compare)
limit: Optional[int] = None, # Max rows
descending: bool = False, # True = newest first
) -> List[JournalEvent]: # Matching events, seq-ordered
"Filtered read; all filters AND-combined."
@patch
def count(
self: LocalJournalStore,
event_type: Optional[str] = None, # Optional per-type count
) -> int: # Row count
"Total journal rows (volume regression checks)."
@patch
def terminal_state_events(
self: LocalJournalStore,
limit: Optional[int] = None, # Most recent N (None = all)
) -> List[JournalEvent]: # Terminal STATE_TRANSITION rows, newest first
"""
The durable job history (`_history` migration rider): terminal
STATE_TRANSITION rows whose payload carries the job snapshot.
"""
Classes
class SubstrateEventType(str, Enum):
"""
Journal vocabulary beyond the job-scoped `JobEventType` set (CR-14).
Reserved up front (emission progressive). Job-scoped types stay in
`core.queue.JobEventType`; both serialize to plain strings in the
journal's `event_type` column — the journal is vocabulary-tolerant
by design (unknown types round-trip; the P5/P6 tolerant-unknown law).
"""
@dataclass
class JournalEvent:
"""
One durable observability record (CR-14).
The journal never duplicates what manifests / capability DBs / the graph
already record — graph-touching payloads carry REFERENCES (node ids +
content hashes, verifiable via the CR-19 machinery), never content.
`worker_reported=True` marks payloads that originated in-worker and rode
a wire envelope; the HOST still wrote the row (single-writer-class rule).
"""
event_type: str # JobEventType.value or SubstrateEventType.value (vocabulary-tolerant)
event_id: str = field(...) # Generated occurrence id (EventRef anchor)
ts: datetime = field(...) # Substrate-stamped, tz-aware UTC
run_id: Optional[str] # Host-tier run correlation (core run manifests)
job_id: Optional[str] # Queue job correlation
composition_id: Optional[str] # Stage-3 composition correlation
node_id: Optional[str] # Composition node correlation
capability_instance_id: Optional[str] # CR-10 instance correlation
capability_name: Optional[str] # Denormalized for filtering
config_hash: Optional[str] # Effective config at event time (CR-7 keying)
task_name: Optional[str] # Task-channel address (stage 4)
method: Optional[str] # Task-channel method (stage 4)
worker_session_id: Optional[str] # Spawn-scoped worker session (replaces ctime markers)
actor: Optional[str] # Who/what initiated (operator / agent / host id)
worker_reported: bool = False # Payload originated in-worker (rode the wire); host wrote the row
payload: Dict[str, Any] = field(...) # Per-event-type structured detail
seq: Optional[int] # Store-assigned cursor (rowid); None until appended
@runtime_checkable
class JournalStore(Protocol):
"""
Protocol for the durable account-of-action (CR-14).
Implementations MUST raise on append failure (loud, never silent —
the audit trail does not degrade quietly) and MUST NOT expose a
delete/retention surface (precious class).
"""
def append(self, event: JournalEvent) -> int:
"""Persist one event; returns the store-assigned seq (cursor)."""
...
def query(
self,
job_id: Optional[str] = None,
run_id: Optional[str] = None,
composition_id: Optional[str] = None,
capability_instance_id: Optional[str] = None,
worker_session_id: Optional[str] = None,
event_type: Optional[str] = None,
after_seq: Optional[int] = None,
since_ts: Optional[datetime] = None,
until_ts: Optional[datetime] = None,
limit: Optional[int] = None,
descending: bool = False,
) -> List[JournalEvent]
"Persist one event; returns the store-assigned seq (cursor)."
def query(
self,
job_id: Optional[str] = None,
run_id: Optional[str] = None,
composition_id: Optional[str] = None,
capability_instance_id: Optional[str] = None,
worker_session_id: Optional[str] = None,
event_type: Optional[str] = None,
after_seq: Optional[int] = None,
since_ts: Optional[datetime] = None,
until_ts: Optional[datetime] = None,
limit: Optional[int] = None,
descending: bool = False,
) -> List[JournalEvent]
"Filtered read; all filters AND-combined; `after_seq` is the tail cursor."
def count(self, event_type: Optional[str] = None) -> int:
"""Total rows (optionally per type) — volume regression checks."""
...
def terminal_state_events(self, limit: Optional[int] = None) -> List[JournalEvent]
"Total rows (optionally per type) — volume regression checks."
def terminal_state_events(self, limit: Optional[int] = None) -> List[JournalEvent]
"STATE_TRANSITION rows whose payload `to` is terminal — the durable
job history (the `_history` migration rider)."
class LocalJournalStore:
def __init__(self, db_path: Optional[Path] = None):
"""`db_path=None` uses `~/.cjm/journal.db`; CapabilityManager passes
`cfg.journal_db_path` (project-scoped) automatically."""
self.db_path = Path(db_path) if db_path is not None else Path.home() / ".cjm" / "journal.db"
# Persistent lock-protected connection (stage-7 stress part-1 catch)
"""
SQLite-backed default `JournalStore` (CR-14).
WAL + busy_timeout for multi-process host writers; per-call
connections (sibling-store convention). `append` raises on failure
(loud) — callers never wrap it in a silent try/except.
"""
def __init__(self, db_path: Optional[Path] = None):
"""`db_path=None` uses `~/.cjm/journal.db`; CapabilityManager passes
`cfg.journal_db_path` (project-scoped) automatically."""
self.db_path = Path(db_path) if db_path is not None else Path.home() / ".cjm" / "journal.db"
# Persistent lock-protected connection (stage-7 stress part-1 catch)
"`db_path=None` uses `~/.cjm/journal.db`; CapabilityManager passes
`cfg.journal_db_path` (project-scoped) automatically."
Variables
LIVENESS_EVENT_TYPES: frozenset
_JOURNAL_SCHEMA = "\nCREATE TABLE IF NOT EXISTS journal (\n seq INTEGER PRIMARY KEY AUTOINCREMENT,\n event_id TEXT NOT NULL UNIQUE,\n ts TEXT NOT NULL,\n event_type TEXT NOT NULL,\n run_id TEXT,\n job_id TEXT,\n composition_id TEXT,\n node_id TEXT,\n capability_instance_id TEXT,\n capability_name TEXT,\n config_hash TEXT,\n task_name TEXT,\n method TEXT,\n worker_session_id TEXT,\n actor TEXT,\n worker_reported INTEGER NOT NULL DEFAULT 0,\n payload TEXT NOT NULL DEFAULT '{}'\n);\nCREATE INDEX IF NOT EXISTS idx_journal_job ON journal (job_id);\nCREATE INDEX IF NOT EXISTS idx_journal_run ON journal (run_id);\nCREATE INDEX IF NOT EXISTS idx_journal_comp ON journal (composition_id);\nCREATE INDEX IF NOT EXISTS idx_journal_type_ts ON journal (event_type, ts);\nCREATE INDEX IF NOT EXISTS idx_journal_instance_ts ON journal (capability_instance_id, ts);\nCREATE INDEX IF NOT EXISTS idx_journal_wsession ON journal (worker_session_id);\n"
Capability Manager (manager.ipynb)
Capability discovery, loading, and lifecycle management system
Import
from cjm_substrate.core.manager import (
CapabilityManager,
CapabilityBinding
)
Functions
def _start_diagnostics_retention_sweep(self) -> None:
"""CR-14 follow-up: host-startup diagnostics retention sweep.
The invocation half of the retention policy (`cjm-ctl retention` is the
other): fire-and-forget daemon thread so `__init__` stays fast (slow-init
discipline) and a large backlog never delays capability loading. Disabled
when `cfg.substrate.diagnostics_retention_days <= 0` and no size budget
is set. Best-effort: a sweep failure logs at WARNING — the diagnostics
class is disposable; the JOURNAL has no retention surface at all.
"""
try
"""
CR-14 follow-up: host-startup diagnostics retention sweep.
The invocation half of the retention policy (`cjm-ctl retention` is the
other): fire-and-forget daemon thread so `__init__` stays fast (slow-init
discipline) and a large backlog never delays capability loading. Disabled
when `cfg.substrate.diagnostics_retention_days <= 0` and no size budget
is set. Best-effort: a sweep failure logs at WARNING — the diagnostics
class is disposable; the JOURNAL has no retention surface at all.
"""
def register_system_monitor(
self,
capability_name:str # Name of the system monitor capability
) -> None
"Bind a loaded capability to act as the hardware system monitor."
def _resolve_system_monitor(
self,
) -> Optional[Any]: # The bound system-monitor proxy, or None
"""
Return the system monitor, lazily binding from the constructor's
`sysmon_capability_name` when `register_system_monitor` was never called.
Stage-3 G11: requiring a SEPARATE `register_system_monitor()` call after
load was a trap every core CLI fell into — GPU subtree ATTRIBUTION worked
(the JobQueue queries its own `sysmon_capability_name` directly) while the
stats path silently returned `{}`, so the scheduler quantity checks AND
the stage-3 admission ladder saw no telemetry and every GPU-profiled job
ran exclusive. The constructor parameter now expresses the full intent.
"""
def _get_global_stats(self) -> Dict[str, Any]: # Current system telemetry
"""Fetch real-time stats from the system monitor capability (sync).
CR-3: prefer typed `get_system_status()` over magic-string dispatcher.
Duck-types because the substrate references `system_monitor` as a
generic `ToolCapability` — CR-1's host-no-imports rule means substrate
does not import the monitor capability to type-narrow the reference.
Proxies after CR-3 expose `get_system_status` as a bound method that
POSTs to `/get_system_status` and returns `Optional[Dict[str, Any]]`.
"""
monitor = self._resolve_system_monitor()
if not monitor
"""
Fetch real-time stats from the system monitor capability (sync).
CR-3: prefer typed `get_system_status()` over magic-string dispatcher.
Duck-types because the substrate references `system_monitor` as a
generic `ToolCapability` — CR-1's host-no-imports rule means substrate
does not import the monitor capability to type-narrow the reference.
Proxies after CR-3 expose `get_system_status` as a bound method that
POSTs to `/get_system_status` and returns `Optional[Dict[str, Any]]`.
"""
async def _get_global_stats_async(self) -> Dict[str, Any]: # Current system telemetry
"""Fetch real-time stats from the system monitor capability (async).
Same CR-3 duck-type semantics as the sync variant. Async variant exists
because the substrate's `execute_capability_async` path (CR-2 + CR-10) needs
a non-blocking stats fetch when scheduling under an asyncio event loop.
"""
monitor = self._resolve_system_monitor()
if not monitor
"""
Fetch real-time stats from the system monitor capability (async).
Same CR-3 duck-type semantics as the sync variant. Async variant exists
because the substrate's `execute_capability_async` path (CR-2 + CR-10) needs
a non-blocking stats fetch when scheduling under an asyncio event loop.
"""
async def get_global_stats(self) -> Dict[str, Any]: # Current system telemetry
"""
Public async system-telemetry accessor (stage 3 / CR-16).
The JobQueue's multi-lane admission consumes this through the
`JobQueueDependencies` protocol (defensively via getattr). Thin wrapper
over `_get_global_stats_async` — same CR-3 duck-type semantics.
"""
def get_admission_profile(
self,
name_or_id:str # Capability name (default instance) or instance_id (multi-instance)
) -> Optional[Dict[str, Any]]: # {'gpu_memory_mb_peak_max','memory_mb_peak_max','sample_count'} or None
"""
Empirical resource profile for a loaded instance's CURRENT config
(stage 3 / CR-16 multi-lane admission).
Reads the CR-7 empirical store at the instance's live
`(instance_id, config_hash)` key — the SAME keying that records samples,
so the profile always describes the configuration actually being run
(a config change = a new hash = no record = the queue runs the job
EXCLUSIVE until its first measurement run graduates it).
None = no evidence (instance unknown / store disabled / no record for
this config). The manifest's `requires_gpu` is deliberately not part of
this surface — GPU use is an empirical fact, not a declaration (stage-3
ledger G2).
"""
def get_instance_concurrency_cap(
self,
name_or_id:str # Capability name (default instance) or instance_id (multi-instance)
) -> Optional[int]: # The instance's SG-33 max_concurrent_requests (None = unset)
"""
Per-instance concurrency cap for queue admission (stage 3 / CR-16).
Surfaces the SG-33 `max_concurrent_requests` setting. The queue treats
None as 1 (same-worker concurrency is OPT-IN per capability — e.g.
ffmpeg raises its cap because its sync endpoints run in a threadpool
and concurrent converts genuinely parallelize as subprocesses; model
workers stay serial-per-instance).
"""
def _parse_resources(
self,
manifest: Dict[str, Any] # Loaded manifest dict
) -> Optional[ResourceRequirements]
"Phase 5a: parse the manifest's resources block into a ResourceRequirements."
def discover_manifests(self) -> List[CapabilityMeta]: # List of discovered capability metadata
"""Discover capabilities via JSON manifests in search paths.
CR-8: reads each manifest via `load_manifest`, which parses the v2.0
nested layout into a typed `ManifestV2`.
`meta.manifest` is set to a flat-shaped dict view so existing consumers
(proxy, scheduling, execute path) continue working unchanged; the typed
`ManifestV2` is also attached as `meta.manifest_v2` so drift detection
+ future typed callers can read `drift_tracking.config_schema_hash`
without re-parsing.
"""
self.discovered = []
self.adapter_manifests = [] # CR-17 pt 2: adapter units discovered beside capabilities
"""
Discover capabilities via JSON manifests in search paths.
CR-8: reads each manifest via `load_manifest`, which parses the v2.0
nested layout into a typed `ManifestV2`.
`meta.manifest` is set to a flat-shaped dict view so existing consumers
(proxy, scheduling, execute path) continue working unchanged; the typed
`ManifestV2` is also attached as `meta.manifest_v2` so drift detection
+ future typed callers can read `drift_tracking.config_schema_hash`
without re-parsing.
"""
def get_adapters_for_task(
self,
task_name: str, # Task name, e.g. "graph-storage"
) -> List[AdapterManifest]: # Discovered adapter units serving the task
"CR-17 pt 2: the adapter-registry view — discovered adapter manifests for a task."
def check_adapter_compatibility(
self,
adapter: Union[str, AdapterManifest], # Adapter unit name or manifest
capability_name: str, # Discovered capability (capability) name
) -> Dict[str, Any]: # Match verdict (see match_protocol_against_surface)
"""
CR-17 pt 2: surface-based compatibility verdict (host-side; works against
UNLOADED capabilities — manifest-vs-manifest, no protocol imports host-side).
Matches the adapter's recorded protocol members against the capability
manifest's recorded `structural_surface` (pass-2 Thread 3: the capability
records only itself; the adapter declares the protocol; the substrate
matches). A capability without a recorded surface (pre-fracture manifest)
is NOT compatible until its manifest regenerates — staleness stays visible
instead of silently mis-answering.
"""
def get_capabilities_compatible_with(
self,
adapter: Union[str, AdapterManifest], # Adapter unit name or manifest
) -> List[str]: # Discovered capability names whose surface satisfies the protocol
"CR-17 pt 2: the pass-2 compatibility query, manifest-surface-based."
def _resolve_adapter_specs(
self,
capability_meta, # Capability CapabilityMeta being loaded
adapters=None, # Explicit adapter unit names (loud refusal on mismatch); None = auto-bind compatibles
) -> List[str]: # Worker specs "module:ClassName"
"""
CR-17 pt 2: resolve which adapter impls bind in-worker at spawn.
AUTO (adapters=None): every discovered adapter whose protocol members match
the capability's recorded surface binds silently — binding rides
`load_capability` with no separate manual call (the G11 lesson: a manual
registration step no CLI makes is silently inert).
EXPLICIT (adapters=[names]): each named unit is verified; an incompatible
pairing REFUSES LOUDLY with the missing members in the message (the CR-17
negative check).
"""
def get_capability_meta(
self,
capability_name:str # Name of the capability
) -> Optional[CapabilityMeta]: # Capability metadata or None
"Get metadata for a loaded capability by name."
def get_discovered_meta(
self,
capability_name:str # Name of the capability
) -> Optional[CapabilityMeta]: # Capability metadata or None
"Get metadata for a discovered (not necessarily loaded) capability by name."
def _extract_defaults_from_schema(
self,
config_schema:Optional[Dict[str, Any]] # JSON Schema with properties
) -> Dict[str, Any]: # Default values extracted from schema
"Extract default values from a JSON Schema's properties."
def _validate_config_against_schema(
"""
SG-5: validate a config dict against the manifest's `config_schema`
before forwarding to the capability's `initialize()`.
"""
def _check_config_schema_drift(
self,
proxy:Any, # RemoteCapabilityProxy with a live worker
capability_meta:CapabilityMeta, # Metadata to flag if drift is detected
) -> None
"""
SG-9 + CR-8: compare live worker `/config_schema` to the stored hash.
Reads the stored hash from `capability_meta.manifest_v2.drift_tracking.config_schema_hash`
(populated by `discover_manifests`). Computes the live hash with
`compute_config_schema_hash` and compares — drift = hashes differ.
Honors `cfg.substrate.drift_detection` opt-out from `cjm.yaml`: hosts
that don't want the per-load `/config_schema` HTTP call can disable
detection there. Default is on.
Test fixtures that stub `meta.manifest = {}` without going through
`discover_manifests` won't have a `manifest_v2` attribute; the
`getattr(..., None)` fallback yields `stored_hash=None`, which doesn't
match any live hash — those tests don't expose a real proxy so the
drift warning fires harmlessly.
"""
def _check_structural_surface_drift(
self,
proxy:Any, # RemoteCapabilityProxy with a live worker
capability_meta:CapabilityMeta, # Metadata to flag if drift is detected
) -> None
"""
Pass-2 Thread 3 (stage 2): compare the worker's live-derived structural
surface to the manifest's stored witness hash — third instance of the
CR-8 hashed-witness + live-companion idiom (after config_schema and the
compatibility-transport protocol membership it superseded).
Stage-4 adapter compatibility matches `required_tool_protocol` against
the RECORDED surface, so a stale recording silently mis-answers
compatibility queries — this check is what makes that visible.
Skips silently when: drift detection is opted out (same cjm.yaml switch
as config-schema drift); the manifest predates surface recording
(stored hash None — `regenerate-manifest` adds it); or the worker
predates the /structural_surface endpoint (proxy returns None).
"""
def _persist_config(
self,
capability_name: str # Capability to persist
) -> None
"""
CR-2: write current CapabilityMeta state + live worker config to the store.
Reads `meta.enabled` (the substrate-authoritative flag) and the worker's
current_config (when reachable). Failures are logged + swallowed —
persistence is a best-effort side-channel, not a correctness invariant.
"""
def _maybe_fire_disable_hook(
self,
name_or_id: str # instance_id (or legacy capability_name) whose in-flight job just finished
) -> None
"""
CR-2 + CR-10: fire deferred on_disable for `name_or_id` if pending.
Idempotent. Resolves via self.instances first; falls back to
self.capabilities[name].instance for legacy code paths.
"""
def _validate_instance_id(self, instance_id: str) -> None:
"""Reject malformed explicit instance_ids at load time.
Pattern: alphanumeric + underscore + hyphen, length 1..64. Raises
ValueError on invalid input so the caller sees the constraint failure
immediately rather than at first execute / unload.
"""
import re as _re
if not isinstance(instance_id, str)
"""
Reject malformed explicit instance_ids at load time.
Pattern: alphanumeric + underscore + hyphen, length 1..64. Raises
ValueError on invalid input so the caller sees the constraint failure
immediately rather than at first execute / unload.
"""
def _generate_instance_id(self, capability_name: str) -> str:
"""Generate a unique instance_id of form `{capability_name}-{6-char-hex}`.
Used when load_capability is called with new_instance=True and no explicit
instance_id. Retries up to 16 times if a collision occurs in self.instances.
"""
import secrets as _secrets
for _ in range(16)
"""
Generate a unique instance_id of form `{capability_name}-{6-char-hex}`.
Used when load_capability is called with new_instance=True and no explicit
instance_id. Retries up to 16 times if a collision occurs in self.instances.
"""
def get_instance(
self,
name_or_id: str # Capability name (default-loaded) or explicit instance_id
) -> Optional[CapabilityInstance]
"""
Return the CapabilityInstance for `name_or_id`, or None if not loaded.
Lookup is keyed by instance_id (which equals capability_name for default-
loaded capabilities). Multi-instance IDs only exist in self.instances.
"""
def list_instances(
"List all loaded instances, optionally filtered by underlying capability name."
def _worker_env_specs(
self,
capability_meta: CapabilityMeta # Capability whose WORKER_ENV contract to read
) -> List[Dict[str, Any]]: # List of EnvVarSpec-as-dict entries (possibly empty)
"""
Return a capability's WORKER_ENV contract as spec dicts (CR-12).
Prefers the typed manifest_v2 code section; falls back to the flat manifest
dict view. Empty list when the capability declares no worker-env contract.
"""
def _resolve_worker_env(
self,
capability_meta: CapabilityMeta, # Capability being loaded
scope: Optional[str] = None # SG-55 forward seam: per-principal scope (None = single-user)
) -> Dict[str, str]: # {ENV_NAME: value} overlay injected into the worker at spawn
"""
CR-12 + Q1-A: compose the resolved worker-env overlay for a load.
Secrets resolve from the SecretStore keyed by capability_name — so every
instance of a capability shares one credential (CR-10: two Gemini instances,
one GEMINI_API_KEY). A missing secret is OMITTED (the worker spawns without
it; the capability reports the gap at execute) rather than injected empty.
Visible vars resolve from their declared `default`, with Q1-A template
substitution applied: a default like ``"${CJM_MODELS_DIR}/huggingface"``
expands to an absolute path using the substrate's current `cfg.models_dir`
+ `cfg.capability_data_dir`. Static defaults (no `${...}` syntax) pass through
unchanged. A template-substitution failure — unknown placeholder (capability
author bug) OR unresolved value (operator hasn't configured
`cfg.models_dir`) — is WARN-and-OMIT: the worker still spawns, and the
capability can surface the gap via `missing_required_env()` if the field was
declared `required=True`. This matches secret omission behaviour
(operator-side concerns don't break load; the capability signals at execute).
Capability-author-bug-class errors (unknown placeholders) surface at
install/release time via `cjm-ctl validate` + `template_check_placeholders`,
not here. All values are fixed at spawn — a change requires `reload_capability`.
"""
def get_worker_env_status(
self,
name_or_meta: Any, # Capability name (loaded/discovered) or a CapabilityMeta
scope: Optional[str] = None # SG-55 forward seam
) -> List[Dict[str, Any]]: # Per-entry status dicts (secret values never returned)
"""
CR-12: per-entry satisfaction status of a capability's worker-env contract.
Each entry: {name, secret, required, satisfied, label, description}.
`satisfied` means a value is resolvable (secret present in the store, or a
visible var has a default/override). Secret VALUES are never returned — only
whether one is set. The capability-config UI uses this to gate config display on
required secrets being satisfied.
"""
def missing_required_env(
self,
name_or_meta: Any, # Capability name or CapabilityMeta
scope: Optional[str] = None # SG-55 forward seam
) -> List[str]: # Names of required worker-env entries with no resolvable value
"CR-12: names of required worker-env entries that are unsatisfied."
def set_capability_secret(
self,
name_or_id: str, # Capability name or instance_id whose secret to set
key: str, # Secret key (the env-var name, e.g. "GEMINI_API_KEY")
value: str, # Secret value (stored via the SecretStore, never config/logs)
*,
scope: Optional[str] = None, # SG-55 forward seam: per-principal scope
reload: bool = True # Respawn loaded worker(s) so the new env is injected
) -> bool: # True if the secret was stored
"""
CR-12: store a capability secret, then respawn its worker(s) to inject it.
Secrets are keyed by the underlying CAPABILITY name (not instance_id), so all
instances of a capability share one credential — set the Gemini key once and
every Gemini instance gets it at (re)spawn. Because worker env is fixed at
spawn, the new value only reaches a *running* worker via a RESPAWN, so this
reloads each loaded instance of the capability (unless `reload=False`, e.g. when
provisioning a secret before the capability is loaded). This is the
actuation seam both the CLI (`cjm-ctl set-secret`) and a future config UI
call. Reload failures are logged, not raised.
"""
def load_capability(
self,
capability_meta:CapabilityMeta, # Capability metadata (with manifest attached)
config:Optional[Dict[str, Any]]=None, # Initial configuration
strict:bool=True, # SG-5: reject unknown keys against manifest config_schema (default)
instance_id:Optional[str]=None, # CR-10: explicit instance_id; None defaults to capability_name
new_instance:bool=False, # CR-10: auto-generate `{name}-{hex}` instance_id (with instance_id=None)
max_concurrent_requests:Optional[int]=None, # SG-33 (CR-7): per-instance async concurrency cap; None = unbounded
adapters:Optional[List[str]]=None # CR-17 pt 2: explicit adapter unit names (loud refusal on mismatch); None = auto-bind discovered compatibles
) -> bool: # True if successfully loaded
"""
Load a capability by spawning a Worker subprocess.
CR-2: reads the persisted CapabilityConfigRecord from `self.config_store`
before launching the worker. If a persisted record exists and the
caller didn't pass an explicit config, the persisted config is used
as the effective input. The persisted `enabled` flag is applied to
`capability_meta.enabled` so disabled capabilities stay disabled across
process restarts.
CR-10: optional `instance_id` allows multi-instance loading.
- instance_id=None, new_instance=False (default): instance_id =
capability_meta.name. Populates self.capabilities[capability_name] + self.instances
[capability_name] together (single-instance backward compat).
- instance_id="custom": validated against `[A-Za-z0-9_-]{1,64}`. Populates
self.instances[custom]. Persistence is keyed by capability_name and only
applied to the default instance.
- instance_id=None, new_instance=True: auto-generates `{name}-{6-hex}`.
Idempotent: re-load against an existing instance_id returns True without
re-spawning.
CR-7: computes `config_hash` from the effective config (post-defaults +
post-validation) and stores it on the CapabilityInstance so execute_capability*
can key empirical samples by (instance_id, config_hash). SG-33 stores
`max_concurrent_requests` on the instance — the actual asyncio.Semaphore
is lazy-created in execute_capability_async via `_get_concurrent_limiter`.
"""
def load_all(
self,
configs:Optional[Dict[str, Dict[str, Any]]]=None # Capability name -> config mapping
) -> Dict[str, bool]: # Capability name -> success mapping
"Discover and load all available capabilities."
def unload_capability(
self,
name_or_id:str # Capability name (default-loaded) or instance_id (multi-instance)
) -> bool: # True if successfully unloaded
"""
Unload a capability instance and terminate its Worker process (CR-10).
If name_or_id resolves to the default instance (instance_id == capability_name)
and no other instances remain for the same capability, also removes the
CapabilityMeta from self.capabilities. Otherwise removes only the instance and
clears CapabilityMeta.instance if it pointed at the unloaded canonical.
"""
def unload_all(self) -> None:
"""Unload all capability instances and terminate all Worker processes (CR-10).
Iterates self.instances (CR-10 keying) rather than self.capabilities so all
multi-instance entries get torn down, not just the canonical instances.
"""
for inst_id in list(self.instances.keys())
"""
Unload all capability instances and terminate all Worker processes (CR-10).
Iterates self.instances (CR-10 keying) rather than self.capabilities so all
multi-instance entries get torn down, not just the canonical instances.
"""
def get_capability(
self,
name_or_id:str # Capability name (default-loaded) or instance_id (multi-instance)
) -> Optional[ToolCapability]: # Capability proxy instance or None
"""
Get a loaded capability's proxy by name or instance_id (CR-10).
Lookup order: self.instances first (covers both default capability_name and
multi-instance IDs), falling back to CapabilityMeta.instance for any
legacy code path that populated self.capabilities without self.instances
(defensive — shouldn't happen post-CR-10 since load_capability always
records the instance).
"""
def list_capabilities(self) -> List[CapabilityMeta]: # List of loaded capability metadata
"List all loaded capabilities."
def _get_sysmon_capability(self) -> Optional[Any]:
"""Resolve the configured monitor capability (CR-3) for GPU subtree attribution.
Returns the loaded capability instance keyed by `sysmon_capability_name`, or
None when no sysmon is configured / hasn't been loaded yet. Lazy
resolution against `self.capabilities` tolerates load-order: the manager
can be constructed before the sysmon capability is loaded; later
`_record_sample_safe` calls pick it up automatically.
"""
name = getattr(self, "_sysmon_capability_name", None)
if not name
"""
Resolve the configured monitor capability (CR-3) for GPU subtree attribution.
Returns the loaded capability instance keyed by `sysmon_capability_name`, or
None when no sysmon is configured / hasn't been loaded yet. Lazy
resolution against `self.capabilities` tolerates load-order: the manager
can be constructed before the sysmon capability is loaded; later
`_record_sample_safe` calls pick it up automatically.
"""
def _record_sample_safe(self, inst:CapabilityInstance, start_time:float, success:bool) -> None
"""
CR-7: best-effort empirical sample recording.
Captures worker stats at end-of-execute (proxy of peak), builds a
ResourceSample, and records it via the EmpiricalResourceStore. Failures
log + swallow — sample recording must never break the execute path
(matches CR-2's `_persist_config` best-effort discipline).
Stats fetch can fail naturally (e.g. worker died with WorkerOOMError —
the proxy is unreachable). The sample still records with zero stats +
success=False so we have a record of the failed attempt for the
success_rate aggregate.
GPU memory is attributed across the worker's process subtree via
`attribute_gpu_to_worker_subtree` (intersecting worker-reported
`subtree_pids` with sysmon's per-PID GPU enumeration). Pre-fix this
function read `worker_stats["gpu_memory_mb"]` — a key the worker `/stats`
endpoint NEVER emits — so EmpiricalResourceRecord.gpu_memory_mb_peak_max
was silently 0 for every capability since CR-7 shipped, not just for
subprocess-spawning ones. When no sysmon is configured, GPU memory
records as 0.0 (honest signal that we can't measure it).
"""
def _get_concurrent_limiter(self, instance_id:str) -> Optional[asyncio.Semaphore]:
"""SG-33 (CR-7): lazy-create the per-instance asyncio.Semaphore.
Returns None when the instance has no `max_concurrent_requests` set (the
default — unbounded). Otherwise creates the semaphore on first call and
caches it in `self._concurrent_limiters`. Semaphores are bound to the
event loop they were created in; lazy creation inside `execute_capability_async`
ensures we're inside the right loop at construction time (Python 3.10+
semaphore-loop-binding rules).
Defensive: returns None if the manager was constructed via __new__ without
`_concurrent_limiters` being populated (test-fixture pattern).
"""
limiters = getattr(self, '_concurrent_limiters', None)
if limiters is None
"""
SG-33 (CR-7): lazy-create the per-instance asyncio.Semaphore.
Returns None when the instance has no `max_concurrent_requests` set (the
default — unbounded). Otherwise creates the semaphore on first call and
caches it in `self._concurrent_limiters`. Semaphores are bound to the
event loop they were created in; lazy creation inside `execute_capability_async`
ensures we're inside the right loop at construction time (Python 3.10+
semaphore-loop-binding rules).
Defensive: returns None if the manager was constructed via __new__ without
`_concurrent_limiters` being populated (test-fixture pattern).
"""
def _reactive_evict_for(
self,
needed_meta:CapabilityMeta,
shortfall:Optional[Any]=None, # Optional ResourceShortfall from Track B; informational only
) -> bool
"""
CR-7: try to free resources after a CapabilityResourceError during execute.
Wraps `_evict_for_resources` with reactive-flow logging. `_evict_for_resources`
itself extends to multi-axis + cost-aware candidate selection (drops the
GPU-only filter, prefers evicting empirically-expensive idle capabilities).
`shortfall` is recorded for log context but doesn't currently steer
candidate selection beyond what _evict_for_resources already does via
its `needed_meta.resources` check. A future enhancement could pass it
through for axis-specific candidate filtering.
"""
def _evict_for_resources(self, needed_meta:CapabilityMeta) -> bool
"""
Attempt to free resources by unloading/releasing idle capabilities (LRU).
CR-7: extended from GPU-only LRU to multi-axis cost-aware eviction.
- Candidate set: any loaded capability that isn't the one we're allocating
for (drops the pre-CR-7 `requires_gpu` filter).
- Sort key: primary = idle (older last_executed first, classic LRU);
secondary = empirical cost when available (highest peak gets evicted
first among equally-idle candidates). Cost axis follows the needed
capability's `resources.requires_gpu` flag — GPU peak when we're freeing
for a GPU capability, system memory peak otherwise.
Without empirical data (no store / unmeasured capability), the secondary
key is 0.0 and pure LRU applies. Cost-aware selection is opt-in via
`empirical_tracking: true`.
"""
def execute_capability(
self,
name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
*args,
_task_name:Optional[str]=None, # CR-17 pt 2: route via the task channel (adapter task) instead of execute
_method:Optional[str]=None, # CR-17 pt 2: adapter method (set with _task_name)
**kwargs
) -> Any: # Capability result
"""
Execute a capability instance's main functionality (sync).
CR-10: resolves `name_or_id` via self.instances; per-instance enabled
flag gates execution. `_running_executions` tracks by instance_id so
concurrent multi-instance executes don't collide.
CR-2: raises CapabilityDisabledError (typed) when the instance is disabled.
CR-7: reactive retry on CapabilityResourceError — evicts other capabilities to
free resources, then ALWAYS reloads the failing capability's worker before
the retry attempt. Track A (WorkerOOMError — worker died from SIGKILL)
needs the reload because there's no live worker to retry on. Track B
(capability-raised CapabilityResourceError — worker still alive) ALSO reloads
because PyTorch's CUDA caching allocator can fragment post-OOM in ways
the capability can't clean up from within its own process; a fresh worker
is the only reliable reset. Bounded by `self.max_retries` (default 1).
Empirical sample recorded in the finally block — best-effort, doesn't
break execute on failure.
"""
async def execute_capability_async(
self,
name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
*args,
_task_name:Optional[str]=None, # CR-17 pt 2: route via the task channel (adapter task) instead of execute
_method:Optional[str]=None, # CR-17 pt 2: adapter method (set with _task_name)
**kwargs
) -> Any: # Capability result
"""
Execute a capability instance's main functionality (async).
CR-10 + CR-2: same semantics as execute_capability, async-flavored. Scheduler
allocation goes through allocate_async for non-blocking polling.
CR-7 + SG-33: reactive retry on CapabilityResourceError — always reloads
before retry (Track A + Track B converge on the same reload path; see
sync variant docstring for the rationale). Per-instance asyncio.Semaphore
enforces the `max_concurrent_requests` cap (None = unbounded). Empirical
sample recorded in the finally block.
"""
def execute_capability_task(
self,
name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
task_name:str, # Adapter task, e.g. "graph-storage"
method:str, # Adapter method, e.g. "query_nodes"
**kwargs
) -> Any: # Typed task result
"""
CR-17 pt 2: execute a typed task-adapter method (explicit task channel; sync).
Thin wrapper over `execute_capability` — the whole CR-7 retry / scheduler /
empirical-sampling machinery applies identically to task-channel calls.
"""
async def execute_capability_task_async(
self,
name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
task_name:str, # Adapter task, e.g. "graph-storage"
method:str, # Adapter method, e.g. "query_nodes"
**kwargs
) -> Any: # Typed task result
"""
CR-17 pt 2: execute a typed task-adapter method (explicit task channel; async).
Thin wrapper over `execute_capability_async` — CR-7 retry, SG-33 semaphore,
admission and empirical sampling apply identically; this is the method
the JobQueue's task-addressed jobs invoke.
"""
def enable_capability(
self,
name_or_id:str # Capability name (default instance) or instance_id (multi-instance)
) -> bool: # True if instance was enabled
"""
Enable a capability instance (CR-10 multi-instance aware).
CR-2: persists the new state via `config_store` (default-instance only;
persistence is per-capability, not per-instance) and fires the capability's
on_enable hook on state-change. Idempotent for already-enabled instances.
"""
def disable_capability(
self,
name_or_id:str # Capability name (default instance) or instance_id (multi-instance)
) -> bool: # True if instance was disabled
"""
Disable a capability instance without unloading it (CR-10 multi-instance aware).
CR-2: persists the new state (default-instance only) and fires the
capability's on_disable hook — but defers the hook until any in-flight job
for THIS instance finishes (the per-instance `_running_executions` key
is the instance_id, so a concurrent execute on a different instance of
the same capability doesn't gate this instance's hook).
"""
def get_capability_diagnostics(
"""
Render a capability's recent diagnostics as text (CR-14; replaces
the retired flat-log accessor — the flat `.cjm/logs/*.log` files no longer exist).
A convenience TEXT projection over the diagnostics store for operator /
UI display: structured records (level + logger name + exact job id when
stamped) merged with the raw stream chunks (prints / tqdm final frames /
death rattles) from this capability's worker sessions, ordered by time.
Programmatic consumers query the stores directly
(`manager.diagnostics_store` / `JobQueue.get_job_diagnostics`).
"""
def get_capability_config(
self,
capability_name: str # Name of the capability
) -> Optional[Dict[str, Any]]: # Current configuration or None
"Get the current configuration of a capability."
def get_capability_config_schema(
self,
capability_name: str # Name of the capability
) -> Optional[Dict[str, Any]]: # JSON Schema or None
"Get the configuration JSON Schema for a capability."
def get_config_options(
self,
name_or_id: str # Capability name (default instance) or instance_id (multi-instance)
) -> Dict[str, Any]: # CR-11: live config option domains, or {} if unavailable
"""
Get a capability instance's runtime config option providers (CR-11).
Forwards to the worker's get_config_options() - live enum domains +
per-option metadata for dynamic config fields (e.g. an API model list).
Kept separate from get_capability_config_schema (static, hashed for CR-8 drift);
these options are the live companion the capability-config UI merges on top.
Degrades to {} if the instance is missing or the worker call fails - the UI
then falls back to the static schema. Typed-error surfacing for the UI
consumer is deferred to the capability-config UI library (Path C Step 4).
"""
def get_all_capability_configs(self) -> Dict[str, Dict[str, Any]]: # Capability name -> config mapping
"""Get current configuration for all loaded capabilities."""
return {
name: capability.get_current_config()
"Get current configuration for all loaded capabilities."
def update_capability_config(
self,
name_or_id: str, # Capability name (default instance) or instance_id (multi-instance)
config: Dict[str, Any], # New configuration values
strict: bool = True # SG-5: reject unknown keys against manifest config_schema (default)
) -> bool: # True if successful
"""
Update a capability instance's configuration (CR-10 multi-instance aware).
CR-2: on successful reconfigure, persists the new config (default instance
only; multi-instance loads don't persist). Per-instance `inst.config` is
updated regardless.
SG-5: validates against the underlying capability's config_schema (per-capability,
not per-instance, so all instances share the same schema).
"""
def reload_capability(
self,
name_or_id: str, # Capability name (default instance) or instance_id (multi-instance)
config: Optional[Dict[str, Any]] = None # Optional new configuration
) -> bool: # True if successful
"Reload a capability instance by terminating and restarting its Worker (CR-10)."
def get_capability_stats(
self,
name_or_id: str # Capability name (default instance) or instance_id (multi-instance)
) -> Optional[Dict[str, Any]]: # Resource telemetry or None
"Get resource usage stats for a capability instance's Worker process (CR-10)."
async def execute_capability_stream(
self,
name_or_id: str, # Capability name (default instance) or instance_id (multi-instance)
*args,
**kwargs
) -> AsyncGenerator[Any, None]: # Async generator yielding results
"""
Execute a capability instance with streaming response (CR-10 multi-instance aware).
Same per-instance resolution as execute_capability_async; scheduler allocation
keys off the CapabilityMeta (capability-level), execution + bookkeeping key off
the CapabilityInstance (per-instance).
"""
async def load_capability_async(
self,
capability_meta: CapabilityMeta,
config: Optional[Dict[str, Any]] = None,
strict: bool = True,
instance_id: Optional[str] = None,
new_instance: bool = False,
) -> bool
"""
Async variant of `load_capability` (CR-10b).
Runs the existing sync `load_capability` via `asyncio.to_thread` so the
blocking proxy spawn + `_wait_for_ready` doesn't stall the event loop.
Backward compat: identical behavior to the sync method, just non-blocking.
"""
async def unload_capability_async(
self,
name_or_id: str,
) -> bool
"Async variant of `unload_capability` (CR-10b)."
def _spec_requested_key(spec: CapabilityLoadSpec, index: int) -> str:
"""Derive the dict key the load_capabilities_concurrent result uses for `spec`.
Resolution: explicit `instance_id` > `meta.name` + `#new[{index}]` suffix
for ambiguous new_instance=True specs > `meta.name`. The suffix prevents
key collision when multiple specs request a new instance of the same capability
without explicit instance_ids.
"""
if spec.instance_id is not None
"""
Derive the dict key the load_capabilities_concurrent result uses for `spec`.
Resolution: explicit `instance_id` > `meta.name` + `#new[{index}]` suffix
for ambiguous new_instance=True specs > `meta.name`. The suffix prevents
key collision when multiple specs request a new instance of the same capability
without explicit instance_ids.
"""
async def load_capabilities_concurrent(
self,
specs: List[CapabilityLoadSpec], # Per-capability load specifications
max_concurrency: Optional[int] = None, # Cap simultaneous loads; None = unbounded
fail_fast: bool = False, # Re-raise first exception (default: collect all results)
) -> Dict[str, Union[str, Exception]]: # requested_key → instance_id or Exception
"""
CR-10b: fan out capability loads concurrently via asyncio.gather.
Each spec is loaded via `load_capability_async` (`asyncio.to_thread` under the
hood). The total wall-clock drops from sum-of-spawns to max-of-spawns when
`max_concurrency=None`. Capped concurrency uses an asyncio.Semaphore.
Result keys come from `_spec_requested_key`: explicit `instance_id` if set,
`{capability_name}#new[{index}]` for ambiguous new_instance specs, else
`capability_name`. Successful entries map to the resolved instance_id (string);
failures map to the raised exception (caught regardless of fail_fast value
for non-fail-fast mode; re-raised in fail_fast=True).
"""
async def unload_capabilities_concurrent(
self,
name_or_ids: List[str], # Capability names or instance_ids to unload
max_concurrency: Optional[int] = None,
fail_fast: bool = False,
) -> Dict[str, Union[bool, Exception]]: # name_or_id → True or Exception
"""
CR-10b: fan out capability unloads concurrently via asyncio.gather.
Same concurrency + fail_fast semantics as load_capabilities_concurrent. Result
keys are the input `name_or_ids` (deduplication is the caller's
responsibility; duplicate inputs produce one dict entry per unique key).
"""
def bind(
self,
capability_name: str, # Name of the capability to pre-bind
default_config: Optional[Dict[str, Any]] = None # Default config used by binding.load()
) -> CapabilityBinding: # Bound view ready for instance-style use
"Create a CapabilityBinding pre-bound to this manager + capability_name."
def get_compatible_for_current_platform(self) -> List[CapabilityMeta]: # Capabilities compatible with current platform
"""Phase 5a: return discovered capabilities compatible with the host platform.
Filters by `resources.platforms`. Capabilities with an empty (or absent)
platforms list are considered universally compatible — that's the
introspection-time convention when a capability author didn't declare a
platform constraint. Capabilities lacking the entire `resources` block
(legacy / pre-Phase-5a manifests) also pass through as universal.
Does NOT filter on `requires_gpu` — substrate doesn't know whether a
GPU is present without invoking a system monitor capability. Callers gate
on GPU availability separately if needed.
"""
# Late import: platform module brings in subprocess + json; defer to call time.
"""
Phase 5a: return discovered capabilities compatible with the host platform.
Filters by `resources.platforms`. Capabilities with an empty (or absent)
platforms list are considered universally compatible — that's the
introspection-time convention when a capability author didn't declare a
platform constraint. Capabilities lacking the entire `resources` block
(legacy / pre-Phase-5a manifests) also pass through as universal.
Does NOT filter on `requires_gpu` — substrate doesn't know whether a
GPU is present without invoking a system monitor capability. Callers gate
on GPU availability separately if needed.
"""
Classes
class CapabilityManager:
def __init__(
self,
capability_interface:Type[ToolCapability]=ToolCapability, # Base interface for type checking
search_paths:Optional[List[Path]]=None, # Custom manifest search paths
scheduler:Optional[ResourceScheduler]=None, # Resource allocation policy
config_store:Optional[CapabilityConfigStore]=None, # CR-2: persistence backend; lazy LocalCapabilityConfigStore default per OQ-4
empirical_store:Optional[EmpiricalResourceStore]=None, # CR-7: resource-usage tracking backend; lazy LocalEmpiricalResourceStore when cfg.substrate.empirical_tracking
secret_store:Optional[SecretStore]=None, # CR-12: secret backend; lazy LocalSecretStore default (project-local <data_dir>/secrets)
max_retries:int=1, # CR-7: how many reactive retries to attempt on CapabilityResourceError (default 1 — one retry after eviction)
sysmon_capability_name:Optional[str]=None, # monitor capability (CR-3) name for GPU subtree attribution; default-None records skip GPU attribution (compute axis only)
journal_store:Optional[JournalStore]=None, # CR-14: durable account-of-action; lazy LocalJournalStore at <data_dir>/journal.db
diagnostics_store:Optional[DiagnosticsStore]=None # CR-14: disposable diagnostic narrative; lazy LocalDiagnosticsStore at <data_dir>/diagnostics.db
)
"Manages capability discovery, loading, and lifecycle via process isolation."
def __init__(
self,
capability_interface:Type[ToolCapability]=ToolCapability, # Base interface for type checking
search_paths:Optional[List[Path]]=None, # Custom manifest search paths
scheduler:Optional[ResourceScheduler]=None, # Resource allocation policy
config_store:Optional[CapabilityConfigStore]=None, # CR-2: persistence backend; lazy LocalCapabilityConfigStore default per OQ-4
empirical_store:Optional[EmpiricalResourceStore]=None, # CR-7: resource-usage tracking backend; lazy LocalEmpiricalResourceStore when cfg.substrate.empirical_tracking
secret_store:Optional[SecretStore]=None, # CR-12: secret backend; lazy LocalSecretStore default (project-local <data_dir>/secrets)
max_retries:int=1, # CR-7: how many reactive retries to attempt on CapabilityResourceError (default 1 — one retry after eviction)
sysmon_capability_name:Optional[str]=None, # monitor capability (CR-3) name for GPU subtree attribution; default-None records skip GPU attribution (compute axis only)
journal_store:Optional[JournalStore]=None, # CR-14: durable account-of-action; lazy LocalJournalStore at <data_dir>/journal.db
diagnostics_store:Optional[DiagnosticsStore]=None # CR-14: disposable diagnostic narrative; lazy LocalDiagnosticsStore at <data_dir>/diagnostics.db
)
"Initialize the capability manager."
@dataclass
class CapabilityBinding:
"""
Pre-bound view of a single capability through a shared CapabilityManager.
Eliminates the wrapper-class duplication audited across 8 consumer services
(SG-17). Methods forward to the manager with `capability_name` pre-supplied;
`default_config` is the fallback used when `load()` is called without an
explicit config (matches the manifest-default behavior in `load_capability`).
"""
manager: 'CapabilityManager' # The shared CapabilityManager
capability_name: str # Name of the capability this binding targets
default_config: Dict[str, Any] = _field(default_factory=dict) # Used when load() called without config
def meta(self) -> Optional[CapabilityMeta]:
"""The CapabilityMeta if the capability is loaded, else None."""
return self.manager.get_capability_meta(self.capability_name)
@property
def is_loaded(self) -> bool
"The CapabilityMeta if the capability is loaded, else None."
def is_loaded(self) -> bool:
"""True if the capability is loaded in the bound manager."""
return self.manager.get_capability(self.capability_name) is not None
@property
def is_enabled(self) -> bool
"True if the capability is loaded in the bound manager."
def is_enabled(self) -> bool:
"""True if the capability is loaded AND not currently disabled."""
m = self.meta
return m is not None and m.enabled
# --- Lifecycle ---
def load(
self,
config: Optional[Dict[str, Any]] = None, # Override default_config when provided
strict: bool = True # SG-5 strict validation
) -> bool: # True if loaded successfully
"True if the capability is loaded AND not currently disabled."
def load(
self,
config: Optional[Dict[str, Any]] = None, # Override default_config when provided
strict: bool = True # SG-5 strict validation
) -> bool: # True if loaded successfully
"Load via the bound manager. Uses `default_config` if no `config` provided."
def unload(self) -> bool: # True if unloaded
"""Unload the bound capability."""
return self.manager.unload_capability(self.capability_name)
def reload(
self,
config: Optional[Dict[str, Any]] = None # Optional new config; current config used if None
) -> bool
"Unload the bound capability."
def reload(
self,
config: Optional[Dict[str, Any]] = None # Optional new config; current config used if None
) -> bool
"Reload the bound capability (terminate + restart worker)."
def enable(self) -> bool:
"""Enable the bound capability."""
return self.manager.enable_capability(self.capability_name)
def disable(self) -> bool
"Enable the bound capability."
def disable(self) -> bool:
"""Disable the bound capability (worker stays alive; jobs rejected)."""
return self.manager.disable_capability(self.capability_name)
# --- Execution ---
def execute(self, *args, **kwargs) -> Any
"Disable the bound capability (worker stays alive; jobs rejected)."
def execute(self, *args, **kwargs) -> Any:
"""Execute via the bound manager (sync)."""
return self.manager.execute_capability(self.capability_name, *args, **kwargs)
async def execute_async(self, *args, **kwargs) -> Any
"Execute via the bound manager (sync)."
async def execute_async(self, *args, **kwargs) -> Any:
"""Execute via the bound manager (async)."""
return await self.manager.execute_capability_async(self.capability_name, *args, **kwargs)
# --- Configuration ---
def update_config(
self,
config: Dict[str, Any], # New config values
strict: bool = True # SG-5 strict validation
) -> bool
"Execute via the bound manager (async)."
def update_config(
self,
config: Dict[str, Any], # New config values
strict: bool = True # SG-5 strict validation
) -> bool
"Hot-reload the bound capability's configuration."
def get_config(self) -> Optional[Dict[str, Any]]:
"""Current configuration values (None if not loaded)."""
return self.manager.get_capability_config(self.capability_name)
def get_config_schema(self) -> Optional[Dict[str, Any]]
"Current configuration values (None if not loaded)."
def get_config_schema(self) -> Optional[Dict[str, Any]]:
"""JSON Schema describing this capability's configuration."""
return self.manager.get_capability_config_schema(self.capability_name)
def get_stats(self) -> Optional[Dict[str, Any]]
"JSON Schema describing this capability's configuration."
def get_stats(self) -> Optional[Dict[str, Any]]
"Resource telemetry for the bound capability's worker process."
class _CR10StubProxy:
def __init__(self, name="stub"):
self._name = name
self.execute_calls = []
self.on_disable_calls = 0
self.on_enable_calls = 0
@property
def name(self): return self._name
"Stand-in proxy tracking execute calls + hook fires for verification."
def __init__(self, name="stub"):
self._name = name
self.execute_calls = []
self.on_disable_calls = 0
self.on_enable_calls = 0
@property
def name(self): return self._name
def name(self): return self._name
@property
def version(self): return "0.0.1"
def version(self): return "0.0.1"
def initialize(self, config): self._config = dict(config or {})
def initialize(self, config): self._config = dict(config or {})
def execute(self, *args, **kwargs)
def execute(self, *args, **kwargs):
self.execute_calls.append((args, kwargs))
return {"who": self._name, "args": args, "kwargs": kwargs}
def get_config_schema(self): return {}
def get_current_config(self): return {}
def get_current_config(self): return {}
def cleanup(self): pass
def cleanup(self): pass
def on_disable(self): self.on_disable_calls += 1
def on_disable(self): self.on_disable_calls += 1
def on_enable(self): self.on_enable_calls += 1
def on_enable(self): self.on_enable_calls += 1
Manifest Format (v2.0) (manifest_format.ipynb)
Typed parser + writer for the nested v2.0 manifest layout per the 2026-05-19 substrate audit’s CR-8. Substrate manifests transitioned from a flat top-level JSON object to a four-section nested layout:
install(deployment-specific facts populated at install time),code(code-derived facts refreshed bycjm-ctl regenerate-manifest),drift_tracking(a config_schema hash that records the witness shape so live-vs-stored comparisons can detect drift), andoverrides(an operator-supplied overlay placeholder).
Import
from cjm_substrate.core.manifest_format import (
CURRENT_FORMAT_VERSION,
InstallSection,
CodeSection,
DriftTracking,
ManifestV2,
compute_config_schema_hash,
compute_structural_surface_hash,
load_manifest,
manifest_to_dict,
write_manifest
)
Functions
def compute_config_schema_hash(
schema: Optional[Dict[str, Any]], # JSON Schema or None
) -> str: # "sha256:hexdigest"
"""
Hash a JSON Schema with stable canonicalization.
None is treated as `{}` — the hash records "no schema declared" rather
than refusing. This way a capability that lost its config_schema between
install and load still gets a drift warning rather than a crash.
"""
def compute_structural_surface_hash(
surface: Optional[Dict[str, Any]], # derive_structural_surface output or None
) -> str: # "sha256:hexdigest"
"""
Hash a structural surface with stable canonicalization.
Same canonical-JSON + hash_bytes shape as `compute_config_schema_hash`
(the CR-8 idiom). None hashes as `{}` — but note the drift check skips
when the STORED hash is None (pre-surface-era manifest ≠ drift);
`_generate_manifest` only writes a hash when a surface was recorded.
"""
def _parse_resources_dict(d: Optional[Dict[str, Any]]) -> Optional[ResourceRequirements]:
"""Build a `ResourceRequirements` from its JSON sub-dict, or None."""
if not d
"Build a `ResourceRequirements` from its JSON sub-dict, or None."
def _from_v2_dict(
data: Dict[str, Any], # Parsed JSON dict with `format_version == "2.0"`
) -> ManifestV2
"Parse a v2.0 nested manifest dict into a typed `ManifestV2`."
def load_manifest(
path: Union[str, Path], # Path to manifest JSON file on disk
) -> ManifestV2: # Parsed manifest in v2.0 typed shape
"""
Load a manifest file and return a typed `ManifestV2`.
Format detection by top-level `format_version` key:
- `"2.0"` → nested layout, parse directly.
- anything else (including missing) → ValueError (fail loud).
"""
def _resources_to_dict(r: Optional[ResourceRequirements]) -> Optional[Dict[str, Any]]:
"""Serialize a `ResourceRequirements` back to its JSON sub-dict, or None."""
if r is None
"Serialize a `ResourceRequirements` back to its JSON sub-dict, or None."
def _code_section_to_dict(c: CodeSection) -> Dict[str, Any]:
"""Serialize a `CodeSection` to its JSON sub-dict, renaming `class_name` -> `class`."""
d: Dict[str, Any] = {
"Serialize a `CodeSection` to its JSON sub-dict, renaming `class_name` -> `class`."
def manifest_to_dict(
m: ManifestV2, # Manifest to serialize
) -> Dict[str, Any]: # v2.0 nested dict ready for `json.dumps`
"""
Serialize a `ManifestV2` to a v2.0 dict.
Always emits `format_version == CURRENT_FORMAT_VERSION`.
"""
def write_manifest(
path: Union[str, Path], # Output JSON file path
manifest: ManifestV2, # Manifest to serialize
) -> None
"Serialize a `ManifestV2` to disk in v2.0 nested layout (indent=2)."
Classes
@dataclass
class InstallSection:
"""
Deployment-specific facts populated at install time.
These fields are written by `install_all` (paths, conda env, env vars)
plus `_generate_manifest`'s post-introspection step (installed_at,
installer_version, package_source). `regenerate-manifest` preserves
the install section across regeneration so paths survive code-side
refreshes.
"""
python_path: str = '' # Absolute path to the capability env's python interpreter
conda_env: str = '' # Conda environment name
db_path: str = '' # Capability's per-data SQLite path (if any)
env_vars: Dict[str, str] = field(...) # Per-capability env vars
installed_at: str = '' # ISO-8601 UTC timestamp of install/regen
installer_version: str = '' # "cjm-ctl <version>" that wrote this manifest
package_source: str = '' # Original install input (git URL or pip spec)
@dataclass
class CodeSection:
"""
Code-derived facts refreshed by `cjm-ctl regenerate-manifest`.
Everything in this section comes from running the introspection script
inside the capability's conda env: metadata + config_schema + binary
platform/hardware hard-facts. Drift detection hashes this section's
`config_schema` field as its witness shape.
`class_name` serializes as the JSON key `"class"` (Python reserved-word
workaround).
"""
name: str = '' # Capability's unique identifier
version: str = '' # Capability's version string
description: str = '' # Brief description (SG-6 required)
module: str = '' # Importable module path for the capability class
class_name: str = '' # Capability class name (JSON key: "class")
resources: Optional[ResourceRequirements] # Phase 5a hard-facts
config_schema: Optional[Dict[str, Any]] # JSON Schema for capability config
regenerated_at: Optional[str] # ISO-8601 UTC of last regen
worker_env: Optional[List[Dict[str, Any]]] # CR-12 spawn-env contract: asdict(EnvVarSpec) list
structural_surface: Optional[Dict[str, Any]] # Pass-2 Thread 3: public surface recorded in-env (methods/properties/attributes)
@dataclass
class DriftTracking:
"""
Witness hashes for drift detection.
`config_schema_hash` is computed at write time (regenerate-manifest /
install_all) from a canonical JSON encoding of the code section's
`config_schema`. The CapabilityManager's drift-check fetches the live
`/config_schema` from the worker, hashes it the same way, and compares;
a mismatch raises `CapabilityMeta.config_schema_drift = True` plus a
warning log.
"""
config_schema_hash: Optional[str] # "sha256:hexdigest" of canonical config_schema
structural_surface_hash: Optional[str] # Pass-2 Thread 3 witness: hash of code.structural_surface (None = pre-surface manifest)
@dataclass
class ManifestV2:
"""
Top-level v2.0 manifest with four named sections plus `format_version`.
Loaded from a v2.0 nested JSON file as-is; `format_version` is always
`CURRENT_FORMAT_VERSION`.
"""
install: InstallSection = field(...)
code: CodeSection = field(...)
drift_tracking: DriftTracking = field(...)
overrides: Dict[str, Any] = field(...)
format_version: str = CURRENT_FORMAT_VERSION
Variables
CURRENT_FORMAT_VERSION = '2.0' # Emitted on every freshly-written manifest
Capability Metadata (metadata.ipynb)
Data structures for capability metadata
Import
from cjm_substrate.core.metadata import (
ResourceRequirements,
CapabilityMeta,
CapabilityInstance,
CapabilityLoadSpec
)
Classes
@dataclass
class ResourceRequirements:
"""
Binary hard-facts about what a capability needs to run (Phase 5a).
Quantitative resource amounts (min_vram_mb, etc.) deliberately omitted
per CR-7's reactive resource management reframing — capability authors can't
reliably estimate model × dtype × quantization combinatorics, and Blender-
style variable-render capabilities can't estimate at all. The substrate uses
these binary hard-facts purely for discovery filtering; actual resource
contention is handled reactively by CR-7's eviction + retry flow.
- `requires_gpu`: True if the capability needs any GPU; the substrate gates
execution on a system monitor reporting one is present.
- `platforms`: e.g., ["linux-x64", "darwin-arm64"]. Empty list means no
platform constraint declared (assume universal compatibility).
- `accelerators`: e.g., ["cuda", "mps", "cpu"]. Informational; substrate
doesn't auto-select but consumers can filter on the values.
"""
requires_gpu: bool = False
platforms: List[str] = field(...)
accelerators: List[str] = field(...)
@dataclass
class CapabilityMeta:
"Metadata about a capability."
name: str # Capability's unique identifier
version: str # Capability's version string
description: str = '' # Brief description of the capability's functionality
resources: Optional['ResourceRequirements']
config_schema: Optional[Dict[str, Any]] # JSON Schema for capability configuration
instance: Optional[Any] # Capability instance (ToolCapability subclass)
enabled: bool = True # Whether the capability is enabled
last_executed: float = 0.0 # Unix timestamp
config_schema_drift: bool = False
live_config_schema: Optional[Dict[str, Any]]
structural_surface_drift: bool = False
@dataclass
class CapabilityInstance:
"""
Per-instance runtime state for a loaded capability (CR-10 multi-instance).
Differs from CapabilityMeta in scope:
- CapabilityMeta is per-capability-name discovery + canonical-instance state.
- CapabilityInstance is per-load-call runtime state.
A capability loaded with no instance_id (default) gets `instance_id == capability_name`
and is the canonical instance referenced by CapabilityMeta.instance. Multi-instance
loads (instance_id != capability_name) add entries to CapabilityManager.instances
without changing the canonical reference.
"""
instance_id: str # Unique key in CapabilityManager.instances; default = capability_name
capability_name: str # The underlying discovered capability's name (CapabilityMeta.name)
config: Dict[str, Any] = field(...) # Effective config used at initialize()
proxy: Optional[Any]
enabled: bool = True # Per-instance enable flag; substrate's execute_capability checks this
last_executed: float = 0.0 # Unix timestamp of the most recent execute on this instance
created_at: datetime = field(...)
config_hash: str = ''
max_concurrent_requests: Optional[int]
@dataclass
class CapabilityLoadSpec:
"""
One entry in `CapabilityManager.load_capabilities_concurrent`'s batch input (CR-10).
Mirrors the positional arguments of `load_capability` so the concurrent helper
can fan out load calls without repeating the per-spec instance_id /
new_instance plumbing.
- `meta`: the discovered CapabilityMeta to load (must have a `.manifest` attached).
- `config`: initial configuration; falls through to persisted-or-schema-defaults
when None (default-instance only; multi-instance starts fresh).
- `instance_id`: explicit instance_id (validated against [A-Za-z0-9_-]{1,64}).
None defaults to capability_name (single-instance backward compat).
- `new_instance`: when True with instance_id=None, auto-generate
`{capability_name}-{6-hex}`.
"""
meta: Any # CapabilityMeta — typed as Any to avoid forward-reference quirk under nbdev's late binding
config: Optional[Dict[str, Any]]
instance_id: Optional[str]
new_instance: bool = False
Platform Utilities (platform.ipynb)
Cross-platform utilities for process management, path handling, and system detection
Import
from cjm_substrate.core.platform import (
MICROMAMBA_URLS,
is_windows,
is_macos,
is_linux,
is_apple_silicon,
get_current_platform,
get_python_in_env,
get_popen_isolation_kwargs,
terminate_process,
terminate_self,
run_shell_command,
conda_env_exists,
get_micromamba_download_url,
download_micromamba,
get_conda_command,
build_conda_command,
get_micromamba_binary_path,
ensure_runtime_available
)
Functions
def is_windows() -> bool:
"""Check if running on Windows."""
return platform.system() == "Windows"
def is_macos() -> bool
"Check if running on Windows."
def is_macos() -> bool:
"""Check if running on macOS."""
return platform.system() == "Darwin"
def is_linux() -> bool
"Check if running on macOS."
def is_linux() -> bool:
"""Check if running on Linux."""
return platform.system() == "Linux"
def is_apple_silicon() -> bool
"Check if running on Linux."
def is_apple_silicon() -> bool
"Check if running on Apple Silicon Mac (for MPS detection)."
def get_current_platform() -> str:
"""Get current platform string for manifest filtering.
Returns strings like 'linux-x64', 'darwin-arm64', 'win-x64'.
"""
system = platform.system().lower()
machine = platform.machine().lower()
# Normalize system names
if system == "darwin"
"""
Get current platform string for manifest filtering.
Returns strings like 'linux-x64', 'darwin-arm64', 'win-x64'.
"""
def get_python_in_env(
env_path: Path # Path to conda environment root
) -> Path: # Path to Python executable
"""
Get the Python executable path for a conda environment.
On Windows: env_path/python.exe
On Unix: env_path/bin/python
"""
def get_popen_isolation_kwargs() -> Dict[str, Any]:
"""Return kwargs for process isolation in subprocess.Popen.
On Unix: Returns {'start_new_session': True}
On Windows: Returns {'creationflags': CREATE_NEW_PROCESS_GROUP}
Usage:
process = subprocess.Popen(cmd, **get_popen_isolation_kwargs(), ...)
"""
if is_windows()
"""
Return kwargs for process isolation in subprocess.Popen.
On Unix: Returns {'start_new_session': True}
On Windows: Returns {'creationflags': CREATE_NEW_PROCESS_GROUP}
Usage:
process = subprocess.Popen(cmd, **get_popen_isolation_kwargs(), ...)
"""
def terminate_process(
process: subprocess.Popen, # Process to terminate (must be a session/group leader for subtree kill)
timeout: float = 2.0 # Seconds to wait before force kill
) -> None
"""
Terminate a subprocess + its entire process subtree (grandchildren, etc).
Session A 2026-05-27: enhanced from worker-only termination to FULL subtree
termination. Workers are spawned with `get_popen_isolation_kwargs()` which
sets `start_new_session=True` on Unix → the worker is its own session leader
and ALL of its descendants inherit the same process-group ID (unless they
setsid themselves, which is rare). `os.killpg(worker_pid, SIGTERM/SIGKILL)`
sends the signal to every process in that group atomically — closes the
orphan-grandchild bug surfaced by Voxtral-vLLM (vLLM api_server spawned its
own EngineCore subprocess; pre-fix, the worker terminated cleanly but vLLM
+ EngineCore kept running as orphans, eating GPU memory until manual kill).
Strategy on Unix:
1. SIGTERM the worker's process group via os.killpg (atomic).
2. Wait up to `timeout` for the worker to exit.
3. If anything still alive, SIGKILL the process group.
4. psutil-based safety sweep for any process that setsid-ed away from the
original group (rare but possible — e.g., a poorly-isolated subprocess).
Strategy on Windows:
1. process.terminate() + wait + kill (legacy path). True process-group
signaling on Windows requires Job Objects which the substrate doesn't
currently wire — Windows users are advised to avoid capabilities that
spawn subprocesses until that's added. (TODO: track as substrate gap.)
"""
def terminate_self() -> None:
"""Terminate the current process (for worker suicide pact).
On Unix: Sends SIGTERM to self for graceful shutdown
On Windows: Calls os._exit() since Windows lacks SIGTERM
"""
if is_windows()
"""
Terminate the current process (for worker suicide pact).
On Unix: Sends SIGTERM to self for graceful shutdown
On Windows: Calls os._exit() since Windows lacks SIGTERM
"""
def run_shell_command(
cmd: str, # Shell command to execute
check: bool = True, # Whether to raise on non-zero exit
capture_output: bool = False, # Whether to capture stdout/stderr
**kwargs # Additional kwargs passed to subprocess.run
) -> subprocess.CompletedProcess
"""
Run a shell command cross-platform.
Unlike using shell=True with executable='/bin/bash', this function
uses the platform's default shell:
- Linux/macOS: /bin/sh (or $SHELL)
- Windows: cmd.exe
"""
def conda_env_exists(
env_name: str, # Name of the conda environment
conda_cmd: str = "conda" # Conda command (conda, mamba, micromamba)
) -> bool
"""
Check if a conda environment exists (cross-platform).
Uses 'conda env list --json' instead of piping to grep,
which doesn't work on Windows.
"""
def get_micromamba_download_url(
platform_str: Optional[str] = None # Platform string (e.g., 'linux-x64'), uses current if None
) -> str: # Download URL for micromamba binary
"Get the micromamba download URL for the specified or current platform."
def download_micromamba(
dest_path: Path, # Destination path for the micromamba binary
platform_str: Optional[str] = None, # Platform string, uses current if None
show_progress: bool = True # Whether to print progress messages
) -> bool: # True if download succeeded
"Download and extract micromamba binary to the specified path."
def get_conda_command(
config: CJMConfig # Configuration object with runtime settings
) -> List[str]: # Base command with prefix args if needed
"Get the conda/mamba/micromamba base command with prefix args for local mode."
def build_conda_command(
config: CJMConfig, # Configuration object with runtime settings
*args: str # Additional command arguments
) -> List[str]: # Complete command ready for subprocess
"Build a complete conda/mamba/micromamba command."
def get_micromamba_binary_path(
config: CJMConfig # Configuration object with runtime settings
) -> Optional[Path]: # Path to micromamba binary or None
"Get the configured micromamba binary path for the current platform."
def ensure_runtime_available(
config: CJMConfig # Configuration object with runtime settings
) -> bool: # True if runtime is available
"Check if the configured conda/micromamba runtime is available."
Variables
MICROMAMBA_URLS: Dict[str, str]
Composition Ports (ports.ipynb)
Capability compositions as DAGs of invocation nodes with typed input/output
Import
from cjm_substrate.core.ports import (
NodeState,
TERMINAL_NODE_STATES,
OutputRef,
CompositionNode,
Composition,
CompositionValidationError,
CompositionBindingError,
validate_composition,
extract_output_field,
resolve_node_kwargs,
CompositionNodeRun,
CompositionRun,
new_composition_run
)
Functions
def _node_dependencies(
node: CompositionNode, # Node whose kwargs are scanned for OutputRef markers
) -> Set[str]: # Producer node ids this node depends on
"""
Derive a node's dependencies from the `OutputRef` markers in its kwargs.
Top-level kwarg values only (see `CompositionNode` docstring); duplicate
references to the same producer collapse into one dependency edge.
"""
def validate_composition(
comp: Composition, # Composition to validate
) -> Dict[str, Set[str]]: # node_id -> set of upstream node ids (the derived DAG)
"""
Validate a composition and return its derived dependency map.
Raises `CompositionValidationError` on duplicate node ids, `OutputRef`
targets that name no node in the composition, or dependency cycles.
An empty composition is valid (returns `{}`) — the queue completes it
at submit, mirroring the empty-sequence totality precedent.
"""
def extract_output_field(
"""
Extract a field from an upstream result for binding into a kwarg.
The single substrate-owned successor of the retired `field_of` helpers:
dicts resolve by KEY (intent for capability-side dict results), everything
else by ATTRIBUTE (typed wire DTOs). Missing fields raise
`CompositionBindingError` loudly — silent shape-shifting is what stage 2
retired (F12 fail-loudly posture).
"""
def resolve_node_kwargs(
"""
Materialize a node's kwargs by resolving its `OutputRef` markers.
Called by the executor at the moment a node becomes ready (all
dependencies completed) — this is where execution-time binding actually
happens. Static kwargs pass through untouched.
"""
def new_composition_run(
comp: Composition, # Composition to run (validated here)
run_id: str, # Run UUID (assigned by the queue)
) -> CompositionRun: # Fresh run record with derived topology
"Validate a composition and build its run record."
def ready_nodes(
self: CompositionRun,
) -> List[str]: # Node ids that are pending with all dependencies completed
"""
Nodes whose member Jobs can be created right now.
Scan order follows the composition's node order (readability +
deterministic dispatch among equally-ready nodes).
"""
def record_started(
"Mark a node running and bind it to its member Job."
def record_result(
self: CompositionRun,
node_id: str, # Node whose member Job reached terminal status
state: NodeState, # completed / failed / cancelled
result: Any = None, # Member job result (if completed)
error: Optional[JobError] = None, # Structured failure (if failed/cancelled)
) -> None
"Record a member job's terminal outcome on its node."
def skip_dependents(
self: CompositionRun,
node_id: str, # Node whose failure/cancellation poisons its downstream
) -> List[str]: # Node ids newly marked skipped (transitive)
"""
Mark every still-pending transitive dependent of `node_id` skipped.
Skipped nodes never get a Job — their inputs can never exist. Runs
regardless of fail_fast (dependents are unrunnable either way; fail_fast
only governs INDEPENDENT pending members, which the executor cancels).
"""
def all_terminal(
self: CompositionRun,
) -> bool: # True when every node is in a terminal state
"Whether the composition has nothing left to run or wait for."
def derive_terminal_status(
self: CompositionRun,
) -> NodeState: # cancelled / failed / completed
"""
Derive the composition-level terminal status from member outcomes.
Precedence:
1. USER cancel intent (`cancel_requested`) dominates everything.
2. A member failure under fail_fast lands the run `failed` — the
executor's housekeeping cancels of independent pending members do NOT
flip it to cancelled (that's what `cancel_requested` distinguishes).
3. A directly-cancelled member (job-level cancel, no failure, no
composition intent) lands the run `cancelled`.
4. Otherwise `completed` — including best-effort (fail_fast=False) runs
with failed members: "we attempted everything", matching the sequence
semantics this replaces. Per-node outcomes stay inspectable on
`node_runs` either way. (`skipped` never appears without a failed or
cancelled member upstream of it, so it needs no clause of its own.)
"""
def results_by_node(
self: CompositionRun,
) -> Dict[str, Any]: # node_id -> result, for completed nodes only
"""
Completed members' results keyed by node id (what host folds consume,
and what `resolve_node_kwargs` reads at advancement time).
"""
Classes
class NodeState(str, Enum):
"""
State of one composition node (and, for the terminal subset, of a
whole composition run).
`skipped` is composition-specific: a node whose transitive dependencies
failed/cancelled can never run (its inputs will never exist) and is
recorded as skipped rather than getting a Job at all. Composition-level
status uses the running/completed/failed/cancelled subset.
"""
class OutputRef:
"""
Binding marker: this kwarg's value comes from an upstream node's result.
Placed directly in a `CompositionNode.kwargs` value position. `field=None`
binds the WHOLE result (fan-in folds); a field name extracts one field via
`extract_output_field` (dict key or typed-result attribute). Frozen so
markers are hashable + safely shareable across nodes.
"""
@dataclass
class CompositionNode:
"""
One capability invocation in a composition.
`kwargs` mixes static values with `OutputRef` markers; the markers are
scanned (top-level values only — nested containers are not searched, by
design: evidence needs single-position bindings, and a nested-marker
grammar is seam-admitted later) to derive the node's dependencies.
"""
id: str # Unique node id within the composition
capability_instance_id: str # Target capability instance
kwargs: Dict[str, Any] = field(...) # Static values + OutputRef markers
priority: int = 0 # Per-node priority override (0 = inherit composition priority)
task_name: Optional[str] # Task-channel address: adapter task (stage 4; None = execute channel)
method: Optional[str] # Task-channel address: adapter method (set with task_name)
control: Dict[str, Any] = field(...) # Per-call control flags (force/cache-bypass); threaded into the member Job's CallEnvelope.control
@dataclass
class Composition:
"""
A static DAG of capability-invocation nodes, submitted as one unit.
`fail_fast=True` (default, matching the audit-locked sequence default):
on a member failure, pending independent members are cancelled, in-flight
members run to completion, transitive dependents are skipped, and the
composition lands `failed`. `fail_fast=False` is best-effort: independent
members continue; only transitive dependents of the failure are skipped.
`run_id` / `actor` (CR-14 follow-up) are host-tier correlation tags
stamped onto every lazily-created member Job (the `submit(run_id=,
actor=)` analog for compositions) — NOT the composition run's own id,
which the queue assigns at submit.
"""
nodes: List[CompositionNode] # The invocation nodes (order = readability + ready-scan order)
fail_fast: bool = True # Halt independent pending members on first failure
priority: int = 0 # Composition-level priority (per-node override possible)
run_id: Optional[str] # Host-tier run correlation for member Jobs (CR-14 follow-up)
actor: Optional[str] # Who/what initiated the work (CR-14 follow-up)
class CompositionValidationError(ValueError):
"""
A composition failed submit-time validation (duplicate ids, unresolved
`OutputRef` targets, or a dependency cycle).
"""
class CompositionBindingError(RuntimeError):
"""
An `OutputRef` could not be resolved against the producer's recorded
result at execution time (missing producer result, missing key/attribute).
"""
@dataclass
class CompositionNodeRun:
"Live state of one node within a composition run."
node_id: str # The CompositionNode this tracks
state: NodeState = NodeState.pending # Current node state
job_id: Optional[str] # Member Job id (set when the node starts)
result: Any # Member job result (if completed)
error: Optional[JobError] # Structured failure summary (if failed/cancelled)
@dataclass
class CompositionRun:
"""
Tracks a submitted composition through execution (lives in
`JobQueue._compositions`).
Carries the validated dependency map (and its reverse) so advancement
decisions are O(edges) lookups for the rest of the run. Composition-level
`status` reuses the NodeState terminal subset: starts `running`,
transitions to completed / failed / cancelled via
`derive_terminal_status` once `all_terminal()`.
`cancel_requested` records USER cancel intent (`cancel_composition`),
distinguishing it from the executor's fail-fast HOUSEKEEPING cancels of
independent pending members after a failure — without the flag, a
failure-driven run would derive `cancelled` instead of `failed` because
its housekeeping cancels would dominate.
"""
id: str # Composition run UUID
composition: Composition # The submitted spec (immutable post-submit)
deps: Dict[str, Set[str]] # node_id -> upstream node ids (validated)
dependents: Dict[str, Set[str]] # node_id -> downstream node ids (reverse of deps)
nodes_by_id: Dict[str, CompositionNode] # Spec lookup for the executor
node_runs: Dict[str, CompositionNodeRun] # Per-node live state
status: NodeState = NodeState.running # Composition-level status
cancel_requested: bool = False # True once cancel_composition is called (user intent)
submitted_at: datetime = field(...)
completed_at: Optional[datetime] # Set when the run reaches terminal status
Variables
TERMINAL_NODE_STATES
Remote Capability Proxy (proxy.ipynb)
Bridge between Host application and isolated Worker processes
Import
from cjm_substrate.core.proxy import (
RemoteCapabilityProxy,
execute_async,
execute_stream_sync,
execute_stream,
execute_with_oom_check,
execute_async_with_oom_check,
execute_task,
execute_task_async,
get_stats,
is_alive,
get_structural_surface,
cancel,
cancel_async,
get_progress,
get_progress_async,
on_disable,
on_enable,
get_system_status,
get_system_status_async,
list_processes,
list_processes_async,
prefetch,
prefetch_async,
reconfigure,
reconfigure_async
)
Functions
@patch
def _bind_listen_socket(self:RemoteCapabilityProxy) -> Tuple[socket.socket, int]
"""
Bind a listening socket on a kernel-chosen ephemeral port.
The socket is kept open so its FD can be inherited by the worker
subprocess (Unix). Returns (socket, port).
"""
def _pump_stream(
"""
Pump a worker's raw output to the diagnostics store (CR-14).
The zero-cooperation death-rattle floor: captures everything the worker
process writes outside the structured handler — bare prints, native-lib
output, tqdm, argparse/startup failures BEFORE logging exists, and the
final traceback of a hard crash. Runs as a daemon thread; ends at EOF
(worker exit). Attribution is the worker SESSION, never a job — raw
streams cannot be job-attributed honestly under same-worker concurrency
(the stage-3 lesson that killed the timestamp-window heuristic).
tqdm CR-frames collapse to their final frame via `normalize_stream_line`
(liveness telemetry is not durable). Failures degrade to dropping chunks
(diagnostics are the disposable class) — never to breaking the worker.
"""
@patch
def _start_process(self:RemoteCapabilityProxy) -> None:
"""Launch the worker subprocess (CR-14: PIPE-captured + journaled).
Replaces the pre-CR-14 fd-inherited flat log file (`.cjm/logs/<name>.log`
+ ctime session markers): raw output goes through `_pump_stream` into the
diagnostics store; structured worker logging writes the diagnostics store
DIRECTLY via the env contract below; the spawn itself is a journal event.
"""
python_path = self.manifest['python_path']
cfg = get_config()
# CR-14: spawn-scoped worker session id — ties WORKER_SPAWNED/READY/DIED
"""
Launch the worker subprocess (CR-14: PIPE-captured + journaled).
Replaces the pre-CR-14 fd-inherited flat log file (`.cjm/logs/<name>.log`
+ ctime session markers): raw output goes through `_pump_stream` into the
diagnostics store; structured worker logging writes the diagnostics store
DIRECTLY via the env contract below; the spawn itself is a journal event.
"""
@patch
def _wait_for_ready(
self:RemoteCapabilityProxy,
timeout:float=30.0 # Max seconds to wait for worker startup
) -> None
"Wait for worker to become responsive."
@patch
def config_options(self:RemoteCapabilityProxy) -> Dict[str, Any]: # CR-11: live config option domains
"""Get the capability's runtime config option providers (CR-11).
Returns the worker's get_config_options() output (FieldOptions per
dynamic field, JSON-serialized to dicts). Empty dict when the capability
exposes no dynamic options.
"""
with httpx.Client() as client
"""
Get the capability's runtime config option providers (CR-11).
Returns the worker's get_config_options() output (FieldOptions per
dynamic field, JSON-serialized to dicts). Empty dict when the capability
exposes no dynamic options.
"""
@patch
def cleanup(self:RemoteCapabilityProxy) -> None:
"""Clean up capability resources and terminate worker process."""
# Send cleanup request to worker
try
"Clean up capability resources and terminate worker process."
def _maybe_serialize_input(
self,
obj: Any # Object to potentially serialize
) -> Any: # Serialized form (path string or original object)
"Convert FileBackedDTO objects to file paths for zero-copy transfer."
def _prepare_payload(
self,
args: tuple, # Positional arguments
kwargs: dict # Keyword arguments
) -> Dict[str, Any]: # JSON-serializable payload
"""
Prepare arguments for HTTP transmission.
CR-14: attaches the current call envelope (set by the JobQueue around
each job's execution via the `wire` contextvar) as a TOP-LEVEL body key.
Never inside kwargs — capability signatures never see it; old workers ignore
unknown top-level keys. Envelope-less calls (direct proxy use) simply
produce unattributed worker records.
"""
def _harvest_worker_accounts(
"""
CR-14 follow-up: journal in-worker accounts off the response header.
The host-writes-the-row half of the account contract (`wire.
record_account` / worker `_accounts_headers`): workers RECORD accounts
during a call span; the proxy journals them ON RECEIPT with
`worker_reported=True` + the receiving-side identity (the proxy-side
call envelope + this spawn's worker session). Called on every unary
response BEFORE the status checks so a `_job_error` 500's accounts
(e.g. a save that succeeded before a later crash) are kept.
Header absent (old workers, account-less calls) = no-op. A failure here
logs at ERROR and never breaks the call — the result is the contract;
and a wedged journal store also fails the queue's own emission for the
same job, so the wedge gate still fires loudly.
"""
async def execute_async(
self,
*args,
**kwargs
) -> Any: # Capability result
"""
Execute the capability asynchronously.
CR-4: HTTP 409 from the worker is mapped to a typed `CapabilityCancelledError`.
Same 409/200/other semantics as the sync `execute()` variant.
"""
def _raise_from_job_error_chunk(
"""
SG-52: convert a `_job_error` JobError-shaped dict into the right typed exception.
Mapping rules:
- `original_exc_repr` starts with `"CapabilityCancelledError"` → raise CapabilityCancelledError
(preserves the non-retriable semantic that category alone doesn't capture).
- category == "user_input" → CapabilityInputError (with fields_invalid).
- category == "transient" → CapabilityTransientError (with retry_after_seconds).
- category == "resource" → CapabilityResourceError (with reconstructed ResourceShortfall).
- category == "fatal" → CapabilityFatalError.
- Unknown category → RuntimeError carrying the chunk for forensic inspection.
This is the streaming-side counterpart to /execute's 409 → CapabilityCancelledError
detection. Same intent: the typed exception survives the HTTP wire boundary
so substrate / JobQueue / consumer code can branch on category without
parsing string messages.
"""
def _raise_typed_execute_error(
"""
SG-52 parity for the unary execute path (stage-3 ledger G7).
If the worker's error body carries the `{"_job_error": <JobError dict>}`
sentinel (post-fix workers), raise the corresponding TYPED exception via
`_raise_from_job_error_chunk` — this is what lets the manager's CR-7
reactive-retry path see `CapabilityResourceError` from plain `/execute`
calls (the OOM-backstop stress test caught the unary channel collapsing
every failure to RuntimeError, leaving the retry blind). Pre-fix workers
return a bare-string detail → the legacy RuntimeError fallback (version-
skew tolerance, F12 posture).
"""
def execute_stream_sync(self, *args, **kwargs) -> Generator[Any, None, None]
"Synchronous wrapper for streaming (blocking)."
async def execute_stream(
self,
*args,
**kwargs
) -> AsyncGenerator[Any, None]: # Yields parsed JSON chunks
"""
Execute with streaming response (async generator).
SG-52: detects the terminal `{"_job_error": <JobError dict>}` chunk and
raises the corresponding typed exception client-side instead of yielding
it to downstream consumers. Mirrors /execute's HTTP 409 → typed-exception
behavior at the streaming wire boundary. Normal capability output chunks
pass through unchanged (they never carry the `_job_error` key).
"""
def _check_worker_death(self) -> None:
"""CR-7 Track A: inspect subprocess state after an httpx fault on /execute.
Raises `WorkerOOMError` if the worker died with SIGKILL (POSIX returncode -9 —
kernel OOM-killer is the dominant cause). Raises `CapabilityTransientError` if
it died with any other returncode. Returns None silently when the worker
is still alive (caller re-raises the original httpx error).
`signal.SIGKILL` doesn't exist on Windows, where worker-OOM looks different
(STATUS_NO_MEMORY 0xC0000017 = -1073741799). Cross-platform OOM detection is
a future enhancement when there's a concrete Windows substrate consumer.
"""
if self.process is None
"""
CR-7 Track A: inspect subprocess state after an httpx fault on /execute.
Raises `WorkerOOMError` if the worker died with SIGKILL (POSIX returncode -9 —
kernel OOM-killer is the dominant cause). Raises `CapabilityTransientError` if
it died with any other returncode. Returns None silently when the worker
is still alive (caller re-raises the original httpx error).
`signal.SIGKILL` doesn't exist on Windows, where worker-OOM looks different
(STATUS_NO_MEMORY 0xC0000017 = -1073741799). Cross-platform OOM detection is
a future enhancement when there's a concrete Windows substrate consumer.
"""
def execute_with_oom_check(self, *args, **kwargs) -> Any:
"""CR-7 Track A wrapper around the sync execute path.
Catches httpx connection / protocol faults, calls `_check_worker_death`,
and either raises a typed `WorkerOOMError` / `CapabilityTransientError` or
re-raises the original httpx error (worker still alive — caller treats
as a generic transient network issue).
"""
payload = self._prepare_payload(args, kwargs)
try
"""
CR-7 Track A wrapper around the sync execute path.
Catches httpx connection / protocol faults, calls `_check_worker_death`,
and either raises a typed `WorkerOOMError` / `CapabilityTransientError` or
re-raises the original httpx error (worker still alive — caller treats
as a generic transient network issue).
"""
async def execute_async_with_oom_check(self, *args, **kwargs) -> Any:
"""CR-7 Track A wrapper around the async execute path. Same semantics."""
payload = self._prepare_payload(args, kwargs)
try
"CR-7 Track A wrapper around the async execute path. Same semantics."
def _prepare_task_payload(
self,
task_name: str, # Adapter task address
method: str, # Adapter method name
kwargs: dict, # Task method kwargs
) -> Dict[str, Any]: # JSON-serializable /task body
"""
Build the /task body (CR-17 pt 2) + the CR-14 call envelope rider.
Same envelope semantics as `_prepare_payload`: top-level key, never
inside kwargs.
"""
def execute_task(self, task_name: str, method: str, **kwargs) -> Any:
"""Invoke a typed task-adapter method in-worker (CR-17 pt 2; sync).
The explicit task channel: `action=` stays the tool's native in-worker
dispatch; this addresses the TASK contract (adapter + method). Kwargs-only
by design. Built ONCE with the CR-7 Track-A worker-death check inside —
no later wrapper supersedes it (the G7a last-assignment-wins lesson).
"""
payload = self._prepare_task_payload(task_name, method, kwargs)
try
"""
Invoke a typed task-adapter method in-worker (CR-17 pt 2; sync).
The explicit task channel: `action=` stays the tool's native in-worker
dispatch; this addresses the TASK contract (adapter + method). Kwargs-only
by design. Built ONCE with the CR-7 Track-A worker-death check inside —
no later wrapper supersedes it (the G7a last-assignment-wins lesson).
"""
async def execute_task_async(self, task_name: str, method: str, **kwargs) -> Any:
"""Invoke a typed task-adapter method in-worker (CR-17 pt 2; async). Same semantics."""
payload = self._prepare_task_payload(task_name, method, kwargs)
try
"Invoke a typed task-adapter method in-worker (CR-17 pt 2; async). Same semantics."
def get_stats(self) -> Dict[str, Any]: # Process telemetry
"""Get worker process resource usage."""
with httpx.Client() as client
"Get worker process resource usage."
def is_alive(self) -> bool: # True if worker is responsive
"""Check if the worker process is still running and responsive."""
if not self.process or self.process.poll() is not None
"Check if the worker process is still running and responsive."
def get_structural_surface(self) -> Optional[Dict[str, Any]]: # Live-derived surface, or None
"""Pass-2 Thread 3 live companion: fetch the worker's runtime-derived
structural surface (`GET /structural_surface`).
Returns None when the worker predates the endpoint (a pre-fracture
substrate in a snapshot env → 404) or on transport failure — callers
treat None as "skip the drift check", never as an empty surface.
"""
try
"""
Pass-2 Thread 3 live companion: fetch the worker's runtime-derived
structural surface (`GET /structural_surface`).
Returns None when the worker predates the endpoint (a pre-fracture
substrate in a snapshot env → 404) or on transport failure — callers
treat None as "skip the drift check", never as an empty surface.
"""
def cancel(self) -> bool: # True if cancel request was sent
"""Request cancellation of running execution."""
try
"Request cancellation of running execution."
async def cancel_async(self) -> bool: # True if cancel request was sent
"""Request cancellation asynchronously."""
try
"Request cancellation asynchronously."
def get_progress(self) -> Dict[str, Any]: # {progress: float, message: str}
"""Get current execution progress from worker."""
try
"Get current execution progress from worker."
async def get_progress_async(self) -> Dict[str, Any]: # {progress: float, message: str}
"""Get current execution progress asynchronously."""
try
"Get current execution progress asynchronously."
def on_disable(self) -> bool: # True if hook signal accepted by worker
"""CR-2: forward the substrate's on_disable signal to the worker process.
Capability can opt in via ToolCapability.on_disable(); default implementation
is a no-op so silent-pass-through is the norm. Failures to reach the
worker (already terminated, network blip) are logged-and-swallowed —
the substrate-side enable/disable bookkeeping doesn't depend on the
hook actually firing.
"""
try
"""
CR-2: forward the substrate's on_disable signal to the worker process.
Capability can opt in via ToolCapability.on_disable(); default implementation
is a no-op so silent-pass-through is the norm. Failures to reach the
worker (already terminated, network blip) are logged-and-swallowed —
the substrate-side enable/disable bookkeeping doesn't depend on the
hook actually firing.
"""
def on_enable(self) -> bool: # True if hook signal accepted by worker
"""CR-2: forward the substrate's on_enable signal to the worker process.
Same delivery semantics as on_disable: best-effort, errors logged-and-
swallowed. The capability's hook (default no-op) decides whether to eagerly
re-acquire resources or rely on lazy re-load at next execute().
"""
try
"""
CR-2: forward the substrate's on_enable signal to the worker process.
Same delivery semantics as on_disable: best-effort, errors logged-and-
swallowed. The capability's hook (default no-op) decides whether to eagerly
re-acquire resources or rely on lazy re-load at next execute().
"""
def get_system_status(self) -> Optional[Dict[str, Any]]: # SystemStats dict, or None on transport / config failure
"""CR-3: typed MonitorToolProtocol accessor. POSTs to worker's `/get_system_status`.
Status code semantics (worker side raises HTTPException with these codes):
- 200: SystemStats dict returned
- 404: capability is not a monitor — logged at ERROR (configuration error;
no amount of retry fixes it) and returns None. Loudly distinguished
from the substrate's WARN-level transient-failure degradation.
- 500: real capability failure; propagates as HTTPStatusError
- ConnectError: worker may have died; returns None silently (substrate
degrades to empty stats)
"""
try
"""
CR-3: typed MonitorToolProtocol accessor. POSTs to worker's `/get_system_status`.
Status code semantics (worker side raises HTTPException with these codes):
- 200: SystemStats dict returned
- 404: capability is not a monitor — logged at ERROR (configuration error;
no amount of retry fixes it) and returns None. Loudly distinguished
from the substrate's WARN-level transient-failure degradation.
- 500: real capability failure; propagates as HTTPStatusError
- ConnectError: worker may have died; returns None silently (substrate
degrades to empty stats)
"""
async def get_system_status_async(self) -> Optional[Dict[str, Any]]: # SystemStats dict, or None on transport / config failure
"""Async variant of `get_system_status`. Same 200/404/500/ConnectError semantics."""
try
"Async variant of `get_system_status`. Same 200/404/500/ConnectError semantics."
def list_processes(self) -> Optional[List[Dict[str, Any]]]: # ProcessStats dict list, or None on transport / config failure
"""CR-3: typed MonitorToolProtocol accessor. POSTs to worker's `/list_processes`.
Same 200/404/500/ConnectError semantics as `get_system_status`. Note that
`MonitorToolProtocol.list_processes()` defaults to returning `[]`, so monitors without
per-process visibility yield a 200 with an empty list.
"""
try
"""
CR-3: typed MonitorToolProtocol accessor. POSTs to worker's `/list_processes`.
Same 200/404/500/ConnectError semantics as `get_system_status`. Note that
`MonitorToolProtocol.list_processes()` defaults to returning `[]`, so monitors without
per-process visibility yield a 200 with an empty list.
"""
async def list_processes_async(self) -> Optional[List[Dict[str, Any]]]: # ProcessStats dict list, or None on transport / config failure
"""Async variant of `list_processes`. Same semantics."""
try
"Async variant of `list_processes`. Same semantics."
def prefetch(
self,
stall_threshold_seconds: Optional[float] = None, # Override SubstrateConfig.prefetch_stall_threshold_seconds; None = use config
poll_interval_seconds: float = 1.0, # How often to poll /progress for stall detection
) -> bool: # True if worker accepted the prefetch hook
"""
CR-4 / Session A 2026-05-27: forward the substrate's prefetch signal with
progress-based stall detection.
Replaces wall-clock-timeout-based startup waiting (operators racing arbitrary
timeouts vs. network speeds for model downloads). Approach:
1. POST /prefetch fires in a background thread with httpx timeout=None.
2. Main thread polls /progress every poll_interval_seconds.
3. Each (progress, message) change resets the stall counter.
4. If no change in stall_threshold_seconds AND POST still pending →
SIGTERM the worker subprocess + raise CapabilityTimeoutError.
Capabilities opt in to fine-grained stall defeat by calling
self.report_progress(...) periodically during long lifecycle operations
(model download, server startup, etc.). Capabilities that don't report progress
are fine as long as the threshold accommodates their slowest plausible
silent stretch.
Errors raised by the capability (worker 500) propagate as RuntimeError; worker
unreachable propagates as `False`; stall fires CapabilityTimeoutError.
"""
async def prefetch_async(
self,
stall_threshold_seconds: Optional[float] = None,
poll_interval_seconds: float = 1.0,
) -> bool: # True if worker accepted the prefetch hook
"Async variant of `prefetch`. Same stall-detection semantics."
def _resolve_prefetch_stall_threshold() -> float:
"""Resolve the stall threshold from SubstrateConfig with a defensive fallback."""
try
"Resolve the stall threshold from SubstrateConfig with a defensive fallback."
def _run_prefetch_with_stall_detection(
proxy: 'RemoteCapabilityProxy',
stall_threshold_seconds: float,
poll_interval_seconds: float,
) -> bool
"""
Sync stall-detecting prefetch implementation.
Runs POST /prefetch in a daemon thread; main thread polls /progress for
a (progress, message) advance every poll_interval_seconds. If no advance
in stall_threshold_seconds AND the POST is still in-flight, SIGTERMs the
worker subprocess (so its capability.cleanup() can run via the worker's
shutdown handler — closes the orphan-subprocess-on-stall bug) and raises
CapabilityTimeoutError client-side.
"""
async def _run_prefetch_with_stall_detection_async(
proxy: 'RemoteCapabilityProxy',
stall_threshold_seconds: float,
poll_interval_seconds: float,
) -> bool
"""
Async stall-detecting prefetch implementation. Mirrors the sync variant
using asyncio.gather instead of a daemon thread.
"""
def reconfigure(
self,
old_config: Optional[Dict[str, Any]], # Previous config snapshot
new_config: Optional[Dict[str, Any]], # Config being applied
) -> bool: # True if worker accepted the reconfigure call
"""
CR-4: forward a reconfigure(old, new) call to the worker process.
The capability's default reconfigure() body delegates to
reconfigure_with_triggers, which walks RELOAD_TRIGGER metadata on the
capability's config_class to fire `_release_<trigger>` methods for fields
whose values changed. Capabilities not opting into the declarative pattern
land in a silent no-op; the substrate's CapabilityManager.update_capability_config
then falls back to initialize(new_config) for the actual state change.
"""
async def reconfigure_async(
self,
old_config: Optional[Dict[str, Any]], # Previous config snapshot
new_config: Optional[Dict[str, Any]], # Config being applied
) -> bool: # True if worker accepted the reconfigure call
"Async variant of `reconfigure`. Same semantics."
def __enter__(self):
"""Enter context manager."""
return self
def __exit__(self, exc_type, exc_val, exc_tb)
"Enter context manager."
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit context manager and cleanup."""
self.cleanup()
return False
async def __aenter__(self)
"Exit context manager and cleanup."
async def __aenter__(self):
"""Enter async context manager."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb)
"Enter async context manager."
async def __aexit__(self, exc_type, exc_val, exc_tb)
"Exit async context manager and cleanup."
Classes
class RemoteCapabilityProxy:
def __init__(
self,
manifest:Dict[str, Any], # Capability manifest with python_path, module, class, etc.
extra_env:Optional[Dict[str, str]]=None, # CR-12: resolved worker-env overlay (secrets + visible overrides) injected at spawn
adapter_specs:Optional[List[str]]=None, # CR-17 pt 2: host-matched adapter impl specs ("module:ClassName") bound in-worker at spawn
journal:Optional[JournalStore]=None, # CR-14: journal sink for worker-lifecycle events; lazy LocalJournalStore at cfg.journal_db_path when None
diagnostics:Optional[DiagnosticsStore]=None # CR-14: diagnostics sink (raw-stream pump + worker env contract); lazy LocalDiagnosticsStore when None
)
"Proxy that forwards capability calls to an isolated Worker subprocess."
def __init__(
self,
manifest:Dict[str, Any], # Capability manifest with python_path, module, class, etc.
extra_env:Optional[Dict[str, str]]=None, # CR-12: resolved worker-env overlay (secrets + visible overrides) injected at spawn
adapter_specs:Optional[List[str]]=None, # CR-17 pt 2: host-matched adapter impl specs ("module:ClassName") bound in-worker at spawn
journal:Optional[JournalStore]=None, # CR-14: journal sink for worker-lifecycle events; lazy LocalJournalStore at cfg.journal_db_path when None
diagnostics:Optional[DiagnosticsStore]=None # CR-14: diagnostics sink (raw-stream pump + worker env contract); lazy LocalDiagnosticsStore when None
)
"Initialize proxy and start the worker process."
def name(self) -> str: # Capability name from manifest
"""Capability name."""
return self.manifest.get('name', 'unknown')
@property
def version(self) -> str: # Capability version from manifest
"Capability name."
def version(self) -> str: # Capability version from manifest
"""Capability version."""
return self.manifest.get('version', '0.0.0')
def _journal_event(
self,
event_type:str, # SubstrateEventType value
payload:Optional[Dict[str, Any]]=None # Per-event structured detail
) -> None
"Capability version."
def initialize(
self,
config:Optional[Dict[str, Any]]=None # Configuration dictionary
) -> None
"Initialize or reconfigure the capability."
def execute(
self,
*args,
**kwargs
) -> Any: # Capability result
"Execute the capability synchronously.
CR-4: HTTP 409 from the worker is mapped to a typed
`CapabilityCancelledError` raised in the host process, so substrate /
JobQueue / consumer callers can distinguish cooperative cancellation
from a real capability failure (500 → RuntimeError as before)."
def get_config_schema(self) -> Dict[str, Any]: # JSON Schema
"""Get the capability's configuration schema."""
with httpx.Client() as client
"Get the capability's configuration schema."
def get_current_config(self) -> Dict[str, Any]: # Current config values
"""Get the capability's current configuration."""
with httpx.Client() as client
"Get the capability's current configuration."
Job Queue (queue.ipynb)
Resource-aware job queue for sequential capability execution with cancellation support
Import
from cjm_substrate.core.queue import (
JobStatus,
JobEventType,
CancelPhase,
Job,
JobEvent,
QueueStats,
ResourceSnapshot,
JobQueueDependencies,
JobQueue
)
Functions
async def _enqueue_job(
self,
job: Job, # Pre-constructed Job (caller fills composition fields if needed)
) -> str: # job_id
"""
Internal: enqueue a pre-constructed Job.
Caller is responsible for validation (disabled-capability check, etc.).
Used by `submit` and by the composition advancement path (stage 3) —
the latter populates `composition_id` + `node_id` on the Job before
enqueueing so the member job appears in composition-tagged event streams
from its first STATE_TRANSITION.
"""
def _check_journal_wedge(self) -> None:
"""CR-14 wedge gate: refuse new work after a journal-append failure.
The loud half of the named-tension resolution — a wedged journal must
never silently drop the audit trail, and the refusal happens at the
operational boundary (new submissions) rather than mid-finalization
(which would leak lanes). Clears only by constructing a new queue /
fixing the journal and resetting `_journal_wedged` deliberately.
"""
if self._journal_wedged
"""
CR-14 wedge gate: refuse new work after a journal-append failure.
The loud half of the named-tension resolution — a wedged journal must
never silently drop the audit trail, and the refusal happens at the
operational boundary (new submissions) rather than mid-finalization
(which would leak lanes). Clears only by constructing a new queue /
fixing the journal and resetting `_journal_wedged` deliberately.
"""
async def submit(
self,
capability_instance_id: str, # Target capability instance (per CR-10)
*args,
priority: int = 0, # Higher = more urgent
task: Optional[str] = None, # Task-channel address: adapter task name (stage 4)
method: Optional[str] = None, # Task-channel address: adapter method (set with task)
run_id: Optional[str] = None, # Host-tier run correlation (CR-14 follow-up; reserved name, never a capability kwarg)
actor: Optional[str] = None, # Who/what initiated (CR-14 follow-up; reserved name)
control: Optional[Dict[str, Any]] = None, # Per-call control flags (force/cache-bypass); reserved name, never a capability kwarg
**kwargs
) -> str: # Returns job_id
"""
Submit a job to the queue.
CR-2: rejects jobs for disabled capabilities at submit time (typed
CapabilityDisabledError) so the failure surface matches CapabilityManager.
execute_capability's disabled gate. Submitting to a disabled capability would
otherwise sit in the queue until execution, then raise — moving the
check earlier gives operators an actionable signal immediately.
CR-6: no STATE_TRANSITION event fires at submit because the job is
already in `pending` state at construction — there's no transition
to publish. The first STATE_TRANSITION fires when the processor loop
moves the job pending → running.
CR-14: refuses loudly when the journal is wedged (see
`_check_journal_wedge`). `run_id`/`actor` join `priority`/`task`/
`method` as reserved keyword names (they never reach capability kwargs):
cores pass their run-manifest id + initiating actor so every journal
row for this job carries the host-tier correlation.
"""
async def cancel(
self,
job_id: str # Job to cancel
) -> bool: # True if cancelled
"""
Cancel a pending or running job.
CR-6: publishes STATE_TRANSITION when a pending job moves directly to
cancelled (no transition through running). Running-job cancellation
publishes CANCEL_PHASE_CHANGED events from `_execute_with_cancellation`.
Stage 3: when the cancelled job is a composition member,
`_advance_composition` runs after the lock is released so the cancelled
status propagates to the composition run (dependents skip; the run
finalizes when all nodes are terminal). Lock release is required because
`_advance_composition` may need to enqueue downstream members in some
flows; asyncio.Lock is not re-entrant.
"""
def reorder(
self,
job_id: str, # Job to move
new_priority: int # New priority value
) -> bool: # True if reordered
"Change the priority of a pending job."
def get_job(
self,
job_id: str # Job to retrieve
) -> Optional[Job]: # Job or None
"Get a job by ID."
async def wait_for_job(
self,
job_id: str, # Job to wait for
timeout: Optional[float] = None # Max seconds to wait
) -> Job: # Completed/failed/cancelled job
"""
Wait for a job to complete.
Independent of the CR-6 event bus — uses a per-job `asyncio.Event` for
the simple block-until-done affordance. Streaming consumers should use
`events(job_id)` instead.
"""
def get_pending(self) -> List[Job]: # Pending jobs, priority-sorted
"Get pending jobs, priority-sorted (higher priority first, then FIFO)."
def get_running_jobs(self) -> List[Job]: # All currently-executing jobs
"All in-flight jobs (stage 3: the queue is multi-lane)."
def get_history(
self,
limit: Optional[int] = None, # Max jobs to return (most recent N); None = all
) -> List[Job]: # Completed/failed/cancelled jobs, most recent first
"""
Get completed jobs, most recent first.
If `limit` is provided, returns the most recent N. The internal history
list grows append-only up to `max_history`, so older jobs are evicted
in submission order (oldest first).
"""
def get_stats(self) -> QueueStats: # Aggregate counts
"Get aggregate queue stats — total counts by terminal status."
def get_job_diagnostics(
self,
job_id: str, # Job whose diagnostic records to read
limit: Optional[int] = 200, # Max records (None = all)
after_seq: Optional[int] = None, # Tail cursor for follow-style reads
) -> List[DiagnosticRecord]: # Job-stamped records, oldest first
"""
EXACT per-job diagnostics (CR-14; replaces `get_job_logs`).
Records were stamped with the job id IN THE WORKER via the call-envelope
contextvar — no timestamp windows, no over-fetch, correct under stage-3
same-worker concurrency and across multi-instance capabilities (both of which
the deleted `_slice_log_by_job_window` heuristic got wrong). Follow-style
consumers poll with `after_seq` (the LOG_APPENDED replacement).
Returns [] when no diagnostics store is configured.
"""
def get_history_from_journal(
self,
limit: Optional[int] = None, # Most recent N terminal jobs (None = all)
) -> List[Job]: # Rehydrated job records, most recent first
"""
Durable job history (the CR-14 `_history` migration rider).
Rehydrates Job records from terminal STATE_TRANSITION journal rows —
restart-surviving and unbounded, unlike the in-memory `get_history`
working set (`max_history` eviction). Rehydrated Jobs are RECORDS:
args/kwargs/result are not journaled (results live in capability DBs;
parameters in run manifests) — identity, timing, status, error,
composition/task fields are present.
Falls back to the in-memory history when no journal is configured.
"""
def _subscriber_keys_for(event: JobEvent) -> List[str]:
"""Return the subscriber keys an event should fan out to (CR-6 / stage 3).
Every event reaches "all" subscribers + the per-job subscribers.
Composition-tagged events additionally reach per-composition subscribers.
"""
keys = ["all", f"job:{event.job_id}"]
if event.composition_id is not None
"""
Return the subscriber keys an event should fan out to (CR-6 / stage 3).
Every event reaches "all" subscribers + the per-job subscribers.
Composition-tagged events additionally reach per-composition subscribers.
"""
def _journal_append_guarded(
self,
event: JournalEvent, # Pre-built journal event (caller fills identity/payload)
) -> None
"""
Append to the journal under the wedge-gate failure contract (CR-14).
The ONE place the named-tension resolution lives: an append failure logs
at ERROR and wedges the queue (new submissions refuse via
`_check_journal_wedge`) instead of raising into dispatch/finalization
paths — raising there would leak lanes and corrupt in-flight state,
while continuing silently would drop the audit trail. No-op without a
configured journal. Used by `_publish_event` (job events) and by the
direct substrate-event emissions (ADMISSION_DECIDED).
"""
def _publish_event(
self,
event: JobEvent, # Event to emit
) -> None
"""
The SINGLE emission path (CR-14: journal-primary).
Class routing at the one place every event passes through:
- journal-class events (everything except `LIVENESS_EVENT_TYPES`)
become durable journal rows FIRST, then fan out to live subscribers —
emitting IS writing the record; the bus is a live tail of the journal.
- liveness-class events (PROGRESS_CHANGED / RESOURCE_SNAPSHOT) fan out
only; their final values ride the terminal STATE_TRANSITION row.
Journal failures follow the wedge-gate contract — see
`_journal_append_guarded`.
Fan-out: slow subscribers backpressure themselves via `asyncio.QueueFull`
drop — publisher never blocks. Each subscriber tracks `dropped_count` so
operators / future telemetry can surface backpressure visibility.
"""
async def _subscribe(
self,
key: str, # Subscriber key ("all" | "job:<id>" | "comp:<id>")
) -> AsyncIterator[JobEvent]
"""
Internal: register a subscription; yield events until the consumer
exits the async generator.
Cleanup runs in `finally` so the subscriber is unregistered even if the
consumer raises or is cancelled. Empty key lists are deleted to avoid
memory accumulation across many short-lived subscriptions.
"""
async def events(
self,
job_id: str, # Filter to events for this job
) -> AsyncIterator[JobEvent]
"""
Subscribe to events for a single job (async generator).
Yields events as they fire. Multiple concurrent subscribers to the same
job_id each get their own independent stream — useful for multi-tab UIs.
Late subscribers catch up exactly via the journal: query
`journal.query(job_id=..., after_seq=cursor)` then follow live — the
bus is a tail of the journal, not the record itself (CR-14).
"""
async def events_for_composition(
self,
composition_id: str, # Filter to events tagged with this composition
) -> AsyncIterator[JobEvent]
"""
Subscribe to events for all member jobs of a composition (async
generator).
Yields the unified per-composition narrative: member-job lifecycle events
interleaved with `COMPOSITION_ADVANCED` aggregate signals (stage 3).
"""
async def all_events(self) -> AsyncIterator[JobEvent]:
"""Subscribe to all events (firehose; async generator).
Useful for global dashboards, audit logs, and telemetry sinks that need
the complete event stream rather than a filtered view.
"""
async for evt in self._subscribe("all")
"""
Subscribe to all events (firehose; async generator).
Useful for global dashboards, audit logs, and telemetry sinks that need
the complete event stream rather than a filtered view.
"""
async def submit_composition(
self,
comp: Composition, # Composition to run (validated at submit)
) -> str: # composition run id
"""
Submit a composition — a DAG of capability-invocation nodes with
execution-time-bound inputs (stage 3; replaces `submit_sequence`).
Validates upfront: structural validation via `new_composition_run`
(duplicate ids / unknown refs / cycles → `CompositionValidationError`)
and the disabled-capability gate across all nodes (`CapabilityDisabledError`),
matching the sequence-era precedent. Member Jobs are created LAZILY —
only dependency-free nodes have Jobs at submit; downstream nodes get
their kwargs materialized from upstream results at advancement time.
CR-14: refuses loudly when the journal is wedged (the same gate as
`submit`).
Consumers wait via `wait_for_composition`, observe via
`events_for_composition`, inspect via `get_composition`.
"""
async def wait_for_composition(
self,
composition_id: str, # Composition to wait for
timeout: Optional[float] = None, # Max seconds to wait
) -> CompositionRun: # Terminal run record
"""
Block until a composition reaches terminal status (the
`wait_for_job` analog for compositions).
"""
async def cancel_composition(
self,
composition_id: str # Composition to cancel
) -> bool: # True if cancellation was recorded
"""
Cancel an in-flight composition (USER intent — the run lands
`cancelled`).
Records intent FIRST (`cancel_requested`) so member-cancel callbacks
racing through `_advance_composition` derive the right terminal status;
then marks never-started nodes cancelled and cancels every member whose
Job is still pending or running (in-flight members resolve through the
per-job cooperative-cancel machinery and finalize the run on the way
out). Returns False if the composition is unknown or already terminal.
"""
def get_composition(
self,
composition_id: str # Composition to retrieve
) -> Optional[CompositionRun]: # CompositionRun or None
"Get a composition run by id (read-only inspection)."
async def _start_ready_nodes(
self,
run: CompositionRun, # Composition being advanced
) -> List[str]: # Node ids whose member Jobs were created + enqueued
"""
Create + enqueue member Jobs for every currently-ready node (stage 3).
This is where execution-time binding happens: each ready node's kwargs
are materialized from upstream results via `resolve_node_kwargs`. A
binding failure is recorded as that NODE's failure (dependents skip,
fail-fast housekeeping applies) rather than raising to the caller — by
the time bindings resolve, the composition is mid-flight and the failure
must flow through the same path as a member-job failure.
CR-14 follow-up: member Jobs inherit the composition's `run_id`/`actor`
correlation tags so every member's journal rows link to the host run.
"""
async def _advance_composition(
self,
completed_job: Job # Member job that just reached terminal status
) -> None
"""
Advance a composition after a member job completes (stage 3).
Records the member outcome; on success enqueues newly-ready downstream
nodes (emitting COMPOSITION_ADVANCED); on failure/cancellation skips
transitive dependents and (fail_fast) cancels independent pending
members. Finalizes the run when every node is terminal. Always called
OUTSIDE the queue's main lock — may take it via `_enqueue_job`.
"""
async def _cancel_pending_members(
self,
run: CompositionRun, # Composition whose pending members should stop
) -> None
"""
Fail-fast housekeeping: stop members that have not actually started
executing (stage 3, ratified failure semantics).
IN-FLIGHT members run to completion — their results and caches are kept,
and force-killing a half-done GPU job buys nothing. Nodes that never got
a Job are marked cancelled directly; members whose Jobs still sit in the
pending heap go through the per-job cancel path (which re-enters
`_advance_composition` for each, transitioning them to terminal).
"""
def _maybe_finalize_composition(
self,
run: CompositionRun, # Composition to check for terminal state
) -> None
"Finalize the run once every node is terminal (idempotent; stage 3)."
def _sample_resource_snapshot(
self,
job: Job # Job to sample resources for
) -> Optional[ResourceSnapshot]
"""
Sample worker + sysmon stats for a job (CR-6 Stage 3 internal helper).
Returns None if the worker proxy doesn't support `get_stats` or the call
fails — substrate can't fabricate a snapshot. Sysmon enrichment is
best-effort: if the named capability isn't loaded / errors / lacks the CR-3
typed methods, GPU fields stay None and the worker-only snapshot is
returned.
Subprocess-spawning capabilities (e.g. Voxtral-vLLM's managed vLLM server)
spawn grandchild PIDs that hold GPU memory the worker itself doesn't.
GPU attribution delegates to `attribute_gpu_to_worker_subtree`, which
intersects the worker-reported `subtree_pids` set with sysmon's per-PID
GPU enumeration. The pre-fix path matched only `worker_pid` and reported
`gpu_memory_mb=None` for any subprocess-spawning capability.
"""
def get_resource_snapshot(
self,
job_id: str # Job to sample resources for
) -> Optional[ResourceSnapshot]
"""
Get a point-in-time resource snapshot for a job (CR-6 Stage 3).
Returns None if the job is unknown or the worker proxy doesn't expose
`get_stats`. Composes worker stats with sysmon GPU stats when the queue
is configured with a `sysmon_capability_name`.
"""
async def start(self) -> None:
"""Start the queue processor.
CR-6 Stage 4: installs the substrate-side retry observer on `_deps`
(typically a CapabilityManager). When CR-7's reactive-retry path fires for
a running job, the observer updates `Job.retry_count` and publishes a
RETRY_STARTED event tagged with the in-flight job. Previous observer
value (if any) is saved + restored in stop() for cooperative coexistence.
"""
if self._running_flag
"""
Start the queue processor.
CR-6 Stage 4: installs the substrate-side retry observer on `_deps`
(typically a CapabilityManager). When CR-7's reactive-retry path fires for
a running job, the observer updates `Job.retry_count` and publishes a
RETRY_STARTED event tagged with the in-flight job. Previous observer
value (if any) is saved + restored in stop() for cooperative coexistence.
"""
async def stop(self) -> None:
"""Stop the queue processor gracefully.
Stage 3: in-flight job tasks are detached lanes now — drain them with
the same 5s budget as the processor task; leftovers are cancelled.
CR-6 Stage 4: restores the previous `_on_retry` observer on deps to
leave the manager in the state we found it (cooperative with other
queue instances, tests, etc.).
"""
self._running_flag = False
self._job_available.set() # Wake up the processor
if self._processor_task
"""
Stop the queue processor gracefully.
Stage 3: in-flight job tasks are detached lanes now — drain them with
the same 5s budget as the processor task; leftovers are cancelled.
CR-6 Stage 4: restores the previous `_on_retry` observer on deps to
leave the manager in the state we found it (cooperative with other
queue instances, tests, etc.).
"""
def _on_manager_retry(
"""
Substrate-side retry observer (CR-6 Stage 4; stage-3 multi-lane).
Invoked synchronously by CapabilityManager's CR-7 retry loop just before
each retry attempt. Updates `Job.retry_count` on the matching in-flight
job + emits RETRY_STARTED. Best-effort: synchronous callback; emission
failure shouldn't propagate back into the retry loop.
`attempt` semantics: CapabilityManager's loop iterates
`for attempt in range(max_retries + 1)`. The first iteration
(`attempt=0`) is the original try and never invokes this callback —
so the value PASSED here is already the 1-based retry number.
Match logic (stage 3): scan the multi-lane `_running` dict for a job on
the retrying instance. With the per-instance cap defaulting to 1, at
most one in-flight job matches; if an instance opts into same-worker
concurrency (SG-33 cap > 1), the first match is attributed — a known
blunt edge recorded in the stage-3 ledger.
"""
def _move_to_history(self, job: Job) -> None:
"""Move a job to history, maintaining max_history limit."""
self._history.append(job)
if len(self._history) > self.max_history
"Move a job to history, maintaining max_history limit."
def _signal_job_completed(self, job_id: str) -> None:
"""Signal that a job has completed."""
event = self._job_completed_events.get(job_id)
if event
"Signal that a job has completed."
def _job_snapshot(job: Job) -> Dict[str, Any]:
"""Serialize a job's RECORD fields for the terminal journal row (CR-14).
Deliberately excludes args/kwargs/result: results live in capability DBs,
parameters in run manifests — the journal never duplicates what better
homes already record (the attempted-vs-happened rule). The error rides as
the JobError dict (failures are exactly what the journal exists to keep).
"""
return {
"id": job.id,
"""
Serialize a job's RECORD fields for the terminal journal row (CR-14).
Deliberately excludes args/kwargs/result: results live in capability DBs,
parameters in run manifests — the journal never duplicates what better
homes already record (the attempted-vs-happened rule). The error rides as
the JobError dict (failures are exactly what the journal exists to keep).
"""
def _job_from_snapshot(snap: Dict[str, Any]) -> Job:
"""Rehydrate a Job RECORD from a terminal-row snapshot (CR-14).
Tolerant on both directions: unknown snapshot keys are ignored; a
JobError dict that no longer matches the current JobError fields
degrades to None rather than failing the history read.
"""
def _dt(v)
"""
Rehydrate a Job RECORD from a terminal-row snapshot (CR-14).
Tolerant on both directions: unknown snapshot keys are ignored; a
JobError dict that no longer matches the current JobError fields
degrades to None rather than failing the history read.
"""
def _emit_state_transition(
self,
job: Job,
prev_status: JobStatus,
) -> None
"""
Emit a STATE_TRANSITION event for `job`'s most recent status change.
Centralized so every transition site (start, completed, failed, cancelled,
or future cancel-phase-driven transitions) carries identical tag context.
CR-14: TERMINAL transitions carry the job snapshot in the payload — the
durable journal row becomes the job's record of existence (the `_history`
migration rider: `get_history_from_journal` rehydrates from these).
"""
def _emit_cancel_phase(
self,
job: Job,
new_phase: CancelPhase,
) -> None
"""
Emit a CANCEL_PHASE_CHANGED event (CR-6 Stage 4).
Updates `job.cancel_phase` to the new value and publishes an event with
the prior phase + new phase in the payload. Centralized so every phase
transition site in `_execute_with_cancellation` produces identically-shaped
events.
"""
def _emit_block_reason(
self,
job: Job,
new_reason: Optional[str],
) -> None
"""
Emit a BLOCK_REASON_CHANGED event (CR-6 Stage 4 — reserved).
Updates `job.block_reason` and publishes an event with the prior + new
reason. Stage 4 ships this helper for future scheduler-integration use;
the queue's current scheduling logic doesn't surface block reasons, so
the helper is reserved for the eventual scheduler-coordination wiring.
"""
async def _fetch_admission_stats(self) -> Dict[str, Any]:
"""Fetch live system telemetry for admission decisions (stage 3).
Defensive-getattr: a deps implementation without `get_global_stats`
(older test doubles) yields `{}` — admission then runs GPU-profiled jobs
exclusive (no live headroom to verify against) and CPU-profiled jobs on
lanes + instance caps alone. Failures degrade the same way.
"""
fn = getattr(self._deps, 'get_global_stats', None)
if not callable(fn)
"""
Fetch live system telemetry for admission decisions (stage 3).
Defensive-getattr: a deps implementation without `get_global_stats`
(older test doubles) yields `{}` — admission then runs GPU-profiled jobs
exclusive (no live headroom to verify against) and CPU-profiled jobs on
lanes + instance caps alone. Failures degrade the same way.
"""
def _pop_next_admissible(
self,
stats: Dict[str, Any], # Live telemetry from _fetch_admission_stats (possibly {})
) -> Optional[Job]: # The popped job, or None when nothing is dispatchable
"""
Pop the highest-priority ADMISSIBLE pending job (stage 3).
Scans pending jobs in priority order with SKIP-AHEAD: a blocked job
(insufficient GPU headroom, instance cap reached) does not stall
admissible jobs behind it. The admission ladder (stage-3 ledger,
ratified 2026-06-10):
1. **lanes** — at most `max_concurrent_lanes` in-flight jobs;
2. **exclusivity** — a job with NO empirical profile runs ALONE: its
first run IS its measurement run. The store's (instance_id,
config_hash) keying graduates it automatically after one run and
demotes it again whenever the config changes — staleness is dissolved
by the keying, not solved by invalidation;
3. **per-instance cap** — in-flight jobs per instance ≤ the instance's
SG-33 `max_concurrent_requests`, DEFAULT 1 when unset (same-worker
concurrency is opt-in per capability);
4. **resources** — the empirical `gpu_memory_mb_peak_max` is admitted
against BOTH a reservation ledger (sum of running GPU peaks ≤ total ×
`gpu_headroom_fraction`; covers admitted-but-not-yet-loaded models)
AND live free VRAM (covers resident idle models + external GPU
users); `memory_mb_peak_max` against live `memory_available_mb`.
Without sysmon stats, GPU-profiled jobs run exclusive.
The manifest's `requires_gpu` is deliberately NOT consumed — whether a
config uses the GPU is an empirical fact (`gpu_memory_mb_peak_max > 0`),
not a declaration (ledger G2: the pre-overhaul scheduler quantity checks
were dead code against v2 manifests). Worst case (no profiles, no
sysmon) every job runs exclusive = exact pre-stage-3 single-lane
behavior. CR-7 reactive retry is the documented backstop for admission
misses.
"""
async def _process_loop(self) -> None:
"""Main dispatch loop (stage 3: multi-lane ready-set dispatch).
`_job_available` means "dispatch state may have changed" — set on submit
AND on every job completion, cleared only after a scan that dispatched
nothing. Each pass: fetch admission telemetry (outside the lock), pop
the highest-priority admissible job under the lock, and launch it as an
independent task tracked in `_running_tasks` (awaited by `stop`). The
pre-stage-3 loop executed one job at a time inline.
CR-14 follow-up: each ADMIT is journaled as an ADMISSION_DECIDED row —
emitted AFTER the lock releases (sqlite I/O never rides the locked fast
path; the decision detail is recovered from the admission ledgers the
pop updated synchronously). Blocked jobs are NOT journaled per scan —
the scan loop re-evaluates them on every pass and would spam rows; the
reserved BLOCK_REASON_CHANGED transition channel is the place block
visibility lands when the scheduler-coordination wiring happens.
"""
while self._running_flag
"""
Main dispatch loop (stage 3: multi-lane ready-set dispatch).
`_job_available` means "dispatch state may have changed" — set on submit
AND on every job completion, cleared only after a scan that dispatched
nothing. Each pass: fetch admission telemetry (outside the lock), pop
the highest-priority admissible job under the lock, and launch it as an
independent task tracked in `_running_tasks` (awaited by `stop`). The
pre-stage-3 loop executed one job at a time inline.
CR-14 follow-up: each ADMIT is journaled as an ADMISSION_DECIDED row —
emitted AFTER the lock releases (sqlite I/O never rides the locked fast
path; the decision detail is recovered from the admission ledgers the
pop updated synchronously). Blocked jobs are NOT journaled per scan —
the scan loop re-evaluates them on every pass and would spam rows; the
reserved BLOCK_REASON_CHANGED transition channel is the place block
visibility lands when the scheduler-coordination wiring happens.
"""
async def _execute_job(self, job: Job) -> None:
"""Execute a single job (runs as an independent task per lane; stage 3)."""
self.logger.info(f"Starting job {job.id[:8]} ({job.capability_instance_id})")
# Mark as running + emit transition pending → running
prev_status = job.status
job.status = JobStatus.running
job.started_at = datetime.now(timezone.utc)
# Lane already reserved at pop time (_pop_next_admissible) — see the
# admission-state synchronicity note there.
self._emit_state_transition(job, prev_status)
try
"Execute a single job (runs as an independent task per lane; stage 3)."
async def _execute_with_cancellation(
self,
job: Job,
capability: Any
) -> Any
"""
Execute job with cancellation monitoring.
CR-6 Stage 4 wires CANCEL_PHASE_CHANGED events for the substrate's
cooperative → force → reloading → completed state machine.
Cooperative-success path (capability acknowledges cancel within timeout):
COOPERATIVE → COMPLETED
Force-kill path (cooperative timeout):
COOPERATIVE → FORCE → RELOADING → COMPLETED
"""
async def _poll_progress(
self,
job: Job,
capability: Any
) -> None
"""
Poll progress + sample resources from the capability during execution.
Emits PROGRESS_CHANGED events when progress or status_message changes
from the previous poll (avoids spamming the bus between meaningful
updates). CR-6 Stage 3: also emits RESOURCE_SNAPSHOT events every
`resource_snapshot_cadence_polls` iterations; snapshot is also stored
on `job.last_resource_snapshot` for synchronous inspection.
Liveness-class events (never journaled) still carry run_id/actor so
live-tail subscribers see the same tag shape as journal-class events.
"""
Classes
class JobStatus(str, Enum):
"Status of a job in the queue."
class JobEventType(str, Enum):
"""
Push-based job event types (CR-6; stage-3 composition rework; CR-14
journal-primary emission).
Emitted by JobQueue through the single emission path (`_publish_event`):
journal-class events become durable journal rows AND fan out to live
subscribers; liveness-class events (`LIVENESS_EVENT_TYPES` in
`core.journal_store`) fan out only — their final values ride the
terminal STATE_TRANSITION row. Consumers subscribe via
`queue.events(job_id)` / `queue.events_for_composition(comp_id)` /
`queue.all_events()` and receive `JobEvent` instances asynchronously.
COMPOSITION_ADVANCED replaced the retired SEQUENCE_ADVANCED at execution
stage 3 (CR-16: compositions replace sequences outright): it fires when a
member job's completion unlocks downstream composition nodes — payload
carries the completed node id + the newly enqueued node ids.
The reserved-never-emitted LOG_APPENDED was RETIRED at stage 7 (CR-14):
log-follow is a diagnostics-store cursor read (`get_job_diagnostics`),
not a push event — there are no log files or byte offsets anymore.
Non-job substrate events (worker lifecycle, config, runs) live in
`core.journal_store.SubstrateEventType`.
"""
class CancelPhase(str, Enum):
"""
Phase of a cancellation in progress (CR-6 + CR-4 pairing).
Surfaces the substrate's cancel state machine. Stage 4 wires the
transitions; Stage 1 reserves the enum so `Job.cancel_phase` can be
typed correctly without dangling forward references.
"""
@runtime_checkable
class JobQueueDependencies(Protocol):
"""
Substrate dependencies the JobQueue requires (CR-6 + stage 3).
CapabilityManager satisfies this structurally; the Protocol exists so JobQueue
can be tested in isolation (with a lightweight test double) and so a future
extraction into a separate library has no API constraint locked in.
The first 4 methods are the CR-6 execute-path surface (CR-14 retired the
original flat-log accessor — log retrieval is a diagnostics-store query now). The
stage-3 additions (CR-16 multi-lane admission) are consumed DEFENSIVELY
via getattr — a deps implementation without them yields no admission
evidence, so every job runs exclusive = exact pre-stage-3 single-lane
behavior. Older test doubles keep working unchanged. CR-14 also reads
`journal_store` / `diagnostics_store` ATTRIBUTES via getattr when the
queue isn't constructed with explicit stores — a deps without them
(test doubles) simply yields no journaling.
"""
def get_capability_meta(self, name_or_id: str) -> Optional[Any]: ...
def get_capability(self, name_or_id: str) -> Optional[Any]: ...
def get_capability(self, name_or_id: str) -> Optional[Any]: ...
async def execute_capability_async(self, name_or_id: str, *args: Any, **kwargs: Any) -> Any: ...
async def execute_capability_async(self, name_or_id: str, *args: Any, **kwargs: Any) -> Any: ...
def reload_capability(self, name_or_id: str) -> Any: ...
def reload_capability(self, name_or_id: str) -> Any: ...
# Stage 3 (CR-16) admission surface
def get_admission_profile(self, name_or_id: str) -> Optional[Dict[str, Any]]: ...
def get_instance_concurrency_cap(self, name_or_id: str) -> Optional[int]: ...
def get_instance_concurrency_cap(self, name_or_id: str) -> Optional[int]: ...
async def get_global_stats(self) -> Dict[str, Any]: ...
async def get_global_stats(self) -> Dict[str, Any]: ...
# Stage 4 (CR-17 pt 2) task channel — invoked only for task-addressed jobs
# (Job.task_name set); execute-channel jobs never touch it, so older test
# doubles keep working unchanged.
async def execute_capability_task_async(self, name_or_id: str, task_name: str, method: str, **kwargs: Any) -> Any: ...
async def execute_capability_task_async(self, name_or_id: str, task_name: str, method: str, **kwargs: Any) -> Any: ...
@dataclass
class Job:
"""
A queued capability execution request (CR-6 reshape; stage-3 composition
rework renamed the sequence tags to composition tags).
`composition_id` / `node_id` are set when the job is a lazily-created
member of a composition (CR-16) — they ride every JobEvent so
`events_for_composition` subscribers see member lifecycle events.
`run_id` / `actor` (CR-14 follow-up) are host-tier correlation tags:
cores pass their run-manifest id + initiating actor at submit so every
journal row for this job links back to the run record (run manifest ↔
journal linkage) and carries who/what initiated it.
"""
id: str # Unique job identifier (UUID)
capability_instance_id: str # Target capability instance (per CR-10)
args: Tuple[Any, ...] # Positional arguments for execute()
kwargs: Dict[str, Any] # Keyword arguments for execute()
status: JobStatus = JobStatus.pending # Current job status
priority: int = 0 # Higher = more urgent
submitted_at: datetime = field(...) # When submitted
started_at: Optional[datetime] # When execution started
completed_at: Optional[datetime] # When execution finished
progress: float = 0.0 # 0.0 to 1.0, or -1.0 for indeterminate
status_message: str = '' # Descriptive status message
result: Any # Execution result (if completed)
error: Optional[JobError] # Structured failure summary (CR-5)
composition_id: Optional[str] # Set when part of a composition (stage 3)
node_id: Optional[str] # Composition node this job executes (stage 3)
task_name: Optional[str] # Task-channel address: adapter task (stage 4, CR-17 pt 2)
method: Optional[str] # Task-channel address: adapter method (stage 4)
run_id: Optional[str] # Host-tier run correlation (CR-14 follow-up; core run manifests)
actor: Optional[str] # Who/what initiated the work (CR-14 follow-up)
control: Dict[str, Any] = field(...) # Per-call control flags (force/cache-bypass — CR-15 cat 4); rides CallEnvelope.control
cancel_requested_at: Optional[datetime] # When cancel was requested (Stage 4)
cancel_phase: Optional[CancelPhase] # Active cancel phase (Stage 4)
block_reason: Optional[str] # Why the scheduler is blocking (Stage 4)
retry_count: int = 0 # Reactive retries attempted (CR-7 + Stage 4)
last_resource_snapshot: Optional[Any] # Stage 3 wires this (ResourceSnapshot)
@dataclass
class JobEvent:
"""
A push-based job event (CR-6; stage-3 composition tags).
Carries full tag context so a subscriber to `all_events()`, `events(job_id)`,
or `events_for_composition(comp_id)` receives identically-shaped instances.
`payload` is a per-event-type structured dict (e.g., STATE_TRANSITION carries
`{"from": "pending", "to": "running"}`; COMPOSITION_ADVANCED carries
`{"completed_node": ..., "enqueued_nodes": [...]}`).
`run_id` / `actor` (CR-14 follow-up) ride from the Job so journal rows
written by the single emission path carry the host-tier correlation.
"""
type: JobEventType
job_id: str
capability_instance_id: str
composition_id: Optional[str]
node_id: Optional[str]
run_id: Optional[str] # Host-tier run correlation (CR-14 follow-up)
actor: Optional[str] # Who/what initiated (CR-14 follow-up)
timestamp: datetime = field(...)
payload: Dict[str, Any] = field(...)
@dataclass
class QueueStats:
"Aggregate counts returned by `JobQueue.get_stats()` (CR-6)."
total_pending: int
total_completed: int
total_failed: int
total_cancelled: int
@dataclass
class _Subscription:
"""
Internal: per-subscriber bounded queue + drop counter (CR-6 event bus).
Slow subscribers backpressure themselves via `asyncio.QueueFull` drop;
the publisher never blocks. `dropped_count` surfaces visibility for
operators / future telemetry.
"""
queue: 'asyncio.Queue[JobEvent]'
dropped_count: int = 0
@dataclass
class ResourceSnapshot:
"""
Point-in-time resource usage for one job (CR-6 Stage 3).
Worker stats (cpu_percent, memory_rss_mb) come from the capability proxy's
`get_stats()`. GPU fields come from the configured system-monitor capability
(when set on JobQueue) via CR-3's typed `list_processes()` (per-PID
matching) and `get_system_status()` (global GPU stats). All GPU fields
are Optional — None if no sysmon configured or the worker isn't running
on a GPU.
Distinct from CR-7's `EmpiricalResourceRecord` (aggregated profile
across runs); this is "what's happening right now" for one job.
"""
timestamp: datetime # When the sample was taken
worker_pid: int = 0 # OS PID of the capability worker subprocess
cpu_percent: float = 0.0 # Worker process CPU%
memory_rss_mb: float = 0.0 # Worker process resident memory (MB)
gpu_index: Optional[int] # GPU index the worker is on (None if not GPU-bound)
gpu_memory_mb: Optional[float] # Worker's GPU memory usage (MB)
gpu_type: Optional[str] # GPU vendor (NVIDIA / AMD / Intel / None)
gpu_total_mb: Optional[float] # Total GPU memory available globally (MB)
gpu_load_percent: Optional[float] # GPU compute utilization (global)
class JobQueue:
def __init__(
"""
Resource-aware multi-lane job queue with journal-primary observability
(CR-6 + CR-14; stage-3 CR-16 rework: ready-set dispatch + resource-derived
admission + composition execution).
"""
def __init__(
"Initialize the job queue.
Stage 3 (CR-16): the queue dispatches multiple admissible jobs
concurrently (`max_concurrent_lanes` is the operator safety valve);
per-job admission derives from empirical resource records + live
sysmon telemetry — see `_pop_next_admissible`. Worst case (no
records, no sysmon) every job runs exclusive, which is exactly the
pre-stage-3 single-lane behavior.
CR-14 (stage 7): emission is journal-primary — `_publish_event`
writes journal-class events as durable rows before fanning out to
live subscribers. The stores default to the deps' (CapabilityManager's)
stores via getattr, so the cores gain journaling with zero host
changes; a deps without them (test doubles) yields no journaling."
def set_run_context(
self,
run_id: Optional[str] = None, # Host-tier run correlation for subsequent submits
actor: Optional[str] = None, # Who/what initiated the run
) -> None
"Set the queue-scoped default run context (CR-14 follow-up).
Every subsequent submit without explicit `run_id`/`actor` inherits
these — the one-queue-per-run CLI cores call this once after
generating their run-manifest id, and every journal row for the run
links back to it. Call again (or with None) to change/clear."
Scheduling (scheduling.ipynb)
Resource scheduling policies for capability execution
Import
from cjm_substrate.core.scheduling import (
ResourceScheduler,
PermissiveScheduler,
SafetyScheduler,
QueueScheduler
)
Functions
@patch
def _check_resources(
self:QueueScheduler,
capability_meta: CapabilityMeta, # Capability metadata with manifest
stats: Dict[str, Any] # Current system stats
) -> bool: # True if resources available
"Check if system has sufficient resources for the capability."
@patch
def get_active_capabilities(self:QueueScheduler) -> Set[str]: # Set of currently executing capability names
"Get the set of capabilities with active executions."
Classes
class ResourceScheduler(ABC):
"Abstract base class for resource allocation policies."
def allocate(
self,
capability_meta: CapabilityMeta, # Metadata of the capability requesting resources
stats_provider: Callable[[], Dict[str, Any]] # Function that returns fresh stats
) -> bool: # True if execution is allowed
"Decide if a capability can start based on its requirements and system state."
async def allocate_async(
self,
capability_meta: CapabilityMeta, # Metadata of the capability requesting resources
stats_provider: Callable[[], Awaitable[Dict[str, Any]]] # Async function returning stats
) -> bool: # True if execution is allowed
"Async allocation decision. Default delegates to sync allocate after fetching stats once."
def on_execution_start(
self,
capability_name: str # Name of the capability starting execution
) -> None
"Notify scheduler that a task started (to reserve resources)."
def on_execution_finish(
self,
capability_name: str # Name of the capability finishing execution
) -> None
"Notify scheduler that a task finished (to release resources)."
class PermissiveScheduler(ResourceScheduler):
"Scheduler that allows all executions (Default / Dev Mode)."
def allocate(
self,
capability_meta: CapabilityMeta, # Metadata of the capability requesting resources
stats_provider: Callable[[], Dict[str, Any]] # Stats provider (ignored)
) -> bool: # Always returns True
"Allow all capability executions without checking resources."
def on_execution_start(
self,
capability_name: str # Name of the capability starting execution
) -> None
"No-op for permissive scheduler."
def on_execution_finish(
self,
capability_name: str # Name of the capability finishing execution
) -> None
"No-op for permissive scheduler."
class SafetyScheduler(ResourceScheduler):
"Scheduler that prevents execution if resources are insufficient."
def allocate(
self,
capability_meta: CapabilityMeta, # Metadata of the capability requesting resources
stats_provider: Callable[[], Dict[str, Any]] # Function returning current stats
) -> bool: # True if resources are available
"Check resource requirements against system state."
def on_execution_start(
self,
capability_name: str # Name of the capability starting execution
) -> None
"Called when execution starts (for future resource reservation)."
def on_execution_finish(
self,
capability_name: str # Name of the capability finishing execution
) -> None
"Called when execution finishes (for future resource release)."
class QueueScheduler:
def __init__(
self,
timeout: float = 300.0, # Max seconds to wait for resources
poll_interval: float = 2.0 # Seconds between resource checks
)
"Scheduler that waits for resources to become available."
def __init__(
self,
timeout: float = 300.0, # Max seconds to wait for resources
poll_interval: float = 2.0 # Seconds between resource checks
)
"Initialize queue scheduler with timeout and polling settings."
def allocate(
self,
capability_meta: CapabilityMeta, # Metadata of the capability requesting resources
stats_provider: Callable[[], Dict[str, Any]] # Function returning current stats
) -> bool: # True if resources become available before timeout
"Wait for resources using blocking sleep."
async def allocate_async(
self,
capability_meta: CapabilityMeta, # Metadata of the capability requesting resources
stats_provider: Callable[[], Awaitable[Dict[str, Any]]] # Async stats function
) -> bool: # True if resources become available before timeout
"Wait for resources using non-blocking async sleep."
def on_execution_start(
self,
capability_name: str # Name of the capability starting execution
) -> None
"Track that a capability has started executing."
def on_execution_finish(
self,
capability_name: str # Name of the capability finishing execution
) -> None
"Track that a capability has finished executing."
Capability Secret Store (secret_store.ipynb)
CR-12: project-local secret storage for API-based capabilities (file-backed, 0600)
Import
from cjm_substrate.core.secret_store import (
SecretStore,
LocalSecretStore
)
Functions
def _default_secrets_dir() -> Path:
"""Default secrets directory: `~/.cjm/secrets` (bootstrap fallback)."""
return Path.home() / ".cjm" / "secrets"
class LocalSecretStore
"Default secrets directory: `~/.cjm/secrets` (bootstrap fallback)."
@patch
def _load(self:LocalSecretStore) -> Dict[str, Dict[str, Dict[str, str]]]:
if not self.path.exists()
@patch
def _save(self:LocalSecretStore, data: Dict[str, Dict[str, Dict[str, str]]]) -> None:
self.secrets_dir.mkdir(parents=True, exist_ok=True)
try
@patch
def get_secret(
self:LocalSecretStore,
capability_name: str, # Capability the secret belongs to
key: str, # Secret key (typically the env-var name, e.g. GEMINI_API_KEY)
*,
scope: Optional[str] = None # Reserved multi-user seam; ignored by the local store
) -> Optional[str]: # The secret value, or None if absent
"Resolve a secret value."
@patch
def set_secret(
self:LocalSecretStore,
capability_name: str, # Capability the secret belongs to
key: str, # Secret key
value: str, # Secret value (stored plaintext at 0600)
*,
scope: Optional[str] = None # Reserved multi-user seam
) -> None
"Persist a secret value."
@patch
def delete_secret(
self:LocalSecretStore,
capability_name: str, # Capability the secret belongs to
key: str, # Secret key
*,
scope: Optional[str] = None # Reserved multi-user seam
) -> bool: # True if a secret was removed
"Remove a secret, pruning now-empty capability/scope containers."
@patch
def list_keys(
self:LocalSecretStore,
capability_name: str, # Capability to list secrets for
*,
scope: Optional[str] = None # Reserved multi-user seam
) -> List[str]: # Secret key NAMES (never values)
"Return the names of secrets stored for a capability (never the values)."
Classes
@runtime_checkable
class SecretStore(Protocol):
"Protocol for resolving per-capability secrets (API keys, tokens)."
def get_secret(self, capability_name: str, key: str, *, scope: Optional[str] = None) -> Optional[str]:
"""Return the secret value for (capability, key) under `scope`, or None."""
...
def set_secret(self, capability_name: str, key: str, value: str, *, scope: Optional[str] = None) -> None
"Return the secret value for (capability, key) under `scope`, or None."
def set_secret(self, capability_name: str, key: str, value: str, *, scope: Optional[str] = None) -> None:
"""Persist a secret value for (capability, key) under `scope`."""
...
def delete_secret(self, capability_name: str, key: str, *, scope: Optional[str] = None) -> bool
"Persist a secret value for (capability, key) under `scope`."
def delete_secret(self, capability_name: str, key: str, *, scope: Optional[str] = None) -> bool:
"""Remove (capability, key) under `scope`. Returns True if a secret was deleted."""
...
def list_keys(self, capability_name: str, *, scope: Optional[str] = None) -> List[str]
"Remove (capability, key) under `scope`. Returns True if a secret was deleted."
def list_keys(self, capability_name: str, *, scope: Optional[str] = None) -> List[str]
"Return the NAMES of secrets stored for a capability under `scope` — never values."
class LocalSecretStore:
def __init__(
self,
secrets_dir: Optional[Path] = None # Directory for secrets.json; None -> ~/.cjm/secrets
)
"File-backed default `SecretStore` (0600 JSON under `secrets_dir`)."
def __init__(
self,
secrets_dir: Optional[Path] = None # Directory for secrets.json; None -> ~/.cjm/secrets
)
"Initialize the store. `secrets_dir=None` uses `~/.cjm/secrets`."
Variables
_SECRETS_FILENAME = 'secrets.json'
_DEFAULT_SCOPE = '__default__'
Substrate Telemetry Helpers (telemetry.ipynb)
Shared GPU/CPU attribution helpers used by both
JobQueue._sample_resource_snapshot(CR-6 Stage 3) andCapabilityManager._record_sample_safe(CR-7).
Import
# No corresponding Python module found for core.telemetry
Functions
def _proc_field(proc: Any, key: str, default: Any = None) -> Any:
"""Read a field from a sysmon process record, accepting dict or dataclass.
`MonitorToolProtocol.list_processes()` returns `ProcessStats` dataclasses (CR-3),
but proxy round-trips frequently coerce to dicts. Accept both so the
helper works against either form without the caller pre-normalizing.
"""
if isinstance(proc, dict)
"""
Read a field from a sysmon process record, accepting dict or dataclass.
`MonitorToolProtocol.list_processes()` returns `ProcessStats` dataclasses (CR-3),
but proxy round-trips frequently coerce to dicts. Accept both so the
helper works against either form without the caller pre-normalizing.
"""
def _worker_subtree_pids(stats: Dict[str, Any]) -> set:
"""Build the worker subtree PID set from a `/stats` dict.
Falls back to a single-pid set when `subtree_pids` is absent (pre-fix
workers, mock test fixtures). The worker pid itself is always included.
"""
tree: set = set()
"""
Build the worker subtree PID set from a `/stats` dict.
Falls back to a single-pid set when `subtree_pids` is absent (pre-fix
workers, mock test fixtures). The worker pid itself is always included.
"""
def attribute_gpu_to_worker_subtree(
stats: Dict[str, Any], # Worker `/stats` payload (must include 'pid'; uses 'subtree_pids' if present)
sysmon: Any, # The configured monitor capability (or None)
) -> Optional[Dict[str, Any]]
"""
Attribute GPU memory across the worker's process subtree.
Returns `{'gpu_memory_mb': float, 'gpu_index': Optional[int]}` when sysmon
is reachable, or `None` when sysmon isn't configured / doesn't expose
`list_processes()` / errors out. Callers treat `None` as "sysmon
unavailable" and leave GPU snapshot fields as their defaults; a 0.0 sum
means sysmon worked but no subtree PID holds GPU memory (CPU-only capability
on a GPU box).
"""
Configuration Validation (validation.ipynb)
Validation helpers for capability configuration dataclasses
Import
from cjm_substrate.utils.validation import (
T,
SCHEMA_TITLE,
SCHEMA_DESC,
SCHEMA_MIN,
SCHEMA_MAX,
SCHEMA_ENUM,
SCHEMA_MIN_LEN,
SCHEMA_MAX_LEN,
SCHEMA_PATTERN,
SCHEMA_FORMAT,
validate_field_value,
validate_config,
config_to_dict,
dict_to_config,
extract_defaults,
dataclass_to_jsonschema
)
Functions
def validate_field_value(
value:Any, # Value to validate
metadata:Dict[str, Any], # Field metadata containing constraints
field_name:str="" # Field name for error messages
) -> Tuple[bool, Optional[str]]: # (is_valid, error_message)
"Validate a value against field metadata constraints."
def validate_config(
config:Any # Configuration dataclass instance to validate
) -> Tuple[bool, Optional[str]]: # (is_valid, error_message)
"Validate all fields in a configuration dataclass against their metadata constraints."
def config_to_dict(
config:Any # Configuration dataclass instance
) -> Dict[str, Any]: # Dictionary representation of the configuration
"Convert a configuration dataclass instance to a dictionary."
def dict_to_config(
config_class:Type[T], # Configuration dataclass type
data:Optional[Dict[str, Any]]=None, # Dictionary with configuration values
validate:bool=False, # Whether to validate against metadata constraints
strict:bool=True # SG-8: reject unknown keys (default); set False to log+filter for forward-compat
) -> T: # Instance of the configuration dataclass
"""
Create a configuration dataclass instance from a dictionary.
SG-8: by default, unknown keys raise `CapabilityConfigError`. The previous
behavior (silently filter unknowns) is available via `strict=False`,
which logs a warning so the drift is still visible in operator logs.
"""
def extract_defaults(
config_class:Type # Configuration dataclass type
) -> Dict[str, Any]: # Default values from the dataclass
"Extract default values from a configuration dataclass type."
def _python_type_to_json_type(
python_type:type # Python type annotation to convert
) -> Dict[str, Any]: # JSON schema type definition
"Convert Python type to JSON schema type."
def dataclass_to_jsonschema(
cls:type # Dataclass with field metadata
) -> Dict[str, Any]: # JSON schema dictionary
"Convert a dataclass to a JSON schema for form generation."
Variables
T
SCHEMA_TITLE = 'title' # Display title for the field
SCHEMA_DESC = 'description' # Help text description
SCHEMA_MIN = 'minimum' # Minimum value for numbers
SCHEMA_MAX = 'maximum' # Maximum value for numbers
SCHEMA_ENUM = 'enum' # Allowed values for dropdowns
SCHEMA_MIN_LEN = 'minLength' # Minimum string length
SCHEMA_MAX_LEN = 'maxLength' # Maximum string length
SCHEMA_PATTERN = 'pattern' # Regex pattern for strings
SCHEMA_FORMAT = 'format' # String format (email, uri, date, etc.)
Typed Wire Layer (wire.ipynb)
Typed data transfer at the worker boundary — the zero-copy
FileBackedDTO
Import
from cjm_substrate.core.wire import (
WIRE_KIND_KEY,
WIRE_DATA_KEY,
ENVELOPE_BODY_KEY,
ACCOUNTS_HEADER,
FileBackedDTO,
flat_from_dict,
wire_type,
wire_encode,
wire_decode,
CallEnvelope,
set_call_envelope,
reset_call_envelope,
get_call_envelope,
begin_account_capture,
record_account,
drain_accounts
)
Functions
def flat_from_dict(
"""
Default reconstruction for FLAT wire DTOs (no nested-DTO fields).
Filters the payload to the dataclass's declared fields (unknown extras
are dropped with a debug log — transport-terminus tolerance, see the
envelope design note) and lets the constructor enforce required fields
(a missing required field raises TypeError loudly). DTOs with nested
DTO fields (e.g. a result carrying a list of typed items) must define
their own `from_dict` classmethod; `@wire_type` only attaches this
default when the class has none.
"""
def wire_type(
kind: str # Stable wire discriminator, e.g. "transcription.result"
) -> Callable[[type], type]: # Class decorator
"""
Register a dataclass as a typed wire DTO under `kind`.
- The class must be a dataclass (encode falls back to
`dataclasses.asdict` when it defines no `to_dict`).
- If the class defines no `from_dict`, the flat default
(`flat_from_dict`) is attached; nested DTOs define their own.
- Re-registering the same LOGICAL class (qualname match; the module is
ignored because nbdev's literate workflow defines each class twice —
in-notebook `__main__` + the exported module) replaces the decode
entry; a DIFFERENT class claiming an already-registered kind raises
ValueError.
"""
def wire_encode(
obj: Any # A task result (any shape)
) -> Any: # Tagged envelope dict for registered DTOs; `obj` unchanged otherwise
"""
Wrap a registered wire DTO in its tagged envelope (worker side).
Exact-type lookup: subclasses are NOT encoded under the parent's kind
(they pass through unregistered, preserving today's behavior).
Payload preference: the DTO's own `to_dict()` when defined, else
`dataclasses.asdict` (recursive — nested dataclasses flatten).
"""
def wire_decode(
obj: Any # A JSON-decoded response body (any shape)
) -> Any: # The typed DTO for known kinds; `obj` unchanged otherwise
"""
Reconstruct a typed result from its tagged envelope (host side).
Known kind -> the registered class's `from_dict` (strict: a missing
required field raises). Unknown kind -> the dict passes through
UNCHANGED with the envelope intact (tolerant degradation for hosts
without the result's interface library). Untagged values pass through.
"""
def set_call_envelope(env: Optional[CallEnvelope]) -> contextvars.Token:
"""Set the current call envelope; returns the token for `reset_call_envelope`."""
return _CALL_ENVELOPE.set(env)
def reset_call_envelope(token: contextvars.Token) -> None
"Set the current call envelope; returns the token for `reset_call_envelope`."
def reset_call_envelope(token: contextvars.Token) -> None:
"""Restore the prior envelope (always pair with `set_call_envelope` in finally)."""
_CALL_ENVELOPE.reset(token)
def get_call_envelope() -> Optional[CallEnvelope]
"Restore the prior envelope (always pair with `set_call_envelope` in finally)."
def get_call_envelope() -> Optional[CallEnvelope]
"The current call envelope, or None outside any call span."
def begin_account_capture() -> None:
"""Start a fresh account list for the current call span (worker endpoint
entry; same no-reset semantics as the envelope — the ASGI request task's
context dies with the request)."""
_CALL_ACCOUNTS.set([])
def record_account(
event_type: str, # SubstrateEventType value (task_account / result_saved / cache_hit / ...)
payload: Optional[dict] = None, # Structured detail (references + hashes, never content)
) -> None
"""
Start a fresh account list for the current call span (worker endpoint
entry; same no-reset semantics as the envelope — the ASGI request task's
context dies with the request).
"""
def record_account(
event_type: str, # SubstrateEventType value (task_account / result_saved / cache_hit / ...)
payload: Optional[dict] = None, # Structured detail (references + hashes, never content)
) -> None
"""
Record one substrate-family account for the current call span.
Called by the worker itself and by interface-lib storage helpers.
Silent no-op outside a capture span (standalone runs, host imports) —
the envelope-less-call posture applied to accounts.
"""
def drain_accounts() -> list:
"""Return + clear the current span's recorded accounts ([] outside a span
or when nothing was recorded). The worker response path calls this once
when building the response headers."""
lst = _CALL_ACCOUNTS.get()
if not lst
"""
Return + clear the current span's recorded accounts ([] outside a span
or when nothing was recorded). The worker response path calls this once
when building the response headers.
"""
Classes
@runtime_checkable
class FileBackedDTO(Protocol):
"Protocol for Data Transfer Objects that serialize to disk for zero-copy transfer."
def to_temp_file(self) -> str: # Absolute path to the temporary file
"Save the data to a temporary file and return the absolute path."
@dataclass
class CallEnvelope:
"""
Substrate-owned per-call identity + control block (CR-14 / CR-15).
Travels as a top-level `"envelope"` key on every proxy→worker call body.
All fields optional — an envelope-less call (direct proxy use, old hosts)
simply yields unattributed records, never a failure.
"""
job_id: Optional[str] # Queue job identity
run_id: Optional[str] # Host-tier run correlation (core run manifests)
composition_id: Optional[str] # Stage-3 composition correlation
node_id: Optional[str] # Composition node correlation
actor: Optional[str] # Who/what initiated (operator / agent / host id)
control: dict = field(...) # Per-call flags (force/cache-bypass — the 4th CR-15 category)
def to_wire(self) -> dict:
"""Compact wire form: None fields dropped; empty control dropped."""
out = {}
for f in _dc_fields(self)
"Compact wire form: None fields dropped; empty control dropped."
def from_wire(cls, d: dict) -> "CallEnvelope"
"Tolerant decode: unknown keys ignored (forward compat)."
Variables
WIRE_KIND_KEY = '__wire__'
WIRE_DATA_KEY = 'data'
_WIRE_TYPES: Dict[str, type]
_WIRE_KINDS: Dict[type, str]
ENVELOPE_BODY_KEY = 'envelope' # Top-level request-body key (never inside kwargs)
_CALL_ENVELOPE: contextvars.ContextVar[Optional[CallEnvelope]]
ACCOUNTS_HEADER = 'X-CJM-Accounts' # Response header carrying recorded accounts (ASCII JSON)
_CALL_ACCOUNTS: contextvars.ContextVar[Optional[list]]
Universal Worker (worker.ipynb)
FastAPI server that runs inside isolated capability environments
Import
from cjm_substrate.core.worker import (
EnhancedJSONEncoder,
parent_monitor,
create_app,
run_worker
)
Functions
def parent_monitor(
ppid: int # Parent process ID to monitor
) -> None
"""
Monitor parent process and terminate self if parent dies.
This implements the "Suicide Pact" pattern: if the Host process dies,
the Worker must terminate itself to prevent zombie processes.
"""
def _load_capability_instance(
module_name: str, # Python module path (e.g., "my_capability.capability")
class_name: str # Capability class name (e.g., "WhisperCapability")
): # Instantiated capability object
"""
Dynamically load + instantiate the capability class.
Runs synchronously before app construction so a load failure terminates
the worker process with exit code 1 (matches pre-lifespan behavior;
loading must succeed for the worker to be useful at all).
"""
def _make_lifespan(
capability_instance # The loaded capability object (closure-captured for shutdown cleanup)
): # FastAPI lifespan async context manager
"Build the FastAPI lifespan that invokes capability.cleanup() on shutdown."
def _register_identity_endpoints(
app, # FastAPI app under construction
capability_instance, # The loaded capability object
) -> None
"/health + /stats: worker identity + process-subtree telemetry."
def _register_lifecycle_endpoints(
app, # FastAPI app under construction
capability_instance, # The loaded capability object
) -> None
"""
/initialize /prefetch /reconfigure /on_disable /on_enable /cleanup:
the tool-capability lifecycle surface.
"""
def _register_config_endpoints(
app, # FastAPI app under construction
capability_instance, # The loaded capability object
) -> None
"/config_schema /config /config_options: the config surface."
def _load_adapters(
capability_instance, # The loaded tool-capability instance
adapter_specs, # List of "module:ClassName" impl specs (host-matched)
) -> Dict[str, Any]: # task_name -> bound adapter instance
"""
Instantiate task-adapter impls bound to this worker's tool (CR-17 pt 2).
Each spec was matched HOST-side (adapter-manifest protocol members vs the
capability's recorded structural surface) before reaching the worker, so a
spec failing HERE is an INSTALL gap (interface lib missing from this env),
not a compatibility miss — log loudly, skip, keep serving /execute.
Binding convention: `AdapterClass(capability_instance)`; keyed by the class's
`task_name` ClassVar.
"""
def _apply_call_envelope(
data: Dict[str, Any], # The decoded request body
) -> None
"""
CR-14: decode the wire envelope into the worker-side contextvar.
Set WITHOUT reset: ASGI handles each request in its own asyncio task, so
the context (and the var) dies with the request — and for
/execute_stream the response iteration runs in the SAME request task
AFTER the endpoint returns, which is exactly why a reset-before-return
would lose the identity (Starlette's iterate_in_threadpool copies the
request task's context per chunk). An absent envelope leaves the var
None — records stay honestly unattributed.
CR-14 follow-up: also begins a fresh account-capture span (same no-reset
semantics) so in-worker substrate-family accounts (TASK_ACCOUNT /
RESULT_SAVED / CACHE_HIT) accumulate per call and ride back on the
response header — see `_accounts_headers`.
"""
def _accounts_headers() -> Dict[str, str]:
"""Drain the call span's recorded accounts into the response header dict.
Empty dict when nothing was recorded (header absent — old hosts and
account-less calls see byte-identical responses). ASCII JSON
(`ensure_ascii` default) keeps the header latin-1-safe.
"""
accounts = drain_accounts()
if not accounts
"""
Drain the call span's recorded accounts into the response header dict.
Empty dict when nothing was recorded (header absent — old hosts and
account-less calls see byte-identical responses). ASCII JSON
(`ensure_ascii` default) keeps the header latin-1-safe.
"""
def _register_task_endpoints(
app, # FastAPI app under construction
capability_instance, # The loaded capability object
adapters=None, # task_name -> bound adapter instance (CR-17 pt 2; stage 4)
) -> None
"""
/execute /execute_stream /cancel /progress /task: the task channel.
Stage 2 (typed wire layer): both result-serialization sites pass
through `wire_encode`, so results whose DTO classes are registered
via `@wire_type` cross the boundary in the tagged envelope and arrive
typed at the proxy; unregistered results serialize exactly as before.
CR-14 (stage 7): each call decodes the per-call envelope into the
contextvar and carries it into the executor thread via
`contextvars.copy_context()` (run_in_executor does NOT copy context by
itself) — the diagnostics handler stamps every capability log record with
exact job identity, replacing the timestamp-window heuristic.
CR-14 follow-up: the unary paths (/execute, /task) return recorded
accounts on the `X-CJM-Accounts` response header (success AND the
`_job_error` 500 — a failed call still reports the accounts recorded
before the failure). The host journals them with `worker_reported=True`.
"""
def _register_monitor_endpoints(
app, # FastAPI app under construction
capability_instance, # The loaded capability object
class_name: str, # Capability class name (for 404 detail messages)
) -> None
"/get_system_status /list_processes: CR-3 typed MonitorToolProtocol accessors."
def create_app(
module_name: str, # Python module path (e.g., "my_capability.capability")
class_name: str, # Capability class name (e.g., "WhisperCapability")
adapter_specs=None # CR-17 pt 2: "module:ClassName" adapter impl specs to bind in-worker
) -> FastAPI: # Configured FastAPI application
"""
Create FastAPI app that hosts the specified capability.
NB-2 reshape (stage 2): a thin assembler — load the capability, build the
lifespan, register the endpoint groups. Endpoint behavior lives in the
module-level `_register_*` helpers above.
"""
def run_worker() -> None:
"""CLI entry point for running the worker."""
parser = argparse.ArgumentParser(description="Universal Capability Worker")
parser.add_argument("--module", required=True, help="Capability module path")
parser.add_argument("--class", dest="class_name", required=True, help="Capability class name")
parser.add_argument("--adapters", required=False, default="",
help="Comma-separated adapter impl specs 'module:ClassName' (CR-17 pt 2)")
# SG-4: parent-bound listening-socket FD inheritance closes the
"CLI entry point for running the worker."
Classes
class EnhancedJSONEncoder(JSONEncoder):
"""
JSON encoder that handles dataclasses and other common types.
SG-52: datetime support added so JobError.occurred_at serializes cleanly
when emitted via the typed `_job_error` terminal chunk in /execute_stream.
"""
def default(
self,
o: Any # Object to encode
) -> Any: # JSON-serializable representation
"Convert non-serializable objects to serializable form."
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cjm_substrate-0.0.51.tar.gz.
File metadata
- Download URL: cjm_substrate-0.0.51.tar.gz
- Upload date:
- Size: 447.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
361d658fcd9c0507af5dbb1a231497f3cddd137180de52d4d7abfc3df0744558
|
|
| MD5 |
22a9db268dc83e1d7594e6601f9c2dca
|
|
| BLAKE2b-256 |
23bc2f4e80733a5d90b68db352f8af9d1caef43256c11a659e3973707db7f558
|
File details
Details for the file cjm_substrate-0.0.51-py3-none-any.whl.
File metadata
- Download URL: cjm_substrate-0.0.51-py3-none-any.whl
- Upload date:
- Size: 294.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4c249a60643b982632519026ff7938a97f36b18eebbd26048bc7e770e5d27ac3
|
|
| MD5 |
aa23fc80166e480a2aad705c8d03d885
|
|
| BLAKE2b-256 |
9bcdfe8e975d35b00e608dd3fb61a3993427c04c5c85d0d7fbfe213beba44b7a
|