Add your description here
Project description
LangGraph Wave Orchestrator
A parallel task execution framework built on LangGraph that distributes AI-powered tasks across multiple worker nodes in organized execution waves. Efficiently coordinates complex, multi-step AI workflows while maximizing parallelization and maintaining proper task dependencies.
Features
- Parallel Wave Execution: Organizes tasks into execution waves for optimal parallel processing
- Dynamic State Management: Creates flexible worker state handling using Pydantic models
- Worker Node Management: Manages worker node lifecycle and task distribution
- Intelligent Task Planning: LLM-powered task decomposition and worker assignment
Installation
Install from PyPI
pip install Lanngraph-Wave-Orchestrator
Install from TestPyPI (for testing)
pip install -i https://test.pypi.org/simple/ Lanngraph-Wave-Orchestrator
Install from Source
git clone https://github.com/benrben/Lanngraph-Wave-Orchestrator.git
cd Lanngraph-Wave-Orchestrator
pip install -e .
Dependencies
The package automatically installs these dependencies:
pydantic>=2.0.0- For data validation and settings managementlangchain-core>=0.1.0- Core LangChain functionalitylanggraph>=0.1.0- Graph-based LLM application frameworkpython-dotenv>=1.0.0- Environment variable management
Optional Dependencies
For OpenAI integration (recommended):
pip install langchain-openai
Usage
from langgraph_wave_orchestrator import WaveOrchestrator, WorkerNode
from langchain_openai import ChatOpenAI
# Create LLM and orchestrator
llm = ChatOpenAI(model="gpt-4")
wave_orchestrator = WaveOrchestrator(llm)
# Add worker nodes
wave_orchestrator.add_node(search_node)
wave_orchestrator.add_node(financial_node)
# Compile and use
graph = wave_orchestrator.compile()
result = graph.invoke({"messages": [{"content": "Your query here"}]})
Creating Worker Nodes
1. Define State Model
from pydantic import BaseModel
from typing import List, Annotated
from langchain_core.messages import BaseMessage, add_messages
class SearchModel(BaseModel):
messages: Annotated[List[BaseMessage], add_messages] = []
2. Create Worker Function
from langchain_core.messages import AIMessage
def search_worker(state):
# Access the task from state
task = state.search_state.messages[-1].content
# Process the task (your custom logic here)
result = f"Search results for: {task}"
# Return updated state
return {"search_state": {"messages": [AIMessage(content=result)]}}
3. Build and Add Node
search_node = WorkerNode(
function=search_worker,
model=SearchModel,
state_placeholder="search_state",
description="search the web for information and current data",
name="search"
)
wave_orchestrator.add_node(search_node)
Development
Setup
uv sync
pytest tests/
Contributing
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests
- Submit a pull request
Follow PEP 8 style guidelines and include type hints.
License
MIT License - see LICENSE file for details.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file lanngraph_wave_orchestrator-1.1.8.tar.gz.
File metadata
- Download URL: lanngraph_wave_orchestrator-1.1.8.tar.gz
- Upload date:
- Size: 78.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
297a98636beb69cca59f46ca48e36be6dfaeff9ad612cc63ef92dcaf37164a78
|
|
| MD5 |
3b8117065e581faee4580223418f1d92
|
|
| BLAKE2b-256 |
7b1a7e68507475f6321441bbcaa607eaf4799fbed0ec70e50bdae49947320413
|
File details
Details for the file lanngraph_wave_orchestrator-1.1.8-py3-none-any.whl.
File metadata
- Download URL: lanngraph_wave_orchestrator-1.1.8-py3-none-any.whl
- Upload date:
- Size: 9.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
caed3201802483bcb5b544bec8e5d0bd86f5f47c4de059e19b9e407ad8c66f5c
|
|
| MD5 |
5a01f68e4fbfe5bd432f1f82f903ec1b
|
|
| BLAKE2b-256 |
ae09942e8e399dd2313753c8563c9fdf2b57db34a240b433ad2b597620f84b89
|