Skip to main content

Defines the standardized interface and data structures for Context Graph plugins, enabling the semantic linking, decomposition, and enrichment of multi-modal content.

Project description

cjm-graph-plugin-system

Install

pip install cjm_graph_plugin_system

Project Structure

nbs/
├── core.ipynb             # DTOs for Context Graph operations with FileBackedDTO support for zero-copy transfer
└── plugin_interface.ipynb # Domain-specific plugin interface for Context Graphs

Total: 2 notebooks

Module Dependencies

graph LR
    core[core<br/>Core Data Structures]
    plugin_interface[plugin_interface<br/>Graph Plugin Interface]

    plugin_interface --> core

1 cross-module dependencies detected

CLI Reference

No CLI commands found in this project.

Module Overview

Detailed documentation for each module in the project:

Core Data Structures (core.ipynb)

DTOs for Context Graph operations with FileBackedDTO support for zero-copy transfer

Import

from cjm_graph_plugin_system.core import (
    SourceRef,
    GraphNode,
    GraphEdge,
    GraphContext,
    GraphQuery
)

Classes

@dataclass
class SourceRef:
    "A pointer to external data in another plugin's domain."
    
    plugin_name: str  # e.g., "cjm-transcription-plugin-voxtral-hf"
    table_name: str  # e.g., "transcriptions"
    row_id: str  # e.g., "b0ceddd3-..." (typically a job_id)
    segment_slice: Optional[str]  # Optional slice: "char:0-500" or "timestamp:00:10-00:20"
    
    def to_dict(self) -> Dict[str, Any]:  # Dictionary representation for JSON serialization
        "Convert to dictionary."
@dataclass
class GraphNode:
    "Represents an entity in the Context Graph."
    
    id: str  # UUID
    label: str  # e.g., "Person", "Concept", "Correction"
    properties: Dict[str, Any] = field(...)  # Arbitrary metadata
    sources: List[SourceRef] = field(...)  # Links to external plugins
    
    def to_dict(self) -> Dict[str, Any]:  # Dictionary representation for JSON serialization
            """Convert to dictionary with nested sources."""
            return {
                "id": self.id,
        "Convert to dictionary with nested sources."
@dataclass
class GraphEdge:
    "Represents a relationship between two nodes."
    
    id: str  # UUID
    source_id: str  # Origin node UUID
    target_id: str  # Destination node UUID
    relation_type: str  # e.g., "MENTIONS", "CORRECTS", "AUTHORED_BY"
    properties: Dict[str, Any] = field(...)  # Arbitrary metadata
    
    def to_dict(self) -> Dict[str, Any]:  # Dictionary representation for JSON serialization
        "Convert to dictionary."
@dataclass
class GraphContext:
    "Container for graph query results (a subgraph)."
    
    nodes: List[GraphNode]  # Nodes in the subgraph
    edges: List[GraphEdge]  # Edges in the subgraph
    metadata: Dict[str, Any] = field(...)  # Query metadata, stats, etc.
    
    def to_temp_file(self) -> str:  # Absolute path to temporary JSON file
            """Save graph data to a temp file for zero-copy transfer."""
            tmp = tempfile.NamedTemporaryFile(suffix=".json", delete=False, mode='w')
            
            data = {
                "nodes": [n.to_dict() for n in self.nodes],
        "Save graph data to a temp file for zero-copy transfer."
    
    def to_dict(self) -> Dict[str, Any]:  # Dictionary representation for JSON serialization
            """Convert to dictionary."""
            return {
                "nodes": [n.to_dict() for n in self.nodes],
        "Convert to dictionary."
    
    def from_file(
            cls,
            filepath: str  # Path to JSON file
        ) -> "GraphContext":  # Reconstructed GraphContext
        "Load graph context from a JSON file."
    
    def from_dict(
            cls,
            data: Dict[str, Any]  # Dictionary with nodes, edges, metadata
        ) -> "GraphContext":  # Reconstructed GraphContext
        "Load graph context from a dictionary."
@dataclass
class GraphQuery:
    "A standardized query object for graph operations."
    
    query: str  # Raw query string (SQL, Cypher, etc.)
    parameters: Dict[str, Any] = field(...)  # Query parameters
    limit: int = 100  # Max results to return
    depth: int = 1  # Traversal depth for neighborhood queries
    
    def to_dict(self) -> Dict[str, Any]:  # Dictionary representation for JSON serialization
        "Convert to dictionary."

Graph Plugin Interface (plugin_interface.ipynb)

Domain-specific plugin interface for Context Graphs

Import

from cjm_graph_plugin_system.plugin_interface import (
    GraphPlugin
)

Classes

class GraphPlugin(PluginInterface):
    "Abstract base class for all Context Graph plugins."
    
    def execute(
            self,
            action: str = "get_schema",  # Action to perform (see docstring for available actions)
            **kwargs
        ) -> Dict[str, Any]:  # JSON-serializable result
        "Execute a graph operation. This is the main entry point for RemotePluginProxy.

Dispatches to the appropriate method based on `action` parameter.
All return values are JSON-serializable dictionaries for HTTP transport."
    
    def add_nodes(
            self,
            nodes: List[GraphNode]  # Nodes to create
        ) -> List[str]:  # Created node IDs
        "Bulk create nodes."
    
    def add_edges(
            self,
            edges: List[GraphEdge]  # Edges to create
        ) -> List[str]:  # Created edge IDs
        "Bulk create edges."
    
    def get_node(
            self,
            node_id: str  # UUID of node to retrieve
        ) -> Optional[GraphNode]:  # Node or None if not found
        "Get a single node by ID."
    
    def get_edge(
            self,
            edge_id: str  # UUID of edge to retrieve
        ) -> Optional[GraphEdge]:  # Edge or None if not found
        "Get a single edge by ID."
    
    def get_context(
            self,
            node_id: str,  # Starting node UUID
            depth: int = 1,  # Traversal depth (1 = immediate neighbors)
            filter_labels: Optional[List[str]] = None  # Only include nodes with these labels
        ) -> GraphContext:  # Subgraph containing node and its neighborhood
        "Get the neighborhood of a specific node."
    
    def find_nodes_by_source(
            self,
            source_ref: SourceRef  # External resource reference
        ) -> List[GraphNode]:  # Nodes attached to this source
        "Find all nodes linked to a specific external resource."
    
    def find_nodes_by_label(
            self,
            label: str,  # Node label to search for
            limit: int = 100  # Max results
        ) -> List[GraphNode]:  # Matching nodes
        "Find nodes by label."
    
    def update_node(
            self,
            node_id: str,  # UUID of node to update
            properties: Dict[str, Any]  # Properties to merge/update
        ) -> bool:  # True if successful
        "Partial update of node properties."
    
    def update_edge(
            self,
            edge_id: str,  # UUID of edge to update
            properties: Dict[str, Any]  # Properties to merge/update
        ) -> bool:  # True if successful
        "Partial update of edge properties."
    
    def delete_nodes(
            self,
            node_ids: List[str],  # UUIDs of nodes to delete
            cascade: bool = True  # Also delete connected edges
        ) -> int:  # Number of nodes deleted
        "Delete nodes (and optionally connected edges)."
    
    def delete_edges(
            self,
            edge_ids: List[str]  # UUIDs of edges to delete
        ) -> int:  # Number of edges deleted
        "Delete edges."
    
    def get_schema(self) -> Dict[str, Any]:  # Graph schema/ontology
            """Return the current ontology/schema of the graph."""
            ...
    
        @abstractmethod
        def import_graph(
            self,
            graph_data: GraphContext,  # Data to import
            merge_strategy: str = "overwrite"  # "overwrite", "skip", or "merge"
        ) -> Dict[str, int]:  # Import statistics {nodes_created, edges_created, ...}
        "Return the current ontology/schema of the graph."
    
    def import_graph(
            self,
            graph_data: GraphContext,  # Data to import
            merge_strategy: str = "overwrite"  # "overwrite", "skip", or "merge"
        ) -> Dict[str, int]:  # Import statistics {nodes_created, edges_created, ...}
        "Bulk import a GraphContext (e.g., from backup or another plugin)."
    
    def export_graph(
            self,
            filter_query: Optional[GraphQuery] = None  # Optional filter
        ) -> GraphContext:  # Exported subgraph or full graph
        "Export the entire graph or a filtered subset."

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cjm_graph_plugin_system-0.0.2.tar.gz (13.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cjm_graph_plugin_system-0.0.2-py3-none-any.whl (13.2 kB view details)

Uploaded Python 3

File details

Details for the file cjm_graph_plugin_system-0.0.2.tar.gz.

File metadata

  • Download URL: cjm_graph_plugin_system-0.0.2.tar.gz
  • Upload date:
  • Size: 13.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for cjm_graph_plugin_system-0.0.2.tar.gz
Algorithm Hash digest
SHA256 6f25a0b94681fc7e2281ebfb0b1ca3de9f84880060c976746ab2478eedc8000d
MD5 550a9b2351b6e6b927fb9286584a4128
BLAKE2b-256 c636a6542d004722487cdf1aeaa0dfc09067dd3ba5cd049f09ccb75e819ee088

See more details on using hashes here.

File details

Details for the file cjm_graph_plugin_system-0.0.2-py3-none-any.whl.

File metadata

File hashes

Hashes for cjm_graph_plugin_system-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 791134791808144400064c1267608b8aeae6efdddea90427e36678a8030b2df8
MD5 e48cd60a9b53b1a4163761a372d698ee
BLAKE2b-256 a19229b1d1b530844aa257ebf9f4b999994f789d48ce34c19b7860eaa13b3eec

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page