Base framework for building RAO (Retrieval-Augmented Orchestrator) agents on the Progress Agentic RAG platform
Project description
rao_agent
Base framework for building RAO (Retrieval-Augmented Orchestrator) agents on the PARAG platform.
rao_agent provides the abstract base classes, data models, and shared utilities that every custom RAO agent needs. It is intentionally minimal - it contains no business logic - so you only pull in the pieces relevant to your agent.
rao_agent belongs to Progress Agentic RAG (PARAG). In this README, product naming follows Progress/PARAG branding, while code-level Python package and module names (for example, nuclia_arag, nuclia, and nucliadb_*) are preserved as the current technical namespaces.
See the PARAG documentation for product and ecosystem context: https://docs.rag.progress.cloud/
Table of Contents
- Installation
- Core Concepts
- Quick Start
- Implementing the Abstractions
- Agent Registration
- PARAG NUA Primitives
- Testing with Built-in Fixtures
- OpenTelemetry Tracing
- License
Installation
pip install rao_agent
Requires Python 3.10 or later.
Optional: full Progress Agentic RAG integration
The package works standalone, but installing nuclia_arag unlocks centralized agent/driver registration used by the Progress Agentic RAG server:
pip install rao_agent nuclia_arag
When nuclia_arag is not installed, a lightweight fallback implementation is used automatically (with a warning logged at import time).
Core Concepts
| Class / Module | File | Purpose |
|---|---|---|
Agent |
agent.py |
Abstract base for all agent types. Implement __call__ to execute your agent logic. |
ContextAgent |
context/agent.py |
Specialised Agent that fetches data from an external source, validates the context against the user's question, and chains to fallback/next agents. Implement _get_question_context. |
Manager |
manager.py |
Facade over the PARAG NUA client (implemented via existing nuclia namespaces). Provides LLM calls (execute, execute_json, execute_raw), reranking (rerank), token counting, and REMI evaluation. |
QuestionMemory |
memory.py |
Conversation state bag. Holds the question, accumulated context, steps, answers, and chat history for one interaction turn. Must be subclassed. |
Driver |
driver.py |
Adaptor for an external data source (database, API, …). Implement init() to establish a connection from a DriverConfig. |
| pub/sub messages | pubsub.py |
Pydantic wire-protocol models used between the ARAG API server and agent worker processes (StartInteraction, AgentAnswer, AgentDone, …). |
AragAnswer |
interaction.py |
Streaming response envelope sent from an agent to the client (answer text, citations, steps, data visualisations, OAuth requests, feedback requests, …). |
Quick Start
Scaffold a new agent
rao_agent ships with a CLI tool that generates the boilerplate for a new agent:
rao_generate MyAgent
This creates a directory myagent/ with a ready-to-edit pyproject.toml, src/rao_agent_myagent/ package, and a tests/ directory.
Minimal custom agent
from rao_agent.agent import Agent, AgentConfig
from rao_agent.manager import Manager
from rao_agent.memory import QuestionMemory
class GreetingAgentConfig(AgentConfig):
greeting: str = "Hello"
class GreetingAgent(Agent[GreetingAgentConfig]):
async def inner_from_config(self, config: GreetingAgentConfig, agent_id=None):
pass # no async setup needed
async def __call__(self, memory: QuestionMemory, manager: Manager):
answer = f"{self.config.greeting}, you asked: {memory.original_question}"
await memory.add_answer(answer, module="greeting", agent_path="/greeting")
await memory.add_final_answer()
await memory.send_final_answer()
Minimal context agent
from typing import Optional
from rao_agent.context.agent import ContextAgent
from rao_agent.context.config import ContextAgentConfig
from rao_agent.manager import Manager
from rao_agent.memory import Chunk, Context, QuestionMemory
class MyContextAgentConfig(ContextAgentConfig):
model_config = {"title": "My Data Source"}
my_param: str = "default"
class MyContextAgent(ContextAgent):
agent_description = "Fetches data from My Data Source to answer questions."
async def _get_question_context(
self,
memory: QuestionMemory,
manager: Manager,
question_uuid: str,
question: str,
flow_id: str,
extra_context=None,
):
# Fetch data from your source and build a Context object
chunks = [Chunk(chunk_id="c1", text="Relevant data here...")]
context = Context(
original_question_uuid=memory.original_question_uuid,
actual_question_uuid=question_uuid,
question=question,
chunks=chunks,
source="my_data_source",
agent=self.agent_id,
)
missing = await self.save_ctx_and_return_missing(
memory=memory,
manager=manager,
question=question,
context=context,
flow_id=flow_id,
)
# Return a list of (uuid, question) for any missing information
return [missing] if missing else []
Implementing the Abstractions
Agent
Agent[T_Config] is a generic base class. T_Config must be a subclass of AgentConfig.
class AgentConfig(BaseModel):
id: Optional[str] # auto-generated UUID if not set
title: str # display name shown in step traces
rules: Optional[List[str]] # optional rules forwarded to the LLM
max_retries: int # default 1
module: Any # agent type identifier (set by the framework)
Required methods to implement:
| Method | Signature | Description |
|---|---|---|
inner_from_config |
async (config, agent_id) -> None |
Async initialisation (e.g. connect to services). Called by from_config. |
__call__ |
async (memory, manager) -> None |
Main execution entry point. Write context to memory and call memory.add_final_answer() / memory.send_final_answer(). |
Utility method:
self.step_title("Fetching data")
# → "My Agent: Fetching data" (uses config title)
ContextAgent
ContextAgent extends Agent with context retrieval, validation, and chaining.
Required method to implement:
| Method | Signature | Description |
|---|---|---|
_get_question_context |
async (memory, manager, question_uuid, question, flow_id, extra_context) -> list[tuple[str, str]] |
Fetch data, build a Context, call save_ctx_and_return_missing, return any missing sub-questions as (uuid, question) tuples. |
Optional class attributes:
class MyContextAgent(ContextAgent):
agent_description = "One-sentence description used by the rephrase LLM."
exposed_functions: Optional[List[str]] = None # function names to expose to other agents
Configuration fields (ContextAgentConfig):
| Field | Default | Description |
|---|---|---|
context_validation_model |
"chatgpt-azure-4o-mini" |
LLM used to validate context and attempt an answer |
rephrase_model |
"chatgpt-azure-4o-mini" |
LLM used to rephrase questions when chaining agents |
context_aware_rephrasing_prompt |
None |
Custom system prompt for the rephrase step |
prune_context |
True |
Remove non-cited chunks after context validation |
fallback |
None |
ContextAgentConfig for a fallback agent |
next_agent |
None |
ContextAgentConfig for the next agent in a chain |
QuestionMemory
QuestionMemory holds all state for a single interaction turn. It is abstract — you must subclass it and implement all @abc.abstractmethod methods.
Required abstract methods:
| Method | Return type | Description |
|---|---|---|
context_user_info() |
str |
String describing the current user (for LLM personalisation) |
get_session_id() |
str |
Unique identifier for the conversation session |
get_agent_id() |
str |
Identifier of the active agent |
get_workflow_id() |
str |
Identifier of the active workflow |
get_rules() |
list[Rule | str] |
Rules to inject into LLM prompts |
search_in_questions() |
KnowledgeboxFindResults |
Semantic search over conversation history |
user_info() |
Dict[str, str] |
Arbitrary user metadata |
set_session_source() |
— | Persist a Source for the session |
get_session_source() |
Optional[Source] |
Retrieve a persisted Source |
context_history() |
tuple[str, int] |
Formatted history string + token count |
get_chat_history() |
list[Message] |
Full chat history for the LLM |
save_context() |
— | Persist a Context object |
save_image_urls() |
— | Persist image URLs |
get_agent_contexts() |
list[Context] |
Retrieve saved contexts for an agent |
get_agent_answer_summaries() |
list[str] |
Retrieve answer summaries for an agent |
list_contexts_markdown() |
list[str] |
Contexts as markdown strings |
list_chunks_markdown() |
list[str] |
Individual chunks as markdown strings |
contexts_markdown() |
str |
Concatenated contexts as markdown |
list_contexts_minimal() |
list[str] |
Minimal (summary-preferred) context list |
contexts_minimal() |
str |
Concatenated minimal contexts |
get_prompt_texts() |
list[str] |
Rendered prompt texts for all contexts |
add_generated_text() |
— | Store raw LLM output |
add_step() |
— | Record an intermediate reasoning step |
set_actual_question() |
— | Update the current working question |
add_future_questions() |
— | Store follow-up questions for the user |
add_context_questions() |
— | Store sub-questions generated during retrieval |
get_questions() |
list[tuple[str, str]] |
Return context sub-questions (or the original) |
add_answer() |
— | Record a candidate answer |
add_final_answer() |
— | Finalise the answer |
send_final_answer() |
— | Dispatch the final answer to the user |
send_oauth() |
— | Initiate an OAuth flow |
recv_oauth_callback() |
Optional[str] |
Receive OAuth credentials |
send_feedback() |
Optional[UserToAgentInteraction] |
Send a feedback/confirmation prompt to the user |
save() |
— | Persist memory to durable storage |
A complete mock implementation is available in rao_agent.fixtures.QuestionMemory — useful as a starting point for tests or development.
Driver
A Driver connects to an external data source. Subclass Driver and implement init():
from typing import Any, Self
from rao_agent.driver import Driver, DriverConfig, EncryptedPayload
from pydantic import BaseModel
class MyDBConfig(EncryptedPayload):
connection_string: str
class MyDBDriver(Driver):
name: str = "my_db"
provider: str = "my_db"
connection: Any = None # your actual connection object
@classmethod
async def init(cls, driver: DriverConfig) -> Self:
instance = cls(name=driver.name, provider=driver.provider)
instance.connection = await connect_to_db(driver.config.connection_string)
return instance
Register the driver so the Manager can discover it (see Agent Registration).
Agent Registration
The @agent and @driver decorators from rao_agent.configure register your classes with the framework:
from rao_agent.configure import agent, driver
@agent(id="my_agent", config_schema=MyContextAgentConfig)
class MyContextAgent(ContextAgent):
...
@driver(id="my_db", config_schema=MyDBConfig)
class MyDBDriver(Driver):
...
When nuclia_arag is installed, registration is global and used by the PARAG ARAG server to instantiate agents from stored configuration. Without it, the lightweight fallback stores registrations in module-level dicts (AGENTS, DRIVERS).
PARAG NUA Primitives
rao_agent uses PARAG NUA primitives through Manager. These types are part of the execution contract between your agent code and PARAG's generation/evaluation APIs.
| Primitive | Source | Used by |
|---|---|---|
ChatModel |
nuclia.lib.nua_responses |
Manager.execute_raw() and low-level generation payloads |
UserPrompt |
nuclia.lib.nua_responses |
Manager.execute(), execute_json(), execute_from_context() |
Message |
nuclia.lib.nua_responses |
Chat history in Manager.execute() |
Image |
nuclia.lib.nua_responses |
Vision/context images in execute methods |
Tokens |
nuclia.lib.nua_responses |
Manager.tokens_predict() |
RerankModel, RerankResponse |
nuclia.lib.nua_responses |
Manager.rerank() |
RemiRequest, RemiResponse |
nuclia_models.predict.remi |
Manager.remi() |
Minimal flow from an agent to NUA:
answer, input_tokens, output_tokens, code = await manager.execute(
prompt="Answer the question using available context.",
user_id=memory.get_session_id(),
model="chatgpt-azure-4o-mini",
chat_history=await memory.get_chat_history(),
)
Compatibility note:
- These primitives come from upstream PARAG runtime libraries, currently published under
nuclia-prefixed package names (nuclia,nuclia-models). - Changes in those libraries can affect payload formats or validation behavior.
- Keep
rao_agentand PARAG runtime dependency versions aligned in production deployments.
Testing with Built-in Fixtures
rao_agent.fixtures provides pytest fixtures for testing your agents without a live PARAG account:
# conftest.py
from rao_agent.fixtures import question_memory, manager # re-export fixtures
# test_my_agent.py
import pytest
from my_agent import MyContextAgent, MyContextAgentConfig
@pytest.mark.asyncio
async def test_my_agent(question_memory):
question_memory.original_question = "What is Progress Agentic RAG?"
config = MyContextAgentConfig(
module="my_agent",
context_validation_model="chatgpt-azure-4o-mini",
)
agent = await MyContextAgent.from_config(config)
# manager fixture requires NUA_KEY env var and a driver config
# For unit tests, mock the manager instead:
from unittest.mock import AsyncMock
mock_manager = AsyncMock()
mock_manager.execute_json.return_value = (
{"answer": "Progress Agentic RAG is a retrieval-augmented platform.", "useful": "yes",
"missing_info_query": "", "reason": "context matches", "citations": []},
10.0, 5.0,
)
await agent(question_memory, mock_manager)
assert question_memory.final_answer is not None
The manager fixture (parameterised) requires a NUA_KEY environment variable and a driver config tuple:
@pytest.mark.asyncio
@pytest.mark.parametrize("manager", [("my_db", {"connection_string": "..."})], indirect=True)
async def test_with_real_manager(question_memory, manager):
...
OpenTelemetry Tracing
Wrap any agent method with @trace_agent to get automatic span creation and metrics:
from rao_agent.trace import trace_agent
class MyContextAgent(ContextAgent):
@trace_agent
async def _get_question_context(self, ...):
...
The tracer reads RAO_SERVICE_NAME from the environment (default: "nuclia_arag_server") and integrates with nucliadb_telemetry.
License
MIT License. See LICENSE for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rao_agent-1.0.2.post2344-py3-none-any.whl.
File metadata
- Download URL: rao_agent-1.0.2.post2344-py3-none-any.whl
- Upload date:
- Size: 33.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
03b8e25cf16032298a91bc00e1f46016559e8bfb12b80d26008a5286e6d6365e
|
|
| MD5 |
f908ce1f6f655b099c478161e62477a3
|
|
| BLAKE2b-256 |
8721706d036b36e1fcac92cc337386cfa8a9610470148e0ba91ae361e9f229b2
|