Skip to main content

amsdal_langgraph plugin for AMSDAL Framework

Project description

AMSDAL Workflow

Python Version License

A LangGraph checkpoint persistence plugin for the AMSDAL Framework. This plugin enables persistent and recoverable LangGraph workflows by storing checkpoint state in AMSDAL-managed databases.

Features

  • Persistent Workflow State: Store LangGraph checkpoints in any AMSDAL-supported database (SQLite, PostgreSQL, etc.)
  • Dual Mode Support: Both synchronous and asynchronous operations
  • Thread-based Organization: Manage multiple workflow threads with checkpoint namespacing
  • Drop-in Replacement: Compatible with LangGraph's BaseCheckpointSaver interface
  • Production Ready: Built on the robust AMSDAL ORM with comprehensive testing

Installation

Install via pip:

pip install amsdal-workflow

Or with optional dependencies:

# With OpenAI support
pip install amsdal-workflow[openai]

Quick Start

Basic Usage

from langgraph.graph import StateGraph
from amsdal_langgraph.checkpoint import AmsdalCheckpointSaver

# Initialize the checkpoint saver
checkpointer = AmsdalCheckpointSaver()

# Create your LangGraph workflow
workflow = StateGraph(...)
# ... define your workflow nodes and edges ...

# Compile with checkpoint support
app = workflow.compile(checkpointer=checkpointer)

# Run with persistence
config = {'configurable': {'thread_id': 'user-123'}}
result = app.invoke(input_data, config=config)

Async Usage

from amsdal_langgraph.checkpoint import AmsdalCheckpointSaver

# Same checkpointer works for async
checkpointer = AmsdalCheckpointSaver()

# Compile async workflow
app = workflow.compile(checkpointer=checkpointer)

# Run async with persistence
config = {'configurable': {'thread_id': 'user-123'}}
result = await app.ainvoke(input_data, config=config)

Advanced Configuration

from langchain_core.runnables import RunnableConfig
from amsdal_langgraph.checkpoint import AmsdalCheckpointSaver

checkpointer = AmsdalCheckpointSaver()

# Configuration with checkpoint namespace
config: RunnableConfig = {
    'configurable': {
        'thread_id': 'conversation-456',
        'checkpoint_ns': 'production',  # Optional namespace
    }
}

# Run workflow
result = app.invoke(input_data, config=config)

# Resume from checkpoint
checkpoint_tuple = checkpointer.get_tuple(config)
if checkpoint_tuple:
    # Continue from last checkpoint
    result = app.invoke(input_data, config=config)

Architecture

Core Components

  • AmsdalCheckpointSaver: Main class implementing LangGraph's BaseCheckpointSaver protocol
  • Checkpoint Model: Stores checkpoint snapshots with metadata
  • CheckpointWrites Model: Stores pending write operations for each checkpoint

Data Models

Checkpoint

Stores workflow state snapshots:

  • thread_id: Workflow thread identifier
  • checkpoint_ns: Optional namespace for organization
  • checkpoint_id: Unique checkpoint identifier
  • parent_checkpoint_id: Reference to parent checkpoint
  • checkpoint: Serialized checkpoint data
  • meta: Checkpoint metadata

CheckpointWrites

Stores pending writes associated with checkpoints:

  • thread_id, checkpoint_ns, checkpoint_id: Links to checkpoint
  • task_id: Task identifier
  • idx: Write operation index
  • channel: Channel name
  • value: Serialized write value

API Reference

AmsdalCheckpointSaver

Methods

Synchronous Methods
  • get_tuple(config: RunnableConfig) -> CheckpointTuple | None

    • Retrieve a checkpoint tuple by configuration
  • list(config: RunnableConfig | None, *, filter: dict | None = None, before: RunnableConfig | None = None, limit: int | None = None) -> Iterator[CheckpointTuple]

    • List checkpoints with optional filtering
  • put(config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata, new_versions: ChannelVersions) -> RunnableConfig

    • Store a new checkpoint
  • put_writes(config: RunnableConfig, writes: Sequence[tuple[str, Any]], task_id: str) -> None

    • Store pending writes for a checkpoint
  • delete_thread(thread_id: str) -> None

    • Delete all checkpoints and writes for a thread
Asynchronous Methods

All synchronous methods have async equivalents prefixed with a:

  • aget_tuple(...)
  • alist(...)
  • aput(...)
  • aput_writes(...)
  • adelete_thread(...)

Configuration

Database Setup

AMSDAL Workflow uses your existing AMSDAL configuration. Ensure you have configured your database connection:

from amsdal.manager import AmsdalManager

# Initialize AMSDAL
manager = AmsdalManager()
manager.setup()

# Now use AmsdalCheckpointSaver
checkpointer = AmsdalCheckpointSaver()

Migration

The plugin includes migration files for creating the required tables. Run migrations before first use:

amsdal migrate

Development

Setup Development Environment

# Clone the repository
git clone https://github.com/amsdal/amsdal-workflow.git
cd amsdal-workflow

# Install dependencies
hatch run sync

Running Tests

# Run all tests
hatch run test

# Run with coverage
hatch run cov

# Run specific test file
hatch run test tests/test_checkpoint.py

# Run with verbose output
hatch run test -v

Code Quality

# Format code
hatch run fmt

# Check code style
hatch run style

# Run type checking
hatch run typing

# Run all checks
hatch run all

Project Structure

amsdal_langgraph/
├── amsdal_langgraph/           # Main package
│   ├── __init__.py
│   ├── checkpoint.py          # AmsdalCheckpointSaver implementation
│   ├── utils.py               # Utility functions
│   ├── models/                # Data models
│   │   ├── checkpoint.py      # Checkpoint model
│   │   └── checkpoint_writes.py  # CheckpointWrites model
│   └── migrations/            # Database migrations
├── tests/                     # Test suite
│   ├── conftest.py            # Test fixtures
│   └── test_checkpoint.py     # Checkpoint tests
├── pyproject.toml             # Project configuration
├── README.md                  # This file

Contributing

Contributions are welcome! Please follow these steps:

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Make your changes
  4. Run tests and code quality checks
  5. Commit your changes (git commit -m 'Add amazing feature')
  6. Push to the branch (git push origin feature/amazing-feature)
  7. Open a Pull Request

Code Standards

  • Python 3.11+ required
  • Follow PEP 8 style guide (enforced by Ruff)
  • Add type hints to all functions
  • Write tests for new features
  • Maintain test coverage above 90%

License

This project is licensed under the AMSDAL End User License Agreement - see the LICENSE.txt file for details.

Acknowledgments

Support

Changelog

See CHANGELOG.md for a list of changes in each release.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

amsdal_langgraph-0.2.1.tar.gz (259.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

amsdal_langgraph-0.2.1-py3-none-any.whl (30.1 kB view details)

Uploaded Python 3

File details

Details for the file amsdal_langgraph-0.2.1.tar.gz.

File metadata

  • Download URL: amsdal_langgraph-0.2.1.tar.gz
  • Upload date:
  • Size: 259.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: python-httpx/0.28.1

File hashes

Hashes for amsdal_langgraph-0.2.1.tar.gz
Algorithm Hash digest
SHA256 0200a9046faf87a6e42b5b1e4344f58755e46feece91f8ab6754fc7abbd23c55
MD5 ff923ee5737b465c52e5f17422b27d5c
BLAKE2b-256 b6831e7ac81b510d26ee144349cc33092c9c27aba257be054b1f7d95d1b9a823

See more details on using hashes here.

File details

Details for the file amsdal_langgraph-0.2.1-py3-none-any.whl.

File metadata

File hashes

Hashes for amsdal_langgraph-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 d55d3ab18511b3081a79244a17d0520f640832b0c605fb4c259d01df9f8fbcec
MD5 5b9ab05754585b3f1f1cff3256222d60
BLAKE2b-256 005bb30fb469a1701569786d31f46fcfc2e4f175cf83ed2c25b6b89dcc248cf6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page