Defines standardized interfaces and data structures for media analysis (VAD, Scene Detection) and processing (FFmpeg, Conversion) plugins within the cjm-plugin-system ecosystem.
Project description
cjm-media-plugin-system
Install
pip install cjm_media_plugin_system
Project Structure
nbs/
├── analysis_interface.ipynb # Domain-specific plugin interface for media analysis (read-only / signal extraction)
├── core.ipynb # DTOs for media analysis and processing with FileBackedDTO support for zero-copy transfer
├── processing_interface.ipynb # Domain-specific plugin interface for media processing (write / file manipulation)
└── storage.ipynb # Standardized SQLite storage for media analysis and processing results with content hashing
Total: 4 notebooks
Module Dependencies
graph LR
analysis_interface["analysis_interface<br/>Media Analysis Plugin Interface"]
core["core<br/>Core Data Structures"]
processing_interface["processing_interface<br/>Media Processing Plugin Interface"]
storage["storage<br/>Media Storage"]
analysis_interface --> core
processing_interface --> core
2 cross-module dependencies detected
CLI Reference
No CLI commands found in this project.
Module Overview
Detailed documentation for each module in the project:
Media Analysis Plugin Interface (analysis_interface.ipynb)
Domain-specific plugin interface for media analysis (read-only / signal extraction)
Import
from cjm_media_plugin_system.analysis_interface import (
MediaAnalysisPlugin
)
Classes
class MediaAnalysisPlugin(PluginInterface):
"""
Abstract base class for plugins that analyze media files.
Analysis plugins perform read-only operations that extract temporal segments
from media files (VAD, scene detection, beat detection, etc.).
"""
def execute(
self,
media_path: Union[str, Path], # Path to media file to analyze
**kwargs
) -> MediaAnalysisResult: # Analysis result with detected TimeRanges
"Analyze the media file and return detected temporal segments."
Core Data Structures (core.ipynb)
DTOs for media analysis and processing with FileBackedDTO support for zero-copy transfer
Import
from cjm_media_plugin_system.core import (
TimeRange,
MediaMetadata,
MediaAnalysisResult
)
Classes
@dataclass
class TimeRange:
"Represents a temporal segment within a media file."
start: float # Start time in seconds
end: float # End time in seconds
label: str = 'segment' # Segment type (e.g., 'speech', 'silence', 'scene')
confidence: Optional[float] # Detection confidence (0.0 to 1.0)
payload: Dict[str, Any] = field(...) # Extra data (e.g., speaker embedding)
def to_dict(self) -> Dict[str, Any]: # Serialized representation
"Convert to dictionary for JSON serialization."
@dataclass
class MediaMetadata:
"Container for media file metadata."
path: str # File path
duration: float # Duration in seconds
format: str # Container format (e.g., 'mp4', 'mkv')
size_bytes: int # File size in bytes
video_streams: List[Dict[str, Any]] = field(...) # Video stream info
audio_streams: List[Dict[str, Any]] = field(...) # Audio stream info
def to_dict(self) -> Dict[str, Any]: # Serialized representation
"Convert to dictionary for JSON serialization."
@dataclass
class MediaAnalysisResult:
"Standard output for media analysis plugins."
ranges: List[TimeRange] # Detected temporal segments
metadata: Dict[str, Any] = field(...) # Global analysis stats
def to_temp_file(self) -> str: # Absolute path to temporary JSON file
"""Save results to a temp JSON file for zero-copy transfer."""
tmp = tempfile.NamedTemporaryFile(suffix=".json", delete=False, mode='w')
data = {
"ranges": [r.to_dict() for r in self.ranges],
"Save results to a temp JSON file for zero-copy transfer."
def from_file(
cls,
filepath: str # Path to JSON file
) -> "MediaAnalysisResult": # Loaded result instance
"Load results from a JSON file."
Media Processing Plugin Interface (processing_interface.ipynb)
Domain-specific plugin interface for media processing (write / file manipulation)
Import
from cjm_media_plugin_system.processing_interface import (
MediaProcessingPlugin
)
Classes
class MediaProcessingPlugin(PluginInterface):
"""
Abstract base class for plugins that modify, convert, or extract media.
Processing plugins perform write operations that produce new files
(format conversion, segment extraction, source separation, speech
enhancement, etc.). They are **dispatcher-style** (SG-44): callers invoke
``execute(action=..., **kwargs)`` and introspect ``supported_actions`` to
discover which operations a given plugin implements. Only ``execute`` and
the universal ``get_info`` read are mandated by this interface; each plugin
declares its own write actions with the substrate's ``@plugin_action``
decorator and sets ``supported_actions = collect_plugin_actions(cls)``.
"""
def execute(
self,
action: str = "get_info", # Operation: 'get_info' plus plugin-specific actions
**kwargs
) -> Dict[str, Any]: # JSON-serializable result (usually containing 'output_path')
"Execute a media processing operation by dispatching on `action`."
def get_info(
self,
file_path: Union[str, Path] # Path to media file
) -> MediaMetadata: # File metadata (duration, codec, streams)
"Get metadata for a media file. Universal across media-processing plugins."
Media Storage (storage.ipynb)
Standardized SQLite storage for media analysis and processing results with content hashing
Import
from cjm_media_plugin_system.storage import (
MediaAnalysisRow,
MediaAnalysisStorage,
MediaProcessingRow,
MediaProcessingStorage
)
Classes
@dataclass
class MediaAnalysisRow:
"A single row from the analysis_jobs table."
file_path: str # Path to the analyzed media file
file_hash: str # Hash of source file in "algo:hexdigest" format
config_hash: str # Hash of the analysis config used
ranges: Optional[List[Dict[str, Any]]] # Detected temporal segments
metadata: Optional[Dict[str, Any]] # Analysis metadata
created_at: Optional[float] # Unix timestamp
class MediaAnalysisStorage:
def __init__(
self,
db_path: str # Absolute path to the SQLite database file
)
"Standardized SQLite storage for media analysis results."
def __init__(
self,
db_path: str # Absolute path to the SQLite database file
)
"Initialize storage and create table if needed."
def save(
self,
file_path: str, # Path to the analyzed media file
file_hash: str, # Hash of source file in "algo:hexdigest" format
config_hash: str, # Hash of the analysis config
ranges: Optional[List[Dict[str, Any]]] = None, # Detected temporal segments
metadata: Optional[Dict[str, Any]] = None # Analysis metadata
) -> None
"Save or replace an analysis result (upsert by file_path + config_hash)."
def save_with_logging(
self,
*,
file_path: str, # Path to the analyzed media file
file_hash: str, # Hash of source file in "algo:hexdigest" format
config_hash: str, # Hash of the analysis config
ranges: Optional[List[Dict[str, Any]]] = None, # Detected temporal segments
metadata: Optional[Dict[str, Any]] = None, # Analysis metadata
logger: Optional[logging.Logger] = None # Optional logger for success/failure messages
) -> bool: # True if saved; False if the save failed (error logged, not raised)
"Save a result, logging success/failure. Failures are logged and swallowed (returns False).
Centralizes the try/save/log/except block media-analysis plugins reimplement.
Returns True on success so callers can gate post-save side effects on the result."
def get_cached(
self,
file_path: str, # Path to the media file
file_hash: str, # Content hash of the file (cache miss if the file changed)
config_hash: str # Config hash to match
) -> Optional[MediaAnalysisRow]: # Cached row or None
"Retrieve a content-correct cached analysis result.
Matches on file_path + file_hash + config_hash, so a changed file (new
file_hash) misses the cache even though a stale row may still exist at the
same (file_path, config_hash) — the next save() replaces it."
def list_jobs(
self,
limit: int = 100 # Maximum number of rows to return
) -> List[MediaAnalysisRow]: # List of analysis rows
"List analysis jobs ordered by creation time (newest first)."
def verify_file(
self,
file_path: str, # Path to the media file
config_hash: str # Config hash to look up
) -> Optional[bool]: # True if file matches, False if changed, None if not found
"Verify the source media file still matches the hash stored for (file_path, config_hash)."
@dataclass
class MediaProcessingRow:
"A single row from the processing_jobs table."
job_id: str # Unique job identifier
action: str # Operation performed: 'convert', 'extract_segment', etc.
input_path: str # Path to the source media file
input_hash: str # Hash of source file in "algo:hexdigest" format
config_hash: str # Hash of the action parameters / effective config used
output_path: str # Path to the produced output file
output_hash: str # Hash of output file in "algo:hexdigest" format
parameters: Optional[Dict[str, Any]] # Action-specific parameters
metadata: Optional[Dict[str, Any]] # Processing metadata
created_at: Optional[float] # Unix timestamp
class MediaProcessingStorage:
def __init__(
self,
db_path: str # Absolute path to the SQLite database file
)
"Standardized SQLite storage for media processing results."
def __init__(
self,
db_path: str # Absolute path to the SQLite database file
)
"Initialize storage, create table, run migrations, and build indexes."
def save(
self,
job_id: str, # Unique job identifier
action: str, # Operation performed: 'convert', 'extract_segment', etc.
input_path: str, # Path to the source media file
input_hash: str, # Hash of source file in "algo:hexdigest" format
config_hash: str, # Hash of the action parameters / effective config
output_path: str, # Path to the produced output file
output_hash: str, # Hash of output file in "algo:hexdigest" format
parameters: Optional[Dict[str, Any]] = None, # Action-specific parameters
metadata: Optional[Dict[str, Any]] = None # Processing metadata
) -> None
"Save or replace a processing result (upsert by action + input_path + config_hash)."
def save_with_logging(
self,
*,
job_id: str, # Unique job identifier
action: str, # Operation performed
input_path: str, # Path to the source media file
input_hash: str, # Hash of source file in "algo:hexdigest" format
config_hash: str, # Hash of the action parameters / effective config
output_path: str, # Path to the produced output file
output_hash: str, # Hash of output file in "algo:hexdigest" format
parameters: Optional[Dict[str, Any]] = None, # Action-specific parameters
metadata: Optional[Dict[str, Any]] = None, # Processing metadata
logger: Optional[logging.Logger] = None # Optional logger for success/failure messages
) -> bool: # True if saved; False if the save failed (error logged, not raised)
"Save a result, logging success/failure. Failures are logged and swallowed (returns False).
Centralizes the try/save/log/except block media-processing plugins reimplement.
Returns True on success so callers can gate post-save side effects on the result."
def get_cached(
self,
action: str, # Operation performed
input_path: str, # Path to the source media file
input_hash: str, # Content hash of the input (cache miss if the file changed)
config_hash: str # Hash of the action parameters / effective config
) -> Optional[MediaProcessingRow]: # Cached row or None
"Retrieve a content-correct cached processing result.
Matches on action + input_path + input_hash + config_hash. A changed input
file (new input_hash) misses even if a stale row exists at the same
(action, input_path, config_hash) — the next save() replaces it."
def get_by_job_id(
self,
job_id: str # Job identifier to look up
) -> Optional[MediaProcessingRow]: # Row or None if not found
"Retrieve a processing result by job ID."
def list_jobs(
self,
limit: int = 100 # Maximum number of rows to return
) -> List[MediaProcessingRow]: # List of processing rows
"List processing jobs ordered by creation time (newest first)."
def verify_input(
self,
job_id: str # Job identifier to verify
) -> Optional[bool]: # True if input matches, False if changed, None if not found
"Verify the source media file still matches its stored hash."
def verify_output(
self,
job_id: str # Job identifier to verify
) -> Optional[bool]: # True if output matches, False if changed, None if not found
"Verify the output media file still matches its stored hash."
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cjm_media_plugin_system-0.0.14.tar.gz.
File metadata
- Download URL: cjm_media_plugin_system-0.0.14.tar.gz
- Upload date:
- Size: 17.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
afccce1fb6b8cece43cbf4180c8ad3b9bb427e6a55b5dc60939fdc105ac4e72d
|
|
| MD5 |
b9a9044f71c013c09985c26c4900c884
|
|
| BLAKE2b-256 |
e3d96eaa75efb7d76910c5dbfdd0333bfe51ef6fd166360db3c4bce8e2e8d139
|
File details
Details for the file cjm_media_plugin_system-0.0.14-py3-none-any.whl.
File metadata
- Download URL: cjm_media_plugin_system-0.0.14-py3-none-any.whl
- Upload date:
- Size: 18.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a6f2f3d682647a53c473ede8e5bb83e0f4cee43a2469f36fedbcf22044bb1713
|
|
| MD5 |
7862e20d82eef42e437f891e717fc82e
|
|
| BLAKE2b-256 |
7fae8630aff82ec4f484aebfdcd524dc890b8633790b7d5947111bd3e9b70bc0
|