Skip to main content

Generic plugin system with discovery, configuration validation, and runtime management for extensible Python applications.

Project description

cjm-plugin-system

Install

pip install cjm_plugin_system

Project Structure

nbs/
├── core/ (5)
│   ├── interface.ipynb  # Abstract base class defining the generic plugin interface
│   ├── manager.ipynb    # Plugin discovery, loading, and lifecycle management system
│   ├── metadata.ipynb   # Data structures for plugin metadata
│   ├── proxy.ipynb      # Bridge between Host application and isolated Worker processes
│   └── worker.ipynb     # FastAPI server that runs inside isolated plugin environments
└── utils/ (1)
    └── validation.ipynb  # Validation helpers for plugin configuration dataclasses

Total: 6 notebooks across 2 directories

Module Dependencies

graph LR
    core_interface[core.interface<br/>Plugin Interface]
    core_manager[core.manager<br/>Plugin Manager]
    core_metadata[core.metadata<br/>Plugin Metadata]
    core_proxy[core.proxy<br/>Remote Plugin Proxy]
    core_worker[core.worker<br/>Universal Worker]
    utils_validation[utils.validation<br/>Configuration Validation]

    core_manager --> core_metadata
    core_manager --> core_proxy
    core_manager --> core_interface
    core_proxy --> core_interface

4 cross-module dependencies detected

CLI Reference

No CLI commands found in this project.

Module Overview

Detailed documentation for each module in the project:

Plugin Interface (interface.ipynb)

Abstract base class defining the generic plugin interface

Import

from cjm_plugin_system.core.interface import (
    FileBackedDTO,
    PluginInterface
)

Classes

@runtime_checkable
class FileBackedDTO(Protocol):
    "Protocol for Data Transfer Objects that serialize to disk for zero-copy transfer."
    
    def to_temp_file(self) -> str: # Absolute path to the temporary file
        "Save the data to a temporary file and return the absolute path."
class PluginInterface(ABC):
    "Abstract base class for all plugins (both local workers and remote proxies)."
    
    def name(self) -> str: # Unique identifier for this plugin
            """Unique plugin identifier."""
            ...
    
        @property
        @abstractmethod
        def version(self) -> str: # Semantic version string (e.g., "1.0.0")
        "Unique plugin identifier."
    
    def version(self) -> str: # Semantic version string (e.g., "1.0.0")
            """Plugin version."""
            ...
    
        @abstractmethod
        def initialize(
            self,
            config: Optional[Dict[str, Any]] = None # Configuration dictionary
        ) -> None
        "Plugin version."
    
    def initialize(
            self,
            config: Optional[Dict[str, Any]] = None # Configuration dictionary
        ) -> None
        "Initialize or re-configure the plugin."
    
    def execute(
            self,
            *args,
            **kwargs
        ) -> Any: # Plugin-specific output
        "Execute the plugin's main functionality."
    
    def execute_stream(
            self,
            *args,
            **kwargs
        ) -> Generator[Any, None, None]: # Yields partial results
        "Stream execution results chunk by chunk."
    
    def get_config_schema(self) -> Dict[str, Any]: # JSON Schema for configuration
            """Return JSON Schema describing the plugin's configuration options."""
            ...
    
        @abstractmethod
        def get_current_config(self) -> Dict[str, Any]: # Current configuration values
        "Return JSON Schema describing the plugin's configuration options."
    
    def get_current_config(self) -> Dict[str, Any]: # Current configuration values
            """Return the current configuration state as a dictionary."""
            ...
    
        @abstractmethod
        def cleanup(self) -> None
        "Return the current configuration state as a dictionary."
    
    def cleanup(self) -> None
        "Clean up resources when plugin is unloaded."

Plugin Manager (manager.ipynb)

Plugin discovery, loading, and lifecycle management system

Import

from cjm_plugin_system.core.manager import (
    PluginManager,
    get_plugin_config,
    get_plugin_config_schema,
    get_all_plugin_configs,
    update_plugin_config,
    reload_plugin,
    get_plugin_stats,
    execute_plugin_stream
)

Functions

def get_plugin_config(
    self,
    plugin_name: str # Name of the plugin
) -> Optional[Dict[str, Any]]: # Current configuration or None
    "Get the current configuration of a plugin."
def get_plugin_config_schema(
    self,
    plugin_name: str # Name of the plugin
) -> Optional[Dict[str, Any]]: # JSON Schema or None
    "Get the configuration JSON Schema for a plugin."
def get_all_plugin_configs(self) -> Dict[str, Dict[str, Any]]: # Plugin name -> config mapping
    """Get current configuration for all loaded plugins."""
    return {
        name: plugin.get_current_config()
    "Get current configuration for all loaded plugins."
def update_plugin_config(
    self,
    plugin_name: str, # Name of the plugin
    config: Dict[str, Any] # New configuration values
) -> bool: # True if successful
    "Update a plugin's configuration (hot-reload without restart)."
def reload_plugin(
    self,
    plugin_name: str, # Name of the plugin
    config: Optional[Dict[str, Any]] = None # Optional new configuration
) -> bool: # True if successful
    "Reload a plugin by terminating and restarting its Worker."
def get_plugin_stats(
    self,
    plugin_name: str # Name of the plugin
) -> Optional[Dict[str, Any]]: # Resource telemetry or None
    "Get resource usage stats for a plugin's Worker process."
async def execute_plugin_stream(
    self,
    plugin_name: str, # Name of the plugin
    *args,
    **kwargs
) -> AsyncGenerator[Any, None]: # Async generator yielding results
    "Execute a plugin with streaming response."

Classes

class PluginManager:
    def __init__(
        self,
        plugin_interface: Type[PluginInterface] = PluginInterface, # Base interface for type checking
        search_paths: Optional[List[Path]] = None # Custom manifest search paths
    )
    "Manages plugin discovery, loading, and lifecycle via process isolation."
    
    def __init__(
            self,
            plugin_interface: Type[PluginInterface] = PluginInterface, # Base interface for type checking
            search_paths: Optional[List[Path]] = None # Custom manifest search paths
        )
        "Initialize the plugin manager."
    
    def discover_manifests(self) -> List[PluginMeta]: # List of discovered plugin metadata
            """Discover plugins via JSON manifests in search paths."""
            self.discovered = []
            seen_plugins = set()
    
            for base_path in self.search_paths
        "Discover plugins via JSON manifests in search paths."
    
    def load_plugin(
            self,
            plugin_meta: PluginMeta, # Plugin metadata (with manifest attached)
            config: Optional[Dict[str, Any]] = None # Initial configuration
        ) -> bool: # True if successfully loaded
        "Load a plugin by spawning a Worker subprocess."
    
    def load_all(
            self,
            configs: Optional[Dict[str, Dict[str, Any]]] = None # Plugin name -> config mapping
        ) -> Dict[str, bool]: # Plugin name -> success mapping
        "Discover and load all available plugins."
    
    def unload_plugin(
            self,
            plugin_name: str # Name of the plugin to unload
        ) -> bool: # True if successfully unloaded
        "Unload a plugin and terminate its Worker process."
    
    def unload_all(self) -> None:
            """Unload all plugins and terminate all Worker processes."""
            for name in list(self.plugins.keys())
        "Unload all plugins and terminate all Worker processes."
    
    def get_plugin(
            self,
            plugin_name: str # Name of the plugin
        ) -> Optional[PluginInterface]: # Plugin proxy instance or None
        "Get a loaded plugin instance by name."
    
    def list_plugins(self) -> List[PluginMeta]: # List of loaded plugin metadata
            """List all loaded plugins."""
            return list(self.plugins.values())
    
        def execute_plugin(
            self,
            plugin_name: str, # Name of the plugin
            *args,
            **kwargs
        ) -> Any: # Plugin result
        "List all loaded plugins."
    
    def execute_plugin(
            self,
            plugin_name: str, # Name of the plugin
            *args,
            **kwargs
        ) -> Any: # Plugin result
        "Execute a plugin's main functionality (sync)."
    
    async def execute_plugin_async(
            self,
            plugin_name: str, # Name of the plugin
            *args,
            **kwargs
        ) -> Any: # Plugin result
        "Execute a plugin's main functionality (async)."
    
    def enable_plugin(
            self,
            plugin_name: str # Name of the plugin
        ) -> bool: # True if plugin was enabled
        "Enable a plugin."
    
    def disable_plugin(
            self,
            plugin_name: str # Name of the plugin
        ) -> bool: # True if plugin was disabled
        "Disable a plugin without unloading it."

Plugin Metadata (metadata.ipynb)

Data structures for plugin metadata

Import

from cjm_plugin_system.core.metadata import (
    PluginMeta
)

Classes

@dataclass
class PluginMeta:
    "Metadata about a plugin."
    
    name: str  # Plugin's unique identifier
    version: str  # Plugin's version string
    description: str = ''  # Brief description of the plugin's functionality
    author: str = ''  # Plugin author's name or organization
    package_name: str = ''  # Python package name containing the plugin
    instance: Optional[Any]  # Plugin instance (PluginInterface subclass)
    enabled: bool = True  # Whether the plugin is enabled

Remote Plugin Proxy (proxy.ipynb)

Bridge between Host application and isolated Worker processes

Import

from cjm_plugin_system.core.proxy import (
    RemotePluginProxy,
    execute_async,
    execute_stream_sync,
    execute_stream,
    get_stats,
    is_alive
)

Functions

def _maybe_serialize_input(
    self,
    obj: Any # Object to potentially serialize
) -> Any: # Serialized form (path string or original object)
    "Convert FileBackedDTO objects to file paths for zero-copy transfer."
def _prepare_payload(
    self,
    args: tuple, # Positional arguments
    kwargs: dict # Keyword arguments
) -> Dict[str, Any]: # JSON-serializable payload
    "Prepare arguments for HTTP transmission."
async def execute_async(
    self,
    *args,
    **kwargs
) -> Any: # Plugin result
    "Execute the plugin asynchronously."
def execute_stream_sync(self, *args, **kwargs) -> Generator[Any, None, None]
    "Synchronous wrapper for streaming (blocking)."
async def execute_stream(
    self,
    *args,
    **kwargs
) -> AsyncGenerator[Any, None]: # Yields parsed JSON chunks
    "Execute with streaming response (async generator)."
def get_stats(self) -> Dict[str, Any]: # Process telemetry
    """Get worker process resource usage."""
    with httpx.Client() as client
    "Get worker process resource usage."
def is_alive(self) -> bool: # True if worker is responsive
    """Check if the worker process is still running and responsive."""
    if not self.process or self.process.poll() is not None
    "Check if the worker process is still running and responsive."
def __enter__(self):
    """Enter context manager."""
    return self

def __exit__(self, exc_type, exc_val, exc_tb)
    "Enter context manager."
def __exit__(self, exc_type, exc_val, exc_tb):
    """Exit context manager and cleanup."""
    self.cleanup()
    return False

async def __aenter__(self)
    "Exit context manager and cleanup."
async def __aenter__(self):
    """Enter async context manager."""
    return self

async def __aexit__(self, exc_type, exc_val, exc_tb)
    "Enter async context manager."
async def __aexit__(self, exc_type, exc_val, exc_tb)
    "Exit async context manager and cleanup."

Classes

class RemotePluginProxy:
    def __init__(
        self,
        manifest: Dict[str, Any] # Plugin manifest with python_path, module, class, etc.
    )
    "Proxy that forwards plugin calls to an isolated Worker subprocess."
    
    def __init__(
            self,
            manifest: Dict[str, Any] # Plugin manifest with python_path, module, class, etc.
        )
        "Initialize proxy and start the worker process."
    
    def name(self) -> str: # Plugin name from manifest
            """Plugin name."""
            return self.manifest.get('name', 'unknown')
        
        @property
        def version(self) -> str: # Plugin version from manifest
        "Plugin name."
    
    def version(self) -> str: # Plugin version from manifest
            """Plugin version."""
            return self.manifest.get('version', '0.0.0')
    
        def _get_free_port(self) -> int
        "Plugin version."
    
    def initialize(
            self,
            config: Optional[Dict[str, Any]] = None # Configuration dictionary
        ) -> None
        "Initialize or reconfigure the plugin."
    
    def execute(
            self,
            *args,
            **kwargs
        ) -> Any: # Plugin result
        "Execute the plugin synchronously."
    
    def get_config_schema(self) -> Dict[str, Any]: # JSON Schema
            """Get the plugin's configuration schema."""
            with httpx.Client() as client
        "Get the plugin's configuration schema."
    
    def get_current_config(self) -> Dict[str, Any]: # Current config values
            """Get the plugin's current configuration."""
            with httpx.Client() as client
        "Get the plugin's current configuration."
    
    def cleanup(self) -> None:
            """Clean up plugin resources and terminate worker process."""
            # Send cleanup request to worker
            try
        "Clean up plugin resources and terminate worker process."

Configuration Validation (validation.ipynb)

Validation helpers for plugin configuration dataclasses

Import

from cjm_plugin_system.utils.validation import (
    T,
    SCHEMA_TITLE,
    SCHEMA_DESC,
    SCHEMA_MIN,
    SCHEMA_MAX,
    SCHEMA_ENUM,
    SCHEMA_MIN_LEN,
    SCHEMA_MAX_LEN,
    SCHEMA_PATTERN,
    SCHEMA_FORMAT,
    validate_field_value,
    validate_config,
    config_to_dict,
    dict_to_config,
    extract_defaults,
    dataclass_to_jsonschema
)

Functions

def validate_field_value(
    value:Any, # Value to validate
    metadata:Dict[str, Any], # Field metadata containing constraints
    field_name:str="" # Field name for error messages
) -> Tuple[bool, Optional[str]]: # (is_valid, error_message)
    "Validate a value against field metadata constraints."
def validate_config(
    config:Any # Configuration dataclass instance to validate
) -> Tuple[bool, Optional[str]]: # (is_valid, error_message)
    "Validate all fields in a configuration dataclass against their metadata constraints."
def config_to_dict(
    config:Any # Configuration dataclass instance
) -> Dict[str, Any]: # Dictionary representation of the configuration
    "Convert a configuration dataclass instance to a dictionary."
def dict_to_config(
    config_class:Type[T], # Configuration dataclass type
    data:Optional[Dict[str, Any]]=None, # Dictionary with configuration values
    validate:bool=False # Whether to validate against metadata constraints
) -> T: # Instance of the configuration dataclass
    "Create a configuration dataclass instance from a dictionary."
def extract_defaults(
    config_class:Type # Configuration dataclass type
) -> Dict[str, Any]: # Default values from the dataclass
    "Extract default values from a configuration dataclass type."
def _python_type_to_json_type(
    python_type:type # Python type annotation to convert
) -> Dict[str, Any]: # JSON schema type definition
    "Convert Python type to JSON schema type."
def dataclass_to_jsonschema(
    cls:type # Dataclass with field metadata
) -> Dict[str, Any]: # JSON schema dictionary
    "Convert a dataclass to a JSON schema for form generation."

Variables

T
SCHEMA_TITLE = 'title'  # Display title for the field
SCHEMA_DESC = 'description'  # Help text description
SCHEMA_MIN = 'minimum'  # Minimum value for numbers
SCHEMA_MAX = 'maximum'  # Maximum value for numbers
SCHEMA_ENUM = 'enum'  # Allowed values for dropdowns
SCHEMA_MIN_LEN = 'minLength'  # Minimum string length
SCHEMA_MAX_LEN = 'maxLength'  # Maximum string length
SCHEMA_PATTERN = 'pattern'  # Regex pattern for strings
SCHEMA_FORMAT = 'format'  # String format (email, uri, date, etc.)

Universal Worker (worker.ipynb)

FastAPI server that runs inside isolated plugin environments

Import

from cjm_plugin_system.core.worker import (
    EnhancedJSONEncoder,
    parent_monitor,
    create_app,
    run_worker
)

Functions

def parent_monitor(
    ppid: int # Parent process ID to monitor
) -> None
    "Monitor parent process and terminate self if parent dies."
def create_app(
    module_name: str, # Python module path (e.g., "my_plugin.plugin")
    class_name: str   # Plugin class name (e.g., "WhisperPlugin")
) -> FastAPI: # Configured FastAPI application
    "Create FastAPI app that hosts the specified plugin."
def run_worker() -> None:
    """CLI entry point for running the worker."""
    parser = argparse.ArgumentParser(description="Universal Plugin Worker")
    parser.add_argument("--module", required=True, help="Plugin module path")
    parser.add_argument("--class", dest="class_name", required=True, help="Plugin class name")
    parser.add_argument("--port", type=int, required=True, help="Port to listen on")
    parser.add_argument("--ppid", type=int, required=False, help="Parent PID to monitor")
    args = parser.parse_args()

    # Start watchdog if parent PID provided
    if args.ppid
    "CLI entry point for running the worker."

Classes

class EnhancedJSONEncoder(JSONEncoder):
    "JSON encoder that handles dataclasses and other common types."
    
    def default(
            self,
            o: Any # Object to encode
        ) -> Any: # JSON-serializable representation
        "Convert non-serializable objects to serializable form."

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cjm_plugin_system-0.0.11.tar.gz (29.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cjm_plugin_system-0.0.11-py3-none-any.whl (28.3 kB view details)

Uploaded Python 3

File details

Details for the file cjm_plugin_system-0.0.11.tar.gz.

File metadata

  • Download URL: cjm_plugin_system-0.0.11.tar.gz
  • Upload date:
  • Size: 29.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for cjm_plugin_system-0.0.11.tar.gz
Algorithm Hash digest
SHA256 ee949bc5f5db3386ebc9745cdf6e9b0628e34907b580a2346e4a86891c2589af
MD5 91deda603f1281ec186c46a84343b7b7
BLAKE2b-256 a7d464793016c797194204fab9fca7b5c21d88bcdd38a4d8e664461411967a26

See more details on using hashes here.

File details

Details for the file cjm_plugin_system-0.0.11-py3-none-any.whl.

File metadata

File hashes

Hashes for cjm_plugin_system-0.0.11-py3-none-any.whl
Algorithm Hash digest
SHA256 b0a2980cd0c86c5d48504039d93e5e4c5ea10dce649804576a5bf530fdf94f75
MD5 3da62a8fd95e2c2a12ea4dd2f169f1d2
BLAKE2b-256 594ea016cafcfd0e84f93486c95de10d7a8d105c6e2096983065bf31f5ea6249

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page