Skip to main content

PyAgenity is a Python framework for building, orchestrating, and managing multi-agent systems. Designed for flexibility and scalability, PyAgenity enables developers to create intelligent agents that collaborate, communicate, and solve complex tasks together.

Project description

PyAgenity

PyPI License Python Coverage

PyAgenity is a lightweight Python framework for building intelligent agents and orchestrating multi-agent workflows, can be used with any LLM provider like OpenAI, Google Gemini, Anthropic Claude, etc.

PyAgenity is a lightweight Python framework for building intelligent agents and orchestrating multi-agent workflows on top of the LiteLLM unified LLM interface.


Features

  • Unified Agent abstraction (no raw LiteLLM objects leaked)
  • Structured responses with content, optional thinking, and usage
  • Streaming support with incremental chunks (Delta = True)
  • Final message hooks for persistence/logging
  • LangGraph-inspired graph engine: nodes, conditional edges, pause/resume (human-in-loop)
  • In-memory session state store (pluggable in the future)

Installation

Basic installation with uv (recommended):

uv pip install pyagenity

Or with pip:

pip install pyagenity

Optional Dependencies:

PyAgenity supports optional dependencies for specific functionality:

# PostgreSQL + Redis checkpointing
pip install pyagenity[pg_checkpoint]

# MCP (Model Context Protocol) support
pip install pyagenity[mcp]

# Composio tools (adapter)
pip install pyagenity[composio]

# LangChain tools (registry-based adapter)
 pip install pyagenity[langchain]

# Individual publishers
pip install pyagenity[redis]     # Redis publisher
pip install pyagenity[kafka]     # Kafka publisher
pip install pyagenity[rabbitmq]  # RabbitMQ publisher

# Multiple extras
pip install pyagenity[pg_checkpoint,mcp,composio,langchain]

Development vs. Library Usage

Library consumers:

  • Only pyproject.toml is needed. It contains runtime dependencies and build metadata.
  • Install with pip or uv as shown above.

Development contributors:

  • Use pyproject.dev.toml for all development tools, linters, test runners, and optional extras.
  • Install dev dependencies with:
# Option 1: pip (from requirements-dev.txt)
python -m venv .venv
source .venv/bin/activate
pip install -r requirements-dev.txt

# Option 2: pip (editable install with extras, if supported)
pip install -e .[dev]

# Option 3: uv (if you use uv)
uv pip install -r requirements-dev.txt

Note:

  • pyproject.dev.toml contains all dev/test/docs/mail extras and tool configs (ruff, isort, mypy, pytest, bandit, etc.).
  • pyproject.toml is minimal and safe for use as a library dependency in other projects.

Set provider API keys (example for OpenAI):

export OPENAI_API_KEY=sk-...  # required for gpt-* models

If you have a .env file, it will be auto-loaded (via python-dotenv).


See example/graph_demo.py for a runnable example.

Using LangChain tools

The LangChain adapter is registry-based. You can register any LangChain tools (BaseTool/StructuredTool or duck-typed run/_run objects), and PyAgenity will expose them to the LLM via a uniform function-calling schema.

from pyagenity.adapters.tools.langchain_adapter import LangChainAdapter
from pyagenity.graph import ToolNode

adapter = LangChainAdapter()  # autoloads a couple common tools if none registered
# Optionally register your own tools:
# from langchain_community.tools import DuckDuckGoSearchRun
# adapter.register_tool(DuckDuckGoSearchRun())

tool_node = ToolNode([], langchain_adapter=adapter)
tools = tool_node.all_tools_sync()  # pass these to your LLM as tools

Disable autoload and register explicitly:

adapter = LangChainAdapter(autoload_default_tools=False)
adapter.register_tools([my_tool, another_tool])

Example: React Weather Agent

This repository includes a comprehensive example agent that demonstrates tool injection, conditional edges, and streaming support. The example is in examples/react/react_weather_agent.py and uses an in-memory checkpointer. It's intended as a runnable demo showing how to register ToolNodes, conditional edges, and invoke the compiled graph.

Key points:

  • Demonstrates an injectable tool signature that receives tool_call_id and state.
  • Shows how to add ToolNode and conditional edges into a StateGraph.
  • Uses litellm.acompletion to call an LLM with async support.
  • Includes proper error handling and tool call routing.

Complete example from examples/react/react_weather_agent.py:

from dotenv import load_dotenv
from litellm import acompletion

from pyagenity.checkpointer import InMemoryCheckpointer
from pyagenity.graph import StateGraph, ToolNode
from pyagenity.state.agent_state import AgentState
from pyagenity.utils import Message
from pyagenity.utils.constants import END
from pyagenity.utils.converter import convert_messages

load_dotenv()

checkpointer = InMemoryCheckpointer()

def get_weather(
    location: str,
    tool_call_id: str | None = None,
    state: AgentState | None = None,
) -> Message:
    """
    Get the current weather for a specific location.
    This demo shows injectable parameters: tool_call_id and state are automatically injected.
    """
    # You can access injected parameters here
    if tool_call_id:
        print(f"Tool call ID: {tool_call_id}")
    if state and hasattr(state, "context"):
        print(f"Number of messages in context: {len(state.context)}")  # type: ignore

    res = f"The weather in {location} is sunny"
    return Message.tool_message(
        content=res,
        tool_call_id=tool_call_id,  # type: ignore
    )

tool_node = ToolNode([get_weather])

async def main_agent(state: AgentState):
    prompts = """
        You are a helpful assistant.
        Your task is to assist the user in finding information and answering questions.
    """

    messages = convert_messages(
        system_prompts=[
            {
                "role": "system",
                "content": prompts,
                "cache_control": {
                    "type": "ephemeral",
                    "ttl": "3600s",  # 👈 Cache for 1 hour
                },
            },
            {"role": "user", "content": "Today Date is 2024-06-15"},
        ],
        state=state,
    )

    mcp_tools = []

    # Check if the last message is a tool result - if so, make final response without tools
    if (
        state.context
        and len(state.context) > 0
        and state.context[-1].role == "tool"
        and state.context[-1].tool_call_id is not None
    ):
        # Make final response without tools since we just got tool results
        response = await acompletion(
            model="gemini/gemini-2.5-flash",
            messages=messages,
        )
    else:
        # Regular response with tools available
        tools = await tool_node.all_tools()
        response = await acompletion(
            model="gemini/gemini-2.5-flash",
            messages=messages,
            tools=tools + mcp_tools,
        )

    return response

def should_use_tools(state: AgentState) -> str:
    """Determine if we should use tools or end the conversation."""
    if not state.context or len(state.context) == 0:
        return "TOOL"  # No context, might need tools

    last_message = state.context[-1]

    # If the last message is from assistant and has tool calls, go to TOOL
    if (
        hasattr(last_message, "tools_calls")
        and last_message.tools_calls
        and len(last_message.tools_calls) > 0
        and last_message.role == "assistant"
    ):
        return "TOOL"

    # If last message is a tool result, we should be done (AI will make final response)
    if last_message.role == "tool" and last_message.tool_call_id is not None:
        return END

    # Default to END for other cases
    return END

graph = StateGraph()
graph.add_node("MAIN", main_agent)
graph.add_node("TOOL", tool_node)

# Add conditional edges from MAIN
graph.add_conditional_edges(
    "MAIN",
    should_use_tools,
    {"TOOL": "TOOL", END: END},
)

# Always go back to MAIN after TOOL execution
graph.add_edge("TOOL", "MAIN")
graph.set_entry_point("MAIN")

app = graph.compile(
    checkpointer=checkpointer,
)

# Run the agent
inp = {"messages": [Message.from_text("Please call the get_weather function for New York City")]}
config = {"thread_id": "12345", "recursion_limit": 10}

res = app.invoke(inp, config=config)

for i in res["messages"]:
    print("**********************")
    print("Message Type: ", i.role)
    print(i)
    print("**********************")
    print("\n\n")

How to run the example locally

  1. Install dependencies (recommended in a virtualenv):
pip install -r requirements.txt
# or if you use uv
uv pip install -r requirements.txt
  1. Set your LLM provider API key (for example OpenAI):
export OPENAI_API_KEY="sk-..."
# or create a .env with the key and the script will load it automatically
  1. Run the example script:
python examples/react/react_weather_agent.py

Notes

  • The example uses litellm's acompletion function — set model to a provider/model available in your environment (for example gemini/gemini-2.5-flash or other supported model strings).
  • InMemoryCheckpointer is for demo/testing only. Replace with a persistent checkpointer for production.

Example: MCP Integration

PyAgenity supports integration with Model Context Protocol (MCP) servers, allowing you to connect external tools and services. The example in examples/react-mcp/ demonstrates how to integrate MCP tools with your agent.

First, create an MCP server (see examples/react-mcp/server.py):

from fastmcp import FastMCP

mcp = FastMCP("My MCP Server")

@mcp.tool(
    description="Get the weather for a specific location",
)
def get_weather(location: str) -> dict:
    return {
        "location": location,
        "temperature": "22°C",
        "description": "Sunny",
    }

if __name__ == "__main__":
    mcp.run(transport="streamable-http")

Then, integrate MCP tools into your agent (from examples/react-mcp/react-mcp.py):

from typing import Any

from dotenv import load_dotenv
from fastmcp import Client
from litellm import acompletion

from pyagenity.checkpointer import InMemoryCheckpointer
from pyagenity.graph import StateGraph, ToolNode
from pyagenity.state.agent_state import AgentState
from pyagenity.utils import Message
from pyagenity.utils.constants import END
from pyagenity.utils.converter import convert_messages

load_dotenv()

checkpointer = InMemoryCheckpointer()

config = {
    "mcpServers": {
        "weather": {
            "url": "http://127.0.0.1:8000/mcp",
            "transport": "streamable-http",
        },
        "github": {
            "url": "http://127.0.0.1:8000/mcp",
            "transport": "streamable-http",
        },
    }
}

client_http = Client(config)

# Initialize ToolNode with MCP client
tool_node = ToolNode(functions=[], client=client_http)

async def main_agent(
    state: AgentState,
    config: dict[str, Any],
    checkpointer: Any | None = None,
    store: Any | None = None,
):
    prompts = """
        You are a helpful assistant.
        Your task is to assist the user in finding information and answering questions.
    """

    messages = convert_messages(
        system_prompts=[{"role": "system", "content": prompts}],
        state=state,
    )

    # Get all available tools (including MCP tools)
    tools = await tool_node.all_tools()
    print("**** List of tools", len(tools), tools)

    response = await acompletion(
        model="gemini/gemini-2.0-flash",
        messages=messages,
        tools=tools,
    )
    return response

def should_use_tools(state: AgentState) -> str:
    """Determine if we should use tools or end the conversation."""
    if not state.context or len(state.context) == 0:
        return "TOOL"  # No context, might need tools

    last_message = state.context[-1]

    # If the last message is from assistant and has tool calls, go to TOOL
    if (
        hasattr(last_message, "tools_calls")
        and last_message.tools_calls
        and len(last_message.tools_calls) > 0
        and last_message.role == "assistant"
    ):
        return "TOOL"

    # If last message is a tool result, we should be done (AI will make final response)
    if last_message.role == "tool" and last_message.tool_call_id is not None:
        return END

    # Default to END for other cases
    return END

graph = StateGraph()
graph.add_node("MAIN", main_agent)
graph.add_node("TOOL", tool_node)

# Add conditional edges from MAIN
graph.add_conditional_edges(
    "MAIN",
    should_use_tools,
    {"TOOL": "TOOL", END: END},
)

# Always go back to MAIN after TOOL execution
graph.add_edge("TOOL", "MAIN")
graph.set_entry_point("MAIN")

app = graph.compile(
    checkpointer=checkpointer,
)

# Run the agent
inp = {"messages": [Message.from_text("Please call the get_weather function for New York City")]}
config = {"thread_id": "12345", "recursion_limit": 10}

res = app.invoke(inp, config=config)

for i in res["messages"]:
    print(i)
    print("\n")

How to run the MCP example:

  1. Install MCP dependencies:
pip install pyagenity[mcp]
# or
uv pip install pyagenity[mcp]
  1. Start the MCP server in one terminal:
cd examples/react-mcp
python server.py
  1. Run the MCP-integrated agent in another terminal:
python examples/react-mcp/react-mcp.py

Example: Streaming Agent

PyAgenity supports streaming responses for real-time interaction. The example in examples/react_stream/stream_react_agent.py demonstrates different streaming modes and configurations.

import asyncio
import logging
from typing import Any

from dotenv import load_dotenv
from litellm import acompletion

from pyagenity.checkpointer import InMemoryCheckpointer
from pyagenity.graph import StateGraph, ToolNode
from pyagenity.state.agent_state import AgentState
from pyagenity.utils import Message, ResponseGranularity
from pyagenity.utils.constants import END
from pyagenity.utils.converter import convert_messages

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

load_dotenv()
checkpointer = InMemoryCheckpointer()

def get_weather(
    location: str,
    tool_call_id: str,
    state: AgentState,
) -> Message:
    """Get weather with injectable parameters."""
    if tool_call_id:
        logging.debug("[tool] Tool call ID: %s", tool_call_id)
    if state and hasattr(state, "context"):
        logging.debug("[tool] Context messages: %s", len(state.context))

    res = f"The weather in {location} is sunny."
    return Message.tool_message(
        content=res,
        tool_call_id=tool_call_id,
    )

tool_node = ToolNode([get_weather])

async def main_agent(
    state: AgentState,
    config: dict[str, Any],
    checkpointer: Any | None = None,
    store: Any | None = None,
):
    prompts = """
        You are a helpful assistant.
        Answer conversationally. Use tools when needed.
    """

    messages = convert_messages(
        system_prompts=[{"role": "system", "content": prompts}],
        state=state,
    )

    is_stream = config.get("is_stream", False)

    if (
        state.context
        and len(state.context) > 0
        and state.context[-1].role == "tool"
        and state.context[-1].tool_call_id is not None
    ):
        response = await acompletion(
            model="gemini/gemini-2.5-flash",
            messages=messages,
            stream=is_stream,
        )
    else:
        tools = await tool_node.all_tools()
        # Avoid streaming when tools are enabled to ensure tool-calls are parsed properly
        response = await acompletion(
            model="gemini/gemini-2.5-flash",
            messages=messages,
            tools=tools,
            stream=is_stream,
        )

    return response

def should_use_tools(state: AgentState) -> str:
    if not state.context or len(state.context) == 0:
        return "TOOL"

    last_message = state.context[-1]

    if (
        hasattr(last_message, "tools_calls")
        and last_message.tools_calls
        and len(last_message.tools_calls) > 0
        and last_message.role == "assistant"
    ):
        return "TOOL"

    if last_message.role == "tool" and last_message.tool_call_id is not None:
        return END

    return END

graph = StateGraph()
graph.add_node("MAIN", main_agent)
graph.add_node("TOOL", tool_node)

graph.add_conditional_edges(
    "MAIN",
    should_use_tools,
    {"TOOL": "TOOL", END: END},
)

graph.add_edge("TOOL", "MAIN")
graph.set_entry_point("MAIN")

app = graph.compile(checkpointer=checkpointer)

async def run_stream_test() -> None:
    inp = {"messages": [Message.from_text("Call get_weather for Tokyo, then reply.")]}
    config = {"thread_id": "stream-1", "recursion_limit": 10}

    logging.info("--- streaming start ---")
    stream_gen = app.astream(
        inp,
        config=config,
        response_granularity=ResponseGranularity.LOW,
    )
    async for chunk in stream_gen:
        print(chunk.model_dump(), end="\n", flush=True)

if __name__ == "__main__":
    import sys

    if len(sys.argv) > 1:
        test_type = sys.argv[1]
        if test_type == "sync":
            asyncio.run(run_sync_test())
        elif test_type == "non-stream":
            asyncio.run(run_non_stream_test())
        elif test_type == "sync-stream":
            asyncio.run(run_sync_stream_test())
        else:
            logging.info("Usage: python stream_react_agent.py [sync|non-stream|sync-stream]")
            logging.info("Running default streaming test...")
            asyncio.run(run_stream_test())
    else:
        asyncio.run(run_stream_test())

Run the streaming example:

python examples/react_stream/stream_react_agent.py

Or run specific test modes:

python examples/react_stream/stream_react_agent.py sync
python examples/react_stream/stream_react_agent.py non-stream
python examples/react_stream/stream_react_agent.py sync-stream

Stopping a running graph

You can cooperatively stop an in-flight execution (invoke or stream) from your UI/frontend by setting a stop flag in the checkpointer’s thread store. Handlers poll this flag and gracefully interrupt the run.

Contract:

  • Provide a stable thread_id in the config for both the running graph and the stop request
  • Call app.stop(config) (sync) or await app.astop(config) (async)
  • The stop method returns a small status dict with ok, running, and thread info

Minimal example:

from pyagenity.checkpointer import InMemoryCheckpointer
from pyagenity.graph import StateGraph
from pyagenity.state.agent_state import AgentState
from pyagenity.utils import Message
from pyagenity.utils.constants import END

checkpointer = InMemoryCheckpointer()

async def main_agent(state: AgentState, config: dict | None = None):
    # Produce chunks (streaming) or any long-running work
    import asyncio
    for i in range(10):
        await asyncio.sleep(0.3)
        yield Message.text_message(f"Chunk {i+1}")

def router(state: AgentState) -> str:
    return END

graph = StateGraph()
graph.add_node("MAIN", main_agent)
graph.add_conditional_edges("MAIN", router, {END: END})
graph.set_entry_point("MAIN")
app = graph.compile(checkpointer=checkpointer)

config = {"thread_id": "demo-thread", "is_stream": True}
inp = {"messages": [Message.text_message("Start, then stop soon")]}

# Start streaming in a background thread/task and then request stop:
import threading, time
def reader():
    for chunk in app.stream(inp, config=config):
        print("STREAM:", getattr(chunk, "content", chunk))

t = threading.Thread(target=reader, daemon=True)
t.start()
time.sleep(1.0)
print("Stop status:", app.stop(config))
t.join(timeout=5)

See runnable examples in examples/react_stream/stop_stream.py and examples/react_stream/stop_stream_litellm.py.


Roadmap

  • Persistent state backend (Redis, SQL, etc.)
  • Parallel / branching strategies and selection policies
  • Tool invocation nodes & function calling wrappers
  • Tracing / telemetry integration

TODO

  • Stop Current Execution: Allow stopping graph execution from frontend/UI
  • Remote Node Support: Enable running nodes on remote machines for distributed processing
  • Extend Node Support: Allow users to extend and customize Node and ToolNode classes

License

MIT


Project Links


Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyagenity-0.3.2.tar.gz (114.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyagenity-0.3.2-py3-none-any.whl (136.5 kB view details)

Uploaded Python 3

File details

Details for the file pyagenity-0.3.2.tar.gz.

File metadata

  • Download URL: pyagenity-0.3.2.tar.gz
  • Upload date:
  • Size: 114.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for pyagenity-0.3.2.tar.gz
Algorithm Hash digest
SHA256 10a0d8959657a9562441e5f978cded348ab54870f6b92147c42f4bd27bc5382f
MD5 faeafd7ddc5348a5900ee046a26fa494
BLAKE2b-256 2d4d7af58fd6fbcabe2858ff46bcbd3ee622c46ec11a80fb9892fb33fc542b51

See more details on using hashes here.

File details

Details for the file pyagenity-0.3.2-py3-none-any.whl.

File metadata

  • Download URL: pyagenity-0.3.2-py3-none-any.whl
  • Upload date:
  • Size: 136.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for pyagenity-0.3.2-py3-none-any.whl
Algorithm Hash digest
SHA256 6f3a514866d9137feed98c44c28eed51c7e0f99f393e5b61afe06f85e99434ad
MD5 ee1480af62448fc69f5f6c06eaac9d71
BLAKE2b-256 3497899116e0d0583fd3cf1b13c07571ea5d3910c2920e558e9b1d28663b96e8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page