Skip to main content

Defines the standardized interface and data structures for Context Graph plugins, enabling the semantic linking, decomposition, and enrichment of multi-modal content.

Project description

cjm-graph-plugin-system

Install

pip install cjm_graph_plugin_system

Project Structure

nbs/
├── utils/ (2)
│   ├── mermaid.ipynb  # Convert GraphContext objects to Mermaid.js diagram strings for visualization
│   └── slices.ipynb   # Typed slice dataclasses for specifying referenced content regions in SourceRef
├── core.ipynb              # DTOs for Context Graph operations with FileBackedDTO support for zero-copy transfer
└── plugin_interface.ipynb  # Domain-specific plugin interface for Context Graphs

Total: 4 notebooks across 1 directory

Module Dependencies

graph LR
    core[core<br/>Core Data Structures]
    plugin_interface[plugin_interface<br/>Graph Plugin Interface]
    utils_mermaid[utils.mermaid<br/>Mermaid Diagram Generation]
    utils_slices[utils.slices<br/>Segment Slice Specifications]

    plugin_interface --> core
    utils_mermaid --> core
    utils_slices --> core

3 cross-module dependencies detected

CLI Reference

No CLI commands found in this project.

Module Overview

Detailed documentation for each module in the project:

Core Data Structures (core.ipynb)

DTOs for Context Graph operations with FileBackedDTO support for zero-copy transfer

Import

from cjm_graph_plugin_system.core import (
    SourceRef,
    GraphNode,
    GraphEdge,
    GraphContext,
    GraphQuery
)

Classes

@dataclass
class SourceRef:
    "A pointer to external data in another plugin's domain."
    
    plugin_name: str  # e.g., "cjm-transcription-plugin-voxtral-hf"
    table_name: str  # e.g., "transcriptions"
    row_id: str  # e.g., "b0ceddd3-..." (typically a job_id)
    content_hash: str  # Hash of consumed content in "algo:hexdigest" format
    segment_slice: Optional[str]  # Optional slice: "char:0-500" or "timestamp:00:10-00:20"
    
    def to_dict(self) -> Dict[str, Any]:  # Dictionary representation for JSON serialization
            """Convert to dictionary."""
            return asdict(self)
    
        def verify(
            self,
            current_content: bytes  # Current content bytes to verify against stored hash
        ) -> bool:  # True if content matches the stored hash
        "Convert to dictionary."
    
    def verify(
            self,
            current_content: bytes  # Current content bytes to verify against stored hash
        ) -> bool:  # True if content matches the stored hash
        "Check if referenced content still matches the stored hash."
    
    def compute_hash(
            content: bytes,  # Content to hash
            algo: str = "sha256"  # Hash algorithm name
        ) -> str:  # Hash string in "algo:hexdigest" format
        "Compute a content hash string for use in SourceRef."
@dataclass
class GraphNode:
    "Represents an entity in the Context Graph."
    
    id: str  # UUID
    label: str  # e.g., "Person", "Concept", "Correction"
    properties: Dict[str, Any] = field(...)  # Arbitrary metadata
    sources: List[SourceRef] = field(...)  # Links to external plugins
    created_at: Optional[float]  # Unix timestamp when created
    updated_at: Optional[float]  # Unix timestamp when last updated
    
    def to_dict(self) -> Dict[str, Any]:  # Dictionary representation for JSON serialization
            """Convert to dictionary with nested sources."""
            return {
                "id": self.id,
        "Convert to dictionary with nested sources."
@dataclass
class GraphEdge:
    "Represents a relationship between two nodes."
    
    id: str  # UUID
    source_id: str  # Origin node UUID
    target_id: str  # Destination node UUID
    relation_type: str  # e.g., "MENTIONS", "CORRECTS", "AUTHORED_BY"
    properties: Dict[str, Any] = field(...)  # Arbitrary metadata
    created_at: Optional[float]  # Unix timestamp when created
    updated_at: Optional[float]  # Unix timestamp when last updated
    
    def to_dict(self) -> Dict[str, Any]:  # Dictionary representation for JSON serialization
        "Convert to dictionary."
@dataclass
class GraphContext:
    "Container for graph query results (a subgraph)."
    
    nodes: List[GraphNode]  # Nodes in the subgraph
    edges: List[GraphEdge]  # Edges in the subgraph
    metadata: Dict[str, Any] = field(...)  # Query metadata, stats, etc.
    
    def to_temp_file(self) -> str:  # Absolute path to temporary JSON file
            """Save graph data to a temp file for zero-copy transfer."""
            tmp = tempfile.NamedTemporaryFile(suffix=".json", delete=False, mode='w')
            
            data = {
                "nodes": [n.to_dict() for n in self.nodes],
        "Save graph data to a temp file for zero-copy transfer."
    
    def to_dict(self) -> Dict[str, Any]:  # Dictionary representation for JSON serialization
            """Convert to dictionary."""
            return {
                "nodes": [n.to_dict() for n in self.nodes],
        "Convert to dictionary."
    
    def from_file(
            cls,
            filepath: str  # Path to JSON file
        ) -> "GraphContext":  # Reconstructed GraphContext
        "Load graph context from a JSON file."
    
    def from_dict(
            cls,
            data: Dict[str, Any]  # Dictionary with nodes, edges, metadata
        ) -> "GraphContext":  # Reconstructed GraphContext
        "Load graph context from a dictionary."
@dataclass
class GraphQuery:
    "A standardized query object for graph operations."
    
    query: str  # Raw query string (SQL, Cypher, etc.)
    parameters: Dict[str, Any] = field(...)  # Query parameters
    limit: int = 100  # Max results to return
    depth: int = 1  # Traversal depth for neighborhood queries
    
    def to_dict(self) -> Dict[str, Any]:  # Dictionary representation for JSON serialization
        "Convert to dictionary."

Mermaid Diagram Generation (mermaid.ipynb)

Convert GraphContext objects to Mermaid.js diagram strings for visualization

Import

from cjm_graph_plugin_system.utils.mermaid import (
    context_to_mermaid
)

Functions

def context_to_mermaid(
    ctx: GraphContext,  # The GraphContext to visualize
    direction: str = "TD",  # Diagram direction: "TD" (top-down) or "LR" (left-right)
    node_color_map: Optional[Dict[str, str]] = None  # Map of node labels to CSS colors
) -> str:  # Mermaid.js diagram string
    "Convert a GraphContext into a Mermaid.js diagram string."

Graph Plugin Interface (plugin_interface.ipynb)

Domain-specific plugin interface for Context Graphs

Import

from cjm_graph_plugin_system.plugin_interface import (
    GraphPlugin
)

Classes

class GraphPlugin(PluginInterface):
    "Abstract base class for all Context Graph plugins."
    
    def execute(
            self,
            action: str = "get_schema",  # Action to perform (see docstring for available actions)
            **kwargs
        ) -> Dict[str, Any]:  # JSON-serializable result
        "Execute a graph operation. This is the main entry point for RemotePluginProxy.

Dispatches to the appropriate method based on `action` parameter.
All return values are JSON-serializable dictionaries for HTTP transport."
    
    def add_nodes(
            self,
            nodes: List[GraphNode]  # Nodes to create
        ) -> List[str]:  # Created node IDs
        "Bulk create nodes."
    
    def add_edges(
            self,
            edges: List[GraphEdge]  # Edges to create
        ) -> List[str]:  # Created edge IDs
        "Bulk create edges."
    
    def get_node(
            self,
            node_id: str  # UUID of node to retrieve
        ) -> Optional[GraphNode]:  # Node or None if not found
        "Get a single node by ID."
    
    def get_edge(
            self,
            edge_id: str  # UUID of edge to retrieve
        ) -> Optional[GraphEdge]:  # Edge or None if not found
        "Get a single edge by ID."
    
    def get_context(
            self,
            node_id: str,  # Starting node UUID
            depth: int = 1,  # Traversal depth (1 = immediate neighbors)
            filter_labels: Optional[List[str]] = None  # Only include nodes with these labels
        ) -> GraphContext:  # Subgraph containing node and its neighborhood
        "Get the neighborhood of a specific node."
    
    def find_nodes_by_source(
            self,
            source_ref: SourceRef  # External resource reference
        ) -> List[GraphNode]:  # Nodes attached to this source
        "Find all nodes linked to a specific external resource."
    
    def find_nodes_by_label(
            self,
            label: str,  # Node label to search for
            limit: int = 100  # Max results
        ) -> List[GraphNode]:  # Matching nodes
        "Find nodes by label."
    
    def update_node(
            self,
            node_id: str,  # UUID of node to update
            properties: Dict[str, Any]  # Properties to merge/update
        ) -> bool:  # True if successful
        "Partial update of node properties."
    
    def update_edge(
            self,
            edge_id: str,  # UUID of edge to update
            properties: Dict[str, Any]  # Properties to merge/update
        ) -> bool:  # True if successful
        "Partial update of edge properties."
    
    def delete_nodes(
            self,
            node_ids: List[str],  # UUIDs of nodes to delete
            cascade: bool = True  # Also delete connected edges
        ) -> int:  # Number of nodes deleted
        "Delete nodes (and optionally connected edges)."
    
    def delete_edges(
            self,
            edge_ids: List[str]  # UUIDs of edges to delete
        ) -> int:  # Number of edges deleted
        "Delete edges."
    
    def get_schema(self) -> Dict[str, Any]:  # Graph schema/ontology
            """Return the current ontology/schema of the graph."""
            ...
    
        @abstractmethod
        def import_graph(
            self,
            graph_data: GraphContext,  # Data to import
            merge_strategy: str = "overwrite"  # "overwrite", "skip", or "merge"
        ) -> Dict[str, int]:  # Import statistics {nodes_created, edges_created, ...}
        "Return the current ontology/schema of the graph."
    
    def import_graph(
            self,
            graph_data: GraphContext,  # Data to import
            merge_strategy: str = "overwrite"  # "overwrite", "skip", or "merge"
        ) -> Dict[str, int]:  # Import statistics {nodes_created, edges_created, ...}
        "Bulk import a GraphContext (e.g., from backup or another plugin)."
    
    def export_graph(
            self,
            filter_query: Optional[GraphQuery] = None  # Optional filter
        ) -> GraphContext:  # Exported subgraph or full graph
        "Export the entire graph or a filtered subset."

Segment Slice Specifications (slices.ipynb)

Typed slice dataclasses for specifying referenced content regions in SourceRef

Import

from cjm_graph_plugin_system.utils.slices import (
    SliceSpec,
    CharSlice,
    TimestampSlice,
    FrameSlice,
    PageSlice,
    LineSlice,
    FullContent,
    parse_slice
)

Functions

def parse_slice(
    s: str  # Slice string to parse (e.g., "char:0-500", "timestamp:10.5-30.0")
) -> SliceSpec:  # Parsed slice specification
    "Parse a slice string into a typed SliceSpec."

Classes

@runtime_checkable
class SliceSpec(Protocol):
    "Protocol for typed segment slice specifications."
    
    def to_slice_string(self) -> str:  # Serialized slice string for SourceRef.segment_slice
        "Serialize to a slice string."
@dataclass
class CharSlice:
    "Character-range slice for text content."
    
    start: int  # Start character index (0-indexed)
    end: int  # End character index (exclusive)
    
    def to_slice_string(self) -> str:  # e.g., "char:0-500"
        "Serialize to slice string."
@dataclass
class TimestampSlice:
    "Temporal slice for audio/video content."
    
    start: float  # Start time in seconds
    end: float  # End time in seconds
    
    def to_slice_string(self) -> str:  # e.g., "timestamp:10.5-30.0"
        "Serialize to slice string."
@dataclass
class FrameSlice:
    "Frame-range slice for video content."
    
    start: int  # Start frame number
    end: int  # End frame number
    
    def to_slice_string(self) -> str:  # e.g., "frame:0-120"
        "Serialize to slice string."
@dataclass
class PageSlice:
    "Page slice for document content (PDFs, EPUBs)."
    
    page: int  # Page number (1-indexed)
    bbox: Optional[str]  # Optional bounding box "x1,y1,x2,y2"
    
    def to_slice_string(self) -> str:  # e.g., "page:3" or "page:3:bbox:10,20,300,400"
            """Serialize to slice string."""
            if self.bbox
        "Serialize to slice string."
@dataclass
class LineSlice:
    "Line-range slice for code or structured text."
    
    start: int  # Start line number (0-indexed)
    end: int  # End line number (exclusive)
    
    def to_slice_string(self) -> str:  # e.g., "line:10-25"
        "Serialize to slice string."
@dataclass
class FullContent:
    "Reference to complete content (no slicing)."
    
    content_type: str = 'text'  # Content type: "text", "audio", "image", etc.
    
    def to_slice_string(self) -> str:  # e.g., "full_text", "full_audio"
        "Serialize to slice string."

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cjm_graph_plugin_system-0.0.5.tar.gz (18.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cjm_graph_plugin_system-0.0.5-py3-none-any.whl (17.7 kB view details)

Uploaded Python 3

File details

Details for the file cjm_graph_plugin_system-0.0.5.tar.gz.

File metadata

  • Download URL: cjm_graph_plugin_system-0.0.5.tar.gz
  • Upload date:
  • Size: 18.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for cjm_graph_plugin_system-0.0.5.tar.gz
Algorithm Hash digest
SHA256 e6f68076baac691e18e78da2376164013f74b1861292961ae309f2963cab3e4a
MD5 83d463b8100d2c9eb52aadcb41c5ccba
BLAKE2b-256 03712d75a6ef65bf56433a99c85ad755376bd76f7dcdfb0e127535bed7d00ad3

See more details on using hashes here.

File details

Details for the file cjm_graph_plugin_system-0.0.5-py3-none-any.whl.

File metadata

File hashes

Hashes for cjm_graph_plugin_system-0.0.5-py3-none-any.whl
Algorithm Hash digest
SHA256 dad4a3529c3767a4d3ce1b11663742d707b114fb75713551799743d79cec1d24
MD5 f4ddd904c39c80038a617e3dafc74dc0
BLAKE2b-256 24f8e350fc12beb7c641cd3bab6e01c2e58ce3a2ff2e7dc1bf7fc73d9f245823

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page