Operations Research Agentic Framework - A powerful framework for creating AI agent workflows with official MCP and A2A protocol SDKs
Project description
OR-AF (Operations Research Agentic Framework)
A powerful, production-ready framework for creating AI agent workflows with MCP (Model Context Protocol) server support, graph-based workflow definitions, A2A (Agent-to-Agent) protocol, and a TensorFlow-like API. Designed for operations research and complex multi-agent systems.
v0.4.0: Now uses the official MCP SDK and A2A SDK!
โจ Features
- ๐ Official MCP SDK: Uses mcp[cli] for standard protocol support
- ๐ค Official A2A SDK: Uses a2a-sdk for agent-to-agent communication
- ๐ TensorFlow-like API: Intuitive Sequential, Parallel, and custom workflow definitions
- ๐ฏ Conditional Routing: Dynamic workflow paths based on agent outputs
- ๐ Full Observability: Custom callbacks for monitoring every step
- ๐ Streaming Responses: Real-time streaming of agent responses
- ๐ Comprehensive Logging: Colored console logging and file logging
- โ Type Safety: Built with Pydantic for full type validation
๐ Installation
From Source (Development)
git clone https://github.com/iaakashRoy/or-af.git
cd or-af
pip install -e .
Using pip
pip install or-af
๐ Requirements
- Python 3.10+
- openai >= 1.0.0
- mcp[cli] >= 1.2.0 (Official MCP SDK)
- a2a-sdk[http-server] >= 0.3.0 (Official A2A SDK)
- pydantic >= 2.0.0
- httpx >= 0.23.0
๐ฏ Quick Start
1. Configure Environment
Create a .env file with your Azure OpenAI credentials:
endpoint = "https://your-endpoint.openai.azure.com/"
deployment = "your-deployment-name"
subscription_key = "your-api-key"
api_version = "2024-12-01-preview"
2. Create MCP Server with Tools (Using Official MCP SDK)
from or_af import create_mcp_server
# Create an MCP server (wraps official mcp.server.FastMCP)
server = create_mcp_server(name="math_tools", description="Mathematical tools")
# Add tools using decorator (official MCP SDK pattern)
@server.tool()
def add(x: float, y: float) -> float:
"""Add two numbers"""
return x + y
@server.tool()
def multiply(x: float, y: float) -> float:
"""Multiply two numbers"""
return x * y
# Add resources (official MCP SDK feature)
@server.resource("config://math")
def get_config() -> str:
"""Get math configuration"""
return '{"precision": 2}'
# Add prompts (official MCP SDK feature)
@server.prompt(title="Math Helper")
def math_prompt(expression: str) -> str:
"""Generate a math helper prompt"""
return f"Please help me calculate: {expression}"
3. Create Agent Connected to MCP Server
from or_af import Agent
# Agents access tools through MCP servers
agent = Agent(
name="math_agent",
system_prompt="You are a helpful math assistant.",
mcp_servers=[server], # Connect to MCP server(s)
stream=True,
verbose=True
)
4. Run Tasks
response = agent.run("What is 25 multiplied by 4?")
print(f"Success: {response.success}")
print(f"Response: {response.response}")
print(f"Iterations: {response.iteration_count}")
๐๏ธ Architecture
Key Concepts
- MCP Servers: Host and manage tools (tools can't be added directly to agents)
- Agents: Connect to MCP servers to access tools
- Workflow Graphs: Define agent pipelines with conditional routing
- A2A Protocol: Standard format for inter-agent communication
โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ
โ MCP Server 1โ โ MCP Server 2โ โ MCP Server 3โ
โ (Math) โ โ (Utility) โ โ (Processing)โ
โโโโโโโโฌโโโโโโโ โโโโโโโโฌโโโโโโโ โโโโโโโโฌโโโโโโโ
โ โ โ
โโโโโโโโโโโโโฌโโโโโโโโดโโโโโโโโโโโโฌโโโโโโโโ
โ โ
โโโโโโผโโโโโ โโโโโโผโโโโโ
โ Agent 1 โโโโA2Aโโโบโ Agent 2 โ
โโโโโโฌโโโโโ โโโโโโฌโโโโโ
โ โ
โโโโโโโโโโโฌโโโโโโโโโโ
โ
โโโโโโโโโโผโโโโโโโโโ
โ Workflow Graph โ
โ (Conditional โ
โ Routing) โ
โโโโโโโโโโโโโโโโโโโ
๐ Workflow Graphs (TensorFlow-like API)
Sequential Workflow
from or_af import Sequential
# Linear pipeline - agents execute in order
workflow = Sequential(
agents=[agent1, agent2, agent3],
name="my_pipeline"
)
result = workflow.run("Initial task")
Custom Graph Workflow
from or_af import WorkflowGraph, EdgeCondition
# Create custom workflow with conditional routing
wf = WorkflowGraph(name="research_pipeline")
# Add agents as nodes
researcher = wf.add_node(research_agent, name="researcher", is_entry=True)
reviewer = wf.add_node(review_agent, name="reviewer")
publisher = wf.add_node(publish_agent, name="publisher", is_exit=True)
# Add conditional edges
wf.add_edge(researcher, reviewer, condition=EdgeCondition.ON_SUCCESS)
wf.add_edge(researcher, publisher, condition=EdgeCondition.ON_FAILURE)
wf.add_edge(reviewer, publisher, condition=EdgeCondition.ON_SUCCESS)
# Compile and run
wf.compile()
result = wf.run("Research AI trends")
Parallel Workflow
from or_af import Parallel
# Parallel execution with merge
workflow = Parallel(
agents=[analyst1, analyst2, analyst3],
merge_agent=summarizer,
name="parallel_analysis"
)
result = workflow.run("Analyze data from multiple perspectives")
Using workflow() Helper (Most Flexible)
from or_af import workflow, EdgeCondition
wf = workflow("my_workflow")
# Add nodes
n1 = wf.add_node(agent1, name="step1", is_entry=True)
n2 = wf.add_node(agent2, name="step2")
n3 = wf.add_node(agent3, name="step3", is_exit=True)
# Chain nodes
wf.connect(n1, n2, n3, condition=EdgeCondition.ON_SUCCESS)
# Or create branches
wf.branch(n1, [
(n2, lambda x: x.success), # Go to n2 if success
(n3, lambda x: not x.success) # Go to n3 if failure
])
wf.compile()
result = wf.run("Task")
๐ A2A Protocol (Using Official A2A SDK)
Create A2A-compliant agents that can communicate with agents built using any framework:
from or_af import create_a2a_agent, SimpleA2AExecutor
# Define your agent's logic
async def handle_message(message: str) -> str:
"""Process incoming messages."""
return f"Processed: {message}"
# Create an A2A agent (wraps official a2a-sdk)
agent = create_a2a_agent(
name="My Agent",
description="A helpful agent",
skills=[
{
"id": "process",
"name": "Message Processor",
"description": "Processes incoming messages",
"tags": ["process", "message"]
}
],
input_modes=["text"],
output_modes=["text"]
)
# Set the executor
agent.set_executor(SimpleA2AExecutor(handle_message))
# Run the agent server (FastAPI-based)
# agent.run(port=9999) # Accessible at http://localhost:9999/
# Access agent card for discovery
print(f"Agent: {agent.card.name}")
print(f"Skills: {[s.name for s in agent.card.skills]}")
print(f"Streaming: {agent.card.capabilities.streaming}")
Custom A2A Executor
from or_af import BaseA2AExecutor, new_agent_text_message
class MyExecutor(BaseA2AExecutor):
async def execute(self, context, event_queue):
# Get the user's message
user_message = self.get_user_message(context)
# Process and respond
response = f"You said: {user_message}"
await event_queue.enqueue_event(new_agent_text_message(response))
async def cancel(self, context, event_queue):
raise Exception("Cancel not supported")
agent.set_executor(MyExecutor())
๐จ Custom Callbacks
from or_af import BaseCallback
from or_af.models import ToolCall, ToolResult
class MetricsCallback(BaseCallback):
def __init__(self):
self.tool_calls = 0
def on_tool_call_start(self, tool_call: ToolCall):
print(f"Starting: {tool_call.name}")
def on_tool_call_end(self, tool_result: ToolResult):
self.tool_calls += 1
print(f"Completed in {tool_result.execution_time:.3f}s")
# Use callback
agent = Agent(
system_prompt="...",
mcp_servers=[mcp],
callbacks=[MetricsCallback()]
)
๐ Complete Example
from or_af import (
MCPServer, Agent, WorkflowGraph,
Sequential, EdgeCondition
)
# 1. Create MCP servers with tools
math_mcp = MCPServer(name="math")
@math_mcp.tool()
def calculate(x: float, y: float, op: str) -> float:
"""Perform calculation"""
ops = {"add": x + y, "sub": x - y, "mul": x * y}
return ops.get(op, 0)
format_mcp = MCPServer(name="format")
format_mcp.add_tool("format_result", lambda x: f"Result: {x}")
# 2. Create agents connected to MCP servers
calculator = Agent(
name="calculator",
system_prompt="You are a calculator.",
mcp_servers=[math_mcp]
)
formatter = Agent(
name="formatter",
system_prompt="Format results nicely.",
mcp_servers=[format_mcp]
)
# 3. Create workflow graph
wf = WorkflowGraph(name="calc_pipeline")
calc_node = wf.add_node(calculator, is_entry=True)
format_node = wf.add_node(formatter, is_exit=True)
wf.add_edge(calc_node, format_node, condition=EdgeCondition.ON_SUCCESS)
wf.compile()
# 4. Run workflow
result = wf.run("Calculate 15 * 8 and format the result")
print(result)
๐งช Running Examples
# Simple example
python examples/simple_example.py
# Advanced example with all features
python examples/advanced_example.py
๐ Edge Conditions
| Condition | Description |
|---|---|
EdgeCondition.ALWAYS |
Always traverse this edge |
EdgeCondition.ON_SUCCESS |
Traverse only if previous node succeeded |
EdgeCondition.ON_FAILURE |
Traverse only if previous node failed |
EdgeCondition.ON_CONDITION |
Custom condition function |
๐ง Configuration
# MCP Server Config
mcp = MCPServer(
name="my_server",
host="localhost",
port=8000,
description="My tools server"
)
# Agent Config
agent = Agent(
name="my_agent",
system_prompt="Your prompt",
model_name="gpt-4",
temperature=1.0,
max_iterations=10,
stream=True,
verbose=True,
mcp_servers=[mcp1, mcp2],
callbacks=[MyCallback()]
)
๐ค Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
๐ License
This project is licensed under the MIT License - see the LICENSE file for details.
๐ Support
For issues, questions, or contributions, please visit our GitHub repository.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file or_af-0.4.0.tar.gz.
File metadata
- Download URL: or_af-0.4.0.tar.gz
- Upload date:
- Size: 45.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6e6ede7ae26187794cfd5f12c56d787ce2e237e93c8d71e4705543662f9341e6
|
|
| MD5 |
fd673ba4f734af00025ed4a9716e47fa
|
|
| BLAKE2b-256 |
9fab531bd90ef6ecdb3303805f7ae3a35938e1955ba96c228edd44caf57026d9
|
Provenance
The following attestation bundles were made for or_af-0.4.0.tar.gz:
Publisher:
publish.yml on iaakashRoy/or-af
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
or_af-0.4.0.tar.gz -
Subject digest:
6e6ede7ae26187794cfd5f12c56d787ce2e237e93c8d71e4705543662f9341e6 - Sigstore transparency entry: 884750382
- Sigstore integration time:
-
Permalink:
iaakashRoy/or-af@9896dc95f6c26c48d831ff9011f6d2c2f1664e63 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/iaakashRoy
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@9896dc95f6c26c48d831ff9011f6d2c2f1664e63 -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file or_af-0.4.0-py3-none-any.whl.
File metadata
- Download URL: or_af-0.4.0-py3-none-any.whl
- Upload date:
- Size: 43.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
da7ce8035e49f37a2a78c9c823147febcea5cd677b8c09942055690854be5ebd
|
|
| MD5 |
400ca85709a821cd7d144de146701f6d
|
|
| BLAKE2b-256 |
5dd2f13215e19682c3afc23dd130d20008002f7a0d095ed749cff3249eaf4746
|
Provenance
The following attestation bundles were made for or_af-0.4.0-py3-none-any.whl:
Publisher:
publish.yml on iaakashRoy/or-af
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
or_af-0.4.0-py3-none-any.whl -
Subject digest:
da7ce8035e49f37a2a78c9c823147febcea5cd677b8c09942055690854be5ebd - Sigstore transparency entry: 884750468
- Sigstore integration time:
-
Permalink:
iaakashRoy/or-af@9896dc95f6c26c48d831ff9011f6d2c2f1664e63 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/iaakashRoy
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@9896dc95f6c26c48d831ff9011f6d2c2f1664e63 -
Trigger Event:
workflow_dispatch
-
Statement type: