Skip to main content

A lightweight graph approach to LLM workflows.

Project description

primeGraph Logo

Python 3.11+ License Package Version


Overview

primeGraph is a Python library for building and executing workflows using graphs, ranging from simple sequential processes to complex parallel execution patterns. While originally optimized for AI applications, its flexible architecture makes it suitable for any workflow orchestration needs.

Key principles:

  • Flexibility First: Design your nodes and execution patterns with complete freedom.
  • Zero Lock-in: Deploy and run workflows however you want, with no vendor dependencies.
  • Opinionated Yet Adaptable: Structured foundations with room for customization.

Note from the author: This project came to life through my experience creating AI applications. I want to acknowledge langgraph as the main inspiration for this project. As an individual developer, I wanted to gain experience creating my own workflow engine to implement more of my own ideas and learnings. At the same time, I also wanted to create a framework that is flexible enough for others to deploy their apps however they want, as this is an open source project. So feel free to use it, modify it, and contribute to it.

Features

  • Flexible Graph Construction: Build multiple workflows with sequential and parallel execution paths.
  • State Management: Built-in state management with various buffer types to coordinate state updates during execution.
  • Type Safety: Uses Pydantic for enforcing shared state types across nodes.
  • Router Nodes: Dynamically choose execution paths based on node outputs.
  • Repeatable Nodes: Execute nodes multiple times either in parallel or sequence.
  • Subgraphs: Compose graphs of subgraphs to design complex workflows.
  • Persistence: Save and resume execution using checkpoint storage (supports memory and Postgres).
  • Async Support: Uses async/await for non-blocking execution with engine methods execute() and resume().
  • Flow Control: Supports human-in-the-loop interactions by pausing and resuming workflows.
  • Visualization: Generate visual representations of your workflows with minimal effort.
  • Web Integration: Integrate with FastAPI and WebSockets for interactive workflows.

Installation

pip install primeGraph

[Optional] Install Graphviz for visualization

To have the graph.visualize() method work, install the Graphviz binary:

https://graphviz.org/download/

Core Features

The Basics

import asyncio
from primeGraph import Graph, START, END
from primeGraph.models import GraphState
from primeGraph.buffer import History, LastValue, Incremental

# Define your state with appropriate buffer types
class DocumentProcessingState(GraphState):
    processed_files: History[str]      # Stores all returned file names
    current_status: LastValue[str]       # Keeps the last status value
    number_of_executed_steps: Incremental[int]  # Increments with each step

# Initialize state
state = DocumentProcessingState(
    processed_files=[],
    current_status="initializing",
    number_of_executed_steps=0
)

# Create a graph with the state
graph = Graph(state=state)

# Adding nodes to the graph
'to simulate the workflow
@graph.node()
def load_documents(state):
    return {
        "processed_files": "document1.txt",
        "current_status": "loading",
        "number_of_executed_steps": 1
    }

@graph.node()
def validate_documents(state):
    return {
        "current_status": "validating",
        "number_of_executed_steps": 1
    }

@graph.node()
def process_documents(state):
    return {
        "current_status": "completed",
        "number_of_executed_steps": 1
    }

# Connect nodes
graph.add_edge(START, "load_documents")
graph.add_edge("load_documents", "validate_documents")
graph.add_edge("validate_documents", "process_documents")
graph.add_edge("process_documents", END)

# Compile the graph
graph.compile()

# Execute the graph asynchronously using the new engine methods
async def run_graph():
    await graph.execute()
    print(state)
    graph.visualize()

asyncio.run(run_graph())

Basic Usage Graph Visualization

Router Nodes

import asyncio
from primeGraph import Graph, START, END

graph = Graph()

@graph.node()
def load_documents(state):
    return {
        "processed_files": "document1.txt",
        "current_status": "loading",
        "number_of_executed_steps": 1
    }

@graph.node()
def validate_documents(state):
    return {
        "current_status": "validating",
        "number_of_executed_steps": 1
    }

@graph.node()
def process_documents(state):
    return {
        "current_status": "completed",
        "number_of_executed_steps": 1
    }

@graph.node()
def route_documents(state):
    if "invoice" in state.current_status:
        return "process_invoice"
    return "cancel_invoice"

@graph.node()
def process_invoice(state):
    return {"current_status": "invoice_processed"}

@graph.node()
def cancel_invoice(state):
    return {"current_status": "invoice_cancelled"}

# Connect nodes and define router edges
graph.add_edge(START, "load_documents")
graph.add_edge("load_documents", "validate_documents")
graph.add_edge("validate_documents", "process_documents")

# Router node is connected as edge from process_documents
graph.add_router_edge("process_documents", "route_documents")
graph.add_edge("process_invoice", END)
graph.add_edge("cancel_invoice", END)

# Compile and execute
graph.compile()

async def run_router():
    await graph.execute()
    print(state)
    graph.visualize()

import asyncio
asyncio.run(run_router())

Router Nodes visualization

Repeatable Nodes

import asyncio
from primeGraph import Graph, START, END

graph = Graph()

@graph.node()
def load_documents(state):
    return {
        "processed_files": "document1.txt",
        "current_status": "loading",
        "number_of_executed_steps": 1
    }

@graph.node()
def validate_documents(state):
    return {
        "current_status": "validating",
        "number_of_executed_steps": 1
    }

@graph.node()
def process_documents(state):
    return {
        "current_status": "processing",
        "number_of_executed_steps": 1
    }

@graph.node()
def repeating_process_batch(state):
    return {
        "processed_files": f"batch_{state.number_of_executed_steps}",
        "number_of_executed_steps": 1
    }

@graph.node()
def conclude_documents(state):
    return {
        "current_status": "completed",
        "number_of_executed_steps": 1
    }

# Connect nodes
graph.add_edge(START, "load_documents")
graph.add_edge("load_documents", "validate_documents")
graph.add_edge("validate_documents", "process_documents")

# Add repeating edge for processing multiple batches
graph.add_repeating_edge(
    "process_documents",
    "repeating_process_batch",
    "conclude_documents",
    repeat=3,
    parallel=True
)
graph.add_edge("conclude_documents", END)

graph.compile()

async def run_repeatable():
    await graph.execute()
    print(state)
    graph.visualize()

import asyncio
asyncio.run(run_repeatable())

Repeatable Nodes visualization

Subgraphs

import asyncio
from primeGraph import Graph, START, END

state = ... # initialize your state appropriately
main_graph = Graph(state=state)

@main_graph.node()
def load_documents(state):
    return {
        "processed_files": "document1.txt",
        "current_status": "loading",
        "number_of_executed_steps": 1
    }

@main_graph.subgraph()
def validation_subgraph():
    subgraph = Graph(state=state)

    @subgraph.node()
    def check_format(state):
        return {"current_status": "checking_format"}

    @subgraph.node()
    def verify_content(state):
        return {"current_status": "verifying_content"}

    subgraph.add_edge(START, "check_format")
    subgraph.add_edge("check_format", "verify_content")
    subgraph.add_edge("verify_content", END)

    return subgraph

@main_graph.node()
def pre_process_documents(state):
    return {
        "current_status": "pre_processed",
        "number_of_executed_steps": 1
    }

@main_graph.node()
def conclude_documents(state):
    return {
        "current_status": "completed",
        "number_of_executed_steps": 1
    }

# Connect nodes
main_graph.add_edge(START, "load_documents")
main_graph.add_edge("load_documents", "validation_subgraph")
main_graph.add_edge("load_documents", "pre_process_documents")
main_graph.add_edge("validation_subgraph", "conclude_documents")
main_graph.add_edge("pre_process_documents", "conclude_documents")
main_graph.add_edge("conclude_documents", END)

main_graph.compile()

async def run_subgraph():
    await main_graph.execute()
    print(state)
    main_graph.visualize()

import asyncio
asyncio.run(run_subgraph())

Subgraphs visualization

Flow Control

import asyncio
from primeGraph import Graph, START, END

state = ...  # initialize state

graph = Graph(state=state)

@graph.node()
def load_documents(state):
    return {
        "processed_files": "document1.txt",
        "current_status": "loading",
        "number_of_executed_steps": 1
    }

# This node will interrupt execution before running
@graph.node(interrupt="before")
def review_documents(state):
    return {
        "current_status": "validating",
        "number_of_executed_steps": 1
    }

@graph.node()
def process_documents(state):
    return {
        "current_status": "completed",
        "number_of_executed_steps": 1
    }

# Connect nodes
graph.add_edge(START, "load_documents")
graph.add_edge("load_documents", "review_documents")
graph.add_edge("review_documents", "process_documents")
graph.add_edge("process_documents", END)

graph.compile()

async def run_flow():
    # Start execution; this will pause before 'review_documents'
    await graph.execute()
    print("State after interruption:", state)

    # Once ready, resume the execution
    await graph.resume()
    print("State after resuming:", state)
    graph.visualize()

import asyncio
asyncio.run(run_flow())

Flow Control visualization

Persistence

from primeGraph.checkpoint.postgresql import PostgreSQLStorage

# Configure storage
storage = PostgreSQLStorage.from_config(
    host="localhost",
    database="documents_db",
    user="user",
    password="password"
)

# Create graph with checkpoint storage
graph = Graph(state=state, checkpoint_storage=storage)

@graph.node(interrupt="before")
def validate_documents(state):
    return {"current_status": "needs_review"}

# Execute graph and save checkpoint
async def run_with_checkpoint():
    chain_id = await graph.execute()
    # Later, load and resume from checkpoint
    graph.load_from_checkpoint(chain_id)
    await graph.resume()

import asyncio
asyncio.run(run_with_checkpoint())

Async Support

import asyncio

@graph.node()
async def async_document_process(state):
    await asyncio.sleep(1)  # Simulate async processing
    return {
        "processed_files": "async_processed",
        "current_status": "async_complete"
    }

# Execute the async graph
async def run_async():
    await graph.execute()
    # Resume if execution was paused
    await graph.resume()

asyncio.run(run_async())

Web Integration

import os
import logging
from typing import List

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from primeGraph.buffer import History
from primeGraph.checkpoint import LocalStorage
from primeGraph import Graph, START, END
from primeGraph.models import GraphState
from primeGraph.web import create_graph_service, wrap_graph_with_websocket

logging.basicConfig(level=logging.DEBUG)

app = FastAPI(debug=True)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.get("/hello")
async def hello():
    return {"message": "Hello World"}

graphs: List[Graph] = []

# Define a simple state model
class SimpleGraphState(GraphState):
    messages: History[str]

state = SimpleGraphState(messages=[])

storage = LocalStorage()
graph1 = Graph(state=state, checkpoint_storage=storage)

@graph1.node()
def add_hello(state: GraphState):
    logging.debug("add_hello")
    return {"messages": "Hello"}

@graph1.node()
def add_world(state: GraphState):
    logging.debug("add_world")
    return {"messages": "World"}

@graph1.node()
def add_exclamation(state: GraphState):
    logging.debug("add_exclamation")
    return {"messages": "!"}

graph1.add_edge(START, "add_hello")
graph1.add_edge("add_hello", "add_world")
graph1.add_edge("add_world", "add_exclamation")
graph1.add_edge("add_exclamation", END)

graph1.compile()

service = create_graph_service(graph1, storage, path_prefix="/graphs/workflow1")
app.include_router(service.router, tags=["workflow1"])

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Basic Usage Examples

Chatbot Example

import asyncio
from primeGraph import Graph, START, END
from primeGraph.models import GraphState
from primeGraph.buffer import History, LastValue
from pydantic import BaseModel, Field
import logging

class ChatbotState(GraphState):
    chat_history: History[dict[str, str]]
    user_wants_to_exit: LastValue[bool] = Field(default=False)

class ChatbotResponse(BaseModel):
    chat_message: str
    user_requested_to_quit: bool = Field(description="True if user wants to quit")

chatbot_state = ChatbotState(chat_history=[], user_wants_to_exit=False)
chatbot_graph = Graph(state=chatbot_state, verbose=False)

@chatbot_graph.node(interrupt="before")
def chat_with_user(state):
    # Simulate calling an AI service
    try:
        # Replace with actual call
        response = ChatbotResponse(chat_message="Hello, how can I assist?", user_requested_to_quit=False)
        print(response.chat_message)
        return {"chat_history": {"role": "assistant", "content": response.chat_message},
                "user_wants_to_exit": response.user_requested_to_quit}
    except Exception as e:
        raise e

@chatbot_graph.node()
def assess_next_step(state):
    if state.user_wants_to_exit:
        return END
    return "chat_with_user"

chatbot_graph.add_edge(START, "chat_with_user")
chatbot_graph.add_router_edge("chat_with_user", "assess_next_step")
chatbot_graph.compile()
chatbot_graph.visualize()

async def run_chatbot():
    await chatbot_graph.execute()

    def add_user_message(message: str):
        chatbot_graph.update_state_and_checkpoint({"chat_history": {"role": "user", "content": message}})

    while not chatbot_state.user_wants_to_exit:
        user_input = input("Your message: ")
        print(f"You: {user_input}")
        add_user_message(user_input)
        await chatbot_graph.resume()

    print("Bye")

import asyncio
asyncio.run(run_chatbot())

Async Workflow

import asyncio
from primeGraph import Graph, START, END
from primeGraph.models import GraphState
from primeGraph.buffer import History, LastValue
from pydantic import BaseModel

# Define models for async workflow
class Character(GraphState):
    character_name: LastValue[str]
    character_items: History[tuple[str,str]]
    character_summary: LastValue[str]

class CharacterName(BaseModel):
    character_name: str

class CharacterSummary(BaseModel):
    character_summary: str

class CharacterItem(BaseModel):
    item_name: str
    item_description: str

character_state = Character(character_name="", character_items=[], character_summary="")
character_graph = Graph(state=character_state, verbose=False)

@character_graph.node()
async def pick_character_name(state):
    # Simulate an async call
    return {"character_name": "Frodo Baggins"}

@character_graph.node()
async def pick_character_profession(state):
    return {"character_items": ("Adventurer", "Embarks on quests")}

@character_graph.node()
async def pick_character_apparel(state):
    return {"character_items": ("Mystic Robe", "Adorned with ancient runes")}

@character_graph.node()
async def pick_character_partner(state):
    return {"character_items": ("Samwise Gamgee", "Loyal companion")}

@character_graph.node()
async def create_charater_summary(state):
    ch_items = "\n".join([f"{item[0]}: {item[1]}" for item in state.character_items])
    return {"character_summary": f"{state.character_name} is accompanied by:\n{ch_items}"}

character_graph.add_edge(START, "pick_character_name")
character_graph.add_edge("pick_character_name", "pick_character_profession")
character_graph.add_edge("pick_character_name", "pick_character_apparel")
character_graph.add_edge("pick_character_name", "pick_character_partner")
character_graph.add_edge("pick_character_profession", "create_charater_summary")
character_graph.add_edge("pick_character_apparel", "create_charater_summary")
character_graph.add_edge("pick_character_partner", "create_charater_summary")
character_graph.add_edge("create_charater_summary", END)

character_graph.compile()

async def run_async_workflow():
    await character_graph.execute()
    print(character_graph.state)

import asyncio
asyncio.run(run_async_workflow())

Note: All examples now use the new asynchronous engine methods execute() and resume(). For scripts, wrap these calls with asyncio.run(...) or use an async context as needed.

Roadmap

  • Add streaming support
  • Create documentation
  • Add tools for agentic workflows
  • Add inter node epheral state for short term interactions
  • Add persistence support for other databases

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

primegraph-1.1.0.tar.gz (633.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

primegraph-1.1.0-py3-none-any.whl (45.3 kB view details)

Uploaded Python 3

File details

Details for the file primegraph-1.1.0.tar.gz.

File metadata

  • Download URL: primegraph-1.1.0.tar.gz
  • Upload date:
  • Size: 633.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.12

File hashes

Hashes for primegraph-1.1.0.tar.gz
Algorithm Hash digest
SHA256 87bb7e689cc9e96f6cb561f9f1a822f430e940e69b401d6680d1e4226a868bf8
MD5 efc311cf26ef6d092f4df2226e5f1f42
BLAKE2b-256 049f47a37f1194a59237a58e0ebccea2e03463b94cc04df2f8966e021768f0dc

See more details on using hashes here.

File details

Details for the file primegraph-1.1.0-py3-none-any.whl.

File metadata

  • Download URL: primegraph-1.1.0-py3-none-any.whl
  • Upload date:
  • Size: 45.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.12

File hashes

Hashes for primegraph-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ae202bdd7f9accd54aac6d67f69c3d1cc9498c43826205067825b29c6fda85cb
MD5 7286baeb569c115871ed025cfcd56940
BLAKE2b-256 a580c7c4584b2f6071e41366bf53a71ca642e1c93ec219feb7274cf7b5376119

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page