Skip to main content

Typed graph-storage task adapter interface: the repository-style contract (CRUD, typed query execution, introspection) between graph-storage tool capabilities and the substrate.

Project description

cjm-graph-storage-adapter-interface

Install

pip install cjm_graph_storage_adapter_interface

Project Structure

nbs/
├── adapter.ipynb # The graph-storage task contract — `GraphStorageToolProtocol` (BORN REAL) + the `GraphStorageAdapter` ABC. Pass-2 Thread 5: graph storage is a tool capability (sqlite-graph / future postgres-graph) plus **ONE multi-method, repository-style adapter** (CRUD + typed query + introspection) — the canonical multi-op adapter case, NOT 14 separate adapters. The adapter is **domain-neutral** (stores generic `GraphNode`/`GraphEdge`; `Document`/`Segment`/`Correction` mapping stays consumer-side, lock 4) and its implementations run **in-worker beside their tool** (Thread 3).
└── generic.ipynb # `GenericGraphStorageAdapter` — the generic (backend-agnostic) implementation of the graph-storage task contract. Thread-3 packaging: the GENERIC adapter impl is the dominant case and lives in the dep-light interface lib, reused across all compatible backend tools. Its job is **wire-input normalization + forwarding**: accept wire dicts or typed objects at the task boundary, hand the tool purely-typed arguments, pass typed results through (they wire-encode at the worker boundary).

Total: 2 notebooks

Module Dependencies

graph LR
    adapter["adapter<br/>adapter"]
    generic["generic<br/>generic"]

    generic --> adapter

1 cross-module dependencies detected

CLI Reference

No CLI commands found in this project.

Module Overview

Detailed documentation for each module in the project:

adapter (adapter.ipynb)

The graph-storage task contract — GraphStorageToolProtocol (BORN REAL) + the GraphStorageAdapter ABC. Pass-2 Thread 5: graph storage is a tool capability (sqlite-graph / future postgres-graph) plus ONE multi-method, repository-style adapter (CRUD + typed query + introspection) — the canonical multi-op adapter case, NOT 14 separate adapters. The adapter is domain-neutral (stores generic GraphNode/GraphEdge; Document/Segment/Correction mapping stays consumer-side, lock 4) and its implementations run in-worker beside their tool (Thread 3).

Import

from cjm_graph_storage_adapter_interface.adapter import (
    GraphStorageToolProtocol,
    GraphStorageAdapter
)

Classes

@runtime_checkable
class GraphStorageToolProtocol(Protocol):
    """
    Structural contract for graph-storage tool capabilities (BORN REAL —
    derived from the stage-4 typed surface, not a fused-era mirror).
    
    Each backend tool owns its per-backend translation of the typed query
    expressions and its raw escape (refusing `RawQuery.backend` mismatches).
    The surface is purely typed: graph nouns + query expressions in, typed
    results out.
    """
    
    def add_nodes(self, nodes: List[GraphNode]) -> List[str]: ...
        def add_edges(self, edges: List[GraphEdge]) -> List[str]: ...
    
    def add_edges(self, edges: List[GraphEdge]) -> List[str]: ...
    
        # -- point reads + neighborhood + reverse provenance index
        def get_node(self, node_id: str) -> Optional[GraphNode]: ...
    
    def get_node(self, node_id: str) -> Optional[GraphNode]: ...
        def get_edge(self, edge_id: str) -> Optional[GraphEdge]: ...
    
    def get_edge(self, edge_id: str) -> Optional[GraphEdge]: ...
        def get_context(self, node_id: str, depth: int = 1,
                        filter_labels: Optional[List[str]] = None) -> GraphContext: ...
    
    def get_context(self, node_id: str, depth: int = 1,
                        filter_labels: Optional[List[str]] = None) -> GraphContext: ...
    
    def find_nodes_by_source(self, source_ref: SourceRef) -> List[GraphNode]: ...
        def find_nodes_by_label(self, label: str, limit: int = 100) -> List[GraphNode]: ...
    
    def find_nodes_by_label(self, label: str, limit: int = 100) -> List[GraphNode]: ...
    
        # -- the typed query surface (stage 4; scale-shaped, portable)
        def query_nodes(self, query: NodeQuery) -> NodeQueryResult: ...
    
    def query_nodes(self, query: NodeQuery) -> NodeQueryResult: ...
        def query_edges(self, query: EdgeQuery) -> EdgeQueryResult: ...
    
    def query_edges(self, query: EdgeQuery) -> EdgeQueryResult: ...
        def raw_query(self, query: RawQuery) -> RawQueryResult: ...
    
    def raw_query(self, query: RawQuery) -> RawQueryResult: ...
    
        # -- update / delete
        def update_node(self, node_id: str, properties: Dict[str, Any]) -> bool: ...
    
    def update_node(self, node_id: str, properties: Dict[str, Any]) -> bool: ...
        def update_edge(self, edge_id: str, properties: Dict[str, Any]) -> bool: ...
    
    def update_edge(self, edge_id: str, properties: Dict[str, Any]) -> bool: ...
        def delete_nodes(self, node_ids: List[str], cascade: bool = True) -> int: ...
    
    def delete_nodes(self, node_ids: List[str], cascade: bool = True) -> int: ...
        def delete_edges(self, edge_ids: List[str]) -> int: ...
    
    def delete_edges(self, edge_ids: List[str]) -> int: ...
    
        # -- introspection / interchange
        def get_schema(self) -> Dict[str, Any]: ...
    
    def get_schema(self) -> Dict[str, Any]: ...
        def integrity_check(self) -> Dict[str, Any]: ...
    
    def integrity_check(self) -> Dict[str, Any]: ...
        def import_graph(self, graph_data: GraphContext,
                         merge_strategy: str = "overwrite") -> Dict[str, int]: ...
    
    def import_graph(self, graph_data: GraphContext,
                         merge_strategy: str = "overwrite") -> Dict[str, int]: ...
    
    def export_graph(self, filter_query: Optional[NodeQuery] = None) -> GraphContext: ...
class GraphStorageAdapter:
    def __init__(
        self,
        tool: GraphStorageToolProtocol,  # The bound tool capability instance (worker-side binding)
    )
    """
    The graph-storage task adapter — ONE multi-method, repository-style
    typed contract (pass-2 Thread 5 lock 1).
    
    Domain-neutral by lock 4: it stores generic `GraphNode`/`GraphEdge`;
    domain node construction (`Document`/`Segment`/`Correction`) stays in the
    consumer (or CR-18 sugar later). A domain-specific op such as
    `verify_spine` is deliberately OFF this surface — verification composes
    from the neutral query aggregates.
    
    Implementations run in-worker beside their tool capability and are
    constructed with the bound tool instance: `AdapterClass(tool)`. The
    adapter is the typed boundary — methods accept wire dicts or typed
    objects for DTO/expression arguments and normalize before touching the
    tool (whose protocol surface is purely typed).
    
    `integrity_check` is the typed introspection op institutionalizing the
    G3 corruption find: loop-backs assert storage health cheaply
    (sqlite -> `PRAGMA quick_check`).
    """
    
    def __init__(
            self,
            tool: GraphStorageToolProtocol,  # The bound tool capability instance (worker-side binding)
        )
    
    def add_nodes(
            self,
            nodes: List[Any],  # GraphNodes or their wire dicts
        ) -> List[str]:  # Created node ids
        "Bulk-create nodes."
    
    def add_edges(
            self,
            edges: List[Any],  # GraphEdges or their wire dicts
        ) -> List[str]:  # Created edge ids
        "Bulk-create edges."
    
    def get_node(
            self,
            node_id: str,  # Node UUID
        ) -> Optional[GraphNode]:  # The node, or None
        "Fetch a single node by id."
    
    def get_edge(
            self,
            edge_id: str,  # Edge UUID
        ) -> Optional[GraphEdge]:  # The edge, or None
        "Fetch a single edge by id."
    
    def get_context(
            self,
            node_id: str,  # Center node UUID
            depth: int = 1,  # Traversal depth (whole-neighborhood reads; scale-shaped reads use query_*)
            filter_labels: Optional[List[str]] = None,  # Restrict returned nodes to these labels
        ) -> GraphContext:  # The neighborhood subgraph
        "Fetch a node's neighborhood subgraph."
    
    def find_nodes_by_source(
            self,
            source_ref: Any,  # SourceRef or its wire dict
        ) -> List[GraphNode]:  # Nodes whose sources match (content-hash-primary)
        "Reverse provenance lookup (content-hash-primary, CR-19)."
    
    def find_nodes_by_label(
            self,
            label: str,  # Node label
            limit: int = 100,  # Max nodes returned
        ) -> List[GraphNode]:  # Matching nodes
        "Fetch nodes by label."
    
    def query_nodes(
            self,
            query: Any,  # NodeQuery or its tagged wire dict
        ) -> NodeQueryResult:  # Typed result (nodes / rows / count per query mode)
        "Execute a typed node query (server-side filter/order/page/count)."
    
    def query_edges(
            self,
            query: Any,  # EdgeQuery or its tagged wire dict
        ) -> EdgeQueryResult:  # Typed result (edges / rows / count per query mode)
        "Execute a typed edge query (server-side filter/order/page/count)."
    
    def raw_query(
            self,
            query: Any,  # RawQuery or its tagged wire dict (backend REQUIRED)
        ) -> RawQueryResult:  # Tabular backend-shaped result
        "Execute the marked, backend-coupled raw escape (the promotion forcing function)."
    
    def update_node(
            self,
            node_id: str,  # Node UUID
            properties: Dict[str, Any],  # Properties to merge
        ) -> bool:  # True if the node existed
        "Merge properties into a node."
    
    def update_edge(
            self,
            edge_id: str,  # Edge UUID
            properties: Dict[str, Any],  # Properties to merge
        ) -> bool:  # True if the edge existed
        "Merge properties into an edge."
    
    def delete_nodes(
            self,
            node_ids: List[str],  # Node UUIDs
            cascade: bool = True,  # Also delete connected edges
        ) -> int:  # Number of nodes deleted
        "Bulk-delete nodes."
    
    def delete_edges(
            self,
            edge_ids: List[str],  # Edge UUIDs
        ) -> int:  # Number of edges deleted
        "Bulk-delete edges."
    
    def get_schema(self) -> Dict[str, Any]:  # Backend-reported schema/ontology summary
            """Report the stored graph's schema (labels, relation types, counts)."""
            ...
    
        @abstractmethod
        def integrity_check(self) -> Dict[str, Any]:  # {"ok": bool, "errors": List[str], "backend": str}
        "Report the stored graph's schema (labels, relation types, counts)."
    
    def integrity_check(self) -> Dict[str, Any]:  # {"ok": bool, "errors": List[str], "backend": str}
            """Backend self-check (G3 institutionalized; sqlite -> PRAGMA quick_check)."""
            ...
    
        @abstractmethod
        def import_graph(
            self,
            graph_data: Any,  # GraphContext or its wire dict
            merge_strategy: str = "overwrite",  # skip | overwrite | merge
        ) -> Dict[str, int]:  # Import counts per entity kind
        "Backend self-check (G3 institutionalized; sqlite -> PRAGMA quick_check)."
    
    def import_graph(
            self,
            graph_data: Any,  # GraphContext or its wire dict
            merge_strategy: str = "overwrite",  # skip | overwrite | merge
        ) -> Dict[str, int]:  # Import counts per entity kind
        "Bulk-import a subgraph."
    
    def export_graph(
            self,
            filter_query: Optional[Any] = None,  # NodeQuery (or wire dict) selecting nodes; None = whole graph
        ) -> GraphContext:  # The exported subgraph (matching nodes + edges among them)
        "Export the graph (optionally filtered by a typed node query)."

generic (generic.ipynb)

GenericGraphStorageAdapter — the generic (backend-agnostic) implementation of the graph-storage task contract. Thread-3 packaging: the GENERIC adapter impl is the dominant case and lives in the dep-light interface lib, reused across all compatible backend tools. Its job is wire-input normalization + forwarding: accept wire dicts or typed objects at the task boundary, hand the tool purely-typed arguments, pass typed results through (they wire-encode at the worker boundary).

Import

from cjm_graph_storage_adapter_interface.generic import (
    ensure_node,
    ensure_edge,
    ensure_context,
    ensure_source_ref,
    ensure_node_query,
    ensure_edge_query,
    ensure_raw_query,
    GenericGraphStorageAdapter
)

Functions

def ensure_node(
    value: Any,  # GraphNode or its wire dict
) -> GraphNode:  # Typed node
    "Normalize a wire dict to a `GraphNode` (typed values pass through)."
def ensure_edge(
    value: Any,  # GraphEdge or its wire dict
) -> GraphEdge:  # Typed edge
    "Normalize a wire dict to a `GraphEdge` (typed values pass through)."
def ensure_context(
    value: Any,  # GraphContext or its wire dict
) -> GraphContext:  # Typed context
    "Normalize a wire dict to a `GraphContext` (typed values pass through)."
def ensure_source_ref(
    value: Any,  # SourceRef or its wire dict
) -> SourceRef:  # Typed source ref
    "Normalize a wire dict to a `SourceRef` (typed values pass through)."
def ensure_node_query(
    value: Any,  # NodeQuery or its tagged wire dict
) -> NodeQuery:  # Typed query
    "Normalize a tagged wire dict to a `NodeQuery` (typed values pass through)."
def ensure_edge_query(
    value: Any,  # EdgeQuery or its tagged wire dict
) -> EdgeQuery:  # Typed query
    "Normalize a tagged wire dict to an `EdgeQuery` (typed values pass through)."
def ensure_raw_query(
    value: Any,  # RawQuery or its tagged wire dict
) -> RawQuery:  # Typed query
    "Normalize a tagged wire dict to a `RawQuery` (typed values pass through)."

Classes

class GenericGraphStorageAdapter(GraphStorageAdapter):
    """
    Generic graph-storage adapter: normalize wire inputs, forward to the
    bound tool, pass typed results through.
    
    Works against ANY tool satisfying `GraphStorageToolProtocol` — the
    backend owns translation, so nothing here is backend-specific.
    """
    
    def add_nodes(self, nodes: List[Any]) -> List[str]:
            """Bulk-create nodes."""
            return self.tool.add_nodes([ensure_node(n) for n in nodes])
    
        def add_edges(self, edges: List[Any]) -> List[str]
        "Bulk-create nodes."
    
    def add_edges(self, edges: List[Any]) -> List[str]:
            """Bulk-create edges."""
            return self.tool.add_edges([ensure_edge(e) for e in edges])
    
        def get_node(self, node_id: str) -> Optional[GraphNode]
        "Bulk-create edges."
    
    def get_node(self, node_id: str) -> Optional[GraphNode]:
            """Fetch a single node by id."""
            return self.tool.get_node(node_id)
    
        def get_edge(self, edge_id: str) -> Optional[GraphEdge]
        "Fetch a single node by id."
    
    def get_edge(self, edge_id: str) -> Optional[GraphEdge]:
            """Fetch a single edge by id."""
            return self.tool.get_edge(edge_id)
    
        def get_context(self, node_id: str, depth: int = 1,
                        filter_labels: Optional[List[str]] = None) -> GraphContext
        "Fetch a single edge by id."
    
    def get_context(self, node_id: str, depth: int = 1,
                        filter_labels: Optional[List[str]] = None) -> GraphContext
        "Fetch a node's neighborhood subgraph."
    
    def find_nodes_by_source(self, source_ref: Any) -> List[GraphNode]:
            """Reverse provenance lookup (content-hash-primary)."""
            return self.tool.find_nodes_by_source(ensure_source_ref(source_ref))
    
        def find_nodes_by_label(self, label: str, limit: int = 100) -> List[GraphNode]
        "Reverse provenance lookup (content-hash-primary)."
    
    def find_nodes_by_label(self, label: str, limit: int = 100) -> List[GraphNode]:
            """Fetch nodes by label."""
            return self.tool.find_nodes_by_label(label, limit=limit)
    
        def query_nodes(self, query: Any) -> NodeQueryResult
        "Fetch nodes by label."
    
    def query_nodes(self, query: Any) -> NodeQueryResult:
            """Execute a typed node query."""
            return self.tool.query_nodes(ensure_node_query(query))
    
        def query_edges(self, query: Any) -> EdgeQueryResult
        "Execute a typed node query."
    
    def query_edges(self, query: Any) -> EdgeQueryResult:
            """Execute a typed edge query."""
            return self.tool.query_edges(ensure_edge_query(query))
    
        def raw_query(self, query: Any) -> RawQueryResult
        "Execute a typed edge query."
    
    def raw_query(self, query: Any) -> RawQueryResult:
            """Execute the marked, backend-coupled raw escape."""
            return self.tool.raw_query(ensure_raw_query(query))
    
        def update_node(self, node_id: str, properties: Dict[str, Any]) -> bool
        "Execute the marked, backend-coupled raw escape."
    
    def update_node(self, node_id: str, properties: Dict[str, Any]) -> bool:
            """Merge properties into a node."""
            return self.tool.update_node(node_id, properties)
    
        def update_edge(self, edge_id: str, properties: Dict[str, Any]) -> bool
        "Merge properties into a node."
    
    def update_edge(self, edge_id: str, properties: Dict[str, Any]) -> bool:
            """Merge properties into an edge."""
            return self.tool.update_edge(edge_id, properties)
    
        def delete_nodes(self, node_ids: List[str], cascade: bool = True) -> int
        "Merge properties into an edge."
    
    def delete_nodes(self, node_ids: List[str], cascade: bool = True) -> int:
            """Bulk-delete nodes."""
            return self.tool.delete_nodes(node_ids, cascade=cascade)
    
        def delete_edges(self, edge_ids: List[str]) -> int
        "Bulk-delete nodes."
    
    def delete_edges(self, edge_ids: List[str]) -> int:
            """Bulk-delete edges."""
            return self.tool.delete_edges(edge_ids)
    
        def get_schema(self) -> Dict[str, Any]
        "Bulk-delete edges."
    
    def get_schema(self) -> Dict[str, Any]:
            """Report the stored graph's schema."""
            return self.tool.get_schema()
    
        def integrity_check(self) -> Dict[str, Any]
        "Report the stored graph's schema."
    
    def integrity_check(self) -> Dict[str, Any]:
            """Backend self-check (G3 institutionalized)."""
            return self.tool.integrity_check()
    
        def import_graph(self, graph_data: Any, merge_strategy: str = "overwrite") -> Dict[str, int]
        "Backend self-check (G3 institutionalized)."
    
    def import_graph(self, graph_data: Any, merge_strategy: str = "overwrite") -> Dict[str, int]:
            """Bulk-import a subgraph."""
            return self.tool.import_graph(ensure_context(graph_data), merge_strategy=merge_strategy)
    
        def export_graph(self, filter_query: Optional[Any] = None) -> GraphContext
        "Bulk-import a subgraph."
    
    def export_graph(self, filter_query: Optional[Any] = None) -> GraphContext
        "Export the graph (optionally filtered by a typed node query)."

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cjm_graph_storage_adapter_interface-0.0.2.tar.gz (17.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file cjm_graph_storage_adapter_interface-0.0.2.tar.gz.

File metadata

File hashes

Hashes for cjm_graph_storage_adapter_interface-0.0.2.tar.gz
Algorithm Hash digest
SHA256 31e5eec63ffdbecc59403adc653854c83ec7ae1f8535d1d1bb3aba4d4d3eb7fd
MD5 30e1dfe82de8f8c1f1e4d109dd5d86e7
BLAKE2b-256 f16528e7f173aa55e48274084a952cc3ad4070c06741b6e1e97983dfa9771227

See more details on using hashes here.

File details

Details for the file cjm_graph_storage_adapter_interface-0.0.2-py3-none-any.whl.

File metadata

File hashes

Hashes for cjm_graph_storage_adapter_interface-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 9f14ec03e8635a854a07fd0ecfe9e17ef4f33b0a1c004ddebb142a6ec2d60d76
MD5 4c725761ce190b2b88b0630e0eadb7b1
BLAKE2b-256 228435c271d78512b9f88df58ea90bd50afde24d6b31b181e018006da99cd223

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page