Skip to main content

Add your description here

Project description

LangGraph Wave Orchestrator

A parallel task execution framework built on LangGraph that distributes AI-powered tasks across multiple worker nodes in organized execution waves. Efficiently coordinates complex, multi-step AI workflows while maximizing parallelization and maintaining proper task dependencies.

Features

  • Parallel Wave Execution: Organizes tasks into execution waves for optimal parallel processing
  • Dynamic State Management: Creates flexible worker state handling using Pydantic models
  • Worker Node Management: Manages worker node lifecycle and task distribution
  • Intelligent Task Planning: LLM-powered task decomposition and worker assignment

Installation

Install from PyPI

pip install Lanngraph-Wave-Orchestrator

Install from TestPyPI (for testing)

pip install -i https://test.pypi.org/simple/ Lanngraph-Wave-Orchestrator

Install from Source

git clone https://github.com/benrben/Lanngraph-Wave-Orchestrator.git
cd Lanngraph-Wave-Orchestrator
pip install -e .

Dependencies

The package automatically installs these dependencies:

  • pydantic>=2.0.0 - For data validation and settings management
  • langchain-core>=0.1.0 - Core LangChain functionality
  • langgraph>=0.1.0 - Graph-based LLM application framework
  • python-dotenv>=1.0.0 - Environment variable management

Optional Dependencies

For OpenAI integration (recommended):

pip install langchain-openai

Usage

from langgraph_wave_orchestrator import WaveOrchestrator, WorkerNode
from langchain_openai import ChatOpenAI

# Create LLM and orchestrator
llm = ChatOpenAI(model="gpt-4")
wave_orchestrator = WaveOrchestrator(llm)

# Add worker nodes
wave_orchestrator.add_node(search_node)
wave_orchestrator.add_node(financial_node)

# Compile and use
graph = wave_orchestrator.compile()
result = graph.invoke({"messages": [{"content": "Your query here"}]})

Creating Worker Nodes

1. Define State Model

from pydantic import BaseModel
from typing import List, Annotated
from langchain_core.messages import BaseMessage, add_messages

class SearchModel(BaseModel):
    messages: Annotated[List[BaseMessage], add_messages] = []

2. Create Worker Function

from langchain_core.messages import AIMessage

def search_worker(state):
    # Access the task from state
    task = state.search_state.messages[-1].content
    
    # Process the task (your custom logic here)
    result = f"Search results for: {task}"
    
    # Return updated state
    return {"search_state": {"messages": [AIMessage(content=result)]}}

3. Build and Add Node

search_node = WorkerNode(
    function=search_worker,
    model=SearchModel, 
    state_placeholder="search_state",
    description="search the web for information and current data",
    name="search"
)

wave_orchestrator.add_node(search_node)

Development

Setup

uv sync
pytest tests/

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests
  5. Submit a pull request

Follow PEP 8 style guidelines and include type hints.

License

MIT License - see LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

lanngraph_wave_orchestrator-1.1.7.tar.gz (61.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

lanngraph_wave_orchestrator-1.1.7-py3-none-any.whl (8.7 kB view details)

Uploaded Python 3

File details

Details for the file lanngraph_wave_orchestrator-1.1.7.tar.gz.

File metadata

File hashes

Hashes for lanngraph_wave_orchestrator-1.1.7.tar.gz
Algorithm Hash digest
SHA256 89d6986b66284aaa3a71242fcf7e5dbcf466563de18299b1f9b5befc178eabaf
MD5 4903d31d74fe5993fa45867ac221ec62
BLAKE2b-256 d59f5406d27a7e8f8d8ee0b446d18d1ffdd331acf86f381edc5d21c9b9d716fd

See more details on using hashes here.

File details

Details for the file lanngraph_wave_orchestrator-1.1.7-py3-none-any.whl.

File metadata

File hashes

Hashes for lanngraph_wave_orchestrator-1.1.7-py3-none-any.whl
Algorithm Hash digest
SHA256 14bd57ab3cf2a2289fa0e6f236eec104ca5571c3dc5570adfac07f06594dd0a8
MD5 11da0e63d3aa1e1f153918b34018e1d9
BLAKE2b-256 a2cef7e151fa99190494b8a409d9fb4d5b14484d39413aeb7695a1a855eeaef5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page