Skip to main content

Galileo observability auto-instrumentation for A2A (Agent-to-Agent) protocol

Project description

galileo-a2a

PyPI version Python versions License

Galileo observability for A2A (Agent-to-Agent) protocol interactions. Automatic tracing of agent-to-agent calls, task lifecycle, and cross-agent distributed trace correlation.

How It Works

┌──────────────────────────────────────────────────────────────────────┐
│                        Single Distributed Trace                      │
├──────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  ┌─────────────────────┐    A2A Protocol     ┌────────────────────┐  │
│  │  Orchestrator Agent │    (JSON-RPC)       │  Researcher Agent  │  │
│  │                     │                     │                    │  │
│  │  plan               │  send_message ──>   │  on_message_send   │  │
│  │  delegate ──────────┼──────────────────>  │    invoke LLM      │  │
│  │  synthesize         │  <── stream events  │    call tools      │  │
│  └──────────┬──────────┘                     └─────────┬──────────┘  │
│             │                                          │             │
│             │          OpenTelemetry Spans             │             │
│             └─────────────────┬────────────────────────┘             │
│                               │                                      │
│                    ┌──────────▼──────────┐                           │
│                    │      Galileo        │                           │
│                    │  (Trace Explorer)   │                           │
│                    └─────────────────────┘                           │
└──────────────────────────────────────────────────────────────────────┘

galileo-a2a instruments both the client (outbound calls) and server (inbound requests) sides of the A2A protocol. Trace context is propagated through A2A message metadata so all agents appear in a single distributed trace in Galileo.

Installation

pip install galileo-a2a

Requirements: Python 3.10+, a Galileo API key, and a2a-sdk 0.3+

Quick Start

from galileo.otel import GalileoSpanProcessor, add_galileo_span_processor
from galileo_a2a import A2AInstrumentor
from opentelemetry.sdk.trace import TracerProvider

provider = TracerProvider()
add_galileo_span_processor(provider, GalileoSpanProcessor())
A2AInstrumentor().instrument(tracer_provider=provider, agent_name="orchestrator")

Once instrumented, all a2a-sdk client and server interactions produce OTel spans automatically.

Configuration

Parameter Description
tracer_provider OTel TracerProvider instance. Falls back to the global provider if not specified.
agent_name Name of this agent, set on spans as gen_ai.agent.name.
capture_content Set to False to disable capturing message content (e.g. for PII compliance).

Environment variables for the Galileo exporter:

Environment Variable Description
GALILEO_API_KEY Galileo API key (required)
GALILEO_PROJECT Project name (alternative to GalileoSpanProcessor(project=...))
GALILEO_LOG_STREAM Log stream name (alternative to GalileoSpanProcessor(logstream=...))

Features

Client & Server Instrumentation

The instrumentor patches both sides of the A2A protocol:

Client-side (outbound calls): send_message, get_task, cancel_task, get_card

Server-side (inbound requests): on_message_send, on_message_send_stream

Cross-Agent Distributed Tracing

When Agent A calls Agent B, trace context is propagated through A2A message metadata. The receiving agent joins the caller's trace, so both agents appear in a single distributed trace in Galileo.

Session Tracking

A2A's context_id is mapped to session.id, grouping all interactions within the same conversation into a Galileo session.

Disabling Instrumentation

instrumentor = A2AInstrumentor()
instrumentor.instrument(tracer_provider=provider, agent_name="my-agent")

# Restore original a2a-sdk behavior
instrumentor.uninstrument()

Multi-Agent Example

Add distributed tracing to your multi-agent A2A workflow with just 4 lines of code. The rest is your standard agent logic — no changes needed.

import asyncio
import uuid

import httpx
import uvicorn
from a2a.client import ClientConfig, ClientFactory
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps.jsonrpc.starlette_app import A2AStarletteApplication
from a2a.server.events import EventQueue, InMemoryQueueManager
from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import (
    AgentCapabilities, AgentCard, AgentSkill, Message, Role,
    TaskState, TaskStatus, TaskStatusUpdateEvent, TextPart,
)
from galileo.otel import GalileoSpanProcessor, add_galileo_span_processor
from galileo_a2a import A2AInstrumentor
from langchain.agents import create_agent
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.graph import END, START, StateGraph
from opentelemetry.instrumentation.langchain import LangchainInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from starlette.applications import Starlette
from typing_extensions import TypedDict

# ---- Only 4 lines needed for full distributed tracing ----
provider = TracerProvider()
add_galileo_span_processor(provider, GalileoSpanProcessor())
A2AInstrumentor().instrument(tracer_provider=provider, agent_name="orchestrator")
LangchainInstrumentor().instrument(tracer_provider=provider)


# ---- Everything below is your standard agent code ----

llm = ChatOpenAI(model="gpt-4o-mini")

# Researcher agent — standard LangChain agent served over A2A

@tool
def search_kb(query: str) -> str:
    """Search the travel knowledge base."""
    if "paris" in query.lower():
        return "Eiffel Tower 330m, Louvre 9.6M visitors/yr, 20 arrondissements."
    return f"No results for: {query}"

researcher = create_agent(
    llm, [search_kb],
    system_prompt="Use search_kb to find facts, then summarize for a traveler.",
)

CARD = AgentCard(
    name="researcher", description="Travel researcher", url="http://localhost:9867",
    version="1.0.0", capabilities=AgentCapabilities(streaming=True),
    default_input_modes=["text/plain"], default_output_modes=["text/plain"],
    skills=[AgentSkill(id="qa", name="Q&A", description="Answer questions", tags=[])],
)

class ResearcherExecutor(AgentExecutor):
    async def execute(self, ctx: RequestContext, queue: EventQueue) -> None:
        result = await researcher.ainvoke({"messages": [("user", ctx.get_user_input())]})
        await queue.enqueue_event(TaskStatusUpdateEvent(
            task_id=ctx.task_id, context_id=ctx.context_id, final=True,
            status=TaskStatus(state=TaskState.completed, message=Message(
                message_id=str(uuid.uuid4()), role=Role.agent,
                parts=[TextPart(text=result["messages"][-1].content or "")],
            )),
        ))

    async def cancel(self, ctx: RequestContext, queue: EventQueue) -> None:
        await queue.enqueue_event(TaskStatusUpdateEvent(
            task_id=ctx.task_id, context_id=ctx.context_id, final=True,
            status=TaskStatus(state=TaskState.canceled),
        ))

# Orchestrator — standard LangGraph StateGraph

class State(TypedDict):
    user_query: str
    skills: list[str]
    research_query: str
    response: str
    plan: str

def build_orchestrator(client):
    async def discover(state: State) -> dict:
        card = await client.get_card()
        return {"skills": [s.name for s in card.skills]}

    async def plan(state: State) -> dict:
        result = await create_agent(llm,
            system_prompt="Formulate a travel research question. Reply with ONLY the question.",
        ).ainvoke({"messages": [("user", state["user_query"])]})
        return {"research_query": result["messages"][-1].content}

    async def delegate(state: State) -> dict:
        msg = Message(message_id=str(uuid.uuid4()), role=Role.user,
            parts=[TextPart(text=state["research_query"])], context_id="session-1")
        async for event in client.send_message(msg):
            if isinstance(event, tuple):
                task = event[0]
                if task.status and task.status.state == TaskState.completed and task.status.message:
                    return {"response": getattr(task.status.message.parts[0].root, "text", "")}
        return {"response": ""}

    async def synthesize(state: State) -> dict:
        result = await create_agent(llm,
            system_prompt="Create a brief 3-day itinerary from the research.",
        ).ainvoke({"messages": [("user", f"Research:\n{state['response']}\n\nCreate itinerary.")]})
        return {"plan": result["messages"][-1].content}

    graph = StateGraph(State)
    graph.add_node("discover", discover)
    graph.add_node("plan", plan)
    graph.add_node("delegate", delegate)
    graph.add_node("synthesize", synthesize)
    graph.add_edge(START, "discover")
    graph.add_edge("discover", "plan")
    graph.add_edge("plan", "delegate")
    graph.add_edge("delegate", "synthesize")
    graph.add_edge("synthesize", END)
    return graph.compile()

# Run both agents

async def main():
    app = Starlette()
    A2AStarletteApplication(
        agent_card=CARD,
        http_handler=DefaultRequestHandler(
            agent_executor=ResearcherExecutor(),
            task_store=InMemoryTaskStore(), queue_manager=InMemoryQueueManager(),
        ),
    ).add_routes_to_app(app)
    server = uvicorn.Server(uvicorn.Config(app, port=9867, log_level="warning"))
    server_task = asyncio.create_task(server.serve())
    await asyncio.sleep(1)

    client = ClientFactory(
        config=ClientConfig(streaming=True, httpx_client=httpx.AsyncClient(timeout=httpx.Timeout(120))),
    ).create(CARD)
    result = await build_orchestrator(client).ainvoke(
        {"user_query": "Plan a 3-day trip to Paris", "skills": [], "research_query": "", "response": "", "plan": ""},
    )
    print(result["plan"])

    server.should_exit = True
    await server_task
    provider.shutdown()

if __name__ == "__main__":
    # Set environment variables: GALILEO_API_KEY, OPENAI_API_KEY
    asyncio.run(main())

Resources

License

Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

galileo_a2a-1.0.0.tar.gz (22.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

galileo_a2a-1.0.0-py3-none-any.whl (14.9 kB view details)

Uploaded Python 3

File details

Details for the file galileo_a2a-1.0.0.tar.gz.

File metadata

  • Download URL: galileo_a2a-1.0.0.tar.gz
  • Upload date:
  • Size: 22.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for galileo_a2a-1.0.0.tar.gz
Algorithm Hash digest
SHA256 0df3cc96f9839ac5a293e85f83104149abe620324d43c49b3c0c1653d773c4dd
MD5 f4fccfa223dd6d7ebc45993b4fb65938
BLAKE2b-256 96d988d4eee5da6480451f0c0d47332d21f361a891350972b62f45246903c276

See more details on using hashes here.

Provenance

The following attestation bundles were made for galileo_a2a-1.0.0.tar.gz:

Publisher: release-a2a.yaml on rungalileo/galileo-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file galileo_a2a-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: galileo_a2a-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 14.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for galileo_a2a-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 91bbfa783c595038dd3292f9cf185d2c75d91d930084ed37427325ed8f1dbbd5
MD5 3a780a4d161a456cb458058f078a9681
BLAKE2b-256 8e762417f42b9105236400e9bef4e4c7be814c017f439332e203733cb045bae5

See more details on using hashes here.

Provenance

The following attestation bundles were made for galileo_a2a-1.0.0-py3-none-any.whl:

Publisher: release-a2a.yaml on rungalileo/galileo-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page