Skip to main content

Python SDK for building Health Universe A2A-compliant agents

Project description

Health Universe A2A SDK for Python

Test

A simple, batteries-included Python SDK for building A2A-compliant agents for the Health Universe platform.

Features

  • Simple API: Just implement 3 methods to create an agent
  • Document Operations: Built-in support for reading, writing, and searching files
  • Search: Full-text and semantic search across thread documents
  • Multi-Agent Support: Run multiple agents with inter-agent communication
  • Sub-Agents: Lightweight SubAgent class for fast inline agent-to-agent calls
  • Progress Updates: Built-in support for progress tracking and artifacts
  • Validation: Pre-validate messages before processing
  • Lifecycle Hooks: Customize behavior at key points
  • Local Development: Test agents against local files without the HU backend
  • Health Universe Integration: Works seamlessly with HU platform

Installation

uv pip install health-universe-a2a

For development:

git clone https://github.com/Health-Universe/healthuniverse-a2a-sdk-python.git
cd healthuniverse-a2a-sdk-python
uv pip install -e ".[dev]"

Quick Start

from health_universe_a2a import Agent, AgentContext

class SymptomClassifierAgent(Agent):
    def get_agent_name(self) -> str:
        return "Symptom Classifier"

    def get_agent_description(self) -> str:
        return "Classifies symptoms into medical categories"

    async def process_message(self, message: str, context: AgentContext) -> str:
        await context.update_progress("Analyzing symptoms...", 0.5)

        # Your classification logic here
        category = classify_symptoms(message)

        return f"Classification: {category}"

if __name__ == "__main__":
    SymptomClassifierAgent().serve()

Working with Documents

The SDK provides a DocumentClient for reading and writing files in the Health Universe platform:

from health_universe_a2a import Agent, AgentContext

class DocumentAnalyzerAgent(Agent):
    def get_agent_name(self) -> str:
        return "Document Analyzer"

    def get_agent_description(self) -> str:
        return "Analyzes clinical documents"

    async def process_message(self, message: str, context: AgentContext) -> str:
        # List all documents in the thread
        documents = await context.document_client.list_documents()

        # Filter documents by name
        protocols = await context.document_client.filter_by_name("protocol")

        # Download and read a document
        content = await context.document_client.download_text(documents[0].id)

        # Write results back
        await context.document_client.write(
            name="Analysis Results",
            content='{"result": "analysis complete"}',
            filename="analysis_results.json",
        )

        return f"Analyzed {len(documents)} documents"

Searching Documents

The SDK supports full-text and semantic (vector) search across thread documents:

# Full-text search
results = await context.document_client.search("blood pressure", limit=5)
for result in results:
    print(f"{result.document_name}: {result.content}")

# Semantic search (vector similarity)
results = await context.document_client.semantic_search(
    "patient vitals",
    max_results=5,
    similarity_threshold=0.4,
)
for result in results:
    print(f"{result.document_name} (score: {result.similarity}): {result.content}")

Document Processing Status

Wait for platform-side document extraction and embedding before searching:

# Wait for all documents to be ready (extracted + embedded)
statuses = await context.document_client.wait_for_ready(timeout=120.0)

# Check individual document status
status = await context.document_client.get_processing_status(doc.id)
if status.is_ready:
    text = await context.document_client.download_extracted(doc.id)

Core Concepts

Agent Context

Your process_message method receives an AgentContext with helper methods:

async def process_message(self, message: str, context: AgentContext) -> str:
    # Send progress updates
    await context.update_progress("Working...", 0.5)

    # Add artifacts (files generated by the agent)
    # Prefer markdown format - the platform has markdown WYSIWYG support
    await context.add_artifact(
        name="Results",
        content=markdown_report,
        data_type="text/markdown"
    )

    # Access metadata
    user_id = context.user_id
    thread_id = context.thread_id

    # Access documents API
    docs = await context.document_client.list_documents()

    return "Done!"

Note: Automatic Terminal Status

The SDK automatically sends a terminal status (completed or failed) when process_message() returns or raises an exception. This ensures the Navigator progress bar always completes properly.

Tip: Prefer Markdown for Artifacts

When generating artifacts, prefer text/markdown as the data type. The Health Universe platform includes a markdown WYSIWYG editor, so users can view and edit markdown artifacts directly in the browser.

Validation

Validate messages before processing:

from health_universe_a2a import ValidationAccepted, ValidationRejected

async def validate_message(self, message: str, metadata: dict) -> ValidationAccepted | ValidationRejected:
    if len(message) < 10:
        return ValidationRejected(reason="Message too short (min 10 chars)")

    return ValidationAccepted(estimated_duration_seconds=60)

Lifecycle Hooks

Customize behavior at key points:

async def on_startup(self) -> None:
    """Called when agent starts up"""
    self.model = await load_model()

async def on_shutdown(self) -> None:
    """Called when agent shuts down"""
    await self.model.unload()

async def on_task_start(self, message: str, context: AgentContext) -> None:
    """Called before processing"""
    self.logger.info(f"Starting task for {context.user_id}")

async def on_task_complete(self, message: str, result: str, context: AgentContext) -> None:
    """Called after successful processing"""
    await self.metrics.increment("tasks_completed")

async def on_task_error(self, message: str, error: Exception, context: AgentContext) -> str | None:
    """Called on error - return custom error message or None for default"""
    if isinstance(error, TimeoutError):
        return "Task timed out. Try a smaller request."
    return None

Configuration Methods

Customize agent behavior:

def get_agent_version(self) -> str:
    """Version string (default: "1.0.0")"""
    return "2.1.0"

def get_max_duration_seconds(self) -> int:
    """Max duration hint (default: 3600)"""
    return 7200  # 2 hours

def get_supported_input_formats(self) -> list[str]:
    """Supported input MIME types"""
    return ["text/plain", "application/json"]

def get_supported_output_formats(self) -> list[str]:
    """Supported output MIME types"""
    return ["text/plain", "application/json"]

Local Development

The SDK includes a LocalDocumentClient and create_local_context() helper so you can test agents against local files without the Health Universe backend.

How it works

Agent code uses context.document_client for all file operations. In production the SDK injects a NestJS/S3-backed DocumentClient; locally, create_local_context() injects a filesystem-backed LocalDocumentClient. Both implement DocumentClientBase, so agent code never branches on the environment.

from health_universe_a2a import Agent, AgentContext, create_local_context

class MyAgent(Agent):
    # ... get_agent_name, get_agent_description ...

    async def process_message(self, message: str, context: AgentContext) -> str:
        # Works identically in local and production modes
        docs = await context.document_client.list_documents(role="source")
        for doc in docs:
            content = await context.document_client.download_text(doc.id)
            # ... process content ...

        await context.document_client.write(
            "Results", '{"score": 0.95}', filename="results.json"
        )
        return "Done"

Directory layout

test_data/
    source/       # Input files (role="source", document_type="user_upload")
    artifact/     # Pre-seeded + agent-written outputs (role="artifact", document_type="agent_output")
  • source/ is created automatically if it doesn't exist
  • artifact/ is the default output_dir — files written via document_client.write() go here
  • You can override output_dir to write artifacts elsewhere

Running locally

import asyncio
from health_universe_a2a import create_local_context

async def main():
    # Point at a directory with source/ subdirectory
    context = create_local_context(data_dir="./test_data")

    agent = MyAgent()
    result = await agent.process_message("analyze", context)
    print(result)

asyncio.run(main())

Listing and filtering documents

# List only input files
source_docs = await context.document_client.list_documents(role="source")

# List only artifacts (pre-seeded + agent-written)
artifact_docs = await context.document_client.list_documents(role="artifact")

# List all (default)
all_docs = await context.document_client.list_documents()
  • Progress updates are logged to stdout instead of POSTed to the backend

See examples/local_dev_example.py for a complete working example.

Examples

See the examples/ directory for complete working examples:

Sub-Agents

For lightweight, fast agent-to-agent calls (under ~30 seconds) that don't need background job infrastructure, use SubAgent:

from health_universe_a2a import SubAgent, SubAgentContext

class SummarizerSubAgent(SubAgent):
    def get_agent_name(self) -> str:
        return "Summarizer"

    def get_agent_description(self) -> str:
        return "Summarizes text quickly"

    async def process_message(self, message: str, context: SubAgentContext) -> str:
        return summarize(message)

Unlike Agent, a SubAgent returns its result directly in the HTTP response with no SSE streaming or background job lifecycle. Use it for inline helper agents that are called by an orchestrator.

Inter-Agent Communication

Call other A2A-compliant agents from your agent:

from health_universe_a2a import Agent, AgentContext

class OrchestratorAgent(Agent):
    async def process_message(self, message: str, context: AgentContext) -> str:
        # Call with text message
        preprocessor_result = await self.call_agent(
            "/preprocessor",
            message,
            context,
        )

        # Call with structured data (dict or list)
        analysis_result = await self.call_agent(
            "/analyzer",
            {"data": preprocessor_result, "mode": "detailed"},
            context,
        )

        return analysis_result

Agent Identifier Formats:

  1. Local agent path: /agent-name - Uses LOCAL_AGENT_BASE_URL (default: http://localhost:8501)
  2. Direct URL: https://... - Calls directly with HTTPS
  3. Registry name: agent-name - Looks up in AGENT_REGISTRY environment variable

Running Your Agent

Built-in HTTP Server

if __name__ == "__main__":
    agent = MyAgent()
    agent.serve()  # Starts server on http://0.0.0.0:8000

The server automatically provides:

  • Agent card endpoint: GET /.well-known/agent-card.json
  • JSON-RPC endpoint: POST / (method: "message/send")
  • Health check: GET /health

Server Configuration

# Via environment variables
# HOST=0.0.0.0 PORT=8080 python my_agent.py

# Via method parameters
agent.serve(host="0.0.0.0", port=8080, reload=True)

Multi-Agent Server

Run multiple agents in a single server:

from health_universe_a2a import serve_multi_agents

serve_multi_agents({
    "/orchestrator": OrchestratorAgent(),
    "/analyzer": AnalyzerAgent(),
    "/reader": ReaderAgent(),
}, port=8501)

Development

Setup

git clone https://github.com/Health-Universe/healthuniverse-a2a-sdk-python
cd healthuniverse-a2a-sdk-python
uv pip install -e ".[dev]"

Testing

uv run pytest

Linting and Formatting

uv run ruff check src/
uv run ruff format src/
uv run mypy src/

Requirements

  • Python 3.10+
  • httpx >= 0.27.0
  • pydantic >= 2.0.0
  • a2a-sdk >= 0.1.0
  • fastapi >= 0.109.0
  • uvicorn >= 0.27.0
  • openai >= 1.0.0

Support

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

health_universe_a2a-0.5.0.tar.gz (216.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

health_universe_a2a-0.5.0-py3-none-any.whl (71.2 kB view details)

Uploaded Python 3

File details

Details for the file health_universe_a2a-0.5.0.tar.gz.

File metadata

  • Download URL: health_universe_a2a-0.5.0.tar.gz
  • Upload date:
  • Size: 216.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.14

File hashes

Hashes for health_universe_a2a-0.5.0.tar.gz
Algorithm Hash digest
SHA256 a1559c23b989ca0ece0de64a55290eaf90d35b62bab012f5848db0a32846cfd7
MD5 86be97b5e98db1279914b8c661858122
BLAKE2b-256 8989ab940320061d38bfa880df0419636b534a97c69caf9327a4d0ccf33c640a

See more details on using hashes here.

File details

Details for the file health_universe_a2a-0.5.0-py3-none-any.whl.

File metadata

File hashes

Hashes for health_universe_a2a-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 13405485d7a38654e039a264f488defb85cabcee7bef29b91750af46d5c87355
MD5 f87fee5f4bccccef22994a92cb1c9f33
BLAKE2b-256 89600d89be4175fb5c416b79a9adc5b7573db5163f3808d45fe9c348f398642f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page