Skip to main content

LangGraph checkpointer for Cassandra

Project description

LangGraph Checkpoint Cassandra

Implementation of LangGraph CheckpointSaver that uses Apache Cassandra.

Installation

pip install langgraph-checkpoint-cassandra

For async operations, also install the async Cassandra driver:

pip install cassandra-asyncio-driver

Usage

Important Note

When using the Cassandra checkpointer for the first time, call .setup() to create the required tables.

Synchronous Usage

from cassandra.cluster import Cluster
from langgraph_checkpoint_cassandra import CassandraSaver
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_core.messages import HumanMessage, AIMessage

# Connect to Cassandra
cluster = Cluster(['localhost'])
session = cluster.connect()

# Create checkpointer and setup schema
checkpointer = CassandraSaver(session, keyspace='my_checkpoints')
checkpointer.setup()  # Call this the first time to create tables

# Simple echo function
def echo_bot(state: MessagesState):
    # Get the last message and echo it back
    user_message = state["messages"][-1]
    return {"messages": [AIMessage(content=user_message.content)]}

# Build your graph
graph = StateGraph(MessagesState)
graph.add_node("chat", echo_bot)
graph.add_edge(START, "chat")
graph.add_edge("chat", END)

# Compile with checkpointer
app = graph.compile(checkpointer=checkpointer)

# Use with different threads
config = {"configurable": {"thread_id": "user-123"}}
result = app.invoke({"messages": [HumanMessage(content="Hello!")]}, config=config)

# Cleanup
cluster.shutdown()

Asynchronous Usage

For high-concurrency scenarios (web servers, concurrent operations), use CassandraSaver with an async session:

from cassandra_asyncio.cluster import Cluster
from langgraph_checkpoint_cassandra import CassandraSaver
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_core.messages import HumanMessage, AIMessage

async def main():
    # Connect to Cassandra using async driver
    cluster = Cluster(['localhost'])
    session = cluster.connect()

    # Create checkpointer with async session
    # CassandraSaver automatically detects async support
    checkpointer = CassandraSaver(session, keyspace='my_checkpoints')
    checkpointer.setup()  # Setup is still sync

    # Build your graph
    def echo_bot(state: MessagesState):
        user_message = state["messages"][-1]
        return {"messages": [AIMessage(content=user_message.content)]}

    graph = StateGraph(MessagesState)
    graph.add_node("chat", echo_bot)
    graph.add_edge(START, "chat")
    graph.add_edge("chat", END)

    # Compile with checkpointer
    app = graph.compile(checkpointer=checkpointer)

    # Use async methods with LangGraph
    config = {"configurable": {"thread_id": "user-456"}}
    result = await app.ainvoke({"messages": [HumanMessage(content="Hello async!")]}, config=config)

    # Cleanup
    cluster.shutdown()

# Run async code
import asyncio
asyncio.run(main())

Note: Async operations require the cassandra-asyncio-driver package. Install with:

pip install cassandra-asyncio-driver

Schema

The checkpointer creates two tables in your Cassandra keyspace:

checkpoints table

Stores checkpoint data with the following schema:

CREATE TABLE checkpoints (
    thread_id TEXT,
    checkpoint_ns TEXT,
    checkpoint_id UUID,         -- Always UUIDv6
    parent_checkpoint_id UUID,  -- Always UUIDv6
    type TEXT,
    checkpoint BLOB,
    metadata BLOB,
    PRIMARY KEY ((thread_id, checkpoint_ns), checkpoint_id)
) WITH CLUSTERING ORDER BY (checkpoint_id DESC);

checkpoint_writes table

Stores pending writes for checkpoints:

CREATE TABLE checkpoint_writes (
    thread_id TEXT,
    checkpoint_ns TEXT,
    checkpoint_id UUID,         -- Always UUIDv6
    task_id TEXT,
    task_path TEXT,
    idx INT,
    channel TEXT,
    type TEXT,
    value BLOB,
    PRIMARY KEY ((thread_id, checkpoint_ns, checkpoint_id), task_id, idx)
);

Advanced Features

Queryable Metadata (Server-Side Filtering)

For efficient filtering on specific metadata fields, you can designate fields as "queryable" when creating the checkpointer. This creates dedicated columns for fast filtering, with optional SAI (Storage Attached Index) indexes for maximum performance.

# Configure queryable metadata fields
checkpointer = CassandraSaver(
    session,
    keyspace='my_checkpoints',
    queryable_metadata={
        "user_id": str,               # Text field for user IDs
        "step": int,                  # Integer field for step numbers
        "source": str,                # Text field for source tracking
        "tags": list[str],            # List of string tags
        "attributes": dict[str, str], # Key-value attributes
    },
    indexed_metadata=["user_id", "source"]  # Only index these fields (optional)
)
checkpointer.setup()

# Now you can filter efficiently on these fields
config = {"configurable": {"thread_id": "my-thread"}}

# Filter by user_id (server-side with SAI index, very fast)
user_checkpoints = list(checkpointer.list(
    config,
    filter={"user_id": "user-123"}
))

# Filter by multiple fields (server-side)
specific_checkpoints = list(checkpointer.list(
    config,
    filter={"user_id": "user-123", "source": "input"}
))

# Filter on list field with CONTAINS (checks if value is in list)
tagged_checkpoints = list(checkpointer.list(
    config,
    filter={"tags": "python"}  # Matches checkpoints where "python" is in tags list
))

# Filter on dict field (checks if key-value pair exists)
prod_checkpoints = list(checkpointer.list(
    config,
    filter={"attributes": {"env": "prod"}}  # Matches where attributes["env"] = "prod"
))

# Filter on dict field by value only (checks if value exists in any key)
us_checkpoints = list(checkpointer.list(
    config,
    filter={"attributes": "us-east"}  # Matches where any value = "us-east"
))

# Mix queryable and non-queryable filters
# Queryable fields use server-side filtering (fast)
# Non-queryable fields use client-side filtering (slower)
mixed = list(checkpointer.list(
    config,
    filter={
        "user_id": "user-123",        # Server-side (queryable with index)
        "step": 5,                    # Server-side (queryable without index, uses ALLOW FILTERING)
        "custom_field": "value"       # Client-side (not queryable)
    }
))

Supported types for queryable metadata:

  • str - Text values
  • int - Integer values
  • float - Floating point values
  • bool - Boolean values
  • dict[K, V] - Dictionary/map values (supports key-value and value-only filtering)
    • Examples: dict[str, int], dict[str, str], dict[str, bool]
  • list[T] - List values (supports CONTAINS operator)
    • Examples: list[int], list[str], list[float]
  • set[T] - Set values (supports CONTAINS operator)
    • Examples: set[str], set[int], set[float]

Index management with indexed_metadata:

By default, all queryable metadata fields get SAI indexes for maximum query performance:

checkpointer = CassandraSaver(
    session,
    queryable_metadata={
        "user_id": str,
        "source": str,
        "step": int,
    }
    # All three fields will be indexed (default behavior)
)

To reduce storage overhead, use indexed_metadata to index only frequently-queried fields:

checkpointer = CassandraSaver(
    session,
    queryable_metadata={
        "user_id": str,      # Will be indexed (in indexed_metadata)
        "source": str,       # Will be indexed (in indexed_metadata)
        "step": int,         # NOT indexed (queryable but slower)
        "debug_info": str,   # NOT indexed (queryable but slower)
    },
    indexed_metadata=["user_id", "source"]  # Only index these two
)
  • Indexed fields: Fast queries using SAI index
  • Non-indexed queryable fields: Still filterable server-side, but uses ALLOW FILTERING (slower)
  • This allows many fields to be queryable while only indexing the most important ones

TTL (Time To Live)

Automatically expire old checkpoints:

# Checkpoints will be automatically deleted after 30 days
checkpointer = CassandraSaver(
    session,
    keyspace='my_checkpoints',
    ttl_seconds=2592000  # 30 days
)
checkpointer.setup()

Consistency Levels

Configure read and write consistency for your use case:

from cassandra.query import ConsistencyLevel

# Production: Strong consistency
checkpointer = CassandraSaver(
    session,
    keyspace='my_checkpoints',
    read_consistency=ConsistencyLevel.QUORUM,      # Majority of replicas for reads
    write_consistency=ConsistencyLevel.QUORUM      # Majority of replicas for writes
)

# Default: Balanced consistency (LOCAL_QUORUM)
checkpointer = CassandraSaver(session, keyspace='my_checkpoints')

# Use session default (set read_consistency=None, write_consistency=None)
session.default_consistency_level = ConsistencyLevel.ALL
checkpointer = CassandraSaver(
    session,
    keyspace='my_checkpoints',
    read_consistency=None,   # Use session default
    write_consistency=None   # Use session default
)

Thread ID and Checkpoint ID Types

Choose the data type for thread identifiers:

# Use TEXT (default, most flexible)
checkpointer = CassandraSaver(session, thread_id_type="text")

# Use UUID (enforces UUID format)
checkpointer = CassandraSaver(session, thread_id_type="uuid")

Choose the data type for checkpoint identifiers:

# Use UUID (default, more efficient storage and queries)
checkpointer = CassandraSaver(session, checkpoint_id_type="uuid")

# Use TEXT (stores UUIDv6 as text, useful for compatibility)
checkpointer = CassandraSaver(session, checkpoint_id_type="text")

Note: Checkpoint IDs are always generated as UUIDv6. The checkpoint_id_type parameter only affects the storage format in Cassandra (native UUID vs TEXT column).

Development

See DEVELOPMENT.md for information on setting up a development environment, running tests, and contributing.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

langgraph_checkpoint_cassandra-0.1.0.tar.gz (17.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

langgraph_checkpoint_cassandra-0.1.0-py3-none-any.whl (19.3 kB view details)

Uploaded Python 3

File details

Details for the file langgraph_checkpoint_cassandra-0.1.0.tar.gz.

File metadata

File hashes

Hashes for langgraph_checkpoint_cassandra-0.1.0.tar.gz
Algorithm Hash digest
SHA256 9167a09f6349178d30ab780d0764fe9b27a0fdc5d2d6a811c47d360e15f1a182
MD5 9cb5502d27be78d8d9785dc42261695c
BLAKE2b-256 7aec25605415ccc0156e696d1556ce8fe7d9aeeee5f128749af288b2a5374d56

See more details on using hashes here.

File details

Details for the file langgraph_checkpoint_cassandra-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for langgraph_checkpoint_cassandra-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f51877bc0ba5af3b99677897420c1aabcf56c47e3a91bdbeb6d2837ef682c31c
MD5 d8f316c3ca3c251da716dc38452a0c19
BLAKE2b-256 4258b78e071ae7c0960e2bab12a266cc193b007bd24a9a597efa6a83818d0875

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page