LangGraph checkpoint saver for Azure Cosmos DB
Project description
LangGraph Cosmos DB Checkpoint
A LangGraph checkpoint saver implementation for Azure Cosmos DB, providing durable persistence for LangGraph workflow state with both synchronous and asynchronous support.
Note: This implementation is based on langgraph-checkpoint-sqlite, adapted for Azure Cosmos DB.
Features
- Synchronous & Asynchronous APIs: Full support for both sync (
CosmosDBSaver) and async (AsyncCosmosDBSaver) operations - Tip Document Optimization: O(1) access to the latest checkpoint without expensive queries
- Transactional Consistency: Atomic batch operations ensure checkpoint and metadata are always in sync
- Efficient Partitioning: Separate partitions for checkpoints and writes optimize read/write performance
- SQL Injection Prevention: All queries use parameterized inputs for security
- Azure Identity Support: Works with connection strings or
DefaultAzureCredentialfor keyless authentication
Installation
pip install langgraph-checkpoint-cosmos
Requirements
- Python >= 3.11
- Azure Cosmos DB account with a database and container
- Container must have partition key set to
/partition_key
Usage
Synchronous
from langgraph_checkpoint_cosmos import CosmosDBSaver
write_config = {"configurable": {"thread_id": "1", "checkpoint_ns": ""}}
read_config = {"configurable": {"thread_id": "1"}}
# You can also use CosmosDBSaver(container) constructor with a pre-configured container
with CosmosDBSaver.from_conn_info(
endpoint="<your-endpoint>",
credential="<your-key>", # Or use DefaultAzureCredential()
database_name="<your-db>",
container_name="<your-container>"
) as checkpointer:
checkpoint = {
"v": 1,
"ts": "2024-07-31T20:14:19.804150+00:00",
"id": "1ef4f797-8335-6428-8001-8a1503f9b875",
"channel_values": {
"my_key": "meow",
"node": "node"
},
"channel_versions": {
"__start__": 2,
"my_key": 3,
"start:node": 3,
"node": 3
},
"versions_seen": {
"__input__": {},
"__start__": {
"__start__": 1
},
"node": {
"start:node": 2
}
},
"pending_sends": [],
}
# store checkpoint
checkpointer.put(write_config, checkpoint, {}, {})
# load checkpoint
checkpointer.get(read_config)
# list checkpoints
list(checkpointer.list(read_config))
Async
from langgraph_checkpoint_cosmos.aio import AsyncCosmosDBSaver
# You can also use explicit constructor AsyncCosmosDBSaver(container)
async with AsyncCosmosDBSaver.from_conn_info(
endpoint="<your-endpoint>",
credential="<your-key>", # Or use DefaultAzureCredential()
database_name="<your-db>",
container_name="<your-container>"
) as checkpointer:
checkpoint = {
"v": 1,
"ts": "2024-07-31T20:14:19.804150+00:00",
"id": "1ef4f797-8335-6428-8001-8a1503f9b875",
"channel_values": {
"my_key": "meow",
"node": "node"
},
"channel_versions": {
"__start__": 2,
"my_key": 3,
"start:node": 3,
"node": 3
},
"versions_seen": {
"__input__": {},
"__start__": {
"__start__": 1
},
"node": {
"start:node": 2
}
},
"pending_sends": [],
}
# store checkpoint
await checkpointer.aput(write_config, checkpoint, {}, {})
# load checkpoint
await checkpointer.aget(read_config)
# list checkpoints
[c async for c in checkpointer.alist(read_config)]
Using with LangGraph StateGraph
The primary use case is as a checkpointer for LangGraph workflows:
from langgraph.graph import StateGraph
from langgraph_checkpoint_cosmos import CosmosDBSaver
# Define your graph
builder = StateGraph(int)
builder.add_node("add_one", lambda x: x + 1)
builder.set_entry_point("add_one")
builder.set_finish_point("add_one")
# Create checkpointer
with CosmosDBSaver.from_conn_info(
endpoint="https://your-account.documents.azure.com:443/",
credential="your-key",
database_name="langgraph",
container_name="checkpoints"
) as checkpointer:
# Compile graph with checkpointer
graph = builder.compile(checkpointer=checkpointer)
# Run with thread_id for persistence
config = {"configurable": {"thread_id": "user-123"}}
result = graph.invoke(1, config)
# Later, resume from the same thread
state = graph.get_state(config)
print(state.values) # Output: 2
Azure Cosmos DB Setup
- Create an Azure Cosmos DB account (NoSQL API)
- Create a database (e.g.,
langgraph) - Create a container with:
- Partition key:
/partition_key - (Optional) Enable "Delete All Items By Partition Key" for efficient thread deletion
- Partition key:
Using DefaultAzureCredential (Recommended for Production)
from azure.identity import DefaultAzureCredential
with CosmosDBSaver.from_conn_info(
endpoint="https://your-account.documents.azure.com:443/",
credential=DefaultAzureCredential(), # Keyless authentication
database_name="langgraph",
container_name="checkpoints"
) as checkpointer:
...
Environment Variables
For testing, set the following environment variables:
export COSMOS_DB_ENDPOINT="https://your-account.documents.azure.com:443/"
export COSMOS_DB_KEY="your-primary-key"
export COSMOS_DB_NAME="langgraph"
export COSMOS_DB_CONTAINER="checkpoints"
API Reference
CosmosDBSaver (Synchronous)
| Method | Description |
|---|---|
get(config) |
Get checkpoint values (convenience method) |
get_tuple(config) |
Get checkpoint with full metadata |
put(config, checkpoint, metadata, new_versions) |
Save a checkpoint |
list(config, *, filter, before, limit) |
List/search checkpoints |
put_writes(config, writes, task_id) |
Store pending writes |
delete_thread(thread_id) |
Delete all data for a thread |
AsyncCosmosDBSaver (Asynchronous)
| Method | Description |
|---|---|
aget(config) |
Get checkpoint values (async, convenience) |
aget_tuple(config) |
Get checkpoint with full metadata (async) |
aput(config, checkpoint, metadata, new_versions) |
Save a checkpoint (async) |
alist(config, *, filter, before, limit) |
List/search checkpoints (async generator) |
aput_writes(config, writes, task_id) |
Store pending writes (async) |
adelete_thread(thread_id) |
Delete all data for a thread (async) |
The async saver also provides synchronous bridge methods (put, get_tuple, list) that can be called from synchronous code when needed.
Testing
# Install dev dependencies
pip install -e ".[dev]"
# Run tests (requires Cosmos DB connection)
pytest tests/ -v
Acknowledgments
This project is based on langgraph-checkpoint-sqlite from the LangChain team. The core architecture, serialization patterns, and checkpoint management logic were adapted for Azure Cosmos DB.
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file langgraph_checkpoint_cosmos-0.1.1.tar.gz.
File metadata
- Download URL: langgraph_checkpoint_cosmos-0.1.1.tar.gz
- Upload date:
- Size: 44.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.25 {"installer":{"name":"uv","version":"0.9.25","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3424f6c93ee3b3b5e2d1c07e80db62f76b1471b9ff90a1c58115eecf362bff8c
|
|
| MD5 |
fa158e80ebfb3b92ccfb0f7cb607f790
|
|
| BLAKE2b-256 |
f2d909779e14a135ee98942aa773fea3a796df922988834e5906541583f23724
|
File details
Details for the file langgraph_checkpoint_cosmos-0.1.1-py3-none-any.whl.
File metadata
- Download URL: langgraph_checkpoint_cosmos-0.1.1-py3-none-any.whl
- Upload date:
- Size: 25.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.25 {"installer":{"name":"uv","version":"0.9.25","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
89edb0418de5c2c2b8b866f8c6b162795ee3b4bcb06cef291e08e3a946affc84
|
|
| MD5 |
f8342e71e5f979c765bcb362a61673cb
|
|
| BLAKE2b-256 |
2a093c1371ff9a0da904b3ad538360f71875b879b1d8249b654a9069ed0db85e
|