Skip to main content

LangGraph checkpointer for Cassandra

Project description

LangGraph Checkpoint Cassandra

Implementation of LangGraph CheckpointSaver that uses Apache Cassandra.

Installation

pip install langgraph-checkpoint-cassandra

Usage

Important Note

When using the Cassandra checkpointer for the first time, call .setup() to create the required tables.

Synchronous Usage

from cassandra.cluster import Cluster
from langgraph_checkpoint_cassandra import CassandraSaver
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_core.messages import HumanMessage, AIMessage

# Connect to Cassandra
cluster = Cluster(['localhost'])
session = cluster.connect()

# Create checkpointer and setup schema
checkpointer = CassandraSaver(session, keyspace='my_checkpoints')
checkpointer.setup()  # Call this the first time to create tables

# Simple echo function
def echo_bot(state: MessagesState):
    # Get the last message and echo it back
    user_message = state["messages"][-1]
    return {"messages": [AIMessage(content=user_message.content)]}

# Build your graph
graph = StateGraph(MessagesState)
graph.add_node("chat", echo_bot)
graph.add_edge(START, "chat")
graph.add_edge("chat", END)

# Compile with checkpointer
app = graph.compile(checkpointer=checkpointer)

# Use with different threads
config = {"configurable": {"thread_id": "user-123"}}
result = app.invoke({"messages": [HumanMessage(content="Hello!")]}, config=config)

# Cleanup
cluster.shutdown()

Asynchronous Usage

from cassandra_asyncio.cluster import Cluster
from langgraph_checkpoint_cassandra import CassandraSaver
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_core.messages import HumanMessage, AIMessage

async def main():
    # Connect to Cassandra using async driver
    cluster = Cluster(['localhost'])
    session = cluster.connect()

    # Create checkpointer with async session
    # CassandraSaver automatically detects async support
    checkpointer = CassandraSaver(session, keyspace='my_checkpoints')
    checkpointer.setup()  # Setup is still sync

    # Build your graph
    def echo_bot(state: MessagesState):
        user_message = state["messages"][-1]
        return {"messages": [AIMessage(content=user_message.content)]}

    graph = StateGraph(MessagesState)
    graph.add_node("chat", echo_bot)
    graph.add_edge(START, "chat")
    graph.add_edge("chat", END)

    # Compile with checkpointer
    app = graph.compile(checkpointer=checkpointer)

    # Use async methods with LangGraph
    config = {"configurable": {"thread_id": "user-456"}}
    result = await app.ainvoke({"messages": [HumanMessage(content="Hello async!")]}, config=config)

    # Cleanup
    cluster.shutdown()

# Run async code
import asyncio
asyncio.run(main())

Schema

The checkpointer creates two tables in your Cassandra keyspace:

checkpoints table

Stores checkpoint data with the following schema:

CREATE TABLE checkpoints (
    thread_id TEXT,
    checkpoint_ns TEXT,
    checkpoint_id UUID,         -- Always UUIDv6
    parent_checkpoint_id UUID,  -- Always UUIDv6
    type TEXT,
    checkpoint BLOB,
    metadata BLOB,
    -- Metadata indexing columns for efficient filtering
    metadata_text map<text, text>,
    metadata_int map<text, bigint>,
    metadata_double map<text, double>,
    metadata_bool map<text, boolean>,
    metadata_null set<text>,
    PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id)
) WITH CLUSTERING ORDER BY (checkpoint_ns ASC, checkpoint_id DESC)

The table includes SAI (Storage Attached Index) indexes on the metadata collections:

CREATE CUSTOM INDEX checkpoints_metadata_text_idx ON checkpoints (ENTRIES(metadata_text)) USING 'StorageAttachedIndex';
CREATE CUSTOM INDEX checkpoints_metadata_int_idx ON checkpoints (ENTRIES(metadata_int)) USING 'StorageAttachedIndex';
CREATE CUSTOM INDEX checkpoints_metadata_double_idx ON checkpoints (ENTRIES(metadata_double)) USING 'StorageAttachedIndex';
CREATE CUSTOM INDEX checkpoints_metadata_bool_idx ON checkpoints (ENTRIES(metadata_bool)) USING 'StorageAttachedIndex';
CREATE CUSTOM INDEX checkpoints_metadata_null_idx ON checkpoints (VALUES(metadata_null)) USING 'StorageAttachedIndex';

checkpoint_writes table

Stores pending writes for checkpoints:

CREATE TABLE checkpoint_writes (
    thread_id TEXT,
    checkpoint_ns TEXT,
    checkpoint_id UUID,         -- Always UUIDv6
    task_id TEXT,
    task_path TEXT,
    idx INT,
    channel TEXT,
    type TEXT,
    value BLOB,
    PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx)
) WITH CLUSTERING ORDER BY (checkpoint_ns ASC, checkpoint_id DESC, task_id ASC, idx ASC)

Advanced Features

Metadata Filtering (Server-Side)

All metadata fields are automatically queryable using server-side filtering with SAI (Storage Attached Index) indexes. The checkpointer stores metadata in flattened typed maps (metadata_text, metadata_int, metadata_double, metadata_bool, metadata_null) with ENTRIES indexes for efficient filtering.

Key features:

  • Automatic filtering: No need to pre-declare queryable fields
  • Nested metadata: Use dot notation to filter on nested fields
  • Literal dots: Escape literal dots with backslash (\.)
  • Type support: Text, integers, floats, booleans, and null values
checkpointer = CassandraSaver(
    session,
    keyspace='my_checkpoints'
)
checkpointer.setup()

config = {"configurable": {"thread_id": "my-thread"}}

# Filter by top-level field
user_checkpoints = list(checkpointer.list(
    config,
    filter={"user_id": "user-123"}
))

# Filter by nested field using dot notation
specific_checkpoints = list(checkpointer.list(
    config,
    filter={"user.name": "alice", "user.age": 30}
))

# Filter by multiple fields (AND logic)
filtered = list(checkpointer.list(
    config,
    filter={"source": "loop", "step": 5}
))

# Filter on keys with literal dots (use backslash escape)
# For metadata {"file.txt": "content"}
file_checkpoints = list(checkpointer.list(
    config,
    filter={"file\\.txt": "content"}
))

# For nested metadata {"config": {"file.txt": "content"}}
# Navigation dot is unescaped, literal dot is escaped
config_checkpoints = list(checkpointer.list(
    config,
    filter={"config.file\\.txt": "content"}
))

Supported types for metadata filtering:

  • str - Text values (stored in metadata_text map)
  • int - Integer values (stored in metadata_int map)
  • float - Floating point values (stored in metadata_double map)
  • bool - Boolean values (stored in metadata_bool map)
  • None - Null values (stored in metadata_null set)

Performance Optimization with Include/Exclude Patterns

For large-scale applications, you may want to control which metadata fields are stored in the indexed columns to optimize storage and write performance. Use metadata_includes and metadata_excludes parameters:

# Only index user-related fields and step counter
checkpointer = CassandraSaver(
    session,
    keyspace='my_checkpoints',
    metadata_includes=["user.*", "step"]  # Only these fields will be indexed
)
checkpointer.setup()

# Alternatively, exclude sensitive fields from indexing
checkpointer = CassandraSaver(
    session,
    keyspace='my_checkpoints',
    metadata_excludes=["*.password", "*.secret", "*.token"]  # These won't be indexed
)
checkpointer.setup()

# Combine both: include user fields but exclude passwords
checkpointer = CassandraSaver(
    session,
    keyspace='my_checkpoints',
    metadata_includes=["user.*", "session.*"],
    metadata_excludes=["*.password", "*.token"]
)
checkpointer.setup()

Pattern matching:

  • Patterns use Unix shell-style wildcards (fnmatch)
  • * matches any sequence of characters: "user.*" matches user.name, user.age, etc.
  • ? matches a single character
  • [seq] matches any character in seq
  • Exact matches: "step" matches only the field step

Priority rules:

  1. If metadata_includes is specified, only fields matching at least one pattern are indexed
  2. Then, metadata_excludes removes matching fields (even if they matched an include pattern)
  3. If both are None (default), all fields are indexed

Important - Server-Side vs Client-Side Filtering:

Fields are automatically split into two categories when filtering:

  • Indexed fields (server-side): Filtered efficiently in Cassandra using SAI indexes

    • All fields by default, or only those matching metadata_includes patterns
    • Excluded if they match metadata_excludes patterns
    • Very fast, scales to large datasets
  • Non-indexed fields (client-side): Filtered in Python after fetching from database

    • Fields excluded by metadata_excludes patterns
    • Fields not matching metadata_includes patterns
    • Complex types (list, dict, set) - always client-side
    • Still works correctly, but less efficient for large result sets

Performance implications:

  • Server-side filters are applied first, minimizing data transfer
  • Client-side filters are applied to results after fetching
  • Use metadata_includes/metadata_excludes to control which fields are indexed
  • This optimizes storage overhead and write performance for high-cardinality metadata

TTL (Time To Live)

Automatically expire old checkpoints:

# Checkpoints will be automatically deleted after 30 days
checkpointer = CassandraSaver(
    session,
    keyspace='my_checkpoints',
    ttl_seconds=2592000  # 30 days
)
checkpointer.setup()

Consistency Levels

Configure read and write consistency for your use case:

from cassandra.query import ConsistencyLevel

# Production: Strong consistency
checkpointer = CassandraSaver(
    session,
    keyspace='my_checkpoints',
    read_consistency=ConsistencyLevel.QUORUM,      # Majority of replicas for reads
    write_consistency=ConsistencyLevel.QUORUM      # Majority of replicas for writes
)

# Default: Balanced consistency (LOCAL_QUORUM)
checkpointer = CassandraSaver(session, keyspace='my_checkpoints')

# Use session default (set read_consistency=None, write_consistency=None)
session.default_consistency_level = ConsistencyLevel.ALL
checkpointer = CassandraSaver(
    session,
    keyspace='my_checkpoints',
    read_consistency=None,   # Use session default
    write_consistency=None   # Use session default
)

Thread ID and Checkpoint ID Types

Choose the data type for thread identifiers:

# Use TEXT (default, most flexible)
checkpointer = CassandraSaver(session, thread_id_type="text")

# Use UUID (enforces UUID format)
checkpointer = CassandraSaver(session, thread_id_type="uuid")

Choose the data type for checkpoint identifiers:

# Use UUID (default, more efficient storage and queries)
checkpointer = CassandraSaver(session, checkpoint_id_type="uuid")

# Use TEXT (stores UUIDv6 as text, useful for compatibility)
checkpointer = CassandraSaver(session, checkpoint_id_type="text")

Note: Checkpoint IDs are always generated as UUIDv6. The checkpoint_id_type parameter only affects the storage format in Cassandra (native UUID vs TEXT column).

Development

See DEVELOPMENT.md for information on setting up a development environment, running tests, and contributing.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

langgraph_checkpoint_cassandra-0.1.1.tar.gz (20.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

langgraph_checkpoint_cassandra-0.1.1-py3-none-any.whl (21.8 kB view details)

Uploaded Python 3

File details

Details for the file langgraph_checkpoint_cassandra-0.1.1.tar.gz.

File metadata

File hashes

Hashes for langgraph_checkpoint_cassandra-0.1.1.tar.gz
Algorithm Hash digest
SHA256 ef768d7f4ac0aff77890c242b323d4695165550e1db07769125263f6c0b4ecbd
MD5 53aedc58f829e4ca8339e83d33728899
BLAKE2b-256 487ff0f8331984378af0119ea0d2a6e9e740a56f5702aa834f11e04778ad3c81

See more details on using hashes here.

File details

Details for the file langgraph_checkpoint_cassandra-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for langgraph_checkpoint_cassandra-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 220ae0e1dd159b4e88460bab1d9f5c4e4cf0faa29518555ee8198ef2b995ac76
MD5 12e192d813563958ace470de8a3de637
BLAKE2b-256 8862446c6ba65cdad3981ce4e28e11aa6f56681f35623173c33546867873056b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page