Skip to main content

LangGraph Checkpoint implementation for Cloudflare D1

Project description

Usage

This package provides both synchronous and asynchronous interfaces for saving and retrieving LangGraph checkpoints in Cloudflare D1.

Synchronous

from langgraph_checkpoint_cloudflare_d1 import CloudflareD1Saver

# Cloudflare credentials
account_id = "your-cloudflare-account-id"
database_id = "your-d1-database-id"
api_token = "your-cloudflare-api-token"

# Configuration for checkpoint operations
write_config = {"configurable": {"thread_id": "1", "checkpoint_ns": ""}}
read_config = {"configurable": {"thread_id": "1"}}

# Initialize the saver with proper credentials
with CloudflareD1Saver(
    account_id=account_id,
    database_id=database_id,
    api_token=api_token
) as checkpointer:
    # Setup the database tables (idempotent operation)
    checkpointer.setup()
    
    # Sample checkpoint data
    checkpoint = {
        "v": 2,
        "ts": "2024-07-31T20:14:19.804150+00:00",
        "id": "1ef4f797-8335-6428-8001-8a1503f9b875",
        "channel_values": {
            "my_key": "meow",
            "node": "node"
        },
        "channel_versions": {
            "__start__": 2,
            "my_key": 3,
            "start:node": 3,
            "node": 3
        },
        "versions_seen": {
            "__input__": {},
            "__start__": {
                "__start__": 1
            },
            "node": {
                "start:node": 2
            }
        },
        "pending_sends": [],
    }
    
    # Store checkpoint
    checkpointer.put(write_config, checkpoint, {}, {})
    
    # Load checkpoint
    loaded_checkpoint = checkpointer.get_tuple(read_config)
    
    # List checkpoints
    checkpoints = list(checkpointer.list(read_config))

You can also use a connection string to initialize the saver:

from langgraph_checkpoint_cloudflare_d1 import CloudflareD1Saver

# Connection string format: "account_id:database_id:api_token"
conn_string = "your-account-id:your-database-id:your-api-token" 

# Use the saver with a connection string
with CloudflareD1Saver.from_conn_string(conn_string) as checkpointer:
    # Your code here
    checkpointer.setup()
    # ...

# Async

```python
from langgraph_checkpoint_cloudflare_d1 import AsyncCloudflareD1Saver
import asyncio

async def main():
    # Cloudflare credentials
    account_id = "your-cloudflare-account-id"
    database_id = "your-d1-database-id"
    api_token = "your-cloudflare-api-token"
    
    # Configuration for checkpoint operations
    write_config = {"configurable": {"thread_id": "1", "checkpoint_ns": ""}}
    read_config = {"configurable": {"thread_id": "1"}}
    
    # Initialize the async saver with proper credentials
    async with AsyncCloudflareD1Saver(
        account_id=account_id,
        database_id=database_id,
        api_token=api_token
    ) as checkpointer:
        # Sample checkpoint data
        checkpoint = {
            "v": 2,
            "ts": "2024-07-31T20:14:19.804150+00:00",
            "id": "1ef4f797-8335-6428-8001-8a1503f9b875",
            "channel_values": {
                "my_key": "meow",
                "node": "node"
            },
            "channel_versions": {
                "__start__": 2,
                "my_key": 3,
                "start:node": 3,
                "node": 3
            },
            "versions_seen": {
                "__input__": {},
                "__start__": {
                    "__start__": 1
                },
                "node": {
                    "start:node": 2
                }
            },
            "pending_sends": [],
        }
        
        # Setup happens automatically but can be called explicitly
        await checkpointer.setup()
        
        # Store checkpoint
        await checkpointer.put(write_config, checkpoint, {}, {})
        
        # Load checkpoint
        loaded_checkpoint = await checkpointer.get_tuple(read_config)
        
        # List checkpoints
        checkpoints = [cp async for cp in checkpointer.list(read_config)]

# For local execution
if __name__ == "__main__":
    asyncio.run(main())

You can also use a connection string to initialize the async saver:

from langgraph_checkpoint_cloudflare_d1 import AsyncCloudflareD1Saver
import asyncio

async def main():
    # Connection string format: "account_id:database_id:api_token"
    conn_string = "your-account-id:your-database-id:your-api-token" 
    
    # Use the async saver with a connection string
    async with await AsyncCloudflareD1Saver.from_conn_string(conn_string) as checkpointer:
        # Your code here
        # Setup is automatic when using context manager
        # ...

# For local execution
if __name__ == "__main__":
    asyncio.run(main())

Integration with LangGraph

To use this checkpoint saver with LangGraph, you can pass it when compiling your graph:

from langgraph.graph import StateGraph
from langgraph_checkpoint_cloudflare_d1 import CloudflareD1Saver

# Create a simple graph
builder = StateGraph(int)
builder.add_node("add_one", lambda x: x + 1)
builder.set_entry_point("add_one")
builder.set_finish_point("add_one")

# Create the checkpoint saver
checkpointer = CloudflareD1Saver(
    account_id="your-account-id",
    database_id="your-database-id",
    api_token="your-api-token"
)
checkpointer.setup()  # Create necessary tables

# Compile the graph with the checkpointer
graph = builder.compile(checkpointer=checkpointer)

# Use the graph with checkpointing
config = {"configurable": {"thread_id": "my-thread-1"}}
result = graph.invoke(3, config)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

langgraph_checkpoint_cloudflare_d1-0.1.0.tar.gz (14.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file langgraph_checkpoint_cloudflare_d1-0.1.0.tar.gz.

File metadata

File hashes

Hashes for langgraph_checkpoint_cloudflare_d1-0.1.0.tar.gz
Algorithm Hash digest
SHA256 2bb738cae271027d24777ae7618925425d7310257c4afc5f1f98bdaef553abf8
MD5 6e14439c8e415a99ff5bc4791c3e0b6d
BLAKE2b-256 35ba43e739f371c2d91c4ce41f5bbebd779e2ced64f38719c296e19b6939d6c8

See more details on using hashes here.

File details

Details for the file langgraph_checkpoint_cloudflare_d1-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for langgraph_checkpoint_cloudflare_d1-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6b69858275d52bc8c58e37c6d63fd81efb6d0725ea4470920b89e820fdabbb35
MD5 e915754b7b6a3e9ecd3e5a49e6571681
BLAKE2b-256 86b230ee2bd4e80875e50ab8ad526cd2833d9ebb0aab75abcc6d93de8a7e4da8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page