10xScale Agentflow is a Python framework for building, orchestrating, and managing multi-agent systems. Designed for flexibility and scalability, 10xScale Agentflow enables developers to create intelligent agents that collaborate, communicate, and solve complex tasks together.
Project description
10xScale Agentflow
10xScale Agentflow is a lightweight Python framework for building intelligent agents and orchestrating multi-agent workflows. It's an LLM-agnostic orchestration tool that works with native SDKs from OpenAI, Google Gemini, Anthropic Claude, or any other provider. You choose your LLM library; 10xScale Agentflow provides the workflow orchestration.
โจ Key Features
- โก Agent Class - Build complete agents in 10-30 lines of code (new in v0.5.3!)
- ๐ฏ LLM-Agnostic Orchestration - Works with any LLM provider (OpenAI, Gemini, Claude, native SDKs)
- ๐ค Multi-Agent Workflows - Build complex agent systems with your choice of orchestration patterns
- ๐ Structured Responses - Get
content, optionalthinking, andusagein a standardized format - ๐ Streaming Support - Real-time incremental responses with delta updates
- ๐ง Tool Integration - Native support for function calling, MCP, Composio, and LangChain tools with parallel execution
- ๐ LangGraph-Inspired Engine - Flexible graph orchestration with nodes, conditional edges, and control flow
- ๐พ State Management - Built-in persistence with in-memory and PostgreSQL+Redis checkpointers
- ๐ Human-in-the-Loop - Pause/resume execution for approval workflows and debugging
- ๐ Production-Ready - Event publishing (Console, Redis, Kafka, RabbitMQ), metrics, and observability
- ๐งฉ Dependency Injection - Clean parameter injection for tools and nodes
- ๐ฆ Prebuilt Patterns - React, RAG, Swarm, Router, MapReduce, SupervisorTeam, and more
๐ What Makes Agentflow Unique
Agentflow stands out with powerful features designed for production-grade AI applications:
๐๏ธ Architecture & Scalability
-
๐พ Checkpointer with Caching Design Intelligent state persistence with built-in caching layer to scale efficiently. PostgreSQL + Redis implementation ensures high performance in production environments.
-
๐ง 3-Layer Memory System
- Short-term memory: Current conversation context
- Conversational memory: Session-based chat history
- Long-term memory: Persistent knowledge across sessions
๐ง Advanced Tooling Ecosystem
-
๐ Remote Tool Calls Execute tools remotely using our TypeScript SDK for distributed agent architectures.
-
๐ ๏ธ Comprehensive Tool Integration
- Local tools (Python functions)
- Remote tools (via TypeScript SDK)
- Agent handoff tools (multi-agent collaboration)
- MCP (Model Context Protocol)
- LangChain tools
- Composio tools
๐ฏ Intelligent Context Management
- ๐ Dedicated Context Manager
- Automatically controls context size to prevent token overflow
- Called at iteration end to avoid mid-execution context loss
- Fully extensible with custom implementations
โ๏ธ Dependency Injection & Control
-
๐ First-Class Dependency Injection Powered by InjectQ library for clean, testable, and maintainable code patterns.
-
๐๏ธ Custom ID Generation Control Choose between string, int, or bigint IDs. Smaller IDs save significant space in databases and indexes compared to standard 128-bit UUIDs.
๐ Observability & Events
- ๐ก Internal Event Publishing
Emit execution events to any publisher:
- Kafka
- RabbitMQ
- Redis Pub/Sub
- OpenTelemetry (planned)
- Custom publishers
๐ Advanced Execution Features
-
โฐ Background Task Manager Built-in manager for running tasks asynchronously:
- Prefetching data
- Memory persistence
- Cleanup operations
- Custom background jobs
-
๐ฆ Human-in-the-Loop with Interrupts Pause execution at any point for human approval, then seamlessly resume with full state preservation.
-
๐งญ Flexible Agent Navigation
- Condition-based routing between agents
- Command-based jumps to specific agents
- Agent handoff tools for smooth transitions
๐ก๏ธ Security & Validation
- ๐ฃ Comprehensive Callback System
Hook into various execution stages for:
- Logging and monitoring
- Custom behavior injection
- Prompt injection attack prevention
- Input/output validation
๐ฆ Ready-to-Use Components
- ๐ค Prebuilt Agent Patterns
Production-ready implementations:
- React agents
- RAG (Retrieval-Augmented Generation)
- Swarm architectures
- Router agents
- MapReduce patterns
- Supervisor teams
๐ Developer Experience
- ๐ Pydantic-First Design
All core classes (State, Message, ToolCalls) are Pydantic models:
- Automatic JSON serialization
- Type safety
- Easy debugging and logging
- Seamless database storage
Installation
Basic installation with uv (recommended):
uv pip install 10xscale-agentflow
Or with pip:
pip install 10xscale-agentflow
Optional Dependencies:
10xScale Agentflow supports optional dependencies for specific functionality:
# PostgreSQL + Redis checkpointing
pip install 10xscale-agentflow[pg_checkpoint]
# MCP (Model Context Protocol) support
pip install 10xscale-agentflow[mcp]
# Google GenAI adapter (google-genai SDK)
pip install 10xscale-agentflow[google-genai]
# Composio tools (adapter)
pip install 10xscale-agentflow[composio]
# LangChain tools (registry-based adapter)
pip install 10xscale-agentflow[langchain]
# Individual publishers
pip install 10xscale-agentflow[redis] # Redis publisher
pip install 10xscale-agentflow[kafka] # Kafka publisher
pip install 10xscale-agentflow[rabbitmq] # RabbitMQ publisher
# Multiple extras
pip install 10xscale-agentflow[pg_checkpoint,mcp,google-genai,composio,langchain]
Environment Setup
Set your LLM provider API key:
export OPENAI_API_KEY=sk-... # for OpenAI models
# or
export GEMINI_API_KEY=... # for Google Gemini
# or
export ANTHROPIC_API_KEY=... # for Anthropic Claude
If you have a .env file, it will be auto-loaded (via python-dotenv).
๐ฏ Two Ways to Build Agents
10xScale Agentflow offers two approachesโchoose based on your needs:
| Approach | Best For | Lines of Code |
|---|---|---|
| Agent Class โญ | Most use cases, rapid development | 10-30 lines |
| Custom Functions | Complex custom logic, custom SDK integrations | 50-150 lines |
Recommendation: Start with the Agent class. It handles 90% of use cases with minimal code.
๐ก Simple Example with Agent Class
Here's a complete tool-calling agent in under 30 lines:
from agentflow.graph import Agent, StateGraph, ToolNode
from agentflow.state import AgentState, Message
from agentflow.utils.constants import END
# 1. Define your tool
def get_weather(location: str) -> str:
"""Get weather for a location."""
return f"The weather in {location} is sunny, 72ยฐF"
# 2. Build the graph with Agent class
graph = StateGraph()
graph.add_node("MAIN", Agent(
model="gemini/gemini-2.5-flash",
system_prompt=[{"role": "system", "content": "You are a helpful assistant."}],
tool_node_name="TOOL"
))
graph.add_node("TOOL", ToolNode([get_weather]))
# 3. Define routing
def route(state: AgentState) -> str:
if state.context and state.context[-1].tools_calls:
return "TOOL"
return END
graph.add_conditional_edges("MAIN", route, {"TOOL": "TOOL", END: END})
graph.add_edge("TOOL", "MAIN")
graph.set_entry_point("MAIN")
# 4. Run it!
app = graph.compile()
result = app.invoke({
"messages": [Message.text_message("What's the weather in NYC?")]
}, config={"thread_id": "1"})
for msg in result["messages"]:
print(f"{msg.role}: {msg.content}")
That's it! The Agent class handles message conversion, LLM calls, and tool integration automatically.
๐ง Advanced: Custom Functions Approach
For maximum control, use custom functions instead of the Agent class:
from dotenv import load_dotenv
from openai import AsyncOpenAI
from agentflow.checkpointer import InMemoryCheckpointer
from agentflow.graph import StateGraph, ToolNode
from agentflow.state.agent_state import AgentState
from agentflow.utils import Message
from agentflow.utils.constants import END
from agentflow.utils.converter import convert_messages
load_dotenv()
client = AsyncOpenAI()
# Define a tool with dependency injection
def get_weather(
location: str,
tool_call_id: str | None = None,
state: AgentState | None = None,
) -> Message:
"""Get the current weather for a specific location."""
res = f"The weather in {location} is sunny"
return Message.tool_message(
content=res,
tool_call_id=tool_call_id,
)
# Create tool node
tool_node = ToolNode([get_weather])
# Define main agent node (manual message handling)
async def main_agent(state: AgentState):
prompts = "You are a helpful assistant. Use tools when needed."
messages = convert_messages(
system_prompts=[{"role": "system", "content": prompts}],
state=state,
)
# Check if we need tools
if (
state.context
and len(state.context) > 0
and state.context[-1].role == "tool"
):
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
)
else:
tools = await tool_node.all_tools()
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
tools=tools,
)
return response
# Define routing logic
def should_use_tools(state: AgentState) -> str:
"""Determine if we should use tools or end."""
if not state.context or len(state.context) == 0:
return "TOOL"
last_message = state.context[-1]
if (
hasattr(last_message, "tools_calls")
and last_message.tools_calls
and len(last_message.tools_calls) > 0
):
return "TOOL"
return END
# Build the graph
graph = StateGraph()
graph.add_node("MAIN", main_agent)
graph.add_node("TOOL", tool_node)
graph.add_conditional_edges(
"MAIN",
should_use_tools,
{"TOOL": "TOOL", END: END},
)
graph.add_edge("TOOL", "MAIN")
graph.set_entry_point("MAIN")
# Compile and run
app = graph.compile(checkpointer=InMemoryCheckpointer())
inp = {"messages": [Message.from_text("What's the weather in New York?")]}
config = {"thread_id": "12345", "recursion_limit": 10}
res = app.invoke(inp, config=config)
for msg in res["messages"]:
print(msg)
How to run the example locally
- Install dependencies (recommended in a virtualenv):
pip install -r requirements.txt
# or if you use uv
uv pip install -r requirements.txt
- Set your LLM provider API key (for example OpenAI):
export OPENAI_API_KEY="sk-..."
# or create a .env with the key and the script will load it automatically
- Run the example script:
python examples/react/react_weather_agent.py
Notes:
- The example uses the OpenAI async client. Set
OPENAI_API_KEYand choose a model available in your account. InMemoryCheckpointeris for demo/testing only. Replace with a persistent checkpointer for production.
Example: MCP Integration
10xScale Agentflow supports integration with Model Context Protocol (MCP) servers, allowing you to connect external tools and services. The example in examples/react-mcp/ demonstrates how to integrate MCP tools with your agent.
First, create an MCP server (see examples/react-mcp/server.py):
from fastmcp import FastMCP
mcp = FastMCP("My MCP Server")
@mcp.tool(
description="Get the weather for a specific location",
)
def get_weather(location: str) -> dict:
return {
"location": location,
"temperature": "22ยฐC",
"description": "Sunny",
}
if __name__ == "__main__":
mcp.run(transport="streamable-http")
Then, integrate MCP tools into your agent (from examples/react-mcp/react-mcp.py):
from typing import Any
from dotenv import load_dotenv
from fastmcp import Client
from openai import AsyncOpenAI
from agentflow.checkpointer import InMemoryCheckpointer
from agentflow.graph import StateGraph, ToolNode
from agentflow.state.agent_state import AgentState
from agentflow.utils import Message
from agentflow.utils.constants import END
from agentflow.utils.converter import convert_messages
load_dotenv()
client = AsyncOpenAI()
checkpointer = InMemoryCheckpointer()
config = {
"mcpServers": {
"weather": {
"url": "http://127.0.0.1:8000/mcp",
"transport": "streamable-http",
},
}
}
client_http = Client(config)
# Initialize ToolNode with MCP client
tool_node = ToolNode(functions=[], client=client_http)
async def main_agent(state: AgentState):
prompts = "You are a helpful assistant."
messages = convert_messages(
system_prompts=[{"role": "system", "content": prompts}],
state=state,
)
# Get all available tools (including MCP tools)
tools = await tool_node.all_tools()
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
tools=tools,
)
return response
def should_use_tools(state: AgentState) -> str:
"""Determine if we should use tools or end the conversation."""
if not state.context or len(state.context) == 0:
return "TOOL"
last_message = state.context[-1]
if (
hasattr(last_message, "tools_calls")
and last_message.tools_calls
and len(last_message.tools_calls) > 0
):
return "TOOL"
if last_message.role == "tool" and last_message.tool_call_id is not None:
return END
return END
graph = StateGraph()
graph.add_node("MAIN", main_agent)
graph.add_node("TOOL", tool_node)
graph.add_conditional_edges(
"MAIN",
should_use_tools,
{"TOOL": "TOOL", END: END},
)
graph.add_edge("TOOL", "MAIN")
graph.set_entry_point("MAIN")
app = graph.compile(checkpointer=checkpointer)
# Run the agent
inp = {"messages": [Message.from_text("Please call the get_weather function for New York City")]}
config = {"thread_id": "12345", "recursion_limit": 10}
res = app.invoke(inp, config=config)
for i in res["messages"]:
print(i)
How to run the MCP example:
- Install MCP dependencies:
pip install 10xscale-agentflow[mcp]
# or
uv pip install 10xscale-agentflow[mcp]
- Start the MCP server in one terminal:
cd examples/react-mcp
python server.py
- Run the MCP-integrated agent in another terminal:
python examples/react-mcp/react-mcp.py
Example: Streaming Agent
10xScale Agentflow supports streaming responses for real-time interaction. The example in examples/react_stream/stream_react_agent.py demonstrates different streaming modes and configurations.
import asyncio
import logging
from dotenv import load_dotenv
from openai import AsyncOpenAI
from agentflow.checkpointer import InMemoryCheckpointer
from agentflow.graph import StateGraph, ToolNode
from agentflow.state.agent_state import AgentState
from agentflow.utils import Message, ResponseGranularity
from agentflow.utils.constants import END
from agentflow.utils.converter import convert_messages
load_dotenv()
client = AsyncOpenAI()
checkpointer = InMemoryCheckpointer()
def get_weather(
location: str,
tool_call_id: str,
state: AgentState,
) -> Message:
"""Get weather with injectable parameters."""
res = f"The weather in {location} is sunny."
return Message.tool_message(
content=res,
tool_call_id=tool_call_id,
)
tool_node = ToolNode([get_weather])
async def main_agent(state: AgentState, config: dict):
prompts = "You are a helpful assistant. Answer conversationally. Use tools when needed."
messages = convert_messages(
system_prompts=[{"role": "system", "content": prompts}],
state=state,
)
is_stream = config.get("is_stream", False)
if (
state.context
and len(state.context) > 0
and state.context[-1].role == "tool"
):
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
stream=is_stream,
)
else:
tools = await tool_node.all_tools()
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
tools=tools,
stream=is_stream,
)
return response
def should_use_tools(state: AgentState) -> str:
if not state.context or len(state.context) == 0:
return "TOOL"
last_message = state.context[-1]
if (
hasattr(last_message, "tools_calls")
and last_message.tools_calls
and len(last_message.tools_calls) > 0
):
return "TOOL"
if last_message.role == "tool" and last_message.tool_call_id is not None:
return END
return END
graph = StateGraph()
graph.add_node("MAIN", main_agent)
graph.add_node("TOOL", tool_node)
graph.add_conditional_edges(
"MAIN",
should_use_tools,
{"TOOL": "TOOL", END: END},
)
graph.add_edge("TOOL", "MAIN")
graph.set_entry_point("MAIN")
app = graph.compile(checkpointer=checkpointer)
async def run_stream_test():
inp = {"messages": [Message.from_text("Call get_weather for Tokyo, then reply.")]}
config = {"thread_id": "stream-1", "recursion_limit": 10}
logging.info("--- streaming start ---")
stream_gen = app.astream(
inp,
config=config,
response_granularity=ResponseGranularity.LOW,
)
async for chunk in stream_gen:
print(chunk.model_dump(), end="\n", flush=True)
if __name__ == "__main__":
asyncio.run(run_stream_test())
Run the streaming example:
python examples/react_stream/stream_react_agent.py
โก Parallel Tool Execution
10xScale Agentflow automatically executes multiple tool calls in parallel when an LLM requests multiple tools simultaneously. This dramatically improves performance for I/O-bound operations.
Benefits
- Faster Response Times: Multiple API calls execute concurrently
- Better Resource Utilization: Don't wait for one tool to finish before starting the next
- Seamless Integration: Works automatically with existing code - no changes needed
Example Performance
# LLM requests 3 tools simultaneously:
# - get_weather("NYC") # Takes 1.0s
# - get_news("tech") # Takes 1.5s
# - get_stock("AAPL") # Takes 0.8s
# Sequential execution: 1.0 + 1.5 + 0.8 = 3.3 seconds
# Parallel execution: max(1.0, 1.5, 0.8) = 1.5 seconds โก
See the [parallel tool execution documentation](https://10xhub.github.io/10xScale Agentflow/Concept/graph/tools/#parallel-tool-execution) for more details.
๐ฏ Use Cases & Patterns
10xScale Agentflow includes prebuilt agent patterns for common scenarios:
๐ค Agent Types
- React Agent - Reasoning and acting with tool calls
- RAG Agent - Retrieval-augmented generation
- Guarded Agent - Input/output validation and safety
- Plan-Act-Reflect - Multi-step reasoning
๐ Orchestration Patterns
- Router Agent - Route queries to specialized agents
- Swarm - Dynamic multi-agent collaboration
- SupervisorTeam - Hierarchical agent coordination
- MapReduce - Parallel processing and aggregation
- Sequential - Linear workflow chains
- Branch-Join - Parallel branches with synchronization
๐ฌ Advanced Patterns
- Deep Research - Multi-level research and synthesis
- Network - Complex agent networks
See the documentation for complete examples.
๐ง Development
For Library Users
Install 10xScale Agentflow as shown above. The pyproject.toml contains all runtime dependencies.
For Contributors
# Clone the repository
git clone https://github.com/10xhub/10xScale Agentflow.git
cd 10xScale Agentflow
# Create virtual environment
python -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
# Install dev dependencies
pip install -r requirements-dev.txt
# or
uv pip install -r requirements-dev.txt
# Run tests
make test
# or
pytest -q
# Build docs
make docs-serve # Serves at http://127.0.0.1:8000
# Run examples
cd examples/react
python react_sync.py
Development Tools
The project uses:
- pytest for testing (with async support)
- ruff for linting and formatting
- mypy for type checking
- mkdocs with Material theme for documentation
- coverage for test coverage reports
See pyproject.dev.toml for complete tool configurations.
๐บ๏ธ Roadmap
- โ Core graph engine with nodes and edges
- โ State management and checkpointing
- โ Tool integration (MCP, Composio, LangChain)
- โ Parallel tool execution for improved performance
- โ Streaming and event publishing
- โ Human-in-the-loop support
- โ Prebuilt agent patterns
- ๐ง Agent-to-Agent (A2A) communication protocols
- ๐ง Remote node execution for distributed processing
- ๐ง Enhanced observability and tracing
- ๐ง More persistence backends (Redis, DynamoDB)
- ๐ง Parallel/branching strategies
- ๐ง Visual graph editor
๐ License
MIT License - see [LICENSE](https://github.com/10xhub/10xScale Agentflow/blob/main/LICENSE) for details.
๐ Links & Resources
- [Documentation](https://10xhub.github.io/10xScale Agentflow/) - Full documentation with tutorials and API reference
- [GitHub Repository](https://github.com/10xhub/10xScale Agentflow) - Source code and issues
- PyPI Project - Package releases
- [Examples Directory](https://github.com/10xhub/10xScale Agentflow/tree/main/examples) - Runnable code samples
๐ Contributing
Contributions are welcome! Please see our [GitHub repository](https://github.com/10xhub/10xScale Agentflow) for:
- Issue reporting and feature requests
- Pull request guidelines
- Development setup instructions
- Code style and testing requirements
๐ฌ Support
- Documentation: [https://10xhub.github.io/10xScale Agentflow/](https://10xhub.github.io/10xScale Agentflow/)
- Examples: Check the [examples directory](https://github.com/10xhub/10xScale Agentflow/tree/main/examples)
- Issues: Report bugs on [GitHub Issues](https://github.com/10xhub/10xScale Agentflow/issues)
- Discussions: Ask questions in [GitHub Discussions](https://github.com/10xhub/10xScale Agentflow/discussions)
Ready to build intelligent agents? Check out the [documentation](https://10xhub.github.io/10xScale Agentflow/) to get started!
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file 10xscale_agentflow-0.6.7.tar.gz.
File metadata
- Download URL: 10xscale_agentflow-0.6.7.tar.gz
- Upload date:
- Size: 322.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
60b6f251fa500bf2bd2875719b30eb42d7f3c56e0c0cd025dfe3f303ae9c79f0
|
|
| MD5 |
4f780dcd7401ba94f6ccfd871358c90f
|
|
| BLAKE2b-256 |
e6bbbd8b2fb51bc9bab5d88b9d58829265251d03d36cf949cf72959041df0ad2
|
File details
Details for the file 10xscale_agentflow-0.6.7-py3-none-any.whl.
File metadata
- Download URL: 10xscale_agentflow-0.6.7-py3-none-any.whl
- Upload date:
- Size: 370.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
55f5512fb39c6eb30655aa2f5b5cdf0316cf702a43e34ebd54cc71c13ee28532
|
|
| MD5 |
3890acf92b88578165adb8be8f16de20
|
|
| BLAKE2b-256 |
a99bfa9b1c8de05860a878c69b99b8fbda86a6c996f143ca605f0e73c47946db
|