Skip to main content

LangGraph checkpoint saver for Azure Cosmos DB

Project description

LangGraph Cosmos DB Checkpoint

A LangGraph checkpoint saver implementation for Azure Cosmos DB, providing durable persistence for LangGraph workflow state with both synchronous and asynchronous support.

Note: This implementation is based on langgraph-checkpoint-sqlite, adapted for Azure Cosmos DB.

Features

  • Synchronous & Asynchronous APIs: Full support for both sync (CosmosDBSaver) and async (AsyncCosmosDBSaver) operations
  • Tip Document Optimization: O(1) access to the latest checkpoint without expensive queries
  • Transactional Consistency: Atomic batch operations ensure checkpoint and metadata are always in sync
  • Efficient Partitioning: Separate partitions for checkpoints and writes optimize read/write performance
  • SQL Injection Prevention: All queries use parameterized inputs for security
  • Azure Identity Support: Works with connection strings or DefaultAzureCredential for keyless authentication

Installation

pip install langgraph-checkpoint-cosmos

Requirements

  • Python >= 3.11
  • Azure Cosmos DB account with a database and container
  • Container must have partition key set to /partition_key

Usage

Synchronous

from langgraph_checkpoint_cosmos import CosmosDBSaver

write_config = {"configurable": {"thread_id": "1", "checkpoint_ns": ""}}
read_config = {"configurable": {"thread_id": "1"}}

# You can also use CosmosDBSaver(container) constructor with a pre-configured container
with CosmosDBSaver.from_conn_info(
    endpoint="<your-endpoint>",
    credential="<your-key>",  # Or use DefaultAzureCredential()
    database_name="<your-db>",
    container_name="<your-container>"
) as checkpointer:
    checkpoint = {
        "v": 1,
        "ts": "2024-07-31T20:14:19.804150+00:00",
        "id": "1ef4f797-8335-6428-8001-8a1503f9b875",
        "channel_values": {
            "my_key": "meow",
            "node": "node"
        },
        "channel_versions": {
            "__start__": 2,
            "my_key": 3,
            "start:node": 3,
            "node": 3
        },
        "versions_seen": {
            "__input__": {},
            "__start__": {
                "__start__": 1
            },
            "node": {
                "start:node": 2
            }
        },
        "pending_sends": [], 
    }

    # store checkpoint
    checkpointer.put(write_config, checkpoint, {}, {})

    # load checkpoint
    checkpointer.get(read_config)

    # list checkpoints
    list(checkpointer.list(read_config))

Async

from langgraph_checkpoint_cosmos.aio import AsyncCosmosDBSaver

# You can also use explicit constructor AsyncCosmosDBSaver(container) 
async with AsyncCosmosDBSaver.from_conn_info(
    endpoint="<your-endpoint>",
    credential="<your-key>",  # Or use DefaultAzureCredential()
    database_name="<your-db>",
    container_name="<your-container>"
) as checkpointer:
    checkpoint = {
        "v": 1,
        "ts": "2024-07-31T20:14:19.804150+00:00",
        "id": "1ef4f797-8335-6428-8001-8a1503f9b875",
        "channel_values": {
            "my_key": "meow",
            "node": "node"
        },
        "channel_versions": {
            "__start__": 2,
            "my_key": 3,
            "start:node": 3,
            "node": 3
        },
        "versions_seen": {
            "__input__": {},
            "__start__": {
                "__start__": 1
            },
            "node": {
                "start:node": 2
            }
        },
        "pending_sends": [],
    }

    # store checkpoint
    await checkpointer.aput(write_config, checkpoint, {}, {})

    # load checkpoint
    await checkpointer.aget(read_config)

    # list checkpoints
    [c async for c in checkpointer.alist(read_config)]

Using with LangGraph StateGraph

The primary use case is as a checkpointer for LangGraph workflows:

from langgraph.graph import StateGraph
from langgraph_checkpoint_cosmos import CosmosDBSaver

# Define your graph
builder = StateGraph(int)
builder.add_node("add_one", lambda x: x + 1)
builder.set_entry_point("add_one")
builder.set_finish_point("add_one")

# Create checkpointer
with CosmosDBSaver.from_conn_info(
    endpoint="https://your-account.documents.azure.com:443/",
    credential="your-key",
    database_name="langgraph",
    container_name="checkpoints"
) as checkpointer:
    # Compile graph with checkpointer
    graph = builder.compile(checkpointer=checkpointer)
    
    # Run with thread_id for persistence
    config = {"configurable": {"thread_id": "user-123"}}
    result = graph.invoke(1, config)
    
    # Later, resume from the same thread
    state = graph.get_state(config)
    print(state.values)  # Output: 2

Azure Cosmos DB Setup

  1. Create an Azure Cosmos DB account (NoSQL API)
  2. Create a database (e.g., langgraph)
  3. Create a container with:
    • Partition key: /partition_key
    • (Optional) Enable "Delete All Items By Partition Key" for efficient thread deletion

Using DefaultAzureCredential (Recommended for Production)

from azure.identity import DefaultAzureCredential

with CosmosDBSaver.from_conn_info(
    endpoint="https://your-account.documents.azure.com:443/",
    credential=DefaultAzureCredential(),  # Keyless authentication
    database_name="langgraph",
    container_name="checkpoints"
) as checkpointer:
    ...

Environment Variables

For testing, set the following environment variables:

export COSMOS_DB_ENDPOINT="https://your-account.documents.azure.com:443/"
export COSMOS_DB_KEY="your-primary-key"
export COSMOS_DB_NAME="langgraph"
export COSMOS_DB_CONTAINER="checkpoints"

API Reference

CosmosDBSaver (Synchronous)

Method Description
get(config) Get checkpoint values (convenience method)
get_tuple(config) Get checkpoint with full metadata
put(config, checkpoint, metadata, new_versions) Save a checkpoint
list(config, *, filter, before, limit) List/search checkpoints
put_writes(config, writes, task_id) Store pending writes
delete_thread(thread_id) Delete all data for a thread

AsyncCosmosDBSaver (Asynchronous)

Method Description
aget(config) Get checkpoint values (async, convenience)
aget_tuple(config) Get checkpoint with full metadata (async)
aput(config, checkpoint, metadata, new_versions) Save a checkpoint (async)
alist(config, *, filter, before, limit) List/search checkpoints (async generator)
aput_writes(config, writes, task_id) Store pending writes (async)
adelete_thread(thread_id) Delete all data for a thread (async)

The async saver also provides synchronous bridge methods (put, get_tuple, list) that can be called from synchronous code when needed.

Testing

# Install dev dependencies
pip install -e ".[dev]"

# Run tests (requires Cosmos DB connection)
pytest tests/ -v

Acknowledgments

This project is based on langgraph-checkpoint-sqlite from the LangChain team. The core architecture, serialization patterns, and checkpoint management logic were adapted for Azure Cosmos DB.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

langgraph_checkpoint_cosmos-0.1.1.tar.gz (44.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

langgraph_checkpoint_cosmos-0.1.1-py3-none-any.whl (25.9 kB view details)

Uploaded Python 3

File details

Details for the file langgraph_checkpoint_cosmos-0.1.1.tar.gz.

File metadata

  • Download URL: langgraph_checkpoint_cosmos-0.1.1.tar.gz
  • Upload date:
  • Size: 44.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.25 {"installer":{"name":"uv","version":"0.9.25","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for langgraph_checkpoint_cosmos-0.1.1.tar.gz
Algorithm Hash digest
SHA256 3424f6c93ee3b3b5e2d1c07e80db62f76b1471b9ff90a1c58115eecf362bff8c
MD5 fa158e80ebfb3b92ccfb0f7cb607f790
BLAKE2b-256 f2d909779e14a135ee98942aa773fea3a796df922988834e5906541583f23724

See more details on using hashes here.

File details

Details for the file langgraph_checkpoint_cosmos-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: langgraph_checkpoint_cosmos-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 25.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.25 {"installer":{"name":"uv","version":"0.9.25","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for langgraph_checkpoint_cosmos-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 89edb0418de5c2c2b8b866f8c6b162795ee3b4bcb06cef291e08e3a946affc84
MD5 f8342e71e5f979c765bcb362a61673cb
BLAKE2b-256 2a093c1371ff9a0da904b3ad538360f71875b879b1d8249b654a9069ed0db85e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page