Skip to main content

Defines standardized interfaces and data structures for media analysis (VAD, Scene Detection) and processing (FFmpeg, Conversion) plugins within the cjm-plugin-system ecosystem.

Project description

cjm-media-plugin-system

Install

pip install cjm_media_plugin_system

Project Structure

nbs/
├── analysis_interface.ipynb   # Domain-specific plugin interface for media analysis (read-only / signal extraction)
├── core.ipynb                 # DTOs for media analysis and processing with FileBackedDTO support for zero-copy transfer
├── processing_interface.ipynb # Domain-specific plugin interface for media processing (write / file manipulation)
└── storage.ipynb              # Standardized SQLite storage for media analysis and processing results with content hashing

Total: 4 notebooks

Module Dependencies

graph LR
    analysis_interface["analysis_interface<br/>Media Analysis Plugin Interface"]
    core["core<br/>Core Data Structures"]
    processing_interface["processing_interface<br/>Media Processing Plugin Interface"]
    storage["storage<br/>Media Storage"]

    analysis_interface --> core
    processing_interface --> core

2 cross-module dependencies detected

CLI Reference

No CLI commands found in this project.

Module Overview

Detailed documentation for each module in the project:

Media Analysis Plugin Interface (analysis_interface.ipynb)

Domain-specific plugin interface for media analysis (read-only / signal extraction)

Import

from cjm_media_plugin_system.analysis_interface import (
    MediaAnalysisPlugin
)

Classes

class MediaAnalysisPlugin(PluginInterface):
    """
    Abstract base class for plugins that analyze media files.
    
    Analysis plugins perform read-only operations that extract temporal segments
    from media files (VAD, scene detection, beat detection, etc.).
    """
    
    def execute(
            self,
            media_path: Union[str, Path],  # Path to media file to analyze
            **kwargs
        ) -> MediaAnalysisResult:  # Analysis result with detected TimeRanges
        "Analyze the media file and return detected temporal segments."

Core Data Structures (core.ipynb)

DTOs for media analysis and processing with FileBackedDTO support for zero-copy transfer

Import

from cjm_media_plugin_system.core import (
    TimeRange,
    MediaMetadata,
    MediaAnalysisResult
)

Classes

@dataclass
class TimeRange:
    "Represents a temporal segment within a media file."
    
    start: float  # Start time in seconds
    end: float  # End time in seconds
    label: str = 'segment'  # Segment type (e.g., 'speech', 'silence', 'scene')
    confidence: Optional[float]  # Detection confidence (0.0 to 1.0)
    payload: Dict[str, Any] = field(...)  # Extra data (e.g., speaker embedding)
    
    def to_dict(self) -> Dict[str, Any]:  # Serialized representation
        "Convert to dictionary for JSON serialization."
@dataclass
class MediaMetadata:
    "Container for media file metadata."
    
    path: str  # File path
    duration: float  # Duration in seconds
    format: str  # Container format (e.g., 'mp4', 'mkv')
    size_bytes: int  # File size in bytes
    video_streams: List[Dict[str, Any]] = field(...)  # Video stream info
    audio_streams: List[Dict[str, Any]] = field(...)  # Audio stream info
    
    def to_dict(self) -> Dict[str, Any]:  # Serialized representation
        "Convert to dictionary for JSON serialization."
@dataclass
class MediaAnalysisResult:
    "Standard output for media analysis plugins."
    
    ranges: List[TimeRange]  # Detected temporal segments
    metadata: Dict[str, Any] = field(...)  # Global analysis stats
    
    def to_temp_file(self) -> str:  # Absolute path to temporary JSON file
            """Save results to a temp JSON file for zero-copy transfer."""
            tmp = tempfile.NamedTemporaryFile(suffix=".json", delete=False, mode='w')
            
            data = {
                "ranges": [r.to_dict() for r in self.ranges],
        "Save results to a temp JSON file for zero-copy transfer."
    
    def from_file(
            cls,
            filepath: str  # Path to JSON file
        ) -> "MediaAnalysisResult":  # Loaded result instance
        "Load results from a JSON file."

Media Processing Plugin Interface (processing_interface.ipynb)

Domain-specific plugin interface for media processing (write / file manipulation)

Import

from cjm_media_plugin_system.processing_interface import (
    MediaProcessingPlugin
)

Classes

class MediaProcessingPlugin(PluginInterface):
    """
    Abstract base class for plugins that modify, convert, or extract media.
    
    Processing plugins perform write operations that produce new files
    (format conversion, segment extraction, source separation, speech
    enhancement, etc.). They are **dispatcher-style** (SG-44): callers invoke
    ``execute(action=..., **kwargs)`` and introspect ``supported_actions`` to
    discover which operations a given plugin implements. Only ``execute`` and
    the universal ``get_info`` read are mandated by this interface; each plugin
    declares its own write actions with the substrate's ``@plugin_action``
    decorator and sets ``supported_actions = collect_plugin_actions(cls)``.
    """
    
    def execute(
            self,
            action: str = "get_info",  # Operation: 'get_info' plus plugin-specific actions
            **kwargs
        ) -> Dict[str, Any]:  # JSON-serializable result (usually containing 'output_path')
        "Execute a media processing operation by dispatching on `action`."
    
    def get_info(
            self,
            file_path: Union[str, Path]  # Path to media file
        ) -> MediaMetadata:  # File metadata (duration, codec, streams)
        "Get metadata for a media file. Universal across media-processing plugins."

Media Storage (storage.ipynb)

Standardized SQLite storage for media analysis and processing results with content hashing

Import

from cjm_media_plugin_system.storage import (
    MediaAnalysisRow,
    MediaAnalysisStorage,
    MediaProcessingRow,
    MediaProcessingStorage
)

Classes

@dataclass
class MediaAnalysisRow:
    "A single row from the analysis_jobs table."
    
    file_path: str  # Path to the analyzed media file
    file_hash: str  # Hash of source file in "algo:hexdigest" format
    config_hash: str  # Hash of the analysis config used
    ranges: Optional[List[Dict[str, Any]]]  # Detected temporal segments
    metadata: Optional[Dict[str, Any]]  # Analysis metadata
    created_at: Optional[float]  # Unix timestamp
class MediaAnalysisStorage:
    def __init__(
        self,
        db_path: str  # Absolute path to the SQLite database file
    )
    "Standardized SQLite storage for media analysis results."
    
    def __init__(
            self,
            db_path: str  # Absolute path to the SQLite database file
        )
        "Initialize storage and create table if needed."
    
    def save(
            self,
            file_path: str,     # Path to the analyzed media file
            file_hash: str,     # Hash of source file in "algo:hexdigest" format
            config_hash: str,   # Hash of the analysis config
            ranges: Optional[List[Dict[str, Any]]] = None,  # Detected temporal segments
            metadata: Optional[Dict[str, Any]] = None        # Analysis metadata
        ) -> None
        "Save or replace an analysis result (upsert by file_path + config_hash)."
    
    def save_with_logging(
            self,
            *,
            file_path: str,     # Path to the analyzed media file
            file_hash: str,     # Hash of source file in "algo:hexdigest" format
            config_hash: str,   # Hash of the analysis config
            ranges: Optional[List[Dict[str, Any]]] = None,  # Detected temporal segments
            metadata: Optional[Dict[str, Any]] = None,       # Analysis metadata
            logger: Optional[logging.Logger] = None           # Optional logger for success/failure messages
        ) -> bool:  # True if saved; False if the save failed (error logged, not raised)
        "Save a result, logging success/failure. Failures are logged and swallowed (returns False).

Centralizes the try/save/log/except block media-analysis plugins reimplement.
Returns True on success so callers can gate post-save side effects on the result."
    
    def get_cached(
            self,
            file_path: str,   # Path to the media file
            file_hash: str,   # Content hash of the file (cache miss if the file changed)
            config_hash: str  # Config hash to match
        ) -> Optional[MediaAnalysisRow]:  # Cached row or None
        "Retrieve a content-correct cached analysis result.

Matches on file_path + file_hash + config_hash, so a changed file (new
file_hash) misses the cache even though a stale row may still exist at the
same (file_path, config_hash)  the next save() replaces it."
    
    def list_jobs(
            self,
            limit: int = 100  # Maximum number of rows to return
        ) -> List[MediaAnalysisRow]:  # List of analysis rows
        "List analysis jobs ordered by creation time (newest first)."
    
    def verify_file(
            self,
            file_path: str,   # Path to the media file
            config_hash: str  # Config hash to look up
        ) -> Optional[bool]:  # True if file matches, False if changed, None if not found
        "Verify the source media file still matches the hash stored for (file_path, config_hash)."
@dataclass
class MediaProcessingRow:
    "A single row from the processing_jobs table."
    
    job_id: str  # Unique job identifier
    action: str  # Operation performed: 'convert', 'extract_segment', etc.
    input_path: str  # Path to the source media file
    input_hash: str  # Hash of source file in "algo:hexdigest" format
    config_hash: str  # Hash of the action parameters / effective config used
    output_path: str  # Path to the produced output file
    output_hash: str  # Hash of output file in "algo:hexdigest" format
    parameters: Optional[Dict[str, Any]]  # Action-specific parameters
    metadata: Optional[Dict[str, Any]]  # Processing metadata
    created_at: Optional[float]  # Unix timestamp
class MediaProcessingStorage:
    def __init__(
        self,
        db_path: str  # Absolute path to the SQLite database file
    )
    "Standardized SQLite storage for media processing results."
    
    def __init__(
            self,
            db_path: str  # Absolute path to the SQLite database file
        )
        "Initialize storage, create table, run migrations, and build indexes."
    
    def save(
            self,
            job_id: str,        # Unique job identifier
            action: str,        # Operation performed: 'convert', 'extract_segment', etc.
            input_path: str,    # Path to the source media file
            input_hash: str,    # Hash of source file in "algo:hexdigest" format
            config_hash: str,   # Hash of the action parameters / effective config
            output_path: str,   # Path to the produced output file
            output_hash: str,   # Hash of output file in "algo:hexdigest" format
            parameters: Optional[Dict[str, Any]] = None,  # Action-specific parameters
            metadata: Optional[Dict[str, Any]] = None       # Processing metadata
        ) -> None
        "Save or replace a processing result (upsert by action + input_path + config_hash)."
    
    def save_with_logging(
            self,
            *,
            job_id: str,        # Unique job identifier
            action: str,        # Operation performed
            input_path: str,    # Path to the source media file
            input_hash: str,    # Hash of source file in "algo:hexdigest" format
            config_hash: str,   # Hash of the action parameters / effective config
            output_path: str,   # Path to the produced output file
            output_hash: str,   # Hash of output file in "algo:hexdigest" format
            parameters: Optional[Dict[str, Any]] = None,  # Action-specific parameters
            metadata: Optional[Dict[str, Any]] = None,      # Processing metadata
            logger: Optional[logging.Logger] = None          # Optional logger for success/failure messages
        ) -> bool:  # True if saved; False if the save failed (error logged, not raised)
        "Save a result, logging success/failure. Failures are logged and swallowed (returns False).

Centralizes the try/save/log/except block media-processing plugins reimplement.
Returns True on success so callers can gate post-save side effects on the result."
    
    def get_cached(
            self,
            action: str,       # Operation performed
            input_path: str,   # Path to the source media file
            input_hash: str,   # Content hash of the input (cache miss if the file changed)
            config_hash: str   # Hash of the action parameters / effective config
        ) -> Optional[MediaProcessingRow]:  # Cached row or None
        "Retrieve a content-correct cached processing result.

Matches on action + input_path + input_hash + config_hash. A changed input
file (new input_hash) misses even if a stale row exists at the same
(action, input_path, config_hash)  the next save() replaces it."
    
    def get_by_job_id(
            self,
            job_id: str  # Job identifier to look up
        ) -> Optional[MediaProcessingRow]:  # Row or None if not found
        "Retrieve a processing result by job ID."
    
    def list_jobs(
            self,
            limit: int = 100  # Maximum number of rows to return
        ) -> List[MediaProcessingRow]:  # List of processing rows
        "List processing jobs ordered by creation time (newest first)."
    
    def verify_input(
            self,
            job_id: str  # Job identifier to verify
        ) -> Optional[bool]:  # True if input matches, False if changed, None if not found
        "Verify the source media file still matches its stored hash."
    
    def verify_output(
            self,
            job_id: str  # Job identifier to verify
        ) -> Optional[bool]:  # True if output matches, False if changed, None if not found
        "Verify the output media file still matches its stored hash."

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cjm_media_plugin_system-0.0.13.tar.gz (17.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cjm_media_plugin_system-0.0.13-py3-none-any.whl (18.1 kB view details)

Uploaded Python 3

File details

Details for the file cjm_media_plugin_system-0.0.13.tar.gz.

File metadata

  • Download URL: cjm_media_plugin_system-0.0.13.tar.gz
  • Upload date:
  • Size: 17.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for cjm_media_plugin_system-0.0.13.tar.gz
Algorithm Hash digest
SHA256 30d861e15723efee0dc4655344e6b6de844b794c045753aaea5d73c920f352d7
MD5 c355930b4bc6e6aeca29a8adf68a220e
BLAKE2b-256 46e17172611b3f52636de31b9bbab9d40b34192a5a24e89a81cd60ea3ec8e87b

See more details on using hashes here.

File details

Details for the file cjm_media_plugin_system-0.0.13-py3-none-any.whl.

File metadata

File hashes

Hashes for cjm_media_plugin_system-0.0.13-py3-none-any.whl
Algorithm Hash digest
SHA256 39054867e2abe600a90dfab4ba16978fae8a15719fc41355b6b0d1f6e33531cb
MD5 f93986dd2f2445323ad8fe73b8215c87
BLAKE2b-256 18d72e5fdfca96528269a0b01bc67fa99f076cc7dd4e02e0aacd1609bf6114e5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page