LangGraph checkpointer for Cassandra
Project description
LangGraph Checkpoint Cassandra
Implementation of LangGraph CheckpointSaver that uses Apache Cassandra.
Installation
pip install langgraph-checkpoint-cassandra
Usage
Important Note
When using the Cassandra checkpointer for the first time, call .setup() to create the required tables.
Synchronous Usage
from cassandra.cluster import Cluster
from langgraph_checkpoint_cassandra import CassandraSaver
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_core.messages import HumanMessage, AIMessage
# Connect to Cassandra
cluster = Cluster(['localhost'])
session = cluster.connect()
# Create checkpointer and setup schema
checkpointer = CassandraSaver(session, keyspace='my_checkpoints')
checkpointer.setup() # Call this the first time to create tables
# Simple echo function
def echo_bot(state: MessagesState):
# Get the last message and echo it back
user_message = state["messages"][-1]
return {"messages": [AIMessage(content=user_message.content)]}
# Build your graph
graph = StateGraph(MessagesState)
graph.add_node("chat", echo_bot)
graph.add_edge(START, "chat")
graph.add_edge("chat", END)
# Compile with checkpointer
app = graph.compile(checkpointer=checkpointer)
# Use with different threads
config = {"configurable": {"thread_id": "user-123"}}
result = app.invoke({"messages": [HumanMessage(content="Hello!")]}, config=config)
# Cleanup
cluster.shutdown()
Asynchronous Usage
from cassandra_asyncio.cluster import Cluster
from langgraph_checkpoint_cassandra import CassandraSaver
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_core.messages import HumanMessage, AIMessage
async def main():
# Connect to Cassandra using async driver
cluster = Cluster(['localhost'])
session = cluster.connect()
# Create checkpointer with async session
# CassandraSaver automatically detects async support
checkpointer = CassandraSaver(session, keyspace='my_checkpoints')
checkpointer.setup() # Setup is still sync
# Build your graph
def echo_bot(state: MessagesState):
user_message = state["messages"][-1]
return {"messages": [AIMessage(content=user_message.content)]}
graph = StateGraph(MessagesState)
graph.add_node("chat", echo_bot)
graph.add_edge(START, "chat")
graph.add_edge("chat", END)
# Compile with checkpointer
app = graph.compile(checkpointer=checkpointer)
# Use async methods with LangGraph
config = {"configurable": {"thread_id": "user-456"}}
result = await app.ainvoke({"messages": [HumanMessage(content="Hello async!")]}, config=config)
# Cleanup
cluster.shutdown()
# Run async code
import asyncio
asyncio.run(main())
Schema
The checkpointer creates two tables in your Cassandra keyspace:
checkpoints table
Stores checkpoint data with the following schema:
CREATE TABLE checkpoints (
thread_id TEXT,
checkpoint_ns TEXT,
checkpoint_id UUID, -- Always UUIDv6
parent_checkpoint_id UUID, -- Always UUIDv6
type TEXT,
checkpoint BLOB,
metadata BLOB,
-- Metadata indexing columns for efficient filtering
metadata_text map<text, text>,
metadata_int map<text, bigint>,
metadata_double map<text, double>,
metadata_bool map<text, boolean>,
metadata_null set<text>,
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id)
) WITH CLUSTERING ORDER BY (checkpoint_ns ASC, checkpoint_id DESC)
The table includes SAI (Storage Attached Index) indexes on the metadata collections:
CREATE CUSTOM INDEX checkpoints_metadata_text_idx ON checkpoints (ENTRIES(metadata_text)) USING 'StorageAttachedIndex';
CREATE CUSTOM INDEX checkpoints_metadata_int_idx ON checkpoints (ENTRIES(metadata_int)) USING 'StorageAttachedIndex';
CREATE CUSTOM INDEX checkpoints_metadata_double_idx ON checkpoints (ENTRIES(metadata_double)) USING 'StorageAttachedIndex';
CREATE CUSTOM INDEX checkpoints_metadata_bool_idx ON checkpoints (ENTRIES(metadata_bool)) USING 'StorageAttachedIndex';
CREATE CUSTOM INDEX checkpoints_metadata_null_idx ON checkpoints (VALUES(metadata_null)) USING 'StorageAttachedIndex';
checkpoint_writes table
Stores pending writes for checkpoints:
CREATE TABLE checkpoint_writes (
thread_id TEXT,
checkpoint_ns TEXT,
checkpoint_id UUID, -- Always UUIDv6
task_id TEXT,
task_path TEXT,
idx INT,
channel TEXT,
type TEXT,
value BLOB,
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx)
) WITH CLUSTERING ORDER BY (checkpoint_ns ASC, checkpoint_id DESC, task_id ASC, idx ASC)
Advanced Features
Metadata Filtering (Server-Side)
All metadata fields are automatically queryable using server-side filtering with SAI (Storage Attached Index) indexes. The checkpointer stores metadata in flattened typed maps (metadata_text, metadata_int, metadata_double, metadata_bool, metadata_null) with ENTRIES indexes for efficient filtering.
Key features:
- Automatic filtering: No need to pre-declare queryable fields
- Nested metadata: Use dot notation to filter on nested fields
- Literal dots: Escape literal dots with backslash (
\.) - Type support: Text, integers, floats, booleans, and null values
checkpointer = CassandraSaver(
session,
keyspace='my_checkpoints'
)
checkpointer.setup()
config = {"configurable": {"thread_id": "my-thread"}}
# Filter by top-level field
user_checkpoints = list(checkpointer.list(
config,
filter={"user_id": "user-123"}
))
# Filter by nested field using dot notation
specific_checkpoints = list(checkpointer.list(
config,
filter={"user.name": "alice", "user.age": 30}
))
# Filter by multiple fields (AND logic)
filtered = list(checkpointer.list(
config,
filter={"source": "loop", "step": 5}
))
# Filter on keys with literal dots (use backslash escape)
# For metadata {"file.txt": "content"}
file_checkpoints = list(checkpointer.list(
config,
filter={"file\\.txt": "content"}
))
# For nested metadata {"config": {"file.txt": "content"}}
# Navigation dot is unescaped, literal dot is escaped
config_checkpoints = list(checkpointer.list(
config,
filter={"config.file\\.txt": "content"}
))
Supported types for metadata filtering:
str- Text values (stored inmetadata_textmap)int- Integer values (stored inmetadata_intmap)float- Floating point values (stored inmetadata_doublemap)bool- Boolean values (stored inmetadata_boolmap)None- Null values (stored inmetadata_nullset)
Performance Optimization with Include/Exclude Patterns
For large-scale applications, you may want to control which metadata fields are stored in the indexed columns to optimize storage and write performance. Use metadata_includes and metadata_excludes parameters:
# Only index user-related fields and step counter
checkpointer = CassandraSaver(
session,
keyspace='my_checkpoints',
metadata_includes=["user.*", "step"] # Only these fields will be indexed
)
checkpointer.setup()
# Alternatively, exclude sensitive fields from indexing
checkpointer = CassandraSaver(
session,
keyspace='my_checkpoints',
metadata_excludes=["*.password", "*.secret", "*.token"] # These won't be indexed
)
checkpointer.setup()
# Combine both: include user fields but exclude passwords
checkpointer = CassandraSaver(
session,
keyspace='my_checkpoints',
metadata_includes=["user.*", "session.*"],
metadata_excludes=["*.password", "*.token"]
)
checkpointer.setup()
Pattern matching:
- Patterns use Unix shell-style wildcards (
fnmatch) *matches any sequence of characters:"user.*"matchesuser.name,user.age, etc.?matches a single character[seq]matches any character in seq- Exact matches:
"step"matches only the fieldstep
Priority rules:
- If
metadata_includesis specified, only fields matching at least one pattern are indexed - Then,
metadata_excludesremoves matching fields (even if they matched an include pattern) - If both are
None(default), all fields are indexed
Important - Server-Side vs Client-Side Filtering:
Fields are automatically split into two categories when filtering:
-
Indexed fields (server-side): Filtered efficiently in Cassandra using SAI indexes
- All fields by default, or only those matching
metadata_includespatterns - Excluded if they match
metadata_excludespatterns - Very fast, scales to large datasets
- All fields by default, or only those matching
-
Non-indexed fields (client-side): Filtered in Python after fetching from database
- Fields excluded by
metadata_excludespatterns - Fields not matching
metadata_includespatterns - Complex types (list, dict, set) - always client-side
- Still works correctly, but less efficient for large result sets
- Fields excluded by
Performance implications:
- Server-side filters are applied first, minimizing data transfer
- Client-side filters are applied to results after fetching
- Use
metadata_includes/metadata_excludesto control which fields are indexed - This optimizes storage overhead and write performance for high-cardinality metadata
TTL (Time To Live)
Automatically expire old checkpoints:
# Checkpoints will be automatically deleted after 30 days
checkpointer = CassandraSaver(
session,
keyspace='my_checkpoints',
ttl_seconds=2592000 # 30 days
)
checkpointer.setup()
Consistency Levels
Configure read and write consistency for your use case:
from cassandra.query import ConsistencyLevel
# Production: Strong consistency
checkpointer = CassandraSaver(
session,
keyspace='my_checkpoints',
read_consistency=ConsistencyLevel.QUORUM, # Majority of replicas for reads
write_consistency=ConsistencyLevel.QUORUM # Majority of replicas for writes
)
# Default: Balanced consistency (LOCAL_QUORUM)
checkpointer = CassandraSaver(session, keyspace='my_checkpoints')
# Use session default (set read_consistency=None, write_consistency=None)
session.default_consistency_level = ConsistencyLevel.ALL
checkpointer = CassandraSaver(
session,
keyspace='my_checkpoints',
read_consistency=None, # Use session default
write_consistency=None # Use session default
)
Thread ID and Checkpoint ID Types
Choose the data type for thread identifiers:
# Use TEXT (default, most flexible)
checkpointer = CassandraSaver(session, thread_id_type="text")
# Use UUID (enforces UUID format)
checkpointer = CassandraSaver(session, thread_id_type="uuid")
Choose the data type for checkpoint identifiers:
# Use UUID (default, more efficient storage and queries)
checkpointer = CassandraSaver(session, checkpoint_id_type="uuid")
# Use TEXT (stores UUIDv6 as text, useful for compatibility)
checkpointer = CassandraSaver(session, checkpoint_id_type="text")
Note: Checkpoint IDs are always generated as UUIDv6. The checkpoint_id_type parameter only affects the storage format in Cassandra (native UUID vs TEXT column).
Development
See DEVELOPMENT.md for information on setting up a development environment, running tests, and contributing.
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file langgraph_checkpoint_cassandra-0.1.1.tar.gz.
File metadata
- Download URL: langgraph_checkpoint_cassandra-0.1.1.tar.gz
- Upload date:
- Size: 20.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ef768d7f4ac0aff77890c242b323d4695165550e1db07769125263f6c0b4ecbd
|
|
| MD5 |
53aedc58f829e4ca8339e83d33728899
|
|
| BLAKE2b-256 |
487ff0f8331984378af0119ea0d2a6e9e740a56f5702aa834f11e04778ad3c81
|
File details
Details for the file langgraph_checkpoint_cassandra-0.1.1-py3-none-any.whl.
File metadata
- Download URL: langgraph_checkpoint_cassandra-0.1.1-py3-none-any.whl
- Upload date:
- Size: 21.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
220ae0e1dd159b4e88460bab1d9f5c4e4cf0faa29518555ee8198ef2b995ac76
|
|
| MD5 |
12e192d813563958ace470de8a3de637
|
|
| BLAKE2b-256 |
8862446c6ba65cdad3981ce4e28e11aa6f56681f35623173c33546867873056b
|