Skip to main content

Unity Catalog-backed persistence for LangChain and LangGraph

Project description

LangGraph Unity Catalog Checkpoint

Python 3.10+ License

Production-ready Unity Catalog persistence for LangChain and LangGraph applications using Databricks as the storage backend.

Following the LangGraph checkpoint-postgres pattern for consistency with the LangGraph ecosystem.


๐Ÿš€ Overview

This package provides enterprise-grade implementations of LangGraph's persistence interfaces backed by Databricks Unity Catalog:

All implementations use Databricks Unity Catalog Delta tables via the WorkspaceClient SQL API, providing:

  • โœ… Enterprise-grade reliability with ACID transactions
  • โœ… Scalability with Delta Lake optimization
  • โœ… Governance with built-in access control and audit trails
  • โœ… Time-travel for debugging and recovery
  • โœ… Seamless Databricks integration for production ML workflows
  • โœ… Performance optimized with batch operations (2-10x faster)

๐Ÿ“ฆ Installation

Prerequisites

  • Python 3.10+
  • Databricks workspace with Unity Catalog enabled
  • SQL warehouse with appropriate permissions

Install from PyPI

The easiest way to install is from PyPI:

pip install langgraph-unity-catalog-checkpoint

This will automatically install all required dependencies including:

  • databricks-sdk
  • langchain
  • langgraph
  • loguru

Install from Source

For development or to get the latest unreleased features:

# Clone the repository
git clone https://github.com/natefleming/langgraph_unity_catalog_checkpoint.git
cd langgraph_unity_catalog_checkpoint

# Install in editable mode
pip install -e .

# Or with development dependencies
pip install -e ".[dev]"

โšก Quick Start

1. Configure Databricks Authentication

Set up environment variables:

export DATABRICKS_HOST="https://your-workspace.databricks.com"
export DATABRICKS_TOKEN="your-access-token"
export DATABRICKS_WAREHOUSE_ID="your-warehouse-id"
export UC_CATALOG="your_catalog"
export UC_SCHEMA="your_schema"

Or use ~/.databrickscfg:

[DEFAULT]
host = https://your-workspace.databricks.com
token = your-access-token

2. Using the Store for Key-Value Storage

from databricks.sdk import WorkspaceClient
from langgraph_unity_catalog_checkpoint import UnityCatalogStore

# Initialize the store
workspace_client = WorkspaceClient()
store = UnityCatalogStore(
    workspace_client=workspace_client,
    catalog="main",
    schema="langgraph",
    table="my_store",  # Default: "store"
    warehouse_id="your-warehouse-id",  # Optional
)

# Store values with namespaced keys
store.put(("users", "123"), "preferences", {"theme": "dark", "language": "en"})

# Retrieve values
prefs = store.get(("users", "123"), "preferences")
print(prefs)  # {"theme": "dark", "language": "en"}

# Search within a namespace
items = store.search(("users",), limit=10)
for item in items:
    print(f"Key: {item.key}, Namespace: {item.namespace}")

# Delete a key
store.delete(("users", "123"), "preferences")

3. Using the Checkpointer for Graph Persistence

from databricks.sdk import WorkspaceClient
from databricks_langchain import ChatDatabricks
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_core.messages import HumanMessage, BaseMessage
from typing_extensions import TypedDict
from typing import Annotated
from langgraph_unity_catalog_checkpoint import UnityCatalogCheckpointSaver

# Define your graph state
class State(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]

# Create a simple chatbot node
llm = ChatDatabricks(endpoint="databricks-meta-llama-3-3-70b-instruct")

def chatbot(state: State) -> dict:
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

# Create the checkpointer
workspace_client = WorkspaceClient()
checkpointer = UnityCatalogCheckpointSaver(
    workspace_client=workspace_client,
    catalog="main",
    schema="langgraph",
    # Default tables: "checkpoints", "checkpoint_blobs", "checkpoint_writes"
    warehouse_id="your-warehouse-id",  # Optional
)

# Build the graph
graph_builder = StateGraph(State)
graph_builder.add_node("chatbot", chatbot)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)

# Compile with checkpointer for persistence
graph = graph_builder.compile(checkpointer=checkpointer)

# Run conversation with persistence
config = {"configurable": {"thread_id": "conversation_1"}}

# First interaction
result = graph.invoke(
    {"messages": [HumanMessage(content="Hello! What's the weather like?")]},
    config=config
)

# Second interaction - conversation history is maintained!
result = graph.invoke(
    {"messages": [HumanMessage(content="What did I just ask you?")]},
    config=config
)
# The bot remembers the previous question! ๐ŸŽ‰

4. Async Usage for High Performance

from langgraph_unity_catalog_checkpoint import AsyncUnityCatalogCheckpointSaver
import asyncio

# Create async checkpointer
async_checkpointer = AsyncUnityCatalogCheckpointSaver(
    workspace_client=workspace_client,
    catalog="main",
    schema="langgraph",
    warehouse_id="your-warehouse-id",
)

# Async chatbot node
async def async_chatbot(state: State) -> dict:
    response = await llm.ainvoke(state["messages"])
    return {"messages": [response]}

# Build and compile with async checkpointer
graph_builder = StateGraph(State)
graph_builder.add_node("chatbot", async_chatbot)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)
graph = graph_builder.compile(checkpointer=async_checkpointer)

# Run asynchronously
config = {"configurable": {"thread_id": "async_conversation_1"}}
result = await graph.ainvoke(
    {"messages": [HumanMessage(content="Hello async world!")]},
    config=config
)

๐ŸŽฏ Use Cases

1. Conversational AI with Memory

Maintain conversation history across multiple interactions:

# Each user gets their own conversation thread
config = {"configurable": {"thread_id": f"user_{user_id}"}}
graph.invoke({"messages": [HumanMessage(content=user_input)]}, config)

2. Human-in-the-Loop Workflows

Pause execution for human review and resume seamlessly:

# Interrupt before critical nodes
graph = builder.compile(
    checkpointer=checkpointer,
    interrupt_before=["approval_node"]
)

# Execute and pause at approval
result = graph.invoke(input_data, config)

# Human reviews and approves...

# Resume from checkpoint
result = graph.invoke(None, config)  # Continues from where it left off

3. Long-Term Memory with LangMem

Integrate with LangMem for user preferences and memories:

from langchain.agents import create_agent
from langmem.tools import get_langmem_tools

# Create store for LangMem
store = UnityCatalogStore(
    workspace_client=workspace_client,
    catalog="main",
    schema="langgraph",
)

# Get LangMem tools
langmem_tools = get_langmem_tools(store=store)

# Create agent with memory
agent = create_agent(llm, tools + langmem_tools)

# Use with user context
config = {
    "configurable": {
        "langgraph_user_id": "user_123"  # Isolates memories per user
    }
}
agent.invoke({"messages": [HumanMessage(content="I prefer dark mode")]}, config)

4. Production ML Pipelines

Reliable state management for complex workflows:

# Automatic recovery from failures
# Time-travel debugging with Delta Lake
# Full audit trail via Unity Catalog
# Multi-agent coordination with isolated states

๐Ÿ“Š Performance Optimizations

Batch Write Operations (2-10x Faster)

The implementation uses batched SQL operations to minimize round trips to Unity Catalog:

# Instead of N+1 SQL statements:
# - 1 per blob
# - 1 per write
# - 1 checkpoint

# We use just 3 SQL statements:
# - 1 batch for all blobs
# - 1 batch for all writes  
# - 1 for checkpoint

# For a checkpoint with 5 blobs and 3 writes:
# Before: 9 SQL statements
# After: 3 SQL statements
# Speedup: 3x faster! โšก

See docs/CHECKPOINT_BATCH_WRITE_OPTIMIZATION.md for details.


๐Ÿ—๏ธ Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  LangChain/LangGraph Application     โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                 โ†“
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  BaseStore / BaseCheckpointSaver     โ”‚
โ”‚  (LangGraph Interfaces)              โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                 โ†“
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  Unity Catalog Implementation        โ”‚
โ”‚  - UnityCatalogStore                 โ”‚
โ”‚  - UnityCatalogCheckpointSaver       โ”‚
โ”‚  - Async variants                    โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                 โ†“
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  Databricks WorkspaceClient          โ”‚
โ”‚  (SQL Statement Execution API)       โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                 โ†“
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  Unity Catalog Delta Tables          โ”‚
โ”‚  - ACID transactions                 โ”‚
โ”‚  - Time-travel                       โ”‚
โ”‚  - Change Data Feed                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Data Storage

  • Serialization: Checkpoints and values are serialized using LangGraph's JsonPlusSerializer
  • Binary Storage: BINARY columns for efficient blob storage (base64 encoded)
  • JSON Metadata: Structured metadata for filtering and querying
  • Delta Lake: ACID transactions, time-travel, and optimization

Default Table Names

Component Default Tables
Store store
Checkpointer checkpoints, checkpoint_blobs, checkpoint_writes

Tables are automatically created on first use with optimized schemas.


๐Ÿ“š Examples

Complete Jupyter Notebooks

Explore the notebooks/ directory for interactive examples:

Run in Databricks

  1. Upload a notebook to your Databricks workspace
  2. Attach to a cluster with Unity Catalog access
  3. Set the required configuration (catalog, schema, warehouse_id)
  4. Run all cells

๐Ÿ”ง Configuration

Environment Variables

Variable Description Required
DATABRICKS_HOST Workspace URL Yes
DATABRICKS_TOKEN Access token Yes
DATABRICKS_WAREHOUSE_ID SQL warehouse ID No
UC_CATALOG Default catalog name Recommended
UC_SCHEMA Default schema name Recommended

Configuration Precedence

Configuration values are resolved in this order:

  1. Environment variables (highest priority)
  2. Databricks widgets (for notebooks)
  3. Constructor parameters (explicit values)

See docs/CONFIGURATION_PRECEDENCE.md for details.

Warehouse ID

The warehouse_id parameter is optional and defaults to None. If not provided:

  • Uses the default warehouse for the workspace
  • Can be overridden per-operation if needed

๐Ÿ”’ Permissions Required

Ensure your Databricks principal has:

  • USE CATALOG on the target catalog
  • USE SCHEMA on the target schema
  • CREATE TABLE on the target schema (for initialization)
  • SELECT, INSERT, UPDATE, DELETE, MODIFY on the tables

๐Ÿงช Testing

Run Unit Tests

# Run all tests
make test

# Run specific test file
uv run pytest tests/test_unity_catalog_store.py -v

# Run with coverage
uv run pytest --cov=src --cov-report=html

Run Integration Tests

Integration tests require a live Databricks connection:

# Set required environment variables
export DATABRICKS_HOST="..."
export DATABRICKS_TOKEN="..."
export DATABRICKS_WAREHOUSE_ID="..."

# Run integration tests
uv run pytest tests/test_integration.py -v

Linting and Formatting

# Format code
make format

# Run linting
make lint

# Type checking
make type-check

๐Ÿ“– Documentation

Core Documentation

Technical Details

Session Summaries


๐Ÿš€ Features

UnityCatalogStore

  • โœ… Implements langgraph.store.base.BaseStore interface
  • โœ… Batch operations (batch, abatch) for performance
  • โœ… Namespaced key-value storage
  • โœ… Search with filtering and pagination
  • โœ… Automatic table initialization
  • โœ… Sync and async implementations
  • โœ… Compatible with LangMem for long-term memory

UnityCatalogCheckpointSaver

  • โœ… Implements BaseCheckpointSaver interface
  • โœ… Full LangGraph checkpoint persistence
  • โœ… Support for human-in-the-loop workflows
  • โœ… Multi-turn conversation memory
  • โœ… State recovery and time-travel
  • โœ… Pending writes management
  • โœ… Checkpoint listing and filtering
  • โœ… Sync and async implementations
  • โœ… Optimized batch writes (2-10x faster)
  • โœ… Automatic table creation and schema management

๐Ÿ› ๏ธ Development

Setup Development Environment

# Clone the repository
git clone https://github.com/natefleming/langgraph_unity_catalog_checkpoint.git
cd langgraph_unity_catalog_checkpoint

# Create virtual environment
python -m venv .venv
source .venv/bin/activate  # On Windows: .venv\Scripts\activate

# Install with development dependencies
pip install -e ".[dev]"

# Install pre-commit hooks
pre-commit install

Project Structure

langgraph_unity_catalog_checkpoint/
โ”œโ”€โ”€ src/
โ”‚   โ””โ”€โ”€ langgraph_unity_catalog_checkpoint/
โ”‚       โ”œโ”€โ”€ store/              # Store implementations
โ”‚       โ”‚   โ”œโ”€โ”€ unity_catalog.py    # Sync store
โ”‚       โ”‚   โ”œโ”€โ”€ aio.py              # Async store
โ”‚       โ”‚   โ””โ”€โ”€ base.py             # Base store class
โ”‚       โ”œโ”€โ”€ checkpoint/         # Checkpointer implementations
โ”‚       โ”‚   โ”œโ”€โ”€ unity_catalog.py    # Sync checkpointer
โ”‚       โ”‚   โ”œโ”€โ”€ aio.py              # Async checkpointer
โ”‚       โ”‚   โ””โ”€โ”€ base.py             # Base checkpointer class
โ”‚       โ””โ”€โ”€ __init__.py         # Public API exports
โ”œโ”€โ”€ tests/                      # Test suite
โ”œโ”€โ”€ notebooks/                  # Example notebooks
โ”œโ”€โ”€ docs/                       # Documentation
โ”œโ”€โ”€ pyproject.toml             # Project configuration
โ””โ”€โ”€ README.md                  # This file

๐Ÿค Contributing

Contributions are welcome! Please:

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Make your changes with tests
  4. Run the test suite (make test)
  5. Format and lint (make format lint)
  6. Commit your changes (git commit -m 'Add amazing feature')
  7. Push to the branch (git push origin feature/amazing-feature)
  8. Open a Pull Request

๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.


๐Ÿ™ Acknowledgments

Built with:


๐Ÿ“ž Support

For issues and questions:


๐Ÿ—บ๏ธ Roadmap

Planned enhancements:

  • Connection pooling for improved performance
  • Configurable TTL for automatic checkpoint cleanup
  • Metrics and monitoring integration
  • Query optimization hints and caching
  • Support for alternative serialization formats
  • Bulk import/export utilities
  • Multi-region replication support

โšก Quick Links


Made with โค๏ธ for the LangChain community

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

langgraph_unity_catalog_checkpoint-0.0.3.tar.gz (49.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file langgraph_unity_catalog_checkpoint-0.0.3.tar.gz.

File metadata

File hashes

Hashes for langgraph_unity_catalog_checkpoint-0.0.3.tar.gz
Algorithm Hash digest
SHA256 2aaee1c6f58927862cbabfd3a92f34b1b9b49d26f88286ad0c5d27f66806134c
MD5 5131c78561b17df32b4bb50724704db7
BLAKE2b-256 2db6605cf616bbec3a17ff56d381985e1e98fe28c331863f36d58a4262062769

See more details on using hashes here.

File details

Details for the file langgraph_unity_catalog_checkpoint-0.0.3-py3-none-any.whl.

File metadata

File hashes

Hashes for langgraph_unity_catalog_checkpoint-0.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 e68d401ba158df95b04c470a77f29fbe9efd1fac753809da681ea9e53ac5cf39
MD5 2ab32fb92da46a7521fd16b4ec935fdf
BLAKE2b-256 37caa5c06cc648f770262fcb9bf899c2c6fff56597cb8b41c4d0d03521e0e82b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page