Skip to main content

Base framework for building RAO (Retrieval-Augmented Orchestrator) agents on the Progress Agentic RAG platform

Project description

rao_agent

PyPI version Python License: MIT

Base framework for building RAO (Retrieval-Augmented Orchestrator) agents on the PARAG platform.

rao_agent provides the abstract base classes, data models, and shared utilities that every custom RAO agent needs. It is intentionally minimal - it contains no business logic - so you only pull in the pieces relevant to your agent.

rao_agent belongs to Progress Agentic RAG (PARAG). In this README, product naming follows Progress/PARAG branding, while code-level Python package and module names (for example, nuclia_arag, nuclia, and nucliadb_*) are preserved as the current technical namespaces.

See the PARAG documentation for product and ecosystem context: https://docs.rag.progress.cloud/

Table of Contents


Installation

pip install rao_agent

Requires Python 3.10 or later.

Optional: full Progress Agentic RAG integration

The package works standalone, but installing nuclia_arag unlocks centralized agent/driver registration used by the Progress Agentic RAG server:

pip install rao_agent nuclia_arag

When nuclia_arag is not installed, a lightweight fallback implementation is used automatically (with a warning logged at import time).


Core Concepts

Class / Module File Purpose
Agent agent.py Abstract base for all agent types. Implement __call__ to execute your agent logic.
ContextAgent context/agent.py Specialised Agent that fetches data from an external source, validates the context against the user's question, and chains to fallback/next agents. Implement _get_question_context.
Manager manager.py Facade over the PARAG NUA client (implemented via existing nuclia namespaces). Provides LLM calls (execute, execute_json, execute_raw), reranking (rerank), token counting, and REMI evaluation.
QuestionMemory memory.py Conversation state bag. Holds the question, accumulated context, steps, answers, and chat history for one interaction turn. Must be subclassed.
Driver driver.py Adaptor for an external data source (database, API, …). Implement init() to establish a connection from a DriverConfig.
pub/sub messages pubsub.py Pydantic wire-protocol models used between the ARAG API server and agent worker processes (StartInteraction, AgentAnswer, AgentDone, …).
AragAnswer interaction.py Streaming response envelope sent from an agent to the client (answer text, citations, steps, data visualisations, OAuth requests, feedback requests, …).

Quick Start

Scaffold a new agent

rao_agent ships with a CLI tool that generates the boilerplate for a new agent:

rao_generate MyAgent

This creates a directory myagent/ with a ready-to-edit pyproject.toml, src/rao_agent_myagent/ package, and a tests/ directory.

Minimal custom agent

from rao_agent.agent import Agent, AgentConfig
from rao_agent.manager import Manager
from rao_agent.memory import QuestionMemory


class GreetingAgentConfig(AgentConfig):
    greeting: str = "Hello"


class GreetingAgent(Agent[GreetingAgentConfig]):
    async def inner_from_config(self, config: GreetingAgentConfig, agent_id=None):
        pass  # no async setup needed

    async def __call__(self, memory: QuestionMemory, manager: Manager):
        answer = f"{self.config.greeting}, you asked: {memory.original_question}"
        await memory.add_answer(answer, module="greeting", agent_path="/greeting")
        await memory.add_final_answer()
        await memory.send_final_answer()

Minimal context agent

from typing import Optional
from rao_agent.context.agent import ContextAgent
from rao_agent.context.config import ContextAgentConfig
from rao_agent.manager import Manager
from rao_agent.memory import Chunk, Context, QuestionMemory


class MyContextAgentConfig(ContextAgentConfig):
    model_config = {"title": "My Data Source"}
    my_param: str = "default"


class MyContextAgent(ContextAgent):
    agent_description = "Fetches data from My Data Source to answer questions."

    async def _get_question_context(
        self,
        memory: QuestionMemory,
        manager: Manager,
        question_uuid: str,
        question: str,
        flow_id: str,
        extra_context=None,
    ):
        # Fetch data from your source and build a Context object
        chunks = [Chunk(chunk_id="c1", text="Relevant data here...")]
        context = Context(
            original_question_uuid=memory.original_question_uuid,
            actual_question_uuid=question_uuid,
            question=question,
            chunks=chunks,
            source="my_data_source",
            agent=self.agent_id,
        )
        missing = await self.save_ctx_and_return_missing(
            memory=memory,
            manager=manager,
            question=question,
            context=context,
            flow_id=flow_id,
        )
        # Return a list of (uuid, question) for any missing information
        return [missing] if missing else []

Implementing the Abstractions

Agent

Agent[T_Config] is a generic base class. T_Config must be a subclass of AgentConfig.

class AgentConfig(BaseModel):
    id: Optional[str]          # auto-generated UUID if not set
    title: str                 # display name shown in step traces
    rules: Optional[List[str]] # optional rules forwarded to the LLM
    max_retries: int           # default 1
    module: Any                # agent type identifier (set by the framework)

Required methods to implement:

Method Signature Description
inner_from_config async (config, agent_id) -> None Async initialisation (e.g. connect to services). Called by from_config.
__call__ async (memory, manager) -> None Main execution entry point. Write context to memory and call memory.add_final_answer() / memory.send_final_answer().

Utility method:

self.step_title("Fetching data")
# → "My Agent: Fetching data"  (uses config title)

ContextAgent

ContextAgent extends Agent with context retrieval, validation, and chaining.

Required method to implement:

Method Signature Description
_get_question_context async (memory, manager, question_uuid, question, flow_id, extra_context) -> list[tuple[str, str]] Fetch data, build a Context, call save_ctx_and_return_missing, return any missing sub-questions as (uuid, question) tuples.

Optional class attributes:

class MyContextAgent(ContextAgent):
    agent_description = "One-sentence description used by the rephrase LLM."
    exposed_functions: Optional[List[str]] = None  # function names to expose to other agents

Configuration fields (ContextAgentConfig):

Field Default Description
context_validation_model "chatgpt-azure-4o-mini" LLM used to validate context and attempt an answer
rephrase_model "chatgpt-azure-4o-mini" LLM used to rephrase questions when chaining agents
context_aware_rephrasing_prompt None Custom system prompt for the rephrase step
prune_context True Remove non-cited chunks after context validation
fallback None ContextAgentConfig for a fallback agent
next_agent None ContextAgentConfig for the next agent in a chain

QuestionMemory

QuestionMemory holds all state for a single interaction turn. It is abstract — you must subclass it and implement all @abc.abstractmethod methods.

Required abstract methods:

Method Return type Description
context_user_info() str String describing the current user (for LLM personalisation)
get_session_id() str Unique identifier for the conversation session
get_agent_id() str Identifier of the active agent
get_workflow_id() str Identifier of the active workflow
get_rules() list[Rule | str] Rules to inject into LLM prompts
search_in_questions() KnowledgeboxFindResults Semantic search over conversation history
user_info() Dict[str, str] Arbitrary user metadata
set_session_source() Persist a Source for the session
get_session_source() Optional[Source] Retrieve a persisted Source
context_history() tuple[str, int] Formatted history string + token count
get_chat_history() list[Message] Full chat history for the LLM
save_context() Persist a Context object
save_image_urls() Persist image URLs
get_agent_contexts() list[Context] Retrieve saved contexts for an agent
get_agent_answer_summaries() list[str] Retrieve answer summaries for an agent
list_contexts_markdown() list[str] Contexts as markdown strings
list_chunks_markdown() list[str] Individual chunks as markdown strings
contexts_markdown() str Concatenated contexts as markdown
list_contexts_minimal() list[str] Minimal (summary-preferred) context list
contexts_minimal() str Concatenated minimal contexts
get_prompt_texts() list[str] Rendered prompt texts for all contexts
add_generated_text() Store raw LLM output
add_step() Record an intermediate reasoning step
set_actual_question() Update the current working question
add_future_questions() Store follow-up questions for the user
add_context_questions() Store sub-questions generated during retrieval
get_questions() list[tuple[str, str]] Return context sub-questions (or the original)
add_answer() Record a candidate answer
add_final_answer() Finalise the answer
send_final_answer() Dispatch the final answer to the user
send_oauth() Initiate an OAuth flow
recv_oauth_callback() Optional[str] Receive OAuth credentials
send_feedback() Optional[UserToAgentInteraction] Send a feedback/confirmation prompt to the user
save() Persist memory to durable storage

A complete mock implementation is available in rao_agent.fixtures.QuestionMemory — useful as a starting point for tests or development.

Driver

A Driver connects to an external data source. Subclass Driver and implement init():

from typing import Any, Self
from rao_agent.driver import Driver, DriverConfig, EncryptedPayload
from pydantic import BaseModel


class MyDBConfig(EncryptedPayload):
    connection_string: str


class MyDBDriver(Driver):
    name: str = "my_db"
    provider: str = "my_db"
    connection: Any = None  # your actual connection object

    @classmethod
    async def init(cls, driver: DriverConfig) -> Self:
        instance = cls(name=driver.name, provider=driver.provider)
        instance.connection = await connect_to_db(driver.config.connection_string)
        return instance

Register the driver so the Manager can discover it (see Agent Registration).


Agent Registration

The @agent and @driver decorators from rao_agent.configure register your classes with the framework:

from rao_agent.configure import agent, driver

@agent(id="my_agent", config_schema=MyContextAgentConfig)
class MyContextAgent(ContextAgent):
    ...

@driver(id="my_db", config_schema=MyDBConfig)
class MyDBDriver(Driver):
    ...

When nuclia_arag is installed, registration is global and used by the PARAG ARAG server to instantiate agents from stored configuration. Without it, the lightweight fallback stores registrations in module-level dicts (AGENTS, DRIVERS).


PARAG NUA Primitives

rao_agent uses PARAG NUA primitives through Manager. These types are part of the execution contract between your agent code and PARAG's generation/evaluation APIs.

Primitive Source Used by
ChatModel nuclia.lib.nua_responses Manager.execute_raw() and low-level generation payloads
UserPrompt nuclia.lib.nua_responses Manager.execute(), execute_json(), execute_from_context()
Message nuclia.lib.nua_responses Chat history in Manager.execute()
Image nuclia.lib.nua_responses Vision/context images in execute methods
Tokens nuclia.lib.nua_responses Manager.tokens_predict()
RerankModel, RerankResponse nuclia.lib.nua_responses Manager.rerank()
RemiRequest, RemiResponse nuclia_models.predict.remi Manager.remi()

Minimal flow from an agent to NUA:

answer, input_tokens, output_tokens, code = await manager.execute(
    prompt="Answer the question using available context.",
    user_id=memory.get_session_id(),
    model="chatgpt-azure-4o-mini",
    chat_history=await memory.get_chat_history(),
)

Compatibility note:

  • These primitives come from upstream PARAG runtime libraries, currently published under nuclia-prefixed package names (nuclia, nuclia-models).
  • Changes in those libraries can affect payload formats or validation behavior.
  • Keep rao_agent and PARAG runtime dependency versions aligned in production deployments.

Testing with Built-in Fixtures

rao_agent.fixtures provides pytest fixtures for testing your agents without a live PARAG account:

# conftest.py
from rao_agent.fixtures import question_memory, manager  # re-export fixtures

# test_my_agent.py
import pytest
from my_agent import MyContextAgent, MyContextAgentConfig


@pytest.mark.asyncio
async def test_my_agent(question_memory):
    question_memory.original_question = "What is Progress Agentic RAG?"

    config = MyContextAgentConfig(
        module="my_agent",
        context_validation_model="chatgpt-azure-4o-mini",
    )
    agent = await MyContextAgent.from_config(config)

    # manager fixture requires NUA_KEY env var and a driver config
    # For unit tests, mock the manager instead:
    from unittest.mock import AsyncMock
    mock_manager = AsyncMock()
    mock_manager.execute_json.return_value = (
        {"answer": "Progress Agentic RAG is a retrieval-augmented platform.", "useful": "yes",
         "missing_info_query": "", "reason": "context matches", "citations": []},
        10.0, 5.0,
    )

    await agent(question_memory, mock_manager)
    assert question_memory.final_answer is not None

The manager fixture (parameterised) requires a NUA_KEY environment variable and a driver config tuple:

@pytest.mark.asyncio
@pytest.mark.parametrize("manager", [("my_db", {"connection_string": "..."})], indirect=True)
async def test_with_real_manager(question_memory, manager):
    ...

OpenTelemetry Tracing

Wrap any agent method with @trace_agent to get automatic span creation and metrics:

from rao_agent.trace import trace_agent

class MyContextAgent(ContextAgent):
    @trace_agent
    async def _get_question_context(self, ...):
        ...

The tracer reads RAO_SERVICE_NAME from the environment (default: "nuclia_arag_server") and integrates with nucliadb_telemetry.


License

MIT License. See LICENSE for details.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rao_agent-1.0.2.post2420-py3-none-any.whl (34.1 kB view details)

Uploaded Python 3

File details

Details for the file rao_agent-1.0.2.post2420-py3-none-any.whl.

File metadata

File hashes

Hashes for rao_agent-1.0.2.post2420-py3-none-any.whl
Algorithm Hash digest
SHA256 82274648812d4b420a88bc3cd1b7261f1fcbaadc0043b62358c18328cae9ee69
MD5 e26acb6b1f69eb019e167b7b2a6a3fd8
BLAKE2b-256 c0c114d9f3a58f2857e8124493f030f084a01e68febc1d7b8bda090f483f16ef

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page