Unity Catalog-backed persistence for LangChain and LangGraph
Project description
LangGraph Unity Catalog Checkpoint
Production-ready Unity Catalog persistence for LangChain and LangGraph applications using Databricks as the storage backend.
Following the LangGraph checkpoint-postgres pattern for consistency with the LangGraph ecosystem.
๐ Overview
This package provides enterprise-grade implementations of LangGraph's persistence interfaces backed by Databricks Unity Catalog:
UnityCatalogStore/AsyncUnityCatalogStore: Implementslanggraph.store.base.BaseStorefor key-value storageUnityCatalogCheckpointSaver/AsyncUnityCatalogCheckpointSaver: ImplementsBaseCheckpointSaverfor graph state persistence
All implementations use Databricks Unity Catalog Delta tables via the WorkspaceClient SQL API, providing:
- โ Enterprise-grade reliability with ACID transactions
- โ Scalability with Delta Lake optimization
- โ Governance with built-in access control and audit trails
- โ Time-travel for debugging and recovery
- โ Seamless Databricks integration for production ML workflows
- โ Performance optimized with batch operations (2-10x faster)
๐ฆ Installation
Prerequisites
- Python 3.10+
- Databricks workspace with Unity Catalog enabled
- SQL warehouse with appropriate permissions
Install from PyPI
The easiest way to install is from PyPI:
pip install langgraph-unity-catalog-checkpoint
This will automatically install all required dependencies including:
databricks-sdklangchainlanggraphloguru
Install from Source
For development or to get the latest unreleased features:
# Clone the repository
git clone https://github.com/natefleming/langgraph_unity_catalog_checkpoint.git
cd langgraph_unity_catalog_checkpoint
# Install in editable mode
pip install -e .
# Or with development dependencies
pip install -e ".[dev]"
โก Quick Start
1. Configure Databricks Authentication
Set up environment variables:
export DATABRICKS_HOST="https://your-workspace.databricks.com"
export DATABRICKS_TOKEN="your-access-token"
export DATABRICKS_WAREHOUSE_ID="your-warehouse-id"
export UC_CATALOG="your_catalog"
export UC_SCHEMA="your_schema"
Or use ~/.databrickscfg:
[DEFAULT]
host = https://your-workspace.databricks.com
token = your-access-token
2. Using the Store for Key-Value Storage
from databricks.sdk import WorkspaceClient
from langgraph_unity_catalog_checkpoint import UnityCatalogStore
# Initialize the store
workspace_client = WorkspaceClient()
store = UnityCatalogStore(
workspace_client=workspace_client,
catalog="main",
schema="langgraph",
table="my_store", # Default: "store"
warehouse_id="your-warehouse-id", # Optional
)
# Store values with namespaced keys
store.put(("users", "123"), "preferences", {"theme": "dark", "language": "en"})
# Retrieve values
prefs = store.get(("users", "123"), "preferences")
print(prefs) # {"theme": "dark", "language": "en"}
# Search within a namespace
items = store.search(("users",), limit=10)
for item in items:
print(f"Key: {item.key}, Namespace: {item.namespace}")
# Delete a key
store.delete(("users", "123"), "preferences")
3. Using the Checkpointer for Graph Persistence
from databricks.sdk import WorkspaceClient
from databricks_langchain import ChatDatabricks
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_core.messages import HumanMessage, BaseMessage
from typing_extensions import TypedDict
from typing import Annotated
from langgraph_unity_catalog_checkpoint import UnityCatalogCheckpointSaver
# Define your graph state
class State(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
# Create a simple chatbot node
llm = ChatDatabricks(endpoint="databricks-meta-llama-3-3-70b-instruct")
def chatbot(state: State) -> dict:
response = llm.invoke(state["messages"])
return {"messages": [response]}
# Create the checkpointer
workspace_client = WorkspaceClient()
checkpointer = UnityCatalogCheckpointSaver(
workspace_client=workspace_client,
catalog="main",
schema="langgraph",
# Default tables: "checkpoints", "checkpoint_blobs", "checkpoint_writes"
warehouse_id="your-warehouse-id", # Optional
)
# Build the graph
graph_builder = StateGraph(State)
graph_builder.add_node("chatbot", chatbot)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)
# Compile with checkpointer for persistence
graph = graph_builder.compile(checkpointer=checkpointer)
# Run conversation with persistence
config = {"configurable": {"thread_id": "conversation_1"}}
# First interaction
result = graph.invoke(
{"messages": [HumanMessage(content="Hello! What's the weather like?")]},
config=config
)
# Second interaction - conversation history is maintained!
result = graph.invoke(
{"messages": [HumanMessage(content="What did I just ask you?")]},
config=config
)
# The bot remembers the previous question! ๐
4. Async Usage for High Performance
from langgraph_unity_catalog_checkpoint import AsyncUnityCatalogCheckpointSaver
import asyncio
# Create async checkpointer
async_checkpointer = AsyncUnityCatalogCheckpointSaver(
workspace_client=workspace_client,
catalog="main",
schema="langgraph",
warehouse_id="your-warehouse-id",
)
# Async chatbot node
async def async_chatbot(state: State) -> dict:
response = await llm.ainvoke(state["messages"])
return {"messages": [response]}
# Build and compile with async checkpointer
graph_builder = StateGraph(State)
graph_builder.add_node("chatbot", async_chatbot)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)
graph = graph_builder.compile(checkpointer=async_checkpointer)
# Run asynchronously
config = {"configurable": {"thread_id": "async_conversation_1"}}
result = await graph.ainvoke(
{"messages": [HumanMessage(content="Hello async world!")]},
config=config
)
๐ฏ Use Cases
1. Conversational AI with Memory
Maintain conversation history across multiple interactions:
# Each user gets their own conversation thread
config = {"configurable": {"thread_id": f"user_{user_id}"}}
graph.invoke({"messages": [HumanMessage(content=user_input)]}, config)
2. Human-in-the-Loop Workflows
Pause execution for human review and resume seamlessly:
# Interrupt before critical nodes
graph = builder.compile(
checkpointer=checkpointer,
interrupt_before=["approval_node"]
)
# Execute and pause at approval
result = graph.invoke(input_data, config)
# Human reviews and approves...
# Resume from checkpoint
result = graph.invoke(None, config) # Continues from where it left off
3. Long-Term Memory with LangMem
Integrate with LangMem for user preferences and memories:
from langchain.agents import create_agent
from langmem.tools import get_langmem_tools
# Create store for LangMem
store = UnityCatalogStore(
workspace_client=workspace_client,
catalog="main",
schema="langgraph",
)
# Get LangMem tools
langmem_tools = get_langmem_tools(store=store)
# Create agent with memory
agent = create_agent(llm, tools + langmem_tools)
# Use with user context
config = {
"configurable": {
"langgraph_user_id": "user_123" # Isolates memories per user
}
}
agent.invoke({"messages": [HumanMessage(content="I prefer dark mode")]}, config)
4. Production ML Pipelines
Reliable state management for complex workflows:
# Automatic recovery from failures
# Time-travel debugging with Delta Lake
# Full audit trail via Unity Catalog
# Multi-agent coordination with isolated states
๐ Performance Optimizations
Batch Write Operations (2-10x Faster)
The implementation uses batched SQL operations to minimize round trips to Unity Catalog:
# Instead of N+1 SQL statements:
# - 1 per blob
# - 1 per write
# - 1 checkpoint
# We use just 3 SQL statements:
# - 1 batch for all blobs
# - 1 batch for all writes
# - 1 for checkpoint
# For a checkpoint with 5 blobs and 3 writes:
# Before: 9 SQL statements
# After: 3 SQL statements
# Speedup: 3x faster! โก
See docs/CHECKPOINT_BATCH_WRITE_OPTIMIZATION.md for details.
๐๏ธ Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ LangChain/LangGraph Application โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ BaseStore / BaseCheckpointSaver โ
โ (LangGraph Interfaces) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Unity Catalog Implementation โ
โ - UnityCatalogStore โ
โ - UnityCatalogCheckpointSaver โ
โ - Async variants โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Databricks WorkspaceClient โ
โ (SQL Statement Execution API) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Unity Catalog Delta Tables โ
โ - ACID transactions โ
โ - Time-travel โ
โ - Change Data Feed โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Data Storage
- Serialization: Checkpoints and values are serialized using LangGraph's
JsonPlusSerializer - Binary Storage: BINARY columns for efficient blob storage (base64 encoded)
- JSON Metadata: Structured metadata for filtering and querying
- Delta Lake: ACID transactions, time-travel, and optimization
Default Table Names
| Component | Default Tables |
|---|---|
| Store | store |
| Checkpointer | checkpoints, checkpoint_blobs, checkpoint_writes |
Tables are automatically created on first use with optimized schemas.
๐ Examples
Complete Jupyter Notebooks
Explore the notebooks/ directory for interactive examples:
store_example.ipynb- Store operations and LangMem integrationcheckpointer_example.ipynb- Synchronous graph checkpointingasync_checkpointer_example.ipynb- Async graph execution
Run in Databricks
- Upload a notebook to your Databricks workspace
- Attach to a cluster with Unity Catalog access
- Set the required configuration (catalog, schema, warehouse_id)
- Run all cells
๐ง Configuration
Environment Variables
| Variable | Description | Required |
|---|---|---|
DATABRICKS_HOST |
Workspace URL | Yes |
DATABRICKS_TOKEN |
Access token | Yes |
DATABRICKS_WAREHOUSE_ID |
SQL warehouse ID | No |
UC_CATALOG |
Default catalog name | Recommended |
UC_SCHEMA |
Default schema name | Recommended |
Configuration Precedence
Configuration values are resolved in this order:
- Environment variables (highest priority)
- Databricks widgets (for notebooks)
- Constructor parameters (explicit values)
See docs/CONFIGURATION_PRECEDENCE.md for details.
Warehouse ID
The warehouse_id parameter is optional and defaults to None. If not provided:
- Uses the default warehouse for the workspace
- Can be overridden per-operation if needed
๐ Permissions Required
Ensure your Databricks principal has:
USE CATALOGon the target catalogUSE SCHEMAon the target schemaCREATE TABLEon the target schema (for initialization)SELECT,INSERT,UPDATE,DELETE,MODIFYon the tables
๐งช Testing
Run Unit Tests
# Run all tests
make test
# Run specific test file
uv run pytest tests/test_unity_catalog_store.py -v
# Run with coverage
uv run pytest --cov=src --cov-report=html
Run Integration Tests
Integration tests require a live Databricks connection:
# Set required environment variables
export DATABRICKS_HOST="..."
export DATABRICKS_TOKEN="..."
export DATABRICKS_WAREHOUSE_ID="..."
# Run integration tests
uv run pytest tests/test_integration.py -v
Linting and Formatting
# Format code
make format
# Run linting
make lint
# Type checking
make type-check
๐ Documentation
Core Documentation
- Usage Guide - Comprehensive usage examples
- Implementation Summary - Technical architecture
- Environment Setup - Development environment
- Quick Start - Getting started guide
- Install Guide - Installation instructions
Technical Details
- Checkpoint Batch Write Optimization - Performance optimization details
- Configuration Precedence - Configuration resolution
- Default Table Names - Table naming conventions
- MLflow Autolog Setup - Observability with MLflow
- Logging - Logging configuration
Session Summaries
๐ Features
UnityCatalogStore
- โ
Implements
langgraph.store.base.BaseStoreinterface - โ
Batch operations (
batch,abatch) for performance - โ Namespaced key-value storage
- โ Search with filtering and pagination
- โ Automatic table initialization
- โ Sync and async implementations
- โ Compatible with LangMem for long-term memory
UnityCatalogCheckpointSaver
- โ
Implements
BaseCheckpointSaverinterface - โ Full LangGraph checkpoint persistence
- โ Support for human-in-the-loop workflows
- โ Multi-turn conversation memory
- โ State recovery and time-travel
- โ Pending writes management
- โ Checkpoint listing and filtering
- โ Sync and async implementations
- โ Optimized batch writes (2-10x faster)
- โ Automatic table creation and schema management
๐ ๏ธ Development
Setup Development Environment
# Clone the repository
git clone https://github.com/natefleming/langgraph_unity_catalog_checkpoint.git
cd langgraph_unity_catalog_checkpoint
# Create virtual environment
python -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
# Install with development dependencies
pip install -e ".[dev]"
# Install pre-commit hooks
pre-commit install
Project Structure
langgraph_unity_catalog_checkpoint/
โโโ src/
โ โโโ langgraph_unity_catalog_checkpoint/
โ โโโ store/ # Store implementations
โ โ โโโ unity_catalog.py # Sync store
โ โ โโโ aio.py # Async store
โ โ โโโ base.py # Base store class
โ โโโ checkpoint/ # Checkpointer implementations
โ โ โโโ unity_catalog.py # Sync checkpointer
โ โ โโโ aio.py # Async checkpointer
โ โ โโโ base.py # Base checkpointer class
โ โโโ __init__.py # Public API exports
โโโ tests/ # Test suite
โโโ notebooks/ # Example notebooks
โโโ docs/ # Documentation
โโโ pyproject.toml # Project configuration
โโโ README.md # This file
๐ค Contributing
Contributions are welcome! Please:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Make your changes with tests
- Run the test suite (
make test) - Format and lint (
make format lint) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
๐ License
This project is licensed under the MIT License - see the LICENSE file for details.
๐ Acknowledgments
Built with:
- LangChain - Framework for LLM applications
- LangGraph - Graph-based agent framework
- LangMem - Long-term memory for agents
- Databricks SDK - Databricks API client
- Unity Catalog - Data governance platform
๐ Support
For issues and questions:
- GitHub Issues: Open an issue
- Documentation: Check the
docs/directory - Examples: Review the
notebooks/directory
๐บ๏ธ Roadmap
Planned enhancements:
- Connection pooling for improved performance
- Configurable TTL for automatic checkpoint cleanup
- Metrics and monitoring integration
- Query optimization hints and caching
- Support for alternative serialization formats
- Bulk import/export utilities
- Multi-region replication support
โก Quick Links
- Quick Start Guide - Get started in 5 minutes
- Usage Examples - Detailed usage patterns
- Notebooks - Interactive examples
- API Reference - Technical details
Made with โค๏ธ for the LangChain community
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file langgraph_unity_catalog_checkpoint-0.0.3.tar.gz.
File metadata
- Download URL: langgraph_unity_catalog_checkpoint-0.0.3.tar.gz
- Upload date:
- Size: 49.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2aaee1c6f58927862cbabfd3a92f34b1b9b49d26f88286ad0c5d27f66806134c
|
|
| MD5 |
5131c78561b17df32b4bb50724704db7
|
|
| BLAKE2b-256 |
2db6605cf616bbec3a17ff56d381985e1e98fe28c331863f36d58a4262062769
|
File details
Details for the file langgraph_unity_catalog_checkpoint-0.0.3-py3-none-any.whl.
File metadata
- Download URL: langgraph_unity_catalog_checkpoint-0.0.3-py3-none-any.whl
- Upload date:
- Size: 40.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e68d401ba158df95b04c470a77f29fbe9efd1fac753809da681ea9e53ac5cf39
|
|
| MD5 |
2ab32fb92da46a7521fd16b4ec935fdf
|
|
| BLAKE2b-256 |
37caa5c06cc648f770262fcb9bf899c2c6fff56597cb8b41c4d0d03521e0e82b
|