Typed graph-storage task adapter interface: the repository-style contract (CRUD, typed query execution, introspection) between graph-storage tool capabilities and the substrate.
Project description
cjm-graph-storage-adapter-interface
Install
pip install cjm_graph_storage_adapter_interface
Project Structure
nbs/
├── adapter.ipynb # The graph-storage task contract — `GraphStorageToolProtocol` (BORN REAL) + the `GraphStorageAdapter` ABC. Pass-2 Thread 5: graph storage is a tool capability (sqlite-graph / future postgres-graph) plus **ONE multi-method, repository-style adapter** (CRUD + typed query + introspection) — the canonical multi-op adapter case, NOT 14 separate adapters. The adapter is **domain-neutral** (stores generic `GraphNode`/`GraphEdge`; `Document`/`Segment`/`Correction` mapping stays consumer-side, lock 4) and its implementations run **in-worker beside their tool** (Thread 3).
└── generic.ipynb # `GenericGraphStorageAdapter` — the generic (backend-agnostic) implementation of the graph-storage task contract. Thread-3 packaging: the GENERIC adapter impl is the dominant case and lives in the dep-light interface lib, reused across all compatible backend tools. Its job is **wire-input normalization + forwarding**: accept wire dicts or typed objects at the task boundary, hand the tool purely-typed arguments, pass typed results through (they wire-encode at the worker boundary).
Total: 2 notebooks
Module Dependencies
graph LR
adapter["adapter<br/>adapter"]
generic["generic<br/>generic"]
generic --> adapter
1 cross-module dependencies detected
CLI Reference
No CLI commands found in this project.
Module Overview
Detailed documentation for each module in the project:
adapter (adapter.ipynb)
The graph-storage task contract —
GraphStorageToolProtocol(BORN REAL) + theGraphStorageAdapterABC. Pass-2 Thread 5: graph storage is a tool capability (sqlite-graph / future postgres-graph) plus ONE multi-method, repository-style adapter (CRUD + typed query + introspection) — the canonical multi-op adapter case, NOT 14 separate adapters. The adapter is domain-neutral (stores genericGraphNode/GraphEdge;Document/Segment/Correctionmapping stays consumer-side, lock 4) and its implementations run in-worker beside their tool (Thread 3).
Import
from cjm_graph_storage_adapter_interface.adapter import (
GraphStorageToolProtocol,
GraphStorageAdapter
)
Classes
@runtime_checkable
class GraphStorageToolProtocol(Protocol):
"""
Structural contract for graph-storage tool capabilities (BORN REAL —
derived from the stage-4 typed surface, not a fused-era mirror).
Each backend tool owns its per-backend translation of the typed query
expressions and its raw escape (refusing `RawQuery.backend` mismatches).
The surface is purely typed: graph nouns + query expressions in, typed
results out.
"""
def add_nodes(self, nodes: List[GraphNode]) -> List[str]: ...
def add_edges(self, edges: List[GraphEdge]) -> List[str]: ...
def add_edges(self, edges: List[GraphEdge]) -> List[str]: ...
# -- point reads + neighborhood + reverse provenance index
def get_node(self, node_id: str) -> Optional[GraphNode]: ...
def get_node(self, node_id: str) -> Optional[GraphNode]: ...
def get_edge(self, edge_id: str) -> Optional[GraphEdge]: ...
def get_edge(self, edge_id: str) -> Optional[GraphEdge]: ...
def get_context(self, node_id: str, depth: int = 1,
filter_labels: Optional[List[str]] = None) -> GraphContext: ...
def get_context(self, node_id: str, depth: int = 1,
filter_labels: Optional[List[str]] = None) -> GraphContext: ...
def find_nodes_by_source(self, source_ref: SourceRef) -> List[GraphNode]: ...
def find_nodes_by_label(self, label: str, limit: int = 100) -> List[GraphNode]: ...
def find_nodes_by_label(self, label: str, limit: int = 100) -> List[GraphNode]: ...
# -- the typed query surface (stage 4; scale-shaped, portable)
def query_nodes(self, query: NodeQuery) -> NodeQueryResult: ...
def query_nodes(self, query: NodeQuery) -> NodeQueryResult: ...
def query_edges(self, query: EdgeQuery) -> EdgeQueryResult: ...
def query_edges(self, query: EdgeQuery) -> EdgeQueryResult: ...
def raw_query(self, query: RawQuery) -> RawQueryResult: ...
def raw_query(self, query: RawQuery) -> RawQueryResult: ...
# -- update / delete
def update_node(self, node_id: str, properties: Dict[str, Any]) -> bool: ...
def update_node(self, node_id: str, properties: Dict[str, Any]) -> bool: ...
def update_edge(self, edge_id: str, properties: Dict[str, Any]) -> bool: ...
def update_edge(self, edge_id: str, properties: Dict[str, Any]) -> bool: ...
def delete_nodes(self, node_ids: List[str], cascade: bool = True) -> int: ...
def delete_nodes(self, node_ids: List[str], cascade: bool = True) -> int: ...
def delete_edges(self, edge_ids: List[str]) -> int: ...
def delete_edges(self, edge_ids: List[str]) -> int: ...
# -- introspection / interchange
def get_schema(self) -> Dict[str, Any]: ...
def get_schema(self) -> Dict[str, Any]: ...
def integrity_check(self) -> Dict[str, Any]: ...
def integrity_check(self) -> Dict[str, Any]: ...
def import_graph(self, graph_data: GraphContext,
merge_strategy: str = "overwrite") -> Dict[str, int]: ...
def import_graph(self, graph_data: GraphContext,
merge_strategy: str = "overwrite") -> Dict[str, int]: ...
def export_graph(self, filter_query: Optional[NodeQuery] = None) -> GraphContext: ...
class GraphStorageAdapter:
def __init__(
self,
tool: GraphStorageToolProtocol, # The bound tool capability instance (worker-side binding)
)
"""
The graph-storage task adapter — ONE multi-method, repository-style
typed contract (pass-2 Thread 5 lock 1).
Domain-neutral by lock 4: it stores generic `GraphNode`/`GraphEdge`;
domain node construction (`Document`/`Segment`/`Correction`) stays in the
consumer (or CR-18 sugar later). A domain-specific op such as
`verify_spine` is deliberately OFF this surface — verification composes
from the neutral query aggregates.
Implementations run in-worker beside their tool capability and are
constructed with the bound tool instance: `AdapterClass(tool)`. The
adapter is the typed boundary — methods accept wire dicts or typed
objects for DTO/expression arguments and normalize before touching the
tool (whose protocol surface is purely typed).
`integrity_check` is the typed introspection op institutionalizing the
G3 corruption find: loop-backs assert storage health cheaply
(sqlite -> `PRAGMA quick_check`).
"""
def __init__(
self,
tool: GraphStorageToolProtocol, # The bound tool capability instance (worker-side binding)
)
def add_nodes(
self,
nodes: List[Any], # GraphNodes or their wire dicts
) -> List[str]: # Created node ids
"Bulk-create nodes."
def add_edges(
self,
edges: List[Any], # GraphEdges or their wire dicts
) -> List[str]: # Created edge ids
"Bulk-create edges."
def get_node(
self,
node_id: str, # Node UUID
) -> Optional[GraphNode]: # The node, or None
"Fetch a single node by id."
def get_edge(
self,
edge_id: str, # Edge UUID
) -> Optional[GraphEdge]: # The edge, or None
"Fetch a single edge by id."
def get_context(
self,
node_id: str, # Center node UUID
depth: int = 1, # Traversal depth (whole-neighborhood reads; scale-shaped reads use query_*)
filter_labels: Optional[List[str]] = None, # Restrict returned nodes to these labels
) -> GraphContext: # The neighborhood subgraph
"Fetch a node's neighborhood subgraph."
def find_nodes_by_source(
self,
source_ref: Any, # SourceRef or its wire dict
) -> List[GraphNode]: # Nodes whose sources match (content-hash-primary)
"Reverse provenance lookup (content-hash-primary, CR-19)."
def find_nodes_by_label(
self,
label: str, # Node label
limit: int = 100, # Max nodes returned
) -> List[GraphNode]: # Matching nodes
"Fetch nodes by label."
def query_nodes(
self,
query: Any, # NodeQuery or its tagged wire dict
) -> NodeQueryResult: # Typed result (nodes / rows / count per query mode)
"Execute a typed node query (server-side filter/order/page/count)."
def query_edges(
self,
query: Any, # EdgeQuery or its tagged wire dict
) -> EdgeQueryResult: # Typed result (edges / rows / count per query mode)
"Execute a typed edge query (server-side filter/order/page/count)."
def raw_query(
self,
query: Any, # RawQuery or its tagged wire dict (backend REQUIRED)
) -> RawQueryResult: # Tabular backend-shaped result
"Execute the marked, backend-coupled raw escape (the promotion forcing function)."
def update_node(
self,
node_id: str, # Node UUID
properties: Dict[str, Any], # Properties to merge
) -> bool: # True if the node existed
"Merge properties into a node."
def update_edge(
self,
edge_id: str, # Edge UUID
properties: Dict[str, Any], # Properties to merge
) -> bool: # True if the edge existed
"Merge properties into an edge."
def delete_nodes(
self,
node_ids: List[str], # Node UUIDs
cascade: bool = True, # Also delete connected edges
) -> int: # Number of nodes deleted
"Bulk-delete nodes."
def delete_edges(
self,
edge_ids: List[str], # Edge UUIDs
) -> int: # Number of edges deleted
"Bulk-delete edges."
def get_schema(self) -> Dict[str, Any]: # Backend-reported schema/ontology summary
"""Report the stored graph's schema (labels, relation types, counts)."""
...
@abstractmethod
def integrity_check(self) -> Dict[str, Any]: # {"ok": bool, "errors": List[str], "backend": str}
"Report the stored graph's schema (labels, relation types, counts)."
def integrity_check(self) -> Dict[str, Any]: # {"ok": bool, "errors": List[str], "backend": str}
"""Backend self-check (G3 institutionalized; sqlite -> PRAGMA quick_check)."""
...
@abstractmethod
def import_graph(
self,
graph_data: Any, # GraphContext or its wire dict
merge_strategy: str = "overwrite", # skip | overwrite | merge
) -> Dict[str, int]: # Import counts per entity kind
"Backend self-check (G3 institutionalized; sqlite -> PRAGMA quick_check)."
def import_graph(
self,
graph_data: Any, # GraphContext or its wire dict
merge_strategy: str = "overwrite", # skip | overwrite | merge
) -> Dict[str, int]: # Import counts per entity kind
"Bulk-import a subgraph."
def export_graph(
self,
filter_query: Optional[Any] = None, # NodeQuery (or wire dict) selecting nodes; None = whole graph
) -> GraphContext: # The exported subgraph (matching nodes + edges among them)
"Export the graph (optionally filtered by a typed node query)."
generic (generic.ipynb)
GenericGraphStorageAdapter— the generic (backend-agnostic) implementation of the graph-storage task contract. Thread-3 packaging: the GENERIC adapter impl is the dominant case and lives in the dep-light interface lib, reused across all compatible backend tools. Its job is wire-input normalization + forwarding: accept wire dicts or typed objects at the task boundary, hand the tool purely-typed arguments, pass typed results through (they wire-encode at the worker boundary).
Import
from cjm_graph_storage_adapter_interface.generic import (
ensure_node,
ensure_edge,
ensure_context,
ensure_source_ref,
ensure_node_query,
ensure_edge_query,
ensure_raw_query,
GenericGraphStorageAdapter
)
Functions
def ensure_node(
value: Any, # GraphNode or its wire dict
) -> GraphNode: # Typed node
"Normalize a wire dict to a `GraphNode` (typed values pass through)."
def ensure_edge(
value: Any, # GraphEdge or its wire dict
) -> GraphEdge: # Typed edge
"Normalize a wire dict to a `GraphEdge` (typed values pass through)."
def ensure_context(
value: Any, # GraphContext or its wire dict
) -> GraphContext: # Typed context
"Normalize a wire dict to a `GraphContext` (typed values pass through)."
def ensure_source_ref(
value: Any, # SourceRef or its wire dict
) -> SourceRef: # Typed source ref
"Normalize a wire dict to a `SourceRef` (typed values pass through)."
def ensure_node_query(
value: Any, # NodeQuery or its tagged wire dict
) -> NodeQuery: # Typed query
"Normalize a tagged wire dict to a `NodeQuery` (typed values pass through)."
def ensure_edge_query(
value: Any, # EdgeQuery or its tagged wire dict
) -> EdgeQuery: # Typed query
"Normalize a tagged wire dict to an `EdgeQuery` (typed values pass through)."
def ensure_raw_query(
value: Any, # RawQuery or its tagged wire dict
) -> RawQuery: # Typed query
"Normalize a tagged wire dict to a `RawQuery` (typed values pass through)."
Classes
class GenericGraphStorageAdapter(GraphStorageAdapter):
"""
Generic graph-storage adapter: normalize wire inputs, forward to the
bound tool, pass typed results through.
Works against ANY tool satisfying `GraphStorageToolProtocol` — the
backend owns translation, so nothing here is backend-specific.
"""
def add_nodes(self, nodes: List[Any]) -> List[str]:
"""Bulk-create nodes."""
return self.tool.add_nodes([ensure_node(n) for n in nodes])
def add_edges(self, edges: List[Any]) -> List[str]
"Bulk-create nodes."
def add_edges(self, edges: List[Any]) -> List[str]:
"""Bulk-create edges."""
return self.tool.add_edges([ensure_edge(e) for e in edges])
def get_node(self, node_id: str) -> Optional[GraphNode]
"Bulk-create edges."
def get_node(self, node_id: str) -> Optional[GraphNode]:
"""Fetch a single node by id."""
return self.tool.get_node(node_id)
def get_edge(self, edge_id: str) -> Optional[GraphEdge]
"Fetch a single node by id."
def get_edge(self, edge_id: str) -> Optional[GraphEdge]:
"""Fetch a single edge by id."""
return self.tool.get_edge(edge_id)
def get_context(self, node_id: str, depth: int = 1,
filter_labels: Optional[List[str]] = None) -> GraphContext
"Fetch a single edge by id."
def get_context(self, node_id: str, depth: int = 1,
filter_labels: Optional[List[str]] = None) -> GraphContext
"Fetch a node's neighborhood subgraph."
def find_nodes_by_source(self, source_ref: Any) -> List[GraphNode]:
"""Reverse provenance lookup (content-hash-primary)."""
return self.tool.find_nodes_by_source(ensure_source_ref(source_ref))
def find_nodes_by_label(self, label: str, limit: int = 100) -> List[GraphNode]
"Reverse provenance lookup (content-hash-primary)."
def find_nodes_by_label(self, label: str, limit: int = 100) -> List[GraphNode]:
"""Fetch nodes by label."""
return self.tool.find_nodes_by_label(label, limit=limit)
def query_nodes(self, query: Any) -> NodeQueryResult
"Fetch nodes by label."
def query_nodes(self, query: Any) -> NodeQueryResult:
"""Execute a typed node query."""
return self.tool.query_nodes(ensure_node_query(query))
def query_edges(self, query: Any) -> EdgeQueryResult
"Execute a typed node query."
def query_edges(self, query: Any) -> EdgeQueryResult:
"""Execute a typed edge query."""
return self.tool.query_edges(ensure_edge_query(query))
def raw_query(self, query: Any) -> RawQueryResult
"Execute a typed edge query."
def raw_query(self, query: Any) -> RawQueryResult:
"""Execute the marked, backend-coupled raw escape."""
return self.tool.raw_query(ensure_raw_query(query))
def update_node(self, node_id: str, properties: Dict[str, Any]) -> bool
"Execute the marked, backend-coupled raw escape."
def update_node(self, node_id: str, properties: Dict[str, Any]) -> bool:
"""Merge properties into a node."""
return self.tool.update_node(node_id, properties)
def update_edge(self, edge_id: str, properties: Dict[str, Any]) -> bool
"Merge properties into a node."
def update_edge(self, edge_id: str, properties: Dict[str, Any]) -> bool:
"""Merge properties into an edge."""
return self.tool.update_edge(edge_id, properties)
def delete_nodes(self, node_ids: List[str], cascade: bool = True) -> int
"Merge properties into an edge."
def delete_nodes(self, node_ids: List[str], cascade: bool = True) -> int:
"""Bulk-delete nodes."""
return self.tool.delete_nodes(node_ids, cascade=cascade)
def delete_edges(self, edge_ids: List[str]) -> int
"Bulk-delete nodes."
def delete_edges(self, edge_ids: List[str]) -> int:
"""Bulk-delete edges."""
return self.tool.delete_edges(edge_ids)
def get_schema(self) -> Dict[str, Any]
"Bulk-delete edges."
def get_schema(self) -> Dict[str, Any]:
"""Report the stored graph's schema."""
return self.tool.get_schema()
def integrity_check(self) -> Dict[str, Any]
"Report the stored graph's schema."
def integrity_check(self) -> Dict[str, Any]:
"""Backend self-check (G3 institutionalized)."""
return self.tool.integrity_check()
def import_graph(self, graph_data: Any, merge_strategy: str = "overwrite") -> Dict[str, int]
"Backend self-check (G3 institutionalized)."
def import_graph(self, graph_data: Any, merge_strategy: str = "overwrite") -> Dict[str, int]:
"""Bulk-import a subgraph."""
return self.tool.import_graph(ensure_context(graph_data), merge_strategy=merge_strategy)
def export_graph(self, filter_query: Optional[Any] = None) -> GraphContext
"Bulk-import a subgraph."
def export_graph(self, filter_query: Optional[Any] = None) -> GraphContext
"Export the graph (optionally filtered by a typed node query)."
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cjm_graph_storage_adapter_interface-0.0.2.tar.gz.
File metadata
- Download URL: cjm_graph_storage_adapter_interface-0.0.2.tar.gz
- Upload date:
- Size: 17.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
31e5eec63ffdbecc59403adc653854c83ec7ae1f8535d1d1bb3aba4d4d3eb7fd
|
|
| MD5 |
30e1dfe82de8f8c1f1e4d109dd5d86e7
|
|
| BLAKE2b-256 |
f16528e7f173aa55e48274084a952cc3ad4070c06741b6e1e97983dfa9771227
|
File details
Details for the file cjm_graph_storage_adapter_interface-0.0.2-py3-none-any.whl.
File metadata
- Download URL: cjm_graph_storage_adapter_interface-0.0.2-py3-none-any.whl
- Upload date:
- Size: 17.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9f14ec03e8635a854a07fd0ecfe9e17ef4f33b0a1c004ddebb142a6ec2d60d76
|
|
| MD5 |
4c725761ce190b2b88b0630e0eadb7b1
|
|
| BLAKE2b-256 |
228435c271d78512b9f88df58ea90bd50afde24d6b31b181e018006da99cd223
|