A lightweight graph approach to LLM workflows.
Project description
Overview
primeGraph is a Python library for building and executing workflows using graphs, ranging from simple sequential processes to complex parallel execution patterns. While originally optimized for AI applications, its flexible architecture makes it suitable for any workflow orchestration needs.
Key principles:
- Flexibility First: Design your nodes and execution patterns with complete freedom.
- Zero Lock-in: Deploy and run workflows however you want, with no vendor dependencies.
- Opinionated Yet Adaptable: Structured foundations with room for customization.
Note from the author: This project came to life through my experience creating AI applications. I want to acknowledge langgraph as the main inspiration for this project. As an individual developer, I wanted to gain experience creating my own workflow engine to implement more of my own ideas and learnings. At the same time, I also wanted to create a framework that is flexible enough for others to deploy their apps however they want, as this is an open source project. So feel free to use it, modify it, and contribute to it.
Features
- Flexible Graph Construction: Build multiple workflows with sequential and parallel execution paths.
- State Management: Built-in state management with various buffer types to coordinate state updates during execution.
- Type Safety: Uses Pydantic for enforcing shared state types across nodes.
- Router Nodes: Dynamically choose execution paths based on node outputs.
- Repeatable Nodes: Execute nodes multiple times either in parallel or sequence.
- Subgraphs: Compose graphs of subgraphs to design complex workflows.
- LLM Tool Integration: Specialized nodes for LLM interaction with tool/function calling capabilities.
- Tool Pause/Resume: Ability to pause execution before specific tool calls for human review and approval.
- Persistence: Save and resume execution using checkpoint storage (supports memory and Postgres).
- Async Support: Uses async/await for non-blocking execution with engine methods
execute()andresume(). - Flow Control: Supports human-in-the-loop interactions by pausing and resuming workflows.
- Visualization: Generate visual representations of your workflows with minimal effort.
- Web Integration: Integrate with FastAPI and WebSockets for interactive workflows.
Installation
pip install primeGraph
[Optional] Install Graphviz for visualization
To have the graph.visualize() method work, install the Graphviz binary:
https://graphviz.org/download/
Core Features
The Basics
import asyncio
from primeGraph import Graph, START, END
from primeGraph.models import GraphState
from primeGraph.buffer import History, LastValue, Incremental
# Define your state with appropriate buffer types
class DocumentProcessingState(GraphState):
processed_files: History[str] # Stores all returned file names
current_status: LastValue[str] # Keeps the last status value
number_of_executed_steps: Incremental[int] # Increments with each step
# Initialize state
state = DocumentProcessingState(
processed_files=[],
current_status="initializing",
number_of_executed_steps=0
)
# Create a graph with the state
graph = Graph(state=state)
# Adding nodes to the graph
'to simulate the workflow
@graph.node()
def load_documents(state):
return {
"processed_files": "document1.txt",
"current_status": "loading",
"number_of_executed_steps": 1
}
@graph.node()
def validate_documents(state):
return {
"current_status": "validating",
"number_of_executed_steps": 1
}
@graph.node()
def process_documents(state):
return {
"current_status": "completed",
"number_of_executed_steps": 1
}
# Connect nodes
graph.add_edge(START, "load_documents")
graph.add_edge("load_documents", "validate_documents")
graph.add_edge("validate_documents", "process_documents")
graph.add_edge("process_documents", END)
# Compile the graph
graph.compile()
# Execute the graph asynchronously using the new engine methods
async def run_graph():
await graph.execute()
print(state)
graph.visualize()
asyncio.run(run_graph())
Router Nodes
import asyncio
from primeGraph import Graph, START, END
graph = Graph()
@graph.node()
def load_documents(state):
return {
"processed_files": "document1.txt",
"current_status": "loading",
"number_of_executed_steps": 1
}
@graph.node()
def validate_documents(state):
return {
"current_status": "validating",
"number_of_executed_steps": 1
}
@graph.node()
def process_documents(state):
return {
"current_status": "completed",
"number_of_executed_steps": 1
}
@graph.node()
def route_documents(state):
if "invoice" in state.current_status:
return "process_invoice"
return "cancel_invoice"
@graph.node()
def process_invoice(state):
return {"current_status": "invoice_processed"}
@graph.node()
def cancel_invoice(state):
return {"current_status": "invoice_cancelled"}
# Connect nodes and define router edges
graph.add_edge(START, "load_documents")
graph.add_edge("load_documents", "validate_documents")
graph.add_edge("validate_documents", "process_documents")
# Router node is connected as edge from process_documents
graph.add_router_edge("process_documents", "route_documents")
graph.add_edge("process_invoice", END)
graph.add_edge("cancel_invoice", END)
# Compile and execute
graph.compile()
async def run_router():
await graph.execute()
print(state)
graph.visualize()
import asyncio
asyncio.run(run_router())
Repeatable Nodes
import asyncio
from primeGraph import Graph, START, END
graph = Graph()
@graph.node()
def load_documents(state):
return {
"processed_files": "document1.txt",
"current_status": "loading",
"number_of_executed_steps": 1
}
@graph.node()
def validate_documents(state):
return {
"current_status": "validating",
"number_of_executed_steps": 1
}
@graph.node()
def process_documents(state):
return {
"current_status": "processing",
"number_of_executed_steps": 1
}
@graph.node()
def repeating_process_batch(state):
return {
"processed_files": f"batch_{state.number_of_executed_steps}",
"number_of_executed_steps": 1
}
@graph.node()
def conclude_documents(state):
return {
"current_status": "completed",
"number_of_executed_steps": 1
}
# Connect nodes
graph.add_edge(START, "load_documents")
graph.add_edge("load_documents", "validate_documents")
graph.add_edge("validate_documents", "process_documents")
# Add repeating edge for processing multiple batches
graph.add_repeating_edge(
"process_documents",
"repeating_process_batch",
"conclude_documents",
repeat=3,
parallel=True
)
graph.add_edge("conclude_documents", END)
graph.compile()
async def run_repeatable():
await graph.execute()
print(state)
graph.visualize()
import asyncio
asyncio.run(run_repeatable())
Subgraphs
import asyncio
from primeGraph import Graph, START, END
state = ... # initialize your state appropriately
main_graph = Graph(state=state)
@main_graph.node()
def load_documents(state):
return {
"processed_files": "document1.txt",
"current_status": "loading",
"number_of_executed_steps": 1
}
@main_graph.subgraph()
def validation_subgraph():
subgraph = Graph(state=state)
@subgraph.node()
def check_format(state):
return {"current_status": "checking_format"}
@subgraph.node()
def verify_content(state):
return {"current_status": "verifying_content"}
subgraph.add_edge(START, "check_format")
subgraph.add_edge("check_format", "verify_content")
subgraph.add_edge("verify_content", END)
return subgraph
@main_graph.node()
def pre_process_documents(state):
return {
"current_status": "pre_processed",
"number_of_executed_steps": 1
}
@main_graph.node()
def conclude_documents(state):
return {
"current_status": "completed",
"number_of_executed_steps": 1
}
# Connect nodes
main_graph.add_edge(START, "load_documents")
main_graph.add_edge("load_documents", "validation_subgraph")
main_graph.add_edge("load_documents", "pre_process_documents")
main_graph.add_edge("validation_subgraph", "conclude_documents")
main_graph.add_edge("pre_process_documents", "conclude_documents")
main_graph.add_edge("conclude_documents", END)
main_graph.compile()
async def run_subgraph():
await main_graph.execute()
print(state)
main_graph.visualize()
import asyncio
asyncio.run(run_subgraph())
Flow Control
import asyncio
from primeGraph import Graph, START, END
state = ... # initialize state
graph = Graph(state=state)
@graph.node()
def load_documents(state):
return {
"processed_files": "document1.txt",
"current_status": "loading",
"number_of_executed_steps": 1
}
# This node will interrupt execution before running
@graph.node(interrupt="before")
def review_documents(state):
return {
"current_status": "validating",
"number_of_executed_steps": 1
}
@graph.node()
def process_documents(state):
return {
"current_status": "completed",
"number_of_executed_steps": 1
}
# Connect nodes
graph.add_edge(START, "load_documents")
graph.add_edge("load_documents", "review_documents")
graph.add_edge("review_documents", "process_documents")
graph.add_edge("process_documents", END)
graph.compile()
async def run_flow():
# Start execution; this will pause before 'review_documents'
await graph.execute()
print("State after interruption:", state)
# Once ready, resume the execution
await graph.resume()
print("State after resuming:", state)
graph.visualize()
import asyncio
asyncio.run(run_flow())
Persistence
from primeGraph.checkpoint.postgresql import PostgreSQLStorage
# Configure storage
storage = PostgreSQLStorage.from_config(
host="localhost",
database="documents_db",
user="user",
password="password"
)
# Create graph with checkpoint storage
graph = Graph(state=state, checkpoint_storage=storage)
@graph.node(interrupt="before")
def validate_documents(state):
return {"current_status": "needs_review"}
# Execute graph and save checkpoint
async def run_with_checkpoint():
chain_id = await graph.execute()
# Later, load and resume from checkpoint
graph.load_from_checkpoint(chain_id)
await graph.resume()
import asyncio
asyncio.run(run_with_checkpoint())
Async Support
import asyncio
@graph.node()
async def async_document_process(state):
await asyncio.sleep(1) # Simulate async processing
return {
"processed_files": "async_processed",
"current_status": "async_complete"
}
# Execute the async graph
async def run_async():
await graph.execute()
# Resume if execution was paused
await graph.resume()
asyncio.run(run_async())
Web Integration
import os
import logging
from typing import List
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from primeGraph.buffer import History
from primeGraph.checkpoint import LocalStorage
from primeGraph import Graph, START, END
from primeGraph.models import GraphState
from primeGraph.web import create_graph_service, wrap_graph_with_websocket
logging.basicConfig(level=logging.DEBUG)
app = FastAPI(debug=True)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/hello")
async def hello():
return {"message": "Hello World"}
graphs: List[Graph] = []
# Define a simple state model
class SimpleGraphState(GraphState):
messages: History[str]
state = SimpleGraphState(messages=[])
storage = LocalStorage()
graph1 = Graph(state=state, checkpoint_storage=storage)
@graph1.node()
def add_hello(state: GraphState):
logging.debug("add_hello")
return {"messages": "Hello"}
@graph1.node()
def add_world(state: GraphState):
logging.debug("add_world")
return {"messages": "World"}
@graph1.node()
def add_exclamation(state: GraphState):
logging.debug("add_exclamation")
return {"messages": "!"}
graph1.add_edge(START, "add_hello")
graph1.add_edge("add_hello", "add_world")
graph1.add_edge("add_world", "add_exclamation")
graph1.add_edge("add_exclamation", END)
graph1.compile()
service = create_graph_service(graph1, storage, path_prefix="/graphs/workflow1")
app.include_router(service.router, tags=["workflow1"])
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Basic Usage Examples
Chatbot Example
import asyncio
from primeGraph import Graph, START, END
from primeGraph.models import GraphState
from primeGraph.buffer import History, LastValue
from pydantic import BaseModel, Field
import logging
class ChatbotState(GraphState):
chat_history: History[dict[str, str]]
user_wants_to_exit: LastValue[bool] = Field(default=False)
class ChatbotResponse(BaseModel):
chat_message: str
user_requested_to_quit: bool = Field(description="True if user wants to quit")
chatbot_state = ChatbotState(chat_history=[], user_wants_to_exit=False)
chatbot_graph = Graph(state=chatbot_state, verbose=False)
@chatbot_graph.node(interrupt="before")
def chat_with_user(state):
# Simulate calling an AI service
try:
# Replace with actual call
response = ChatbotResponse(chat_message="Hello, how can I assist?", user_requested_to_quit=False)
print(response.chat_message)
return {"chat_history": {"role": "assistant", "content": response.chat_message},
"user_wants_to_exit": response.user_requested_to_quit}
except Exception as e:
raise e
@chatbot_graph.node()
def assess_next_step(state):
if state.user_wants_to_exit:
return END
return "chat_with_user"
chatbot_graph.add_edge(START, "chat_with_user")
chatbot_graph.add_router_edge("chat_with_user", "assess_next_step")
chatbot_graph.compile()
chatbot_graph.visualize()
async def run_chatbot():
await chatbot_graph.execute()
def add_user_message(message: str):
chatbot_graph.update_state_and_checkpoint({"chat_history": {"role": "user", "content": message}})
while not chatbot_state.user_wants_to_exit:
user_input = input("Your message: ")
print(f"You: {user_input}")
add_user_message(user_input)
await chatbot_graph.resume()
print("Bye")
import asyncio
asyncio.run(run_chatbot())
Async Workflow
import asyncio
from primeGraph import Graph, START, END
from primeGraph.models import GraphState
from primeGraph.buffer import History, LastValue
from pydantic import BaseModel
# Define models for async workflow
class Character(GraphState):
character_name: LastValue[str]
character_items: History[tuple[str,str]]
character_summary: LastValue[str]
class CharacterName(BaseModel):
character_name: str
class CharacterSummary(BaseModel):
character_summary: str
class CharacterItem(BaseModel):
item_name: str
item_description: str
character_state = Character(character_name="", character_items=[], character_summary="")
character_graph = Graph(state=character_state, verbose=False)
@character_graph.node()
async def pick_character_name(state):
# Simulate an async call
return {"character_name": "Frodo Baggins"}
@character_graph.node()
async def pick_character_profession(state):
return {"character_items": ("Adventurer", "Embarks on quests")}
@character_graph.node()
async def pick_character_apparel(state):
return {"character_items": ("Mystic Robe", "Adorned with ancient runes")}
@character_graph.node()
async def pick_character_partner(state):
return {"character_items": ("Samwise Gamgee", "Loyal companion")}
@character_graph.node()
async def create_charater_summary(state):
ch_items = "\n".join([f"{item[0]}: {item[1]}" for item in state.character_items])
return {"character_summary": f"{state.character_name} is accompanied by:\n{ch_items}"}
character_graph.add_edge(START, "pick_character_name")
character_graph.add_edge("pick_character_name", "pick_character_profession")
character_graph.add_edge("pick_character_name", "pick_character_apparel")
character_graph.add_edge("pick_character_name", "pick_character_partner")
character_graph.add_edge("pick_character_profession", "create_charater_summary")
character_graph.add_edge("pick_character_apparel", "create_charater_summary")
character_graph.add_edge("pick_character_partner", "create_charater_summary")
character_graph.add_edge("create_charater_summary", END)
character_graph.compile()
async def run_async_workflow():
await character_graph.execute()
print(character_graph.state)
import asyncio
asyncio.run(run_async_workflow())
Note: All examples now use the new asynchronous engine methods execute() and resume(). For scripts, wrap these calls with asyncio.run(...) or use an async context as needed.
LLM Tool Nodes
import asyncio
from typing import Dict, List, Any
from primeGraph import START, END
from primeGraph.graph.llm_tools import (
tool, ToolNode, ToolGraph, ToolEngine, ToolState,
ToolLoopOptions, LLMMessage
)
from primeGraph.graph.llm_clients import OpenAIClient, AnthropicClient
# Define state for your tool-based workflow
class ResearchState(ToolState):
search_results: List[Dict[str, Any]] = []
final_summary: str = None
# Define tools with the @tool decorator
@tool("Search the web for information")
async def search_web(query: str, num_results: int = 5) -> Dict[str, Any]:
"""Search the web for information on a topic"""
# Implementation would call a real search API
return {
"results": [
{"title": "Example result 1", "content": "Example content..."},
{"title": "Example result 2", "content": "More example content..."}
]
}
@tool("Summarize information")
async def summarize(text: str) -> Dict[str, Any]:
"""Summarize text into a concise summary"""
# Implementation would use an LLM or summarization service
return {"summary": f"Summarized version of: {text[:30]}..."}
# Create a graph for tool-based workflow
graph = ToolGraph("research_workflow", state_class=ResearchState)
# Initialize an LLM client (OpenAI or Anthropic)
llm_client = OpenAIClient(api_key="your-api-key-here")
# Add a tool node to the graph
node = graph.add_tool_node(
name="researcher",
tools=[search_web, summarize],
llm_client=llm_client,
options=ToolLoopOptions(max_iterations=5)
)
# Connect nodes
graph.add_edge(START, node.name)
graph.add_edge(node.name, END)
# Execute the graph
async def run_research():
# Initialize the engine
engine = ToolEngine(graph)
# Create initial state with user query
initial_state = ResearchState()
initial_state.messages = [
LLMMessage(role="system", content="You are a helpful research assistant."),
LLMMessage(role="user", content="Research quantum computing advancements in 2023")
]
# Execute the graph
result = await engine.execute(initial_state=initial_state)
# Access final state
final_state = result.state
print(f"Tool calls: {len(final_state.tool_calls)}")
print(f"Final output: {final_state.final_output}")
asyncio.run(run_research())
Tool Pause and Resume
# Define a tool that will pause for human review
@tool("Process payment", pause_before_execution=True)
async def process_payment(order_id: str, amount: float) -> Dict[str, Any]:
"""Process a payment for an order, pausing for verification"""
return {
"order_id": order_id,
"amount": amount,
"status": "processed",
"transaction_id": f"TX-{order_id}-{int(time.time())}"
}
# Add to graph
payment_node = graph.add_tool_node(
name="payment_processor",
tools=[process_payment, get_order_details],
llm_client=llm_client
)
# Execute the graph - will pause at process_payment
result = await engine.execute(initial_state)
paused_state = result.state
# Verify the execution paused at the payment tool
if paused_state.is_paused and paused_state.paused_tool_name == "process_payment":
# User can review the payment details
print(f"Review payment: {paused_state.paused_tool_arguments}")
# If approved, resume execution with the tool
resumed_result = await engine.resume_from_pause(paused_state, execute_tool=True)
# Or skip the payment if not approved
# resumed_result = await engine.resume_from_pause(paused_state, execute_tool=False)
Roadmap
- Add streaming support
- Create documentation
- Add tools for agentic workflows
- Add inter node ephemeral state for short term interactions
- Add persistence support for other databases
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file primegraph-1.2.4.tar.gz.
File metadata
- Download URL: primegraph-1.2.4.tar.gz
- Upload date:
- Size: 667.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.5.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1165812e65d2ec626878ab32dd28cf8c72cde9e697c8b9bd5f9a4c62b1f8ea46
|
|
| MD5 |
952e89f9a818e5eba077f755f5c51136
|
|
| BLAKE2b-256 |
733c9267eca656aa04cd3a70d8f317192d964d6ff7c417012a53b2b1630806c6
|
File details
Details for the file primegraph-1.2.4-py3-none-any.whl.
File metadata
- Download URL: primegraph-1.2.4-py3-none-any.whl
- Upload date:
- Size: 61.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.5.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ac1b675a91baadb3b625562534dbb41f7c6ea04fa1c70af07986ff75fd9e7162
|
|
| MD5 |
40ac3213d8cd35ca15909a1ecec74419
|
|
| BLAKE2b-256 |
cd955d323f3d97cedf57968db1cb510003b793c47b1dd0a2fada804b0404409d
|