A lightweight graph approach to LLM workflows.
Project description
Overview
primeGraph is a Python library for building and executing workflows using graphs, ranging from simple sequential processes to complex parallel execution patterns. While originally optimized for AI applications, its flexible architecture makes it suitable for any workflow orchestration needs.
Key principles:
- Flexibility First: Design your nodes and execution patterns with complete freedom.
- Zero Lock-in: Deploy and run workflows however you want, with no vendor dependencies.
- Opinionated Yet Adaptable: Structured foundations with room for customization.
Note from the author: This project came to life through my experience of creating AI applications. I want to acknowledge langgraph as the main inspiration for this project. As an individual developer, I wanted to gain experience creating my own workflow engine to implement more of my own ideas and learnings. At the same time, I also wanted to create a framework that is flexible enough for others to deploy their apps however they want, as this is an open source project. So feel free to use it, modify it, and contribute to it.
Features
- Flexible Graph Construction: Build multiple workflows with sequential and parallel execution paths.
- State Management: Built-in state management with different buffer types to coordinate state management during workflow execution.
- Type Safety: Built-in type safety for your nodes' shared state using Pydantic.
- Router Nodes: Dynamic path selection based on node outputs.
- Repeatable Nodes: Execute nodes multiple times in parallel or sequence.
- Subgraphs: graphs can be composed of subgraphs to allow for more complex workflows.
- Persistence: Save and resume workflow execution using stored states (currently supports memory and Postgres).
- Async Support: Full async/await support for non-blocking execution.
- Acyclical and Cyclical Graphs: Build acyclical and cyclical graphs with ease.
- Flow Control: Support execution flow control for human-in-the-loop interactions.
- Visualization: Generate visual representations of your workflows with 0 effort.
- Web Integration: Built-in FastAPI integration with WebSocket support.
- (Coming Soon) Streaming: Stream outputs from your nodes as they are generated.
Installation
pip install primeGraph
[Optional] Install Graphviz for visualization
To have the graph.visualize() method work, you need to install Graphviz binary on top of primeGraph package. Here is how to do it:
Link to install Graphviz: https://graphviz.org/download/
Core Features
The Basics
from primeGraph import Graph
from primeGraph.models import GraphState
from primeGraph.buffer import History, LastValue, Incremental
# primeGraph uses the return values of the nodes to update the state (state is a pydantic model)
class DocumentProcessingState(GraphState):
processed_files: History[str] # History: stores all the values returned as a list
current_status: LastValue[str] # LastValue: keeps the last value returned
number_of_executed_steps: Incremental[int] # Incremental: increments the current value of the key by the returned value
# Initialize state
state = DocumentProcessingState(
processed_files=[],
current_status="initializing",
number_of_executed_steps=0
)
# Create graph
graph = Graph(state=state)
#adding nodes to the graph
@graph.node()
def load_documents(state):
# Simulate loading documents
return {
"processed_files": "document1.txt",
"current_status": "loading",
"number_of_executed_steps": 1
}
@graph.node()
def validate_documents(state):
# Validate loaded documents
return {
"current_status": "validating",
"number_of_executed_steps": 1
}
@graph.node()
def process_documents(state):
# Process documents
return {
"current_status": "completed",
"number_of_executed_steps": 1
}
# Connect nodes
graph.add_edge(START, "load_documents")
graph.add_edge("load_documents", "validate_documents")
graph.add_edge("validate_documents", "process_documents")
graph.add_edge("process_documents", END)
# Compile and execute
graph.compile()
graph.start()
# state after execution
print(state)
# DocumentProcessingState(version='random_uuid',
# processed_files=['document1.txt'],
# current_status='completed',
# number_of_executed_steps=3)
graph.visualize()
Router Nodes
# previous Basic Usage ...example
@graph.node()
def load_documents(state):
# Simulate loading documents
return {
"processed_files": "document1.txt",
"current_status": "loading",
"number_of_executed_steps": 1
}
@graph.node()
def validate_documents(state):
# Validate loaded documents
return {
"current_status": "validating",
"number_of_executed_steps": 1
}
@graph.node()
def process_documents(state):
# Process documents
return {
"current_status": "completed",
"number_of_executed_steps": 1
}
@graph.node()
def route_documents(state):
# Route based on document type
if "invoice" in state.current_status:
return "process_invoice"
return "cancel_invoice"
@graph.node()
def process_invoice(state):
return {"current_status": "invoice_processed"}
@graph.node()
def cancel_invoice(state):
return {"current_status": "invoice_cancelled"}
# Connect nodes
graph.add_edge(START, "load_documents")
graph.add_edge("load_documents", "validate_documents")
graph.add_edge("validate_documents", "process_documents")
# Add router edges
graph.add_router_edge("process_documents", "route_documents")
graph.add_edge("process_invoice", END)
graph.add_edge("cancel_invoice", END)
# Compile and execute
graph.compile()
graph.start()
# state after execution
print(state)
# DocumentProcessingState(version='random_uuid',
# processed_files=['document1.txt'],
# current_status='invoice_cancelled',
# number_of_executed_steps=4)
graph.visualize()
Repeatable Nodes
# previous Basic Usage ...example
@graph.node()
def repeating_process_batch(state):
return {
"processed_files": f"batch_{state.number_of_executed_steps}",
"number_of_executed_steps": 1
}
@graph.node()
def conclude_documents(state):
return {
"current_status": "completed",
"number_of_executed_steps": 1
}
# Connect nodes
graph.add_edge(START, "load_documents")
graph.add_edge("load_documents", "validate_documents")
graph.add_edge("validate_documents", "process_documents")
# Add repeating edge to process multiple batches
graph.add_repeating_edge(
"process_documents",
"repeating_process_batch",
"conclude_documents",
repeat=3,
parallel=True
)
graph.add_edge("conclude_documents", END)
# Compile and execute
graph.compile()
graph.start()
# state after execution
print(state)
# DocumentProcessingState(version='random_uuid',
# processed_files=['document1.txt', 'batch_3', 'batch_3', 'batch_5'],
# current_status='completed',
# number_of_executed_steps=7)
graph.visualize()
Subgraphs
# previous Basic Usage ...example
# Create graph
main_graph = Graph(state=state)
@main_graph.node()
def load_documents(state):
# Simulate loading documents
return {
"processed_files": "document1.txt",
"current_status": "loading",
"number_of_executed_steps": 1
}
# a subgbraph decorator is execting the function (which is now a new node) to return a subgraph
# you can either declare your subgraph in the function or reference from an existing subgraph
@main_graph.subgraph()
def validation_subgraph():
subgraph = Graph(state=state)
@subgraph.node()
def check_format(state):
return {"current_status": "checking_format"}
@subgraph.node()
def verify_content(state):
return {"current_status": "verifying_content"}
subgraph.add_edge(START, "check_format")
subgraph.add_edge("check_format", "verify_content")
subgraph.add_edge("verify_content", END)
return subgraph
@main_graph.node()
def pre_process_documents(state):
# Process documents
return {
"current_status": "completed",
"number_of_executed_steps": 1
}
@main_graph.node()
def conclude_documents(state):
return {
"current_status": "completed",
"number_of_executed_steps": 1
}
# Connect nodes
main_graph.add_edge(START, "load_documents")
main_graph.add_edge("load_documents", "validation_subgraph") # subgreaph added as a normal node
main_graph.add_edge("load_documents", "pre_process_documents")
main_graph.add_edge("validation_subgraph", "conclude_documents")
main_graph.add_edge("pre_process_documents", "conclude_documents")
main_graph.add_edge("conclude_documents", END)
# Compile and execute
main_graph.compile()
main_graph.start()
# state after execution
print(state)
# DocumentProcessingState(version='random_uuid',
# processed_files=['document1.txt'],
# current_status='completed',
# number_of_executed_steps=3)
graph.visualize()
Flow Control
# previous Basic Usage ...example
# Create graph
graph = Graph(state=state)
@graph.node()
def load_documents(state):
# Simulate loading documents
return {
"processed_files": "document1.txt",
"current_status": "loading",
"number_of_executed_steps": 1
}
# using interrupt="before" will interrupt the execution before this node is executed
# using interrupt="after" will interrupt the execution after this node is executed
@graph.node(interrupt="before")
def review_documents(state):
# Validate loaded documents
return {
"current_status": "validating",
"number_of_executed_steps": 1
}
@graph.node()
def process_documents(state):
# Process documents
return {
"current_status": "completed",
"number_of_executed_steps": 1
}
# Connect nodes
graph.add_edge(START, "load_documents")
graph.add_edge("load_documents", "review_documents")
graph.add_edge("review_documents", "process_documents")
graph.add_edge("process_documents", END)
# Compile and execute
graph.compile()
graph.start()
# state until interrupted
print(state)
# DocumentProcessingState(version='random_uuid',
# processed_files=['document1.txt'],
# current_status='loading',
# number_of_executed_steps=1)
graph.resume()
# state after finishing
print(state)
# DocumentProcessingState(version='random_uuid',
# processed_files=['document1.txt'],
# current_status='completed',
# number_of_executed_steps=3)
graph.visualize()
Persistence
from primeGraph.checkpoint.postgresql import PostgreSQLStorage
# Configure storage
storage = PostgreSQLStorage.from_config(
host="localhost",
database="documents_db",
user="user",
password="password"
)
# Create graph with checkpoint storage
graph = Graph(state=state, checkpoint_storage=storage)
@graph.node(interrupt="before")
def validate_documents(state):
return {"current_status": "needs_review"}
# Start execution
chain_id = graph.start()
# Later, resume from checkpoint
graph.load_from_checkpoint(chain_id)
graph.resume()
Async Support
@graph.node()
async def async_document_process(state):
await asyncio.sleep(1) # Simulate async processing
return {
"processed_files": "async_processed",
"current_status": "async_complete"
}
# Execute async graph
await graph.start_async()
# Resume async graph
await graph.resume_async()
Web Integration
import os
import logging
from typing import List
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from primeGraph.buffer import History
from primeGraph.checkpoint import LocalStorage
from primeGraph import Graph, END, START
from primeGraph.models import GraphState
from primeGraph.web import create_graph_service, wrap_graph_with_websocket
logging.basicConfig(level=logging.DEBUG)
# Create FastAPI app
app = FastAPI()
# Explicitly set logging levels for key loggers
logging.getLogger("uvicorn").setLevel(logging.DEBUG)
logging.getLogger("fastapi").setLevel(logging.DEBUG)
logging.getLogger("websockets").setLevel(logging.DEBUG)
logging.getLogger("primeGraph").setLevel(logging.DEBUG)
# Your existing imports...
app = FastAPI(debug=True) # Enable debug mode
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, replace with specific origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Your existing routes
@app.get("/hello")
async def hello():
return {"message": "Hello World"}
# Create multiple graphs if needed
graphs: List[Graph] = []
# Define state model
class SimpleGraphState(GraphState):
messages: History[str]
# Create state instance
state = SimpleGraphState(messages=[])
# Update graph with state
storage = LocalStorage()
graph1 = Graph(state=state, checkpoint_storage=storage)
@graph1.node()
def add_hello(state: GraphState):
logging.debug("add_hello")
return {"messages": "Hello"}
@graph1.node()
def add_world(state: GraphState):
logging.debug("add_world")
return {"messages": "World"}
@graph1.node()
def add_exclamation(state: GraphState):
logging.debug("add_exclamation")
return {"messages": "!"}
# Add edges
graph1.add_edge(START, "add_hello")
graph1.add_edge("add_hello", "add_world")
graph1.add_edge("add_world", "add_exclamation")
graph1.add_edge("add_exclamation", END)
# Add nodes and edges...
graph1.compile()
# Create graph service
service = create_graph_service(graph1, storage, path_prefix="/graphs/workflow1")
# Include the router in your app
app.include_router(service.router, tags=["workflow1"])
# access your graph at http://localhost:8000/graphs/workflow1/
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Basic Usage examples
Find examples in the examples folder.
Chatbot (yep, one more chatbot example)
from primeGraph import Graph
from primeGraph.models import GraphState
from primeGraph.buffer import History, LastValue, Incremental
from pydantic import BaseModel, Field
from openai import OpenAI
import instructor
class ChatbotState(GraphState):
chat_history: History[dict[str, str]]
user_wants_to_exit: LastValue[bool] = Field(default=False)
class ChatbotResponse(BaseModel):
chat_message: str
user_requested_to_quit: bool = Field(description="returns true if user is requesting to quit the chat")
chatbot_state = ChatbotState(chat_history=[], user_wants_to_exit=False)
chatbot_graph = Graph(state=chatbot_state, verbose=False)
@chatbot_graph.node(interrupt="before")
def chat_with_user(state):
# user input will be inserted directly into the chat_history on the state
# Extract structured data from natural language
try:
res = client.chat.completions.create(
model="gpt-4o-mini",
response_model=ChatbotResponse,
messages=state.chat_history,
)
print(res.chat_message)
return {"chat_history": {"role": "assistant", "content": res.chat_message},
"user_wants_to_exit": res.user_requested_to_quit}
except Exception as e:
raise e
@chatbot_graph.node()
def assess_next_step(state):
if state.user_wants_to_exit:
return END
return "chat_with_user"
chatbot_graph.add_edge(START, "chat_with_user")
chatbot_graph.add_router_edge("chat_with_user", "assess_next_step")
chatbot_graph.compile()
chatbot_graph.visualize()
# Running the chatbot on a loop
chatbot_graph.start()
def add_user_message(message: str):
chatbot_state.chat_history.append({"role": "user", "content": message})
while not chatbot_state.user_wants_to_exit:
user_input = input("Your message: ")
print(f"You: {user_input}")
add_user_message(user_input)
chatbot_graph.resume()
print("Bye")
Async workflow
from primeGraph import Graph, START, END
from primeGraph.models import GraphState
from primeGraph.buffer import History, LastValue
from pydantic import BaseModel
from openai import AsyncOpenAI
import instructor
from IPython.display import Image
from typing import Tuple
from dotenv import load_dotenv
# assumes you have a local .env file with OPENAI_API_KEY set
load_dotenv()
# loading openai client
client = instructor.from_openai(AsyncOpenAI())
class Character(GraphState):
character_name: LastValue[str]
character_items: History[Tuple[str,str]]
character_summary: LastValue[str]
class CharacterName(BaseModel):
character_name: str
class CharacterSummary(BaseModel):
character_summary: str
class CharacterItem(BaseModel):
item_name: str
item_description: str
character_state = Character(character_name="", character_items=[], character_summary="")
character_graph = Graph(state=character_state, verbose=False)
@character_graph.node()
async def pick_character_name(state):
res = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Pick me a character from Lord of the Rings"}],
response_model=CharacterName,
)
return {"character_name": res.character_name}
@character_graph.node()
async def pick_character_profession(state):
res = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Pick me a profession for the character"}],
response_model=CharacterItem,
)
return {"character_items": (res.item_name, res.item_description)}
@character_graph.node()
async def pick_character_apparel(state):
res = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Pick me a clothing for the character"}],
response_model=CharacterItem,
)
return {"character_items": (res.item_name, res.item_description)}
@character_graph.node()
async def pick_character_partner(state):
res = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Pick me a partner for the character"}],
response_model=CharacterItem,
)
return {"character_items": (res.item_name, res.item_description)}
@character_graph.node()
async def create_charater_summary(state):
ch_items = "\n".join([f"{item[0]}: {item[1]}" for item in state.character_items])
res = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": f"Name: {state.character_name} \
\nItems: {ch_items}"}],
response_model=CharacterSummary,
)
return {"character_summary": res.character_summary}
character_graph.add_edge(START, "pick_character_name")
# setting tasks to run in parallel
character_graph.add_edge("pick_character_name", "pick_character_profession")
character_graph.add_edge("pick_character_name", "pick_character_apparel")
character_graph.add_edge("pick_character_name", "pick_character_partner")
character_graph.add_edge("pick_character_profession", "create_charater_summary")
character_graph.add_edge("pick_character_apparel", "create_charater_summary")
character_graph.add_edge("pick_character_partner", "create_charater_summary")
character_graph.add_edge("create_charater_summary", END)
character_graph.compile()
Image(character_graph.visualize(transparent=False).pipe(format='png'))
from rich import print as rprint
await character_graph.start_async()
rprint(character_graph.state)
# Character(
# version='a35efff8c805417e13d4b950e6d7281c',
# character_name='Frodo Baggins',
# character_items=[
# (
# 'Mysterious Stranger',
# "A hooded figure who appears at unexpected moments, offering cryptic advice and insight into the
# character's quest."
# ),
# (
# 'Mystic Robe',
# "A flowing robe made from shimmering fabric that glimmers with magical energy. It is adorned with
# ancient runes and has a hood that conceals the wearer's face. Perfect for wizards and sorcerers."
# ),
# (
# 'Adventurer',
# 'A brave explorer who embarks on quests, seeks treasure, and faces challenges in the great unknown.'
# )
# ],
# character_summary='Frodo Baggins is a brave adventurer on a quest, known for exploring the unknown and seeking
# treasure. He is accompanied by a Mysterious Stranger, a hooded figure who offers cryptic advice and insight during
# his journey. Frodo wears a Mystic Robe, a magical garment adorned with ancient runes, which enhances his mystical
# abilities and conceals his identity.'
# )
Roadmap
- Add streaming support
- Create documentation
- Add tools for agentic workflows
- Add inter node epheral state for short term interactions
- Add persistence support for other databases
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file primegraph-0.2.4.tar.gz.
File metadata
- Download URL: primegraph-0.2.4.tar.gz
- Upload date:
- Size: 628.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.5.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4643b4bea19e5a7162b48a362f013988ca9e3256ccf8bd3e789add8668977d97
|
|
| MD5 |
71a12ae18db94c427e009b167153564e
|
|
| BLAKE2b-256 |
ba5c2114e6213109498b79734e10bcef17a999f8ba0e9ab56f08bd7e3a3e806d
|
File details
Details for the file primegraph-0.2.4-py3-none-any.whl.
File metadata
- Download URL: primegraph-0.2.4-py3-none-any.whl
- Upload date:
- Size: 43.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.5.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
434f4e9171e011dd56e43b45f51537a44f3c5fd8e6a3e8a9d213a7b2295f5ae4
|
|
| MD5 |
127003f5a126c019aaa1172df17de6d6
|
|
| BLAKE2b-256 |
8f6490f6b42c19f6434137773d326892257c9e74fe01ea9175b1d2bf4e5339b2
|