Typed media-processing task-adapter interface — the multi-method MediaProcessingAdapter ABC + GenericMediaProcessingAdapter (cache/persist bookends around a pure-compute tool: convert, segment_audio, extract_audio, plus an uncached get_info probe) + the MediaProcessingToolProtocol. Result DTOs live in cjm-capability-primitives.
Project description
cjm-media-processing-adapter-interface
Install
pip install cjm_media_processing_adapter_interface
Project Structure
nbs/
├── adapter.ipynb # The typed media-processing task contract — the multi-method `MediaProcessingAdapter` ABC + the `MediaProcessingToolProtocol` structural contract (capability-unit Option C, pass-2 Thread 3). ONE multi-method adapter for the media-processing tool's whole repertoire (the graph-storage precedent), NOT per-action adapters.
├── generic.ipynb # The generic (tool-agnostic) multi-method media-processing adapter — cache-bookended artifact ops (`convert` / `segment_audio` / `extract_audio`) + an uncached `get_info` probe. Reused across every tool capability satisfying `MediaProcessingToolProtocol` (ffmpeg today).
└── storage.ipynb # Standardized SQLite storage for media-processing results (produced-artifact pointers) with content hashing — born-final, action-aware, no `job_id`.
Total: 3 notebooks
Module Dependencies
graph LR
adapter["adapter<br/>Media Processing Adapter"]
generic["generic<br/>Generic Media Processing Adapter"]
storage["storage<br/>Media Processing Storage"]
generic --> adapter
generic --> storage
2 cross-module dependencies detected
CLI Reference
No CLI commands found in this project.
Module Overview
Detailed documentation for each module in the project:
Media Processing Adapter (adapter.ipynb)
The typed media-processing task contract — the multi-method
MediaProcessingAdapterABC + theMediaProcessingToolProtocolstructural contract (capability-unit Option C, pass-2 Thread 3). ONE multi-method adapter for the media-processing tool’s whole repertoire (the graph-storage precedent), NOT per-action adapters.
Import
from cjm_media_processing_adapter_interface.adapter import (
MediaProcessingToolProtocol,
MediaProcessingAdapter
)
Classes
@runtime_checkable
class MediaProcessingToolProtocol(Protocol):
"""
Structural contract for media-processing tool capabilities (ffmpeg today;
born-final at stage 8 — derived from the native tool surface).
Artifact-producing ops (`convert`, `segment_audio`, `extract_audio`) are pure
compute that WRITE audio file(s) into the adapter-chosen `output_dir` (the
artifact-write seam) and return a typed pointer. `get_info` is a pure-query
probe returning inline `MediaMetadata` (no artifact). `get_current_config`
supplies the effective config the generic adapter hashes (together with the
per-call request params) for its cache key.
The `output_dir` parameter on the artifact ops is the adapter telling the
tool WHERE to persist (the `db_path`-off-the-tool rule); encoding the audio
stays tool-side compute. Per-call request params (`output_format`,
`sample_rate`, `channels`, `boundaries`, `filename_template`) define WHAT
artifact to produce and are hashed into the cache key.
"""
def convert(self, input_path: Union[str, Path], output_dir: str,
output_format: str, sample_rate: Optional[int] = None,
channels: Optional[int] = None, **kwargs) -> MediaArtifactResult: ...
def segment_audio(self, input_path: Union[str, Path], output_dir: str,
boundaries: List[Dict[str, float]], output_format: Optional[str] = None,
filename_template: str = "segment_{index:03d}", **kwargs) -> MediaSegmentationResult: ...
def extract_audio(self, input_path: Union[str, Path], output_dir: str,
output_format: Optional[str] = None, **kwargs) -> MediaArtifactResult: ...
def get_info(self, file_path: Union[str, Path]) -> MediaMetadata: ...
def get_current_config(self) -> Dict[str, Any]: ...
def get_current_config(self) -> Dict[str, Any]: ...
class MediaProcessingAdapter:
def __init__(
self,
tool: MediaProcessingToolProtocol, # The bound tool capability instance (worker-side binding)
)
"""
Typed media-processing task adapter — ONE multi-method adapter for the
media-processing tool's whole repertoire (the graph-storage multi-method
precedent, NOT per-action adapters: the `task_name` key answers "which tools
do media-processing", and there is one such tool — ffmpeg — that will gain
MORE ops over time, e.g. building video from images + audio, so per-action
would only multiply task families + libs without aiding discovery).
Native-surface model (stage 8 / PILLAR 1c): the TOOL is pure compute; the
ADAPTER owns the cache + persistence bookends (see
`GenericMediaProcessingAdapter`) + the per-call `force` control. The
artifact ops write audio file(s) under the substrate-injected
`PLUGIN_DATA_DIR`; the cache row maps (action, input, config) -> output(s).
The adapter chooses the output location and passes it to the tool; `db_path`
is not on the tool protocol. `get_info` is an UNCACHED pure-query
pass-through (a probe; no artifact).
Input contract: `convert`/`segment_audio` receive a source (or an upstream
artifact, e.g. a vocals stem) path; the produced WAVs ARE the model-ready
audio downstream consumers (VAD/transcription) use. `extract_audio` pulls an
audio stream from a video container (the non-minimal video-input path).
Implementations run in-worker beside their tool, constructed
`AdapterClass(tool)` (mirrors `GraphStorageAdapter`). Result DTOs are
wire-registered (`media_processing.{artifact,segmentation,metadata}`) so
returned values cross the worker boundary typed.
"""
def __init__(
self,
tool: MediaProcessingToolProtocol, # The bound tool capability instance (worker-side binding)
)
def convert(
self,
input_path: Union[str, Path], # Source (or upstream artifact) audio/video to convert
output_format: str, # Target audio format (e.g. 'wav', 'mp3')
sample_rate: Optional[int] = None, # Target sample rate (e.g. 16000 for model input); None = source rate
channels: Optional[int] = None, # Target channel count (e.g. 1 = mono); None = source channels
**kwargs, # Provenance + tool options
) -> MediaArtifactResult: # The produced converted-audio artifact (cached)
"Convert media to target/model-ready audio (cached artifact)."
def segment_audio(
self,
input_path: Union[str, Path], # Source audio to cut
boundaries: List[Dict[str, float]], # [{start, end}, ...] cut points (seconds)
output_format: Optional[str] = None, # Output format; None = same as input
filename_template: str = "segment_{index:03d}", # Per-segment filename pattern
**kwargs, # Provenance + tool options
) -> MediaSegmentationResult: # The produced batch of segment files (cached at batch level)
"Cut a source into a batch of segment files at the given boundaries (cached batch)."
def extract_audio(
self,
input_path: Union[str, Path], # Source video container
output_format: Optional[str] = None, # Audio format; None = codec-derived
**kwargs, # Provenance + tool options
) -> MediaArtifactResult: # The produced extracted-audio artifact (cached)
"Extract the audio stream from a video container (cached artifact)."
def get_info(
self,
file_path: Union[str, Path], # Media file to probe
) -> MediaMetadata: # Probed metadata (uncached)
"Probe media metadata (uncached pure-query)."
Generic Media Processing Adapter (generic.ipynb)
The generic (tool-agnostic) multi-method media-processing adapter — cache-bookended artifact ops (
convert/segment_audio/extract_audio) + an uncachedget_infoprobe. Reused across every tool capability satisfyingMediaProcessingToolProtocol(ffmpeg today).
Import
from cjm_media_processing_adapter_interface.generic import (
GenericMediaProcessingAdapter
)
Classes
class GenericMediaProcessingAdapter(MediaProcessingAdapter):
"""
Generic multi-method media-processing adapter, reused across any tool
satisfying `MediaProcessingToolProtocol`.
Artifact ops (`convert`/`extract_audio`/`segment_audio`) share the
source-separation bookend (cache-check + artifact-existence recheck ->
adapter-chosen `output_dir` -> tool -> persist). `get_info` is an uncached
pass-through. The cache key hashes `get_current_config()` together with the
per-call request params, and `force` rides `CallEnvelope.control` (keeping
the task methods pure). Storage + artifacts live under the substrate-injected
`PLUGIN_DATA_DIR` (`media_processing.db` + `cache_dir_for_config` dirs).
"""
def convert(self, input_path, output_format, sample_rate=None, channels=None, **kwargs) -> MediaArtifactResult:
"""Convert media to target/model-ready audio (cached artifact)."""
request_params = {"output_format": output_format, "sample_rate": sample_rate, "channels": channels}
"Convert media to target/model-ready audio (cached artifact)."
def extract_audio(self, input_path, output_format=None, **kwargs) -> MediaArtifactResult:
"""Extract the audio stream from a video container (cached artifact)."""
request_params = {"output_format": output_format}
"Extract the audio stream from a video container (cached artifact)."
def segment_audio(self, input_path, boundaries, output_format=None,
filename_template="segment_{index:03d}", **kwargs) -> MediaSegmentationResult
"Cut a source into a batch of segment files (cached at BATCH level).
One cache row per (segment_audio, input, config): output_path = the batch
dir, metadata = the whole typed result (segments + batch_key + counts), so
a hit reconstructs the `MediaSegmentationResult` — but only if EVERY
produced segment file still exists (the batch analog of the single-artifact
existence recheck). The per-segment resume-skip the fused tool did is
intentionally NOT ported (born-final simplicity); watch the journal for
batch re-compute cost before reinstating it."
def get_info(self, file_path) -> MediaMetadata
"Probe media metadata — UNCACHED pure-query pass-through (no artifact)."
Media Processing Storage (storage.ipynb)
Standardized SQLite storage for media-processing results (produced-artifact pointers) with content hashing — born-final, action-aware, no
job_id.
Import
from cjm_media_processing_adapter_interface.storage import (
MediaProcessingRow,
MediaProcessingStorage
)
Classes
@dataclass
class MediaProcessingRow:
"A single row from the media_processing_results table."
action: str # The media-processing op: 'convert', 'segment_audio', 'extract_audio'
input_path: str # Path to the source media file
input_hash: str # Hash of the input file in "algo:hexdigest" format
config_hash: str # Hash of the effective config + per-call request params
output_path: str # Produced artifact path (a file for convert/extract_audio; the batch dir for segment_audio)
output_hash: Optional[str] # Hash of the produced artifact (None for a batch)
metadata: Optional[Dict[str, Any]] # Processing metadata (for segment_audio: the full typed batch result)
created_at: Optional[float] # Unix timestamp
class MediaProcessingStorage:
def __init__(
self,
db_path: str # Absolute path to the SQLite database file
)
"Standardized SQLite storage for media-processing results (artifact pointers), born-final."
def __init__(
self,
db_path: str # Absolute path to the SQLite database file
)
"Initialize storage and create table if needed."
def save(
self,
action: str, # Media-processing op: 'convert', 'segment_audio', 'extract_audio'
input_path: str, # Path to the source media file
input_hash: str, # Hash of the input file in "algo:hexdigest" format
config_hash: str, # Hash of the effective config + per-call request params
output_path: str, # Produced artifact path (file, or batch dir for segment_audio)
output_hash: Optional[str] = None, # Hash of the produced artifact (None for a batch)
metadata: Optional[Dict[str, Any]] = None # Processing metadata (batch result for segment_audio)
) -> None
"Save or replace a media-processing result (upsert by action + input_path + config_hash)."
def save_with_logging(
self,
*,
action: str, # Media-processing op
input_path: str, # Path to the source media file
input_hash: str, # Hash of the input file in "algo:hexdigest" format
config_hash: str, # Hash of the effective config + per-call request params
output_path: str, # Produced artifact path
output_hash: Optional[str] = None, # Hash of the produced artifact
metadata: Optional[Dict[str, Any]] = None, # Processing metadata
logger: Optional[logging.Logger] = None # Optional logger for success/failure messages
) -> bool: # True if saved; False if the save failed (error logged, not raised)
"Save a result, logging success/failure. Failures are logged and swallowed (returns False).
CR-14 follow-up: records a RESULT_SAVED account either way (ok flag +
action/input/output/config references — the journal never carries
content) so saves AND swallowed save-failures become auditable journal rows."
def get_cached(
self,
action: str, # Media-processing op
input_path: str, # Path to the source media file
input_hash: str, # Content hash of the input (cache miss if the file changed)
config_hash: str # Config hash to match
) -> Optional[MediaProcessingRow]: # Cached row or None
"Retrieve a content-correct cached media-processing result.
Matches on action + input_path + input_hash + config_hash, so a changed
input (new input_hash) misses even though a stale row may still exist at
the same (action, input_path, config_hash) — the next save() replaces it.
The CALLER must still confirm the artifact(s) at `output_path` exist
before serving them (they live outside the DB).
CR-14 follow-up: a hit records a CACHE_HIT account (the cache-serving
decision is an account-of-action)."
def list_jobs(
self,
limit: int = 100 # Maximum number of rows to return
) -> List[MediaProcessingRow]: # List of media-processing rows
"List media-processing results ordered by creation time (newest first)."
def verify_input(
self,
action: str, # Media-processing op
input_path: str, # Path to the source media file
config_hash: str # Config hash to look up
) -> Optional[bool]: # True if input matches, False if changed, None if not found
"Verify the source file still matches the hash stored for (action, input_path, config_hash)."
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cjm_media_processing_adapter_interface-0.0.4.tar.gz.
File metadata
- Download URL: cjm_media_processing_adapter_interface-0.0.4.tar.gz
- Upload date:
- Size: 20.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8c2d14aa4f35e2d85bafc3bf7f4623fa8723b09ac65e4cad8949b5432957ffaa
|
|
| MD5 |
fde338220f078cea320ba105ce677601
|
|
| BLAKE2b-256 |
cafe8002613a2f5ae7b1c8505b8f3f6a969bdc79ed065d77173ebb71eb21b658
|
File details
Details for the file cjm_media_processing_adapter_interface-0.0.4-py3-none-any.whl.
File metadata
- Download URL: cjm_media_processing_adapter_interface-0.0.4-py3-none-any.whl
- Upload date:
- Size: 20.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2197ba05a09de579cc429e16c39f7203dba13863343e46a204030f631026272b
|
|
| MD5 |
534ed5b5f0c119b4b0ea16f093c02951
|
|
| BLAKE2b-256 |
4088e4d255ec1a9a6417378382c56d053fce0dc9cb1298d12663aceb3af757f0
|