amsdal_langgraph plugin for AMSDAL Framework
Project description
AMSDAL Workflow
A LangGraph checkpoint persistence plugin for the AMSDAL Framework. This plugin enables persistent and recoverable LangGraph workflows by storing checkpoint state in AMSDAL-managed databases.
Features
- Persistent Workflow State: Store LangGraph checkpoints in any AMSDAL-supported database (SQLite, PostgreSQL, etc.)
- Dual Mode Support: Both synchronous and asynchronous operations
- Thread-based Organization: Manage multiple workflow threads with checkpoint namespacing
- Drop-in Replacement: Compatible with LangGraph's
BaseCheckpointSaverinterface - Production Ready: Built on the robust AMSDAL ORM with comprehensive testing
Installation
Install via pip:
pip install amsdal-workflow
Or with optional dependencies:
# With OpenAI support
pip install amsdal-workflow[openai]
Quick Start
Basic Usage
from langgraph.graph import StateGraph
from amsdal_langgraph.checkpoint import AmsdalCheckpointSaver
# Initialize the checkpoint saver
checkpointer = AmsdalCheckpointSaver()
# Create your LangGraph workflow
workflow = StateGraph(...)
# ... define your workflow nodes and edges ...
# Compile with checkpoint support
app = workflow.compile(checkpointer=checkpointer)
# Run with persistence
config = {'configurable': {'thread_id': 'user-123'}}
result = app.invoke(input_data, config=config)
Async Usage
from amsdal_langgraph.checkpoint import AmsdalCheckpointSaver
# Same checkpointer works for async
checkpointer = AmsdalCheckpointSaver()
# Compile async workflow
app = workflow.compile(checkpointer=checkpointer)
# Run async with persistence
config = {'configurable': {'thread_id': 'user-123'}}
result = await app.ainvoke(input_data, config=config)
Advanced Configuration
from langchain_core.runnables import RunnableConfig
from amsdal_langgraph.checkpoint import AmsdalCheckpointSaver
checkpointer = AmsdalCheckpointSaver()
# Configuration with checkpoint namespace
config: RunnableConfig = {
'configurable': {
'thread_id': 'conversation-456',
'checkpoint_ns': 'production', # Optional namespace
}
}
# Run workflow
result = app.invoke(input_data, config=config)
# Resume from checkpoint
checkpoint_tuple = checkpointer.get_tuple(config)
if checkpoint_tuple:
# Continue from last checkpoint
result = app.invoke(input_data, config=config)
Architecture
Core Components
- AmsdalCheckpointSaver: Main class implementing LangGraph's
BaseCheckpointSaverprotocol - Checkpoint Model: Stores checkpoint snapshots with metadata
- CheckpointWrites Model: Stores pending write operations for each checkpoint
Data Models
Checkpoint
Stores workflow state snapshots:
thread_id: Workflow thread identifiercheckpoint_ns: Optional namespace for organizationcheckpoint_id: Unique checkpoint identifierparent_checkpoint_id: Reference to parent checkpointcheckpoint: Serialized checkpoint datameta: Checkpoint metadata
CheckpointWrites
Stores pending writes associated with checkpoints:
thread_id,checkpoint_ns,checkpoint_id: Links to checkpointtask_id: Task identifieridx: Write operation indexchannel: Channel namevalue: Serialized write value
API Reference
AmsdalCheckpointSaver
Methods
Synchronous Methods
-
get_tuple(config: RunnableConfig) -> CheckpointTuple | None- Retrieve a checkpoint tuple by configuration
-
list(config: RunnableConfig | None, *, filter: dict | None = None, before: RunnableConfig | None = None, limit: int | None = None) -> Iterator[CheckpointTuple]- List checkpoints with optional filtering
-
put(config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata, new_versions: ChannelVersions) -> RunnableConfig- Store a new checkpoint
-
put_writes(config: RunnableConfig, writes: Sequence[tuple[str, Any]], task_id: str) -> None- Store pending writes for a checkpoint
-
delete_thread(thread_id: str) -> None- Delete all checkpoints and writes for a thread
Asynchronous Methods
All synchronous methods have async equivalents prefixed with a:
aget_tuple(...)alist(...)aput(...)aput_writes(...)adelete_thread(...)
Configuration
Database Setup
AMSDAL Workflow uses your existing AMSDAL configuration. Ensure you have configured your database connection:
from amsdal.manager import AmsdalManager
# Initialize AMSDAL
manager = AmsdalManager()
manager.setup()
# Now use AmsdalCheckpointSaver
checkpointer = AmsdalCheckpointSaver()
Migration
The plugin includes migration files for creating the required tables. Run migrations before first use:
amsdal migrate
Development
Setup Development Environment
# Clone the repository
git clone https://github.com/amsdal/amsdal-workflow.git
cd amsdal-workflow
# Install dependencies
hatch run sync
Running Tests
# Run all tests
hatch run test
# Run with coverage
hatch run cov
# Run specific test file
hatch run test tests/test_checkpoint.py
# Run with verbose output
hatch run test -v
Code Quality
# Format code
hatch run fmt
# Check code style
hatch run style
# Run type checking
hatch run typing
# Run all checks
hatch run all
Project Structure
amsdal_langgraph/
├── amsdal_langgraph/ # Main package
│ ├── __init__.py
│ ├── checkpoint.py # AmsdalCheckpointSaver implementation
│ ├── utils.py # Utility functions
│ ├── models/ # Data models
│ │ ├── checkpoint.py # Checkpoint model
│ │ └── checkpoint_writes.py # CheckpointWrites model
│ └── migrations/ # Database migrations
├── tests/ # Test suite
│ ├── conftest.py # Test fixtures
│ └── test_checkpoint.py # Checkpoint tests
├── pyproject.toml # Project configuration
├── README.md # This file
Contributing
Contributions are welcome! Please follow these steps:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Make your changes
- Run tests and code quality checks
- Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
Code Standards
- Python 3.11+ required
- Follow PEP 8 style guide (enforced by Ruff)
- Add type hints to all functions
- Write tests for new features
- Maintain test coverage above 90%
License
This project is licensed under the AMSDAL End User License Agreement - see the LICENSE.txt file for details.
Acknowledgments
- Built on LangGraph by LangChain
- Powered by AMSDAL Framework
Support
- Documentation: AMSDAL Docs
- Issues: GitHub Issues
- Discussions: GitHub Discussions
Changelog
See CHANGELOG.md for a list of changes in each release.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file amsdal_langgraph-0.2.2.tar.gz.
File metadata
- Download URL: amsdal_langgraph-0.2.2.tar.gz
- Upload date:
- Size: 259.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: python-httpx/0.28.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b711ff43ba76333894690b0da416c68aad8c9679b76f180b18e06f10ecbf13c6
|
|
| MD5 |
1061aa5ab65534d010721d682653654a
|
|
| BLAKE2b-256 |
625994dba381f0a408b0cddf390e9f7b128643b206ae7f6b9ef1d946bbbaef8b
|
File details
Details for the file amsdal_langgraph-0.2.2-py3-none-any.whl.
File metadata
- Download URL: amsdal_langgraph-0.2.2-py3-none-any.whl
- Upload date:
- Size: 30.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: python-httpx/0.28.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c409a57b0e668ee450c8477987dd2df22b82681c8968e82d3fdaffcf67141c5a
|
|
| MD5 |
6697d17d9481ebfc3e0fc94fb577691a
|
|
| BLAKE2b-256 |
921eb745a7504f59e0a93d06150b62c9ed18e64c50ca7a76a61845f748966675
|