Skip to main content

Typed media-processing task-adapter interface — the multi-method MediaProcessingAdapter ABC + GenericMediaProcessingAdapter (cache/persist bookends around a pure-compute tool: convert, segment_audio, extract_audio, plus an uncached get_info probe) + the MediaProcessingToolProtocol. Result DTOs live in cjm-capability-primitives.

Project description

cjm-media-processing-adapter-interface

Install

pip install cjm_media_processing_adapter_interface

Project Structure

nbs/
├── adapter.ipynb # The typed media-processing task contract — the multi-method `MediaProcessingAdapter` ABC + the `MediaProcessingToolProtocol` structural contract (capability-unit Option C, pass-2 Thread 3). ONE multi-method adapter for the media-processing tool's whole repertoire (the graph-storage precedent), NOT per-action adapters.
├── generic.ipynb # The generic (tool-agnostic) multi-method media-processing adapter — cache-bookended artifact ops (`convert` / `segment_audio` / `extract_audio`) + an uncached `get_info` probe. Reused across every tool capability satisfying `MediaProcessingToolProtocol` (ffmpeg today).
└── storage.ipynb # Standardized SQLite storage for media-processing results (produced-artifact pointers) with content hashing — born-final, action-aware, no `job_id`.

Total: 3 notebooks

Module Dependencies

graph LR
    adapter["adapter<br/>Media Processing Adapter"]
    generic["generic<br/>Generic Media Processing Adapter"]
    storage["storage<br/>Media Processing Storage"]

    generic --> adapter
    generic --> storage

2 cross-module dependencies detected

CLI Reference

No CLI commands found in this project.

Module Overview

Detailed documentation for each module in the project:

Media Processing Adapter (adapter.ipynb)

The typed media-processing task contract — the multi-method MediaProcessingAdapter ABC + the MediaProcessingToolProtocol structural contract (capability-unit Option C, pass-2 Thread 3). ONE multi-method adapter for the media-processing tool’s whole repertoire (the graph-storage precedent), NOT per-action adapters.

Import

from cjm_media_processing_adapter_interface.adapter import (
    MediaProcessingToolProtocol,
    MediaProcessingAdapter
)

Classes

@runtime_checkable
class MediaProcessingToolProtocol(Protocol):
    """
    Structural contract for media-processing tool capabilities (ffmpeg today;
    born-final at stage 8 — derived from the native tool surface).
    
    Artifact-producing ops (`convert`, `segment_audio`, `extract_audio`) are pure
    compute that WRITE audio file(s) into the adapter-chosen `output_dir` (the
    artifact-write seam) and return a typed pointer. `get_info` is a pure-query
    probe returning inline `MediaMetadata` (no artifact). `get_current_config`
    supplies the effective config the generic adapter hashes (together with the
    per-call request params) for its cache key.
    
    The `output_dir` parameter on the artifact ops is the adapter telling the
    tool WHERE to persist (the `db_path`-off-the-tool rule); encoding the audio
    stays tool-side compute. Per-call request params (`output_format`,
    `sample_rate`, `channels`, `boundaries`, `filename_template`) define WHAT
    artifact to produce and are hashed into the cache key.
    """
    
    def convert(self, input_path: Union[str, Path], output_dir: str,
                    output_format: str, sample_rate: Optional[int] = None,
                    channels: Optional[int] = None, **kwargs) -> MediaArtifactResult: ...
    
    def segment_audio(self, input_path: Union[str, Path], output_dir: str,
                          boundaries: List[Dict[str, float]], output_format: Optional[str] = None,
                          filename_template: str = "segment_{index:03d}", **kwargs) -> MediaSegmentationResult: ...
    
    def extract_audio(self, input_path: Union[str, Path], output_dir: str,
                          output_format: Optional[str] = None, **kwargs) -> MediaArtifactResult: ...
    
    def get_info(self, file_path: Union[str, Path]) -> MediaMetadata: ...
        def get_current_config(self) -> Dict[str, Any]: ...
    
    def get_current_config(self) -> Dict[str, Any]: ...
class MediaProcessingAdapter:
    def __init__(
        self,
        tool: MediaProcessingToolProtocol,  # The bound tool capability instance (worker-side binding)
    )
    """
    Typed media-processing task adapter — ONE multi-method adapter for the
    media-processing tool's whole repertoire (the graph-storage multi-method
    precedent, NOT per-action adapters: the `task_name` key answers "which tools
    do media-processing", and there is one such tool — ffmpeg — that will gain
    MORE ops over time, e.g. building video from images + audio, so per-action
    would only multiply task families + libs without aiding discovery).
    
    Native-surface model (stage 8 / PILLAR 1c): the TOOL is pure compute; the
    ADAPTER owns the cache + persistence bookends (see
    `GenericMediaProcessingAdapter`) + the per-call `force` control. The
    artifact ops write audio file(s) under the substrate-injected
    `PLUGIN_DATA_DIR`; the cache row maps (action, input, config) -> output(s).
    The adapter chooses the output location and passes it to the tool; `db_path`
    is not on the tool protocol. `get_info` is an UNCACHED pure-query
    pass-through (a probe; no artifact).
    
    Input contract: `convert`/`segment_audio` receive a source (or an upstream
    artifact, e.g. a vocals stem) path; the produced WAVs ARE the model-ready
    audio downstream consumers (VAD/transcription) use. `extract_audio` pulls an
    audio stream from a video container (the non-minimal video-input path).
    Implementations run in-worker beside their tool, constructed
    `AdapterClass(tool)` (mirrors `GraphStorageAdapter`). Result DTOs are
    wire-registered (`media_processing.{artifact,segmentation,metadata}`) so
    returned values cross the worker boundary typed.
    """
    
    def __init__(
            self,
            tool: MediaProcessingToolProtocol,  # The bound tool capability instance (worker-side binding)
        )
    
    def convert(
            self,
            input_path: Union[str, Path],     # Source (or upstream artifact) audio/video to convert
            output_format: str,               # Target audio format (e.g. 'wav', 'mp3')
            sample_rate: Optional[int] = None,  # Target sample rate (e.g. 16000 for model input); None = source rate
            channels: Optional[int] = None,     # Target channel count (e.g. 1 = mono); None = source channels
            **kwargs,                         # Provenance + tool options
        ) -> MediaArtifactResult:  # The produced converted-audio artifact (cached)
        "Convert media to target/model-ready audio (cached artifact)."
    
    def segment_audio(
            self,
            input_path: Union[str, Path],         # Source audio to cut
            boundaries: List[Dict[str, float]],   # [{start, end}, ...] cut points (seconds)
            output_format: Optional[str] = None,  # Output format; None = same as input
            filename_template: str = "segment_{index:03d}",  # Per-segment filename pattern
            **kwargs,                             # Provenance + tool options
        ) -> MediaSegmentationResult:  # The produced batch of segment files (cached at batch level)
        "Cut a source into a batch of segment files at the given boundaries (cached batch)."
    
    def extract_audio(
            self,
            input_path: Union[str, Path],         # Source video container
            output_format: Optional[str] = None,  # Audio format; None = codec-derived
            **kwargs,                             # Provenance + tool options
        ) -> MediaArtifactResult:  # The produced extracted-audio artifact (cached)
        "Extract the audio stream from a video container (cached artifact)."
    
    def get_info(
            self,
            file_path: Union[str, Path],  # Media file to probe
        ) -> MediaMetadata:  # Probed metadata (uncached)
        "Probe media metadata (uncached pure-query)."

Generic Media Processing Adapter (generic.ipynb)

The generic (tool-agnostic) multi-method media-processing adapter — cache-bookended artifact ops (convert / segment_audio / extract_audio) + an uncached get_info probe. Reused across every tool capability satisfying MediaProcessingToolProtocol (ffmpeg today).

Import

from cjm_media_processing_adapter_interface.generic import (
    GenericMediaProcessingAdapter
)

Classes

class GenericMediaProcessingAdapter(MediaProcessingAdapter):
    """
    Generic multi-method media-processing adapter, reused across any tool
    satisfying `MediaProcessingToolProtocol`.
    
    Artifact ops (`convert`/`extract_audio`/`segment_audio`) share the
    source-separation bookend (cache-check + artifact-existence recheck ->
    adapter-chosen `output_dir` -> tool -> persist). `get_info` is an uncached
    pass-through. The cache key hashes `get_current_config()` together with the
    per-call request params, and `force` rides `CallEnvelope.control` (keeping
    the task methods pure). Storage + artifacts live under the substrate-injected
    `PLUGIN_DATA_DIR` (`media_processing.db` + `cache_dir_for_config` dirs).
    """
    
    def convert(self, input_path, output_format, sample_rate=None, channels=None, **kwargs) -> MediaArtifactResult:
            """Convert media to target/model-ready audio (cached artifact)."""
            request_params = {"output_format": output_format, "sample_rate": sample_rate, "channels": channels}
        "Convert media to target/model-ready audio (cached artifact)."
    
    def extract_audio(self, input_path, output_format=None, **kwargs) -> MediaArtifactResult:
            """Extract the audio stream from a video container (cached artifact)."""
            request_params = {"output_format": output_format}
        "Extract the audio stream from a video container (cached artifact)."
    
    def segment_audio(self, input_path, boundaries, output_format=None,
                          filename_template="segment_{index:03d}", **kwargs) -> MediaSegmentationResult
        "Cut a source into a batch of segment files (cached at BATCH level).

One cache row per (segment_audio, input, config): output_path = the batch
dir, metadata = the whole typed result (segments + batch_key + counts), so
a hit reconstructs the `MediaSegmentationResult`  but only if EVERY
produced segment file still exists (the batch analog of the single-artifact
existence recheck). The per-segment resume-skip the fused tool did is
intentionally NOT ported (born-final simplicity); watch the journal for
batch re-compute cost before reinstating it."
    
    def get_info(self, file_path) -> MediaMetadata
        "Probe media metadata — UNCACHED pure-query pass-through (no artifact)."

Media Processing Storage (storage.ipynb)

Standardized SQLite storage for media-processing results (produced-artifact pointers) with content hashing — born-final, action-aware, no job_id.

Import

from cjm_media_processing_adapter_interface.storage import (
    MediaProcessingRow,
    MediaProcessingStorage
)

Classes

@dataclass
class MediaProcessingRow:
    "A single row from the media_processing_results table."
    
    action: str  # The media-processing op: 'convert', 'segment_audio', 'extract_audio'
    input_path: str  # Path to the source media file
    input_hash: str  # Hash of the input file in "algo:hexdigest" format
    config_hash: str  # Hash of the effective config + per-call request params
    output_path: str  # Produced artifact path (a file for convert/extract_audio; the batch dir for segment_audio)
    output_hash: Optional[str]  # Hash of the produced artifact (None for a batch)
    metadata: Optional[Dict[str, Any]]  # Processing metadata (for segment_audio: the full typed batch result)
    created_at: Optional[float]  # Unix timestamp
class MediaProcessingStorage:
    def __init__(
        self,
        db_path: str  # Absolute path to the SQLite database file
    )
    "Standardized SQLite storage for media-processing results (artifact pointers), born-final."
    
    def __init__(
            self,
            db_path: str  # Absolute path to the SQLite database file
        )
        "Initialize storage and create table if needed."
    
    def save(
            self,
            action: str,        # Media-processing op: 'convert', 'segment_audio', 'extract_audio'
            input_path: str,    # Path to the source media file
            input_hash: str,    # Hash of the input file in "algo:hexdigest" format
            config_hash: str,   # Hash of the effective config + per-call request params
            output_path: str,   # Produced artifact path (file, or batch dir for segment_audio)
            output_hash: Optional[str] = None,        # Hash of the produced artifact (None for a batch)
            metadata: Optional[Dict[str, Any]] = None  # Processing metadata (batch result for segment_audio)
        ) -> None
        "Save or replace a media-processing result (upsert by action + input_path + config_hash)."
    
    def save_with_logging(
            self,
            *,
            action: str,        # Media-processing op
            input_path: str,    # Path to the source media file
            input_hash: str,    # Hash of the input file in "algo:hexdigest" format
            config_hash: str,   # Hash of the effective config + per-call request params
            output_path: str,   # Produced artifact path
            output_hash: Optional[str] = None,         # Hash of the produced artifact
            metadata: Optional[Dict[str, Any]] = None,  # Processing metadata
            logger: Optional[logging.Logger] = None      # Optional logger for success/failure messages
        ) -> bool:  # True if saved; False if the save failed (error logged, not raised)
        "Save a result, logging success/failure. Failures are logged and swallowed (returns False).

CR-14 follow-up: records a RESULT_SAVED account either way (ok flag +
action/input/output/config references  the journal never carries
content) so saves AND swallowed save-failures become auditable journal rows."
    
    def get_cached(
            self,
            action: str,       # Media-processing op
            input_path: str,   # Path to the source media file
            input_hash: str,   # Content hash of the input (cache miss if the file changed)
            config_hash: str   # Config hash to match
        ) -> Optional[MediaProcessingRow]:  # Cached row or None
        "Retrieve a content-correct cached media-processing result.

Matches on action + input_path + input_hash + config_hash, so a changed
input (new input_hash) misses even though a stale row may still exist at
the same (action, input_path, config_hash)  the next save() replaces it.
The CALLER must still confirm the artifact(s) at `output_path` exist
before serving them (they live outside the DB).

CR-14 follow-up: a hit records a CACHE_HIT account (the cache-serving
decision is an account-of-action)."
    
    def list_jobs(
            self,
            limit: int = 100  # Maximum number of rows to return
        ) -> List[MediaProcessingRow]:  # List of media-processing rows
        "List media-processing results ordered by creation time (newest first)."
    
    def verify_input(
            self,
            action: str,      # Media-processing op
            input_path: str,  # Path to the source media file
            config_hash: str  # Config hash to look up
        ) -> Optional[bool]:  # True if input matches, False if changed, None if not found
        "Verify the source file still matches the hash stored for (action, input_path, config_hash)."

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cjm_media_processing_adapter_interface-0.0.2.tar.gz (20.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file cjm_media_processing_adapter_interface-0.0.2.tar.gz.

File metadata

File hashes

Hashes for cjm_media_processing_adapter_interface-0.0.2.tar.gz
Algorithm Hash digest
SHA256 a807e24562122f01211938eb851a4a551eb6132afdf65ee04f34f3fcc8f0bfbd
MD5 cb4eb625b1fb6e10a60ce185d3d9dfbf
BLAKE2b-256 af017b9d3c64ba60770b617c7a195f4e3126016c5061ad01508c6bb56ad64aea

See more details on using hashes here.

File details

Details for the file cjm_media_processing_adapter_interface-0.0.2-py3-none-any.whl.

File metadata

File hashes

Hashes for cjm_media_processing_adapter_interface-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 c1fba97429778606956a5d0ecd515192bc6499df6d2c62bd9249916228868c31
MD5 99b16f8ff03fbbe88d56a5b9416e050e
BLAKE2b-256 00a15872e499f92b6660df475402e09cf5776d1eef8e207be9e4f141bf269f37

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page