A lightweight graph approach to LLM workflows.
Project description
Overview
primeGraph is a Python library for building and executing workflows using graphs, ranging from simple sequential processes to complex parallel execution patterns. While originally optimized for AI applications, its flexible architecture makes it suitable for any workflow orchestration needs.
Key principles:
- Flexibility First: Design your nodes and execution patterns with complete freedom.
- Zero Lock-in: Deploy and run workflows however you want, with no vendor dependencies.
- Opinionated Yet Adaptable: Structured foundations with room for customization.
Note from the author: This project came to life through my experience creating AI applications. I want to acknowledge langgraph as the main inspiration for this project. As an individual developer, I wanted to gain experience creating my own workflow engine to implement more of my own ideas and learnings. At the same time, I also wanted to create a framework that is flexible enough for others to deploy their apps however they want, as this is an open source project. So feel free to use it, modify it, and contribute to it.
Features
- Flexible Graph Construction: Build multiple workflows with sequential and parallel execution paths.
- State Management: Built-in state management with various buffer types to coordinate state updates during execution.
- Type Safety: Uses Pydantic for enforcing shared state types across nodes.
- Router Nodes: Dynamically choose execution paths based on node outputs.
- Repeatable Nodes: Execute nodes multiple times either in parallel or sequence.
- Subgraphs: Compose graphs of subgraphs to design complex workflows.
- Persistence: Save and resume execution using checkpoint storage (supports memory and Postgres).
- Async Support: Uses async/await for non-blocking execution with engine methods
execute()andresume(). - Flow Control: Supports human-in-the-loop interactions by pausing and resuming workflows.
- Visualization: Generate visual representations of your workflows with minimal effort.
- Web Integration: Integrate with FastAPI and WebSockets for interactive workflows.
Installation
pip install primeGraph
[Optional] Install Graphviz for visualization
To have the graph.visualize() method work, install the Graphviz binary:
https://graphviz.org/download/
Core Features
The Basics
import asyncio
from primeGraph import Graph, START, END
from primeGraph.models import GraphState
from primeGraph.buffer import History, LastValue, Incremental
# Define your state with appropriate buffer types
class DocumentProcessingState(GraphState):
processed_files: History[str] # Stores all returned file names
current_status: LastValue[str] # Keeps the last status value
number_of_executed_steps: Incremental[int] # Increments with each step
# Initialize state
state = DocumentProcessingState(
processed_files=[],
current_status="initializing",
number_of_executed_steps=0
)
# Create a graph with the state
graph = Graph(state=state)
# Adding nodes to the graph
'to simulate the workflow
@graph.node()
def load_documents(state):
return {
"processed_files": "document1.txt",
"current_status": "loading",
"number_of_executed_steps": 1
}
@graph.node()
def validate_documents(state):
return {
"current_status": "validating",
"number_of_executed_steps": 1
}
@graph.node()
def process_documents(state):
return {
"current_status": "completed",
"number_of_executed_steps": 1
}
# Connect nodes
graph.add_edge(START, "load_documents")
graph.add_edge("load_documents", "validate_documents")
graph.add_edge("validate_documents", "process_documents")
graph.add_edge("process_documents", END)
# Compile the graph
graph.compile()
# Execute the graph asynchronously using the new engine methods
async def run_graph():
await graph.execute()
print(state)
graph.visualize()
asyncio.run(run_graph())
Router Nodes
import asyncio
from primeGraph import Graph, START, END
graph = Graph()
@graph.node()
def load_documents(state):
return {
"processed_files": "document1.txt",
"current_status": "loading",
"number_of_executed_steps": 1
}
@graph.node()
def validate_documents(state):
return {
"current_status": "validating",
"number_of_executed_steps": 1
}
@graph.node()
def process_documents(state):
return {
"current_status": "completed",
"number_of_executed_steps": 1
}
@graph.node()
def route_documents(state):
if "invoice" in state.current_status:
return "process_invoice"
return "cancel_invoice"
@graph.node()
def process_invoice(state):
return {"current_status": "invoice_processed"}
@graph.node()
def cancel_invoice(state):
return {"current_status": "invoice_cancelled"}
# Connect nodes and define router edges
graph.add_edge(START, "load_documents")
graph.add_edge("load_documents", "validate_documents")
graph.add_edge("validate_documents", "process_documents")
# Router node is connected as edge from process_documents
graph.add_router_edge("process_documents", "route_documents")
graph.add_edge("process_invoice", END)
graph.add_edge("cancel_invoice", END)
# Compile and execute
graph.compile()
async def run_router():
await graph.execute()
print(state)
graph.visualize()
import asyncio
asyncio.run(run_router())
Repeatable Nodes
import asyncio
from primeGraph import Graph, START, END
graph = Graph()
@graph.node()
def load_documents(state):
return {
"processed_files": "document1.txt",
"current_status": "loading",
"number_of_executed_steps": 1
}
@graph.node()
def validate_documents(state):
return {
"current_status": "validating",
"number_of_executed_steps": 1
}
@graph.node()
def process_documents(state):
return {
"current_status": "processing",
"number_of_executed_steps": 1
}
@graph.node()
def repeating_process_batch(state):
return {
"processed_files": f"batch_{state.number_of_executed_steps}",
"number_of_executed_steps": 1
}
@graph.node()
def conclude_documents(state):
return {
"current_status": "completed",
"number_of_executed_steps": 1
}
# Connect nodes
graph.add_edge(START, "load_documents")
graph.add_edge("load_documents", "validate_documents")
graph.add_edge("validate_documents", "process_documents")
# Add repeating edge for processing multiple batches
graph.add_repeating_edge(
"process_documents",
"repeating_process_batch",
"conclude_documents",
repeat=3,
parallel=True
)
graph.add_edge("conclude_documents", END)
graph.compile()
async def run_repeatable():
await graph.execute()
print(state)
graph.visualize()
import asyncio
asyncio.run(run_repeatable())
Subgraphs
import asyncio
from primeGraph import Graph, START, END
state = ... # initialize your state appropriately
main_graph = Graph(state=state)
@main_graph.node()
def load_documents(state):
return {
"processed_files": "document1.txt",
"current_status": "loading",
"number_of_executed_steps": 1
}
@main_graph.subgraph()
def validation_subgraph():
subgraph = Graph(state=state)
@subgraph.node()
def check_format(state):
return {"current_status": "checking_format"}
@subgraph.node()
def verify_content(state):
return {"current_status": "verifying_content"}
subgraph.add_edge(START, "check_format")
subgraph.add_edge("check_format", "verify_content")
subgraph.add_edge("verify_content", END)
return subgraph
@main_graph.node()
def pre_process_documents(state):
return {
"current_status": "pre_processed",
"number_of_executed_steps": 1
}
@main_graph.node()
def conclude_documents(state):
return {
"current_status": "completed",
"number_of_executed_steps": 1
}
# Connect nodes
main_graph.add_edge(START, "load_documents")
main_graph.add_edge("load_documents", "validation_subgraph")
main_graph.add_edge("load_documents", "pre_process_documents")
main_graph.add_edge("validation_subgraph", "conclude_documents")
main_graph.add_edge("pre_process_documents", "conclude_documents")
main_graph.add_edge("conclude_documents", END)
main_graph.compile()
async def run_subgraph():
await main_graph.execute()
print(state)
main_graph.visualize()
import asyncio
asyncio.run(run_subgraph())
Flow Control
import asyncio
from primeGraph import Graph, START, END
state = ... # initialize state
graph = Graph(state=state)
@graph.node()
def load_documents(state):
return {
"processed_files": "document1.txt",
"current_status": "loading",
"number_of_executed_steps": 1
}
# This node will interrupt execution before running
@graph.node(interrupt="before")
def review_documents(state):
return {
"current_status": "validating",
"number_of_executed_steps": 1
}
@graph.node()
def process_documents(state):
return {
"current_status": "completed",
"number_of_executed_steps": 1
}
# Connect nodes
graph.add_edge(START, "load_documents")
graph.add_edge("load_documents", "review_documents")
graph.add_edge("review_documents", "process_documents")
graph.add_edge("process_documents", END)
graph.compile()
async def run_flow():
# Start execution; this will pause before 'review_documents'
await graph.execute()
print("State after interruption:", state)
# Once ready, resume the execution
await graph.resume()
print("State after resuming:", state)
graph.visualize()
import asyncio
asyncio.run(run_flow())
Persistence
from primeGraph.checkpoint.postgresql import PostgreSQLStorage
# Configure storage
storage = PostgreSQLStorage.from_config(
host="localhost",
database="documents_db",
user="user",
password="password"
)
# Create graph with checkpoint storage
graph = Graph(state=state, checkpoint_storage=storage)
@graph.node(interrupt="before")
def validate_documents(state):
return {"current_status": "needs_review"}
# Execute graph and save checkpoint
async def run_with_checkpoint():
chain_id = await graph.execute()
# Later, load and resume from checkpoint
graph.load_from_checkpoint(chain_id)
await graph.resume()
import asyncio
asyncio.run(run_with_checkpoint())
Async Support
import asyncio
@graph.node()
async def async_document_process(state):
await asyncio.sleep(1) # Simulate async processing
return {
"processed_files": "async_processed",
"current_status": "async_complete"
}
# Execute the async graph
async def run_async():
await graph.execute()
# Resume if execution was paused
await graph.resume()
asyncio.run(run_async())
Web Integration
import os
import logging
from typing import List
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from primeGraph.buffer import History
from primeGraph.checkpoint import LocalStorage
from primeGraph import Graph, START, END
from primeGraph.models import GraphState
from primeGraph.web import create_graph_service, wrap_graph_with_websocket
logging.basicConfig(level=logging.DEBUG)
app = FastAPI(debug=True)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/hello")
async def hello():
return {"message": "Hello World"}
graphs: List[Graph] = []
# Define a simple state model
class SimpleGraphState(GraphState):
messages: History[str]
state = SimpleGraphState(messages=[])
storage = LocalStorage()
graph1 = Graph(state=state, checkpoint_storage=storage)
@graph1.node()
def add_hello(state: GraphState):
logging.debug("add_hello")
return {"messages": "Hello"}
@graph1.node()
def add_world(state: GraphState):
logging.debug("add_world")
return {"messages": "World"}
@graph1.node()
def add_exclamation(state: GraphState):
logging.debug("add_exclamation")
return {"messages": "!"}
graph1.add_edge(START, "add_hello")
graph1.add_edge("add_hello", "add_world")
graph1.add_edge("add_world", "add_exclamation")
graph1.add_edge("add_exclamation", END)
graph1.compile()
service = create_graph_service(graph1, storage, path_prefix="/graphs/workflow1")
app.include_router(service.router, tags=["workflow1"])
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Basic Usage Examples
Chatbot Example
import asyncio
from primeGraph import Graph, START, END
from primeGraph.models import GraphState
from primeGraph.buffer import History, LastValue
from pydantic import BaseModel, Field
import logging
class ChatbotState(GraphState):
chat_history: History[dict[str, str]]
user_wants_to_exit: LastValue[bool] = Field(default=False)
class ChatbotResponse(BaseModel):
chat_message: str
user_requested_to_quit: bool = Field(description="True if user wants to quit")
chatbot_state = ChatbotState(chat_history=[], user_wants_to_exit=False)
chatbot_graph = Graph(state=chatbot_state, verbose=False)
@chatbot_graph.node(interrupt="before")
def chat_with_user(state):
# Simulate calling an AI service
try:
# Replace with actual call
response = ChatbotResponse(chat_message="Hello, how can I assist?", user_requested_to_quit=False)
print(response.chat_message)
return {"chat_history": {"role": "assistant", "content": response.chat_message},
"user_wants_to_exit": response.user_requested_to_quit}
except Exception as e:
raise e
@chatbot_graph.node()
def assess_next_step(state):
if state.user_wants_to_exit:
return END
return "chat_with_user"
chatbot_graph.add_edge(START, "chat_with_user")
chatbot_graph.add_router_edge("chat_with_user", "assess_next_step")
chatbot_graph.compile()
chatbot_graph.visualize()
async def run_chatbot():
await chatbot_graph.execute()
def add_user_message(message: str):
chatbot_graph.update_state_and_checkpoint({"chat_history": {"role": "user", "content": message}})
while not chatbot_state.user_wants_to_exit:
user_input = input("Your message: ")
print(f"You: {user_input}")
add_user_message(user_input)
await chatbot_graph.resume()
print("Bye")
import asyncio
asyncio.run(run_chatbot())
Async Workflow
import asyncio
from primeGraph import Graph, START, END
from primeGraph.models import GraphState
from primeGraph.buffer import History, LastValue
from pydantic import BaseModel
# Define models for async workflow
class Character(GraphState):
character_name: LastValue[str]
character_items: History[tuple[str,str]]
character_summary: LastValue[str]
class CharacterName(BaseModel):
character_name: str
class CharacterSummary(BaseModel):
character_summary: str
class CharacterItem(BaseModel):
item_name: str
item_description: str
character_state = Character(character_name="", character_items=[], character_summary="")
character_graph = Graph(state=character_state, verbose=False)
@character_graph.node()
async def pick_character_name(state):
# Simulate an async call
return {"character_name": "Frodo Baggins"}
@character_graph.node()
async def pick_character_profession(state):
return {"character_items": ("Adventurer", "Embarks on quests")}
@character_graph.node()
async def pick_character_apparel(state):
return {"character_items": ("Mystic Robe", "Adorned with ancient runes")}
@character_graph.node()
async def pick_character_partner(state):
return {"character_items": ("Samwise Gamgee", "Loyal companion")}
@character_graph.node()
async def create_charater_summary(state):
ch_items = "\n".join([f"{item[0]}: {item[1]}" for item in state.character_items])
return {"character_summary": f"{state.character_name} is accompanied by:\n{ch_items}"}
character_graph.add_edge(START, "pick_character_name")
character_graph.add_edge("pick_character_name", "pick_character_profession")
character_graph.add_edge("pick_character_name", "pick_character_apparel")
character_graph.add_edge("pick_character_name", "pick_character_partner")
character_graph.add_edge("pick_character_profession", "create_charater_summary")
character_graph.add_edge("pick_character_apparel", "create_charater_summary")
character_graph.add_edge("pick_character_partner", "create_charater_summary")
character_graph.add_edge("create_charater_summary", END)
character_graph.compile()
async def run_async_workflow():
await character_graph.execute()
print(character_graph.state)
import asyncio
asyncio.run(run_async_workflow())
Note: All examples now use the new asynchronous engine methods execute() and resume(). For scripts, wrap these calls with asyncio.run(...) or use an async context as needed.
Roadmap
- Add streaming support
- Create documentation
- Add tools for agentic workflows
- Add inter node epheral state for short term interactions
- Add persistence support for other databases
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file primegraph-1.1.5.tar.gz.
File metadata
- Download URL: primegraph-1.1.5.tar.gz
- Upload date:
- Size: 639.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.5.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
af9e9a8d9da523d1642dd93cacdd44d8e68c9063621e8cd8574fec65bf388a70
|
|
| MD5 |
4dcb8433f5c61e8c50556970417b731e
|
|
| BLAKE2b-256 |
9907e2dff89ffe0a390a2fbd59eba6d2b4cd32c2556b8d8ef7bba4560218b2f8
|
File details
Details for the file primegraph-1.1.5-py3-none-any.whl.
File metadata
- Download URL: primegraph-1.1.5-py3-none-any.whl
- Upload date:
- Size: 45.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.5.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ed566580ead6d4c23c3831e762c71de09cd8a065961afe545bde7ae036fad141
|
|
| MD5 |
62cef6292e8cc8af5d77d33fe3d9f629
|
|
| BLAKE2b-256 |
31ea6b15a01e23c5bf6d61527ae73a0145a2c02cc3d70ff32ca667f77b70fbc4
|