LangGraph checkpointer for Cassandra
Project description
LangGraph Checkpoint Cassandra
Implementation of LangGraph CheckpointSaver that uses Apache Cassandra.
Installation
pip install langgraph-checkpoint-cassandra
For async operations, also install the async Cassandra driver:
pip install cassandra-asyncio-driver
Usage
Important Note
When using the Cassandra checkpointer for the first time, call .setup() to create the required tables.
Synchronous Usage
from cassandra.cluster import Cluster
from langgraph_checkpoint_cassandra import CassandraSaver
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_core.messages import HumanMessage, AIMessage
# Connect to Cassandra
cluster = Cluster(['localhost'])
session = cluster.connect()
# Create checkpointer and setup schema
checkpointer = CassandraSaver(session, keyspace='my_checkpoints')
checkpointer.setup() # Call this the first time to create tables
# Simple echo function
def echo_bot(state: MessagesState):
# Get the last message and echo it back
user_message = state["messages"][-1]
return {"messages": [AIMessage(content=user_message.content)]}
# Build your graph
graph = StateGraph(MessagesState)
graph.add_node("chat", echo_bot)
graph.add_edge(START, "chat")
graph.add_edge("chat", END)
# Compile with checkpointer
app = graph.compile(checkpointer=checkpointer)
# Use with different threads
config = {"configurable": {"thread_id": "user-123"}}
result = app.invoke({"messages": [HumanMessage(content="Hello!")]}, config=config)
# Cleanup
cluster.shutdown()
Asynchronous Usage
For high-concurrency scenarios (web servers, concurrent operations), use CassandraSaver with an async session:
from cassandra_asyncio.cluster import Cluster
from langgraph_checkpoint_cassandra import CassandraSaver
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_core.messages import HumanMessage, AIMessage
async def main():
# Connect to Cassandra using async driver
cluster = Cluster(['localhost'])
session = cluster.connect()
# Create checkpointer with async session
# CassandraSaver automatically detects async support
checkpointer = CassandraSaver(session, keyspace='my_checkpoints')
checkpointer.setup() # Setup is still sync
# Build your graph
def echo_bot(state: MessagesState):
user_message = state["messages"][-1]
return {"messages": [AIMessage(content=user_message.content)]}
graph = StateGraph(MessagesState)
graph.add_node("chat", echo_bot)
graph.add_edge(START, "chat")
graph.add_edge("chat", END)
# Compile with checkpointer
app = graph.compile(checkpointer=checkpointer)
# Use async methods with LangGraph
config = {"configurable": {"thread_id": "user-456"}}
result = await app.ainvoke({"messages": [HumanMessage(content="Hello async!")]}, config=config)
# Cleanup
cluster.shutdown()
# Run async code
import asyncio
asyncio.run(main())
Note: Async operations require the cassandra-asyncio-driver package. Install with:
pip install cassandra-asyncio-driver
Schema
The checkpointer creates two tables in your Cassandra keyspace:
checkpoints table
Stores checkpoint data with the following schema:
CREATE TABLE checkpoints (
thread_id TEXT,
checkpoint_ns TEXT,
checkpoint_id UUID, -- Always UUIDv6
parent_checkpoint_id UUID, -- Always UUIDv6
type TEXT,
checkpoint BLOB,
metadata BLOB,
PRIMARY KEY ((thread_id, checkpoint_ns), checkpoint_id)
) WITH CLUSTERING ORDER BY (checkpoint_id DESC);
checkpoint_writes table
Stores pending writes for checkpoints:
CREATE TABLE checkpoint_writes (
thread_id TEXT,
checkpoint_ns TEXT,
checkpoint_id UUID, -- Always UUIDv6
task_id TEXT,
task_path TEXT,
idx INT,
channel TEXT,
type TEXT,
value BLOB,
PRIMARY KEY ((thread_id, checkpoint_ns, checkpoint_id), task_id, idx)
);
Advanced Features
Queryable Metadata (Server-Side Filtering)
For efficient filtering on specific metadata fields, you can designate fields as "queryable" when creating the checkpointer. This creates dedicated columns for fast filtering, with optional SAI (Storage Attached Index) indexes for maximum performance.
# Configure queryable metadata fields
checkpointer = CassandraSaver(
session,
keyspace='my_checkpoints',
queryable_metadata={
"user_id": str, # Text field for user IDs
"step": int, # Integer field for step numbers
"source": str, # Text field for source tracking
"tags": list[str], # List of string tags
"attributes": dict[str, str], # Key-value attributes
},
indexed_metadata=["user_id", "source"] # Only index these fields (optional)
)
checkpointer.setup()
# Now you can filter efficiently on these fields
config = {"configurable": {"thread_id": "my-thread"}}
# Filter by user_id (server-side with SAI index, very fast)
user_checkpoints = list(checkpointer.list(
config,
filter={"user_id": "user-123"}
))
# Filter by multiple fields (server-side)
specific_checkpoints = list(checkpointer.list(
config,
filter={"user_id": "user-123", "source": "input"}
))
# Filter on list field with CONTAINS (checks if value is in list)
tagged_checkpoints = list(checkpointer.list(
config,
filter={"tags": "python"} # Matches checkpoints where "python" is in tags list
))
# Filter on dict field (checks if key-value pair exists)
prod_checkpoints = list(checkpointer.list(
config,
filter={"attributes": {"env": "prod"}} # Matches where attributes["env"] = "prod"
))
# Filter on dict field by value only (checks if value exists in any key)
us_checkpoints = list(checkpointer.list(
config,
filter={"attributes": "us-east"} # Matches where any value = "us-east"
))
# Mix queryable and non-queryable filters
# Queryable fields use server-side filtering (fast)
# Non-queryable fields use client-side filtering (slower)
mixed = list(checkpointer.list(
config,
filter={
"user_id": "user-123", # Server-side (queryable with index)
"step": 5, # Server-side (queryable without index, uses ALLOW FILTERING)
"custom_field": "value" # Client-side (not queryable)
}
))
Supported types for queryable metadata:
str- Text valuesint- Integer valuesfloat- Floating point valuesbool- Boolean valuesdict[K, V]- Dictionary/map values (supports key-value and value-only filtering)- Examples:
dict[str, int],dict[str, str],dict[str, bool]
- Examples:
list[T]- List values (supports CONTAINS operator)- Examples:
list[int],list[str],list[float]
- Examples:
set[T]- Set values (supports CONTAINS operator)- Examples:
set[str],set[int],set[float]
- Examples:
Index management with indexed_metadata:
By default, all queryable metadata fields get SAI indexes for maximum query performance:
checkpointer = CassandraSaver(
session,
queryable_metadata={
"user_id": str,
"source": str,
"step": int,
}
# All three fields will be indexed (default behavior)
)
To reduce storage overhead, use indexed_metadata to index only frequently-queried fields:
checkpointer = CassandraSaver(
session,
queryable_metadata={
"user_id": str, # Will be indexed (in indexed_metadata)
"source": str, # Will be indexed (in indexed_metadata)
"step": int, # NOT indexed (queryable but slower)
"debug_info": str, # NOT indexed (queryable but slower)
},
indexed_metadata=["user_id", "source"] # Only index these two
)
- Indexed fields: Fast queries using SAI index
- Non-indexed queryable fields: Still filterable server-side, but uses
ALLOW FILTERING(slower) - This allows many fields to be queryable while only indexing the most important ones
TTL (Time To Live)
Automatically expire old checkpoints:
# Checkpoints will be automatically deleted after 30 days
checkpointer = CassandraSaver(
session,
keyspace='my_checkpoints',
ttl_seconds=2592000 # 30 days
)
checkpointer.setup()
Consistency Levels
Configure read and write consistency for your use case:
from cassandra.query import ConsistencyLevel
# Production: Strong consistency
checkpointer = CassandraSaver(
session,
keyspace='my_checkpoints',
read_consistency=ConsistencyLevel.QUORUM, # Majority of replicas for reads
write_consistency=ConsistencyLevel.QUORUM # Majority of replicas for writes
)
# Default: Balanced consistency (LOCAL_QUORUM)
checkpointer = CassandraSaver(session, keyspace='my_checkpoints')
# Use session default (set read_consistency=None, write_consistency=None)
session.default_consistency_level = ConsistencyLevel.ALL
checkpointer = CassandraSaver(
session,
keyspace='my_checkpoints',
read_consistency=None, # Use session default
write_consistency=None # Use session default
)
Thread ID and Checkpoint ID Types
Choose the data type for thread identifiers:
# Use TEXT (default, most flexible)
checkpointer = CassandraSaver(session, thread_id_type="text")
# Use UUID (enforces UUID format)
checkpointer = CassandraSaver(session, thread_id_type="uuid")
Choose the data type for checkpoint identifiers:
# Use UUID (default, more efficient storage and queries)
checkpointer = CassandraSaver(session, checkpoint_id_type="uuid")
# Use TEXT (stores UUIDv6 as text, useful for compatibility)
checkpointer = CassandraSaver(session, checkpoint_id_type="text")
Note: Checkpoint IDs are always generated as UUIDv6. The checkpoint_id_type parameter only affects the storage format in Cassandra (native UUID vs TEXT column).
Development
See DEVELOPMENT.md for information on setting up a development environment, running tests, and contributing.
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file langgraph_checkpoint_cassandra-0.1.0.tar.gz.
File metadata
- Download URL: langgraph_checkpoint_cassandra-0.1.0.tar.gz
- Upload date:
- Size: 17.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9167a09f6349178d30ab780d0764fe9b27a0fdc5d2d6a811c47d360e15f1a182
|
|
| MD5 |
9cb5502d27be78d8d9785dc42261695c
|
|
| BLAKE2b-256 |
7aec25605415ccc0156e696d1556ce8fe7d9aeeee5f128749af288b2a5374d56
|
File details
Details for the file langgraph_checkpoint_cassandra-0.1.0-py3-none-any.whl.
File metadata
- Download URL: langgraph_checkpoint_cassandra-0.1.0-py3-none-any.whl
- Upload date:
- Size: 19.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f51877bc0ba5af3b99677897420c1aabcf56c47e3a91bdbeb6d2837ef682c31c
|
|
| MD5 |
d8f316c3ca3c251da716dc38452a0c19
|
|
| BLAKE2b-256 |
4258b78e071ae7c0960e2bab12a266cc193b007bd24a9a597efa6a83818d0875
|