Skip to main content

Typed media-processing task-adapter interface — the multi-method MediaProcessingAdapter ABC + GenericMediaProcessingAdapter (cache/persist bookends around a pure-compute tool: convert, segment_audio, extract_audio, plus an uncached get_info probe) + the MediaProcessingToolProtocol. Result DTOs live in cjm-capability-primitives.

Project description

cjm-media-processing-adapter-interface

Install

pip install cjm_media_processing_adapter_interface

Project Structure

nbs/
├── adapter.ipynb # The typed media-processing task contract — the multi-method `MediaProcessingAdapter` ABC + the `MediaProcessingToolProtocol` structural contract (capability-unit Option C, pass-2 Thread 3). ONE multi-method adapter for the media-processing tool's whole repertoire (the graph-storage precedent), NOT per-action adapters.
├── generic.ipynb # The generic (tool-agnostic) multi-method media-processing adapter — cache-bookended artifact ops (`convert` / `segment_audio` / `extract_audio`) + an uncached `get_info` probe. Reused across every tool capability satisfying `MediaProcessingToolProtocol` (ffmpeg today).
└── storage.ipynb # Standardized SQLite storage for media-processing results (produced-artifact pointers) with content hashing — born-final, action-aware, no `job_id`.

Total: 3 notebooks

Module Dependencies

graph LR
    adapter["adapter<br/>Media Processing Adapter"]
    generic["generic<br/>Generic Media Processing Adapter"]
    storage["storage<br/>Media Processing Storage"]

    generic --> adapter
    generic --> storage

2 cross-module dependencies detected

CLI Reference

No CLI commands found in this project.

Module Overview

Detailed documentation for each module in the project:

Media Processing Adapter (adapter.ipynb)

The typed media-processing task contract — the multi-method MediaProcessingAdapter ABC + the MediaProcessingToolProtocol structural contract (capability-unit Option C, pass-2 Thread 3). ONE multi-method adapter for the media-processing tool’s whole repertoire (the graph-storage precedent), NOT per-action adapters.

Import

from cjm_media_processing_adapter_interface.adapter import (
    MediaProcessingToolProtocol,
    MediaProcessingAdapter
)

Classes

@runtime_checkable
class MediaProcessingToolProtocol(Protocol):
    """
    Structural contract for media-processing tool capabilities (ffmpeg today;
    born-final at stage 8 — derived from the native tool surface).
    
    Artifact-producing ops (`convert`, `segment_audio`, `extract_audio`) are pure
    compute that WRITE audio file(s) into the adapter-chosen `output_dir` (the
    artifact-write seam) and return a typed pointer. `get_info` is a pure-query
    probe returning inline `MediaMetadata` (no artifact). `get_current_config`
    supplies the effective config the generic adapter hashes (together with the
    per-call request params) for its cache key.
    
    The `output_dir` parameter on the artifact ops is the adapter telling the
    tool WHERE to persist (the `db_path`-off-the-tool rule); encoding the audio
    stays tool-side compute. Per-call request params (`output_format`,
    `sample_rate`, `channels`, `boundaries`, `filename_template`) define WHAT
    artifact to produce and are hashed into the cache key.
    """
    
    def convert(self, input_path: Union[str, Path], output_dir: str,
                    output_format: str, sample_rate: Optional[int] = None,
                    channels: Optional[int] = None, **kwargs) -> MediaArtifactResult: ...
    
    def segment_audio(self, input_path: Union[str, Path], output_dir: str,
                          boundaries: List[Dict[str, float]], output_format: Optional[str] = None,
                          filename_template: str = "segment_{index:03d}", **kwargs) -> MediaSegmentationResult: ...
    
    def extract_audio(self, input_path: Union[str, Path], output_dir: str,
                          output_format: Optional[str] = None, **kwargs) -> MediaArtifactResult: ...
    
    def get_info(self, file_path: Union[str, Path]) -> MediaMetadata: ...
        def get_current_config(self) -> Dict[str, Any]: ...
    
    def get_current_config(self) -> Dict[str, Any]: ...
class MediaProcessingAdapter:
    def __init__(
        self,
        tool: MediaProcessingToolProtocol,  # The bound tool capability instance (worker-side binding)
    )
    """
    Typed media-processing task adapter — ONE multi-method adapter for the
    media-processing tool's whole repertoire (the graph-storage multi-method
    precedent, NOT per-action adapters: the `task_name` key answers "which tools
    do media-processing", and there is one such tool — ffmpeg — that will gain
    MORE ops over time, e.g. building video from images + audio, so per-action
    would only multiply task families + libs without aiding discovery).
    
    Native-surface model (stage 8 / PILLAR 1c): the TOOL is pure compute; the
    ADAPTER owns the cache + persistence bookends (see
    `GenericMediaProcessingAdapter`) + the per-call `force` control. The
    artifact ops write audio file(s) under the substrate-injected
    `PLUGIN_DATA_DIR`; the cache row maps (action, input, config) -> output(s).
    The adapter chooses the output location and passes it to the tool; `db_path`
    is not on the tool protocol. `get_info` is an UNCACHED pure-query
    pass-through (a probe; no artifact).
    
    Input contract: `convert`/`segment_audio` receive a source (or an upstream
    artifact, e.g. a vocals stem) path; the produced WAVs ARE the model-ready
    audio downstream consumers (VAD/transcription) use. `extract_audio` pulls an
    audio stream from a video container (the non-minimal video-input path).
    Implementations run in-worker beside their tool, constructed
    `AdapterClass(tool)` (mirrors `GraphStorageAdapter`). Result DTOs are
    wire-registered (`media_processing.{artifact,segmentation,metadata}`) so
    returned values cross the worker boundary typed.
    """
    
    def __init__(
            self,
            tool: MediaProcessingToolProtocol,  # The bound tool capability instance (worker-side binding)
        )
    
    def convert(
            self,
            input_path: Union[str, Path],     # Source (or upstream artifact) audio/video to convert
            output_format: str,               # Target audio format (e.g. 'wav', 'mp3')
            sample_rate: Optional[int] = None,  # Target sample rate (e.g. 16000 for model input); None = source rate
            channels: Optional[int] = None,     # Target channel count (e.g. 1 = mono); None = source channels
            **kwargs,                         # Provenance + tool options
        ) -> MediaArtifactResult:  # The produced converted-audio artifact (cached)
        "Convert media to target/model-ready audio (cached artifact)."
    
    def segment_audio(
            self,
            input_path: Union[str, Path],         # Source audio to cut
            boundaries: List[Dict[str, float]],   # [{start, end}, ...] cut points (seconds)
            output_format: Optional[str] = None,  # Output format; None = same as input
            filename_template: str = "segment_{index:03d}",  # Per-segment filename pattern
            **kwargs,                             # Provenance + tool options
        ) -> MediaSegmentationResult:  # The produced batch of segment files (cached at batch level)
        "Cut a source into a batch of segment files at the given boundaries (cached batch)."
    
    def extract_audio(
            self,
            input_path: Union[str, Path],         # Source video container
            output_format: Optional[str] = None,  # Audio format; None = codec-derived
            **kwargs,                             # Provenance + tool options
        ) -> MediaArtifactResult:  # The produced extracted-audio artifact (cached)
        "Extract the audio stream from a video container (cached artifact)."
    
    def get_info(
            self,
            file_path: Union[str, Path],  # Media file to probe
        ) -> MediaMetadata:  # Probed metadata (uncached)
        "Probe media metadata (uncached pure-query)."

Generic Media Processing Adapter (generic.ipynb)

The generic (tool-agnostic) multi-method media-processing adapter — cache-bookended artifact ops (convert / segment_audio / extract_audio) + an uncached get_info probe. Reused across every tool capability satisfying MediaProcessingToolProtocol (ffmpeg today).

Import

from cjm_media_processing_adapter_interface.generic import (
    GenericMediaProcessingAdapter
)

Classes

class GenericMediaProcessingAdapter(MediaProcessingAdapter):
    """
    Generic multi-method media-processing adapter, reused across any tool
    satisfying `MediaProcessingToolProtocol`.
    
    Artifact ops (`convert`/`extract_audio`/`segment_audio`) share the
    source-separation bookend (cache-check + artifact-existence recheck ->
    adapter-chosen `output_dir` -> tool -> persist). `get_info` is an uncached
    pass-through. The cache key hashes `get_current_config()` together with the
    per-call request params, and `force` rides `CallEnvelope.control` (keeping
    the task methods pure). Storage + artifacts live under the substrate-injected
    `PLUGIN_DATA_DIR` (`media_processing.db` + `cache_dir_for_config` dirs).
    """
    
    def convert(self, input_path, output_format, sample_rate=None, channels=None, **kwargs) -> MediaArtifactResult:
            """Convert media to target/model-ready audio (cached artifact)."""
            request_params = {"output_format": output_format, "sample_rate": sample_rate, "channels": channels}
        "Convert media to target/model-ready audio (cached artifact)."
    
    def extract_audio(self, input_path, output_format=None, **kwargs) -> MediaArtifactResult:
            """Extract the audio stream from a video container (cached artifact)."""
            request_params = {"output_format": output_format}
        "Extract the audio stream from a video container (cached artifact)."
    
    def segment_audio(self, input_path, boundaries, output_format=None,
                          filename_template="segment_{index:03d}", **kwargs) -> MediaSegmentationResult
        "Cut a source into a batch of segment files (cached at BATCH level).

One cache row per (segment_audio, input, config): output_path = the batch
dir, metadata = the whole typed result (segments + batch_key + counts), so
a hit reconstructs the `MediaSegmentationResult`  but only if EVERY
produced segment file still exists (the batch analog of the single-artifact
existence recheck). The per-segment resume-skip the fused tool did is
intentionally NOT ported (born-final simplicity); watch the journal for
batch re-compute cost before reinstating it."
    
    def get_info(self, file_path) -> MediaMetadata
        "Probe media metadata — UNCACHED pure-query pass-through (no artifact)."

Media Processing Storage (storage.ipynb)

Standardized SQLite storage for media-processing results (produced-artifact pointers) with content hashing — born-final, action-aware, no job_id.

Import

from cjm_media_processing_adapter_interface.storage import (
    MediaProcessingRow,
    MediaProcessingStorage
)

Classes

@dataclass
class MediaProcessingRow:
    "A single row from the media_processing_results table."
    
    action: str  # The media-processing op: 'convert', 'segment_audio', 'extract_audio'
    input_path: str  # Path to the source media file
    input_hash: str  # Hash of the input file in "algo:hexdigest" format
    config_hash: str  # Hash of the effective config + per-call request params
    output_path: str  # Produced artifact path (a file for convert/extract_audio; the batch dir for segment_audio)
    output_hash: Optional[str]  # Hash of the produced artifact (None for a batch)
    metadata: Optional[Dict[str, Any]]  # Processing metadata (for segment_audio: the full typed batch result)
    created_at: Optional[float]  # Unix timestamp
class MediaProcessingStorage:
    def __init__(
        self,
        db_path: str  # Absolute path to the SQLite database file
    )
    "Standardized SQLite storage for media-processing results (artifact pointers), born-final."
    
    def __init__(
            self,
            db_path: str  # Absolute path to the SQLite database file
        )
        "Initialize storage and create table if needed."
    
    def save(
            self,
            action: str,        # Media-processing op: 'convert', 'segment_audio', 'extract_audio'
            input_path: str,    # Path to the source media file
            input_hash: str,    # Hash of the input file in "algo:hexdigest" format
            config_hash: str,   # Hash of the effective config + per-call request params
            output_path: str,   # Produced artifact path (file, or batch dir for segment_audio)
            output_hash: Optional[str] = None,        # Hash of the produced artifact (None for a batch)
            metadata: Optional[Dict[str, Any]] = None  # Processing metadata (batch result for segment_audio)
        ) -> None
        "Save or replace a media-processing result (upsert by action + input_path + config_hash)."
    
    def save_with_logging(
            self,
            *,
            action: str,        # Media-processing op
            input_path: str,    # Path to the source media file
            input_hash: str,    # Hash of the input file in "algo:hexdigest" format
            config_hash: str,   # Hash of the effective config + per-call request params
            output_path: str,   # Produced artifact path
            output_hash: Optional[str] = None,         # Hash of the produced artifact
            metadata: Optional[Dict[str, Any]] = None,  # Processing metadata
            logger: Optional[logging.Logger] = None      # Optional logger for success/failure messages
        ) -> bool:  # True if saved; False if the save failed (error logged, not raised)
        "Save a result, logging success/failure. Failures are logged and swallowed (returns False).

CR-14 follow-up: records a RESULT_SAVED account either way (ok flag +
action/input/output/config references  the journal never carries
content) so saves AND swallowed save-failures become auditable journal rows."
    
    def get_cached(
            self,
            action: str,       # Media-processing op
            input_path: str,   # Path to the source media file
            input_hash: str,   # Content hash of the input (cache miss if the file changed)
            config_hash: str   # Config hash to match
        ) -> Optional[MediaProcessingRow]:  # Cached row or None
        "Retrieve a content-correct cached media-processing result.

Matches on action + input_path + input_hash + config_hash, so a changed
input (new input_hash) misses even though a stale row may still exist at
the same (action, input_path, config_hash)  the next save() replaces it.
The CALLER must still confirm the artifact(s) at `output_path` exist
before serving them (they live outside the DB).

CR-14 follow-up: a hit records a CACHE_HIT account (the cache-serving
decision is an account-of-action)."
    
    def list_jobs(
            self,
            limit: int = 100  # Maximum number of rows to return
        ) -> List[MediaProcessingRow]:  # List of media-processing rows
        "List media-processing results ordered by creation time (newest first)."
    
    def verify_input(
            self,
            action: str,      # Media-processing op
            input_path: str,  # Path to the source media file
            config_hash: str  # Config hash to look up
        ) -> Optional[bool]:  # True if input matches, False if changed, None if not found
        "Verify the source file still matches the hash stored for (action, input_path, config_hash)."

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cjm_media_processing_adapter_interface-0.0.3.tar.gz (20.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file cjm_media_processing_adapter_interface-0.0.3.tar.gz.

File metadata

File hashes

Hashes for cjm_media_processing_adapter_interface-0.0.3.tar.gz
Algorithm Hash digest
SHA256 07749f90fcf2626bedd4e254b18662fd4757f189151f07c1d7e567ffdc923147
MD5 b26eebeeaf8014a65dd6b63df78ca6f7
BLAKE2b-256 4dae28907cc6838eb1b86976c3b42a94bccd5d4bfef9bdce13541c008b2a3f23

See more details on using hashes here.

File details

Details for the file cjm_media_processing_adapter_interface-0.0.3-py3-none-any.whl.

File metadata

File hashes

Hashes for cjm_media_processing_adapter_interface-0.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 88e7d24372bd9e463e1e4f4ceb59a34b710e61c32372c1b2d654dfd88c924ae7
MD5 4e68879fae1b4449d5d90285965489bf
BLAKE2b-256 4a972a3310856b613139570ddf3d9b9a61edf4361bd3a64371af34668eb596f4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page