Skip to main content

10xScale Agentflow is a Python framework for building, orchestrating, and managing multi-agent systems. Designed for flexibility and scalability, 10xScale Agentflow enables developers to create intelligent agents that collaborate, communicate, and solve complex tasks together.

Project description

10xScale Agentflow

PyPI License Python Coverage

10xScale Agentflow is a lightweight Python framework for building intelligent agents and orchestrating multi-agent workflows. It's an LLM-agnostic orchestration tool that works with native SDKs from OpenAI, Google Gemini, Anthropic Claude, or any other provider. You choose your LLM library; 10xScale Agentflow provides the workflow orchestration.


โœจ Key Features

  • โšก Agent Class - Build complete agents in 10-30 lines of code (new in v0.5.3!)
  • ๐ŸŽฏ LLM-Agnostic Orchestration - Works with any LLM provider (OpenAI, Gemini, Claude, native SDKs)
  • ๐Ÿค– Multi-Agent Workflows - Build complex agent systems with your choice of orchestration patterns
  • ๐Ÿ“Š Structured Responses - Get content, optional thinking, and usage in a standardized format
  • ๐ŸŒŠ Streaming Support - Real-time incremental responses with delta updates
  • ๐Ÿ”ง Tool Integration - Native support for function calling, MCP, Composio, and LangChain tools with parallel execution
  • ๐Ÿ”€ LangGraph-Inspired Engine - Flexible graph orchestration with nodes, conditional edges, and control flow
  • ๐Ÿ’พ State Management - Built-in persistence with in-memory and PostgreSQL+Redis checkpointers
  • ๐Ÿ”„ Human-in-the-Loop - Pause/resume execution for approval workflows and debugging
  • ๐Ÿš€ Production-Ready - Event publishing (Console, Redis, Kafka, RabbitMQ), metrics, and observability
  • ๐Ÿงฉ Dependency Injection - Clean parameter injection for tools and nodes
  • ๐Ÿ“ฆ Prebuilt Patterns - React, RAG, Swarm, Router, MapReduce, SupervisorTeam, and more

๐ŸŒŸ What Makes Agentflow Unique

Agentflow stands out with powerful features designed for production-grade AI applications:

๐Ÿ—๏ธ Architecture & Scalability

  1. ๐Ÿ’พ Checkpointer with Caching Design Intelligent state persistence with built-in caching layer to scale efficiently. PostgreSQL + Redis implementation ensures high performance in production environments.

  2. ๐Ÿง  3-Layer Memory System

    • Short-term memory: Current conversation context
    • Conversational memory: Session-based chat history
    • Long-term memory: Persistent knowledge across sessions

๐Ÿ”ง Advanced Tooling Ecosystem

  1. ๐Ÿ”Œ Remote Tool Calls Execute tools remotely using our TypeScript SDK for distributed agent architectures.

  2. ๐Ÿ› ๏ธ Comprehensive Tool Integration

    • Local tools (Python functions)
    • Remote tools (via TypeScript SDK)
    • Agent handoff tools (multi-agent collaboration)
    • MCP (Model Context Protocol)
    • LangChain tools
    • Composio tools

๐ŸŽฏ Intelligent Context Management

  1. ๐Ÿ“ Dedicated Context Manager
    • Automatically controls context size to prevent token overflow
    • Called at iteration end to avoid mid-execution context loss
    • Fully extensible with custom implementations

โš™๏ธ Dependency Injection & Control

  1. ๐Ÿ’‰ First-Class Dependency Injection Powered by InjectQ library for clean, testable, and maintainable code patterns.

  2. ๐ŸŽ›๏ธ Custom ID Generation Control Choose between string, int, or bigint IDs. Smaller IDs save significant space in databases and indexes compared to standard 128-bit UUIDs.

๐Ÿ“Š Observability & Events

  1. ๐Ÿ“ก Internal Event Publishing Emit execution events to any publisher:
    • Kafka
    • RabbitMQ
    • Redis Pub/Sub
    • OpenTelemetry (planned)
    • Custom publishers

๐Ÿ”„ Advanced Execution Features

  1. โฐ Background Task Manager Built-in manager for running tasks asynchronously:

    • Prefetching data
    • Memory persistence
    • Cleanup operations
    • Custom background jobs
  2. ๐Ÿšฆ Human-in-the-Loop with Interrupts Pause execution at any point for human approval, then seamlessly resume with full state preservation.

  3. ๐Ÿงญ Flexible Agent Navigation

    • Condition-based routing between agents
    • Command-based jumps to specific agents
    • Agent handoff tools for smooth transitions

๐Ÿ›ก๏ธ Security & Validation

  1. ๐ŸŽฃ Comprehensive Callback System Hook into various execution stages for:
    • Logging and monitoring
    • Custom behavior injection
    • Prompt injection attack prevention
    • Input/output validation

๐Ÿ“ฆ Ready-to-Use Components

  1. ๐Ÿค– Prebuilt Agent Patterns Production-ready implementations:
    • React agents
    • RAG (Retrieval-Augmented Generation)
    • Swarm architectures
    • Router agents
    • MapReduce patterns
    • Supervisor teams

๐Ÿ“ Developer Experience

  1. ๐Ÿ“‹ Pydantic-First Design All core classes (State, Message, ToolCalls) are Pydantic models:
    • Automatic JSON serialization
    • Type safety
    • Easy debugging and logging
    • Seamless database storage

Installation

Basic installation with uv (recommended):

uv pip install 10xscale-agentflow

Or with pip:

pip install 10xscale-agentflow

Optional Dependencies:

10xScale Agentflow supports optional dependencies for specific functionality:

# PostgreSQL + Redis checkpointing
pip install 10xscale-agentflow[pg_checkpoint]

# MCP (Model Context Protocol) support
pip install 10xscale-agentflow[mcp]

# Google GenAI adapter (google-genai SDK)
pip install 10xscale-agentflow[google-genai]

# Composio tools (adapter)
pip install 10xscale-agentflow[composio]

# LangChain tools (registry-based adapter)
pip install 10xscale-agentflow[langchain]

# Individual publishers
pip install 10xscale-agentflow[redis]     # Redis publisher
pip install 10xscale-agentflow[kafka]     # Kafka publisher
pip install 10xscale-agentflow[rabbitmq]  # RabbitMQ publisher

# Multiple extras
pip install 10xscale-agentflow[pg_checkpoint,mcp,google-genai,composio,langchain]

Environment Setup

Set your LLM provider API key:

export OPENAI_API_KEY=sk-...  # for OpenAI models
# or
export GEMINI_API_KEY=...     # for Google Gemini
# or
export ANTHROPIC_API_KEY=...  # for Anthropic Claude

If you have a .env file, it will be auto-loaded (via python-dotenv).


๐ŸŽฏ Two Ways to Build Agents

10xScale Agentflow offers two approachesโ€”choose based on your needs:

Approach Best For Lines of Code
Agent Class โญ Most use cases, rapid development 10-30 lines
Custom Functions Complex custom logic, custom SDK integrations 50-150 lines

Recommendation: Start with the Agent class. It handles 90% of use cases with minimal code.


๐Ÿ’ก Simple Example with Agent Class

Here's a complete tool-calling agent in under 30 lines:

from agentflow.graph import Agent, StateGraph, ToolNode
from agentflow.state import AgentState, Message
from agentflow.utils.constants import END


# 1. Define your tool
def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"The weather in {location} is sunny, 72ยฐF"


# 2. Build the graph with Agent class
graph = StateGraph()
graph.add_node("MAIN", Agent(
    model="gemini/gemini-2.5-flash",
    system_prompt=[{"role": "system", "content": "You are a helpful assistant."}],
    tool_node_name="TOOL"
))
graph.add_node("TOOL", ToolNode([get_weather]))


# 3. Define routing
def route(state: AgentState) -> str:
    if state.context and state.context[-1].tools_calls:
        return "TOOL"
    return END


graph.add_conditional_edges("MAIN", route, {"TOOL": "TOOL", END: END})
graph.add_edge("TOOL", "MAIN")
graph.set_entry_point("MAIN")

# 4. Run it!
app = graph.compile()
result = app.invoke({
    "messages": [Message.text_message("What's the weather in NYC?")]
}, config={"thread_id": "1"})

for msg in result["messages"]:
    print(f"{msg.role}: {msg.content}")

That's it! The Agent class handles message conversion, LLM calls, and tool integration automatically.


๐Ÿ”ง Advanced: Custom Functions Approach

For maximum control, use custom functions instead of the Agent class:

from dotenv import load_dotenv
from openai import AsyncOpenAI

from agentflow.checkpointer import InMemoryCheckpointer
from agentflow.graph import StateGraph, ToolNode
from agentflow.state.agent_state import AgentState
from agentflow.utils import Message
from agentflow.utils.constants import END
from agentflow.utils.converter import convert_messages

load_dotenv()
client = AsyncOpenAI()


# Define a tool with dependency injection
def get_weather(
        location: str,
        tool_call_id: str | None = None,
        state: AgentState | None = None,
) -> Message:
    """Get the current weather for a specific location."""
    res = f"The weather in {location} is sunny"
    return Message.tool_message(
        content=res,
        tool_call_id=tool_call_id,
    )


# Create tool node
tool_node = ToolNode([get_weather])


# Define main agent node (manual message handling)
async def main_agent(state: AgentState):
    prompts = "You are a helpful assistant. Use tools when needed."

    messages = convert_messages(
        system_prompts=[{"role": "system", "content": prompts}],
        state=state,
    )

    # Check if we need tools
    if (
            state.context
            and len(state.context) > 0
            and state.context[-1].role == "tool"
    ):
        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages,
        )
    else:
        tools = await tool_node.all_tools()
        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages,
            tools=tools,
        )

    return response


# Define routing logic
def should_use_tools(state: AgentState) -> str:
    """Determine if we should use tools or end."""
    if not state.context or len(state.context) == 0:
        return "TOOL"

    last_message = state.context[-1]

    if (
            hasattr(last_message, "tools_calls")
            and last_message.tools_calls
            and len(last_message.tools_calls) > 0
    ):
        return "TOOL"

    return END


# Build the graph
graph = StateGraph()
graph.add_node("MAIN", main_agent)
graph.add_node("TOOL", tool_node)

graph.add_conditional_edges(
    "MAIN",
    should_use_tools,
    {"TOOL": "TOOL", END: END},
)

graph.add_edge("TOOL", "MAIN")
graph.set_entry_point("MAIN")

# Compile and run
app = graph.compile(checkpointer=InMemoryCheckpointer())

inp = {"messages": [Message.from_text("What's the weather in New York?")]}
config = {"thread_id": "12345", "recursion_limit": 10}

res = app.invoke(inp, config=config)

for msg in res["messages"]:
    print(msg)

How to run the example locally

  1. Install dependencies (recommended in a virtualenv):
pip install -r requirements.txt
# or if you use uv
uv pip install -r requirements.txt
  1. Set your LLM provider API key (for example OpenAI):
export OPENAI_API_KEY="sk-..."
# or create a .env with the key and the script will load it automatically
  1. Run the example script:
python examples/react/react_weather_agent.py

Notes:

  • The example uses the OpenAI async client. Set OPENAI_API_KEY and choose a model available in your account.
  • InMemoryCheckpointer is for demo/testing only. Replace with a persistent checkpointer for production.

Example: MCP Integration

10xScale Agentflow supports integration with Model Context Protocol (MCP) servers, allowing you to connect external tools and services. The example in examples/react-mcp/ demonstrates how to integrate MCP tools with your agent.

First, create an MCP server (see examples/react-mcp/server.py):

from fastmcp import FastMCP

mcp = FastMCP("My MCP Server")

@mcp.tool(
    description="Get the weather for a specific location",
)
def get_weather(location: str) -> dict:
    return {
        "location": location,
        "temperature": "22ยฐC",
        "description": "Sunny",
    }

if __name__ == "__main__":
    mcp.run(transport="streamable-http")

Then, integrate MCP tools into your agent (from examples/react-mcp/react-mcp.py):

from typing import Any

from dotenv import load_dotenv
from fastmcp import Client
from openai import AsyncOpenAI

from agentflow.checkpointer import InMemoryCheckpointer
from agentflow.graph import StateGraph, ToolNode
from agentflow.state.agent_state import AgentState
from agentflow.utils import Message
from agentflow.utils.constants import END
from agentflow.utils.converter import convert_messages

load_dotenv()
client = AsyncOpenAI()

checkpointer = InMemoryCheckpointer()

config = {
    "mcpServers": {
        "weather": {
            "url": "http://127.0.0.1:8000/mcp",
            "transport": "streamable-http",
        },
    }
}

client_http = Client(config)

# Initialize ToolNode with MCP client
tool_node = ToolNode(functions=[], client=client_http)


async def main_agent(state: AgentState):
    prompts = "You are a helpful assistant."

    messages = convert_messages(
        system_prompts=[{"role": "system", "content": prompts}],
        state=state,
    )

    # Get all available tools (including MCP tools)
    tools = await tool_node.all_tools()

    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages,
        tools=tools,
    )
    return response


def should_use_tools(state: AgentState) -> str:
    """Determine if we should use tools or end the conversation."""
    if not state.context or len(state.context) == 0:
        return "TOOL"

    last_message = state.context[-1]

    if (
            hasattr(last_message, "tools_calls")
            and last_message.tools_calls
            and len(last_message.tools_calls) > 0
    ):
        return "TOOL"

    if last_message.role == "tool" and last_message.tool_call_id is not None:
        return END

    return END


graph = StateGraph()
graph.add_node("MAIN", main_agent)
graph.add_node("TOOL", tool_node)

graph.add_conditional_edges(
    "MAIN",
    should_use_tools,
    {"TOOL": "TOOL", END: END},
)

graph.add_edge("TOOL", "MAIN")
graph.set_entry_point("MAIN")

app = graph.compile(checkpointer=checkpointer)

# Run the agent
inp = {"messages": [Message.from_text("Please call the get_weather function for New York City")]}
config = {"thread_id": "12345", "recursion_limit": 10}

res = app.invoke(inp, config=config)

for i in res["messages"]:
    print(i)

How to run the MCP example:

  1. Install MCP dependencies:
pip install 10xscale-agentflow[mcp]
# or
uv pip install 10xscale-agentflow[mcp]
  1. Start the MCP server in one terminal:
cd examples/react-mcp
python server.py
  1. Run the MCP-integrated agent in another terminal:
python examples/react-mcp/react-mcp.py

Example: Streaming Agent

10xScale Agentflow supports streaming responses for real-time interaction. The example in examples/react_stream/stream_react_agent.py demonstrates different streaming modes and configurations.

import asyncio
import logging

from dotenv import load_dotenv
from openai import AsyncOpenAI

from agentflow.checkpointer import InMemoryCheckpointer
from agentflow.graph import StateGraph, ToolNode
from agentflow.state.agent_state import AgentState
from agentflow.utils import Message, ResponseGranularity
from agentflow.utils.constants import END
from agentflow.utils.converter import convert_messages

load_dotenv()
client = AsyncOpenAI()
checkpointer = InMemoryCheckpointer()


def get_weather(
        location: str,
        tool_call_id: str,
        state: AgentState,
) -> Message:
    """Get weather with injectable parameters."""
    res = f"The weather in {location} is sunny."
    return Message.tool_message(
        content=res,
        tool_call_id=tool_call_id,
    )


tool_node = ToolNode([get_weather])


async def main_agent(state: AgentState, config: dict):
    prompts = "You are a helpful assistant. Answer conversationally. Use tools when needed."

    messages = convert_messages(
        system_prompts=[{"role": "system", "content": prompts}],
        state=state,
    )

    is_stream = config.get("is_stream", False)

    if (
            state.context
            and len(state.context) > 0
            and state.context[-1].role == "tool"
    ):
        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages,
            stream=is_stream,
        )
    else:
        tools = await tool_node.all_tools()
        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages,
            tools=tools,
            stream=is_stream,
        )

    return response


def should_use_tools(state: AgentState) -> str:
    if not state.context or len(state.context) == 0:
        return "TOOL"

    last_message = state.context[-1]

    if (
            hasattr(last_message, "tools_calls")
            and last_message.tools_calls
            and len(last_message.tools_calls) > 0
    ):
        return "TOOL"

    if last_message.role == "tool" and last_message.tool_call_id is not None:
        return END

    return END


graph = StateGraph()
graph.add_node("MAIN", main_agent)
graph.add_node("TOOL", tool_node)

graph.add_conditional_edges(
    "MAIN",
    should_use_tools,
    {"TOOL": "TOOL", END: END},
)

graph.add_edge("TOOL", "MAIN")
graph.set_entry_point("MAIN")

app = graph.compile(checkpointer=checkpointer)


async def run_stream_test():
    inp = {"messages": [Message.from_text("Call get_weather for Tokyo, then reply.")]}
    config = {"thread_id": "stream-1", "recursion_limit": 10}

    logging.info("--- streaming start ---")
    stream_gen = app.astream(
        inp,
        config=config,
        response_granularity=ResponseGranularity.LOW,
    )
    async for chunk in stream_gen:
        print(chunk.model_dump(), end="\n", flush=True)


if __name__ == "__main__":
    asyncio.run(run_stream_test())

Run the streaming example:

python examples/react_stream/stream_react_agent.py

โšก Parallel Tool Execution

10xScale Agentflow automatically executes multiple tool calls in parallel when an LLM requests multiple tools simultaneously. This dramatically improves performance for I/O-bound operations.

Benefits

  • Faster Response Times: Multiple API calls execute concurrently
  • Better Resource Utilization: Don't wait for one tool to finish before starting the next
  • Seamless Integration: Works automatically with existing code - no changes needed

Example Performance

# LLM requests 3 tools simultaneously:
# - get_weather("NYC")    # Takes 1.0s
# - get_news("tech")      # Takes 1.5s
# - get_stock("AAPL")     # Takes 0.8s

# Sequential execution: 1.0 + 1.5 + 0.8 = 3.3 seconds
# Parallel execution:   max(1.0, 1.5, 0.8) = 1.5 seconds โšก

See the [parallel tool execution documentation](https://10xhub.github.io/10xScale Agentflow/Concept/graph/tools/#parallel-tool-execution) for more details.


๐ŸŽฏ Use Cases & Patterns

10xScale Agentflow includes prebuilt agent patterns for common scenarios:

๐Ÿค– Agent Types

  • React Agent - Reasoning and acting with tool calls
  • RAG Agent - Retrieval-augmented generation
  • Guarded Agent - Input/output validation and safety
  • Plan-Act-Reflect - Multi-step reasoning

๐Ÿ”€ Orchestration Patterns

  • Router Agent - Route queries to specialized agents
  • Swarm - Dynamic multi-agent collaboration
  • SupervisorTeam - Hierarchical agent coordination
  • MapReduce - Parallel processing and aggregation
  • Sequential - Linear workflow chains
  • Branch-Join - Parallel branches with synchronization

๐Ÿ”ฌ Advanced Patterns

  • Deep Research - Multi-level research and synthesis
  • Network - Complex agent networks

See the documentation for complete examples.


๐Ÿ”ง Development

For Library Users

Install 10xScale Agentflow as shown above. The pyproject.toml contains all runtime dependencies.

For Contributors

# Clone the repository
git clone https://github.com/10xhub/10xScale Agentflow.git
cd 10xScale Agentflow

# Create virtual environment
python -m venv .venv
source .venv/bin/activate  # On Windows: .venv\Scripts\activate

# Install dev dependencies
pip install -r requirements-dev.txt
# or
uv pip install -r requirements-dev.txt

# Run tests
make test
# or
pytest -q

# Build docs
make docs-serve  # Serves at http://127.0.0.1:8000

# Run examples
cd examples/react
python react_sync.py

Development Tools

The project uses:

  • pytest for testing (with async support)
  • ruff for linting and formatting
  • mypy for type checking
  • mkdocs with Material theme for documentation
  • coverage for test coverage reports

See pyproject.dev.toml for complete tool configurations.


๐Ÿ—บ๏ธ Roadmap

  • โœ… Core graph engine with nodes and edges
  • โœ… State management and checkpointing
  • โœ… Tool integration (MCP, Composio, LangChain)
  • โœ… Parallel tool execution for improved performance
  • โœ… Streaming and event publishing
  • โœ… Human-in-the-loop support
  • โœ… Prebuilt agent patterns
  • ๐Ÿšง Agent-to-Agent (A2A) communication protocols
  • ๐Ÿšง Remote node execution for distributed processing
  • ๐Ÿšง Enhanced observability and tracing
  • ๐Ÿšง More persistence backends (Redis, DynamoDB)
  • ๐Ÿšง Parallel/branching strategies
  • ๐Ÿšง Visual graph editor

๐Ÿ“„ License

MIT License - see [LICENSE](https://github.com/10xhub/10xScale Agentflow/blob/main/LICENSE) for details.


๐Ÿ”— Links & Resources


๐Ÿ™ Contributing

Contributions are welcome! Please see our [GitHub repository](https://github.com/10xhub/10xScale Agentflow) for:

  • Issue reporting and feature requests
  • Pull request guidelines
  • Development setup instructions
  • Code style and testing requirements

๐Ÿ’ฌ Support


Ready to build intelligent agents? Check out the [documentation](https://10xhub.github.io/10xScale Agentflow/) to get started!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

10xscale_agentflow-0.6.7.tar.gz (322.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

10xscale_agentflow-0.6.7-py3-none-any.whl (370.5 kB view details)

Uploaded Python 3

File details

Details for the file 10xscale_agentflow-0.6.7.tar.gz.

File metadata

  • Download URL: 10xscale_agentflow-0.6.7.tar.gz
  • Upload date:
  • Size: 322.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for 10xscale_agentflow-0.6.7.tar.gz
Algorithm Hash digest
SHA256 60b6f251fa500bf2bd2875719b30eb42d7f3c56e0c0cd025dfe3f303ae9c79f0
MD5 4f780dcd7401ba94f6ccfd871358c90f
BLAKE2b-256 e6bbbd8b2fb51bc9bab5d88b9d58829265251d03d36cf949cf72959041df0ad2

See more details on using hashes here.

File details

Details for the file 10xscale_agentflow-0.6.7-py3-none-any.whl.

File metadata

File hashes

Hashes for 10xscale_agentflow-0.6.7-py3-none-any.whl
Algorithm Hash digest
SHA256 55f5512fb39c6eb30655aa2f5b5cdf0316cf702a43e34ebd54cc71c13ee28532
MD5 3890acf92b88578165adb8be8f16de20
BLAKE2b-256 a99bfa9b1c8de05860a878c69b99b8fbda86a6c996f143ca605f0e73c47946db

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page