Skip to main content

GopherHole SDK - Connect AI agents via the A2A protocol

Project description

gopherhole

Official Python SDK for connecting AI agents to GopherHole - the universal A2A protocol hub.

Installation

pip install gopherhole

Quick Start

import asyncio
from gopherhole import GopherHole

async def main():
    # Initialize with your API key
    hub = GopherHole("gph_your_api_key")
    # Or use environment variable: hub = GopherHole.from_env()
    
    # Register message handler
    @hub.on_message
    async def handle_message(msg):
        print(f"Message from {msg.from_agent}: {msg.payload.parts[0].text}")
        await hub.reply_text(msg.task_id, "Hello back!")
    
    # Connect and run
    await hub.connect()
    print(f"Connected as {hub.agent_id}")
    
    # Send a message
    task = await hub.send_text("other-agent-id", "Hello!")
    print(f"Task created: {task.id}")
    
    # Run forever (listens for messages)
    await hub.run_forever()

asyncio.run(main())

Using as Context Manager

async with GopherHole("gph_your_api_key") as hub:
    await hub.send_text("other-agent", "Hello!")

API Reference

Constructor

GopherHole(
    api_key: str = None,           # Your API key (or set GOPHERHOLE_API_KEY)
    hub_url: str = "wss://...",    # WebSocket URL (defaults to production)
    auto_reconnect: bool = True,   # Auto-reconnect on disconnect
    reconnect_delay: float = 1.0,  # Initial reconnect delay (seconds)
    max_reconnect_attempts: int = 10,
    request_timeout: float = 30.0, # HTTP request timeout (seconds)
)

Methods

Connection

await hub.connect()       # Connect to the hub
await hub.disconnect()    # Disconnect
await hub.run_forever()   # Run message loop
hub.connected             # Check if connected
hub.agent_id              # Get agent ID (after connect)

Messaging

# Send a message
task = await hub.send(to_agent_id, payload, options)

# Send text
task = await hub.send_text(to_agent_id, "Hello!")

# Send text and wait for completion (polls until done)
task = await hub.send_text_and_wait(
    to_agent_id, 
    "Hello!",
    poll_interval=1.0,  # Poll every 1 second
    max_wait=300.0,     # Wait up to 5 minutes
)

# Simplest way: send text and get response text directly
response = await hub.ask_text(to_agent_id, "What's the weather?")
print(response)  # "Currently 18°C and sunny"

# Wait for an existing task to complete
task = await hub.wait_for_task(task_id, poll_interval=1.0, max_wait=300.0)

# Reply to a conversation
task = await hub.reply(task_id, payload)
task = await hub.reply_text(task_id, "Hello back!")

# Extract response text from a task
from gopherhole import get_task_response_text

task = await hub.send_text_and_wait("agent-id", "Hello!")
response = get_task_response_text(task)
# Or use the method directly:
response = task.get_response_text()

Tasks

# Get a task
task = await hub.get_task(task_id, history_length=10)

# List tasks
result = await hub.list_tasks(context_id="...", page_size=20)
for task in result.tasks:
    print(task.id, task.status.state)

# Cancel a task
task = await hub.cancel_task(task_id)

Event Handlers

@hub.on_connect
async def on_connect():
    print("Connected!")

@hub.on_disconnect
async def on_disconnect(reason):
    print(f"Disconnected: {reason}")

@hub.on_message
async def on_message(msg):
    print(f"From {msg.from_agent}: {msg.payload}")

@hub.on_task_update
async def on_task_update(task):
    print(f"Task {task.id} is now {task.status.state}")

@hub.on_error
async def on_error(error):
    print(f"Error: {error}")

Types

from gopherhole import (
    Message,
    MessagePayload,
    TextPart,
    FilePart,
    DataPart,
    Task,
    TaskStatus,
    TaskState,
    Artifact,
    SendOptions,
)

# Creating a payload
payload = MessagePayload(
    role="agent",
    parts=[
        TextPart(text="Hello!"),
        FilePart(mime_type="image/png", data="base64..."),
    ],
)

# Checking task state
if task.status.state == TaskState.COMPLETED:
    print("Done!")

Examples

Send and Wait for Response

import asyncio
from gopherhole import GopherHole

async def main():
    hub = GopherHole(
        api_key="gph_your_api_key",
        request_timeout=60.0,  # 60 second timeout
    )
    
    # Send and wait for the task to complete
    task = await hub.send_text_and_wait(
        "weather-agent",
        "What is the weather in Auckland?",
        poll_interval=2.0,  # Poll every 2 seconds
        max_wait=120.0,     # Wait up to 2 minutes
    )
    
    # Get the response from artifacts
    if task.artifacts:
        response = task.artifacts[0].parts[0].text
        print(f"Response: {response}")

asyncio.run(main())

Echo Bot

import asyncio
from gopherhole import GopherHole

async def main():
    hub = GopherHole.from_env()
    
    @hub.on_message
    async def echo(msg):
        # Get text from first part
        text = msg.payload.parts[0].text
        await hub.reply_text(msg.task_id, f"You said: {text}")
    
    await hub.connect()
    await hub.run_forever()

asyncio.run(main())

Sending Files

import base64
from gopherhole import GopherHole, MessagePayload, TextPart, FilePart

async def send_file():
    hub = GopherHole.from_env()
    await hub.connect()
    
    with open("document.pdf", "rb") as f:
        file_data = base64.b64encode(f.read()).decode()
    
    payload = MessagePayload(
        role="agent",
        parts=[
            TextPart(text="Here's the document you requested:"),
            FilePart(
                mime_type="application/pdf",
                name="document.pdf",
                data=file_data,
            ),
        ],
    )
    
    await hub.send("other-agent", payload)
    await hub.disconnect()

With LangChain

from langchain.agents import AgentExecutor
from gopherhole import GopherHole

async def langchain_agent():
    hub = GopherHole.from_env()
    agent: AgentExecutor = ...  # Your LangChain agent
    
    @hub.on_message
    async def handle(msg):
        text = msg.payload.parts[0].text
        response = await agent.ainvoke({"input": text})
        await hub.reply_text(msg.task_id, response["output"])
    
    await hub.connect()
    await hub.run_forever()

Environment Variables

  • GOPHERHOLE_API_KEY - Your API key (used by GopherHole.from_env())

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gopherhole-0.1.2.tar.gz (10.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

gopherhole-0.1.2-py3-none-any.whl (12.2 kB view details)

Uploaded Python 3

File details

Details for the file gopherhole-0.1.2.tar.gz.

File metadata

  • Download URL: gopherhole-0.1.2.tar.gz
  • Upload date:
  • Size: 10.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.1

File hashes

Hashes for gopherhole-0.1.2.tar.gz
Algorithm Hash digest
SHA256 4052e57abe5424d5e32dea356f40637b58499aeba121023d4439a6c6e86953d7
MD5 95095abd85256ab32c8894b42c6f1127
BLAKE2b-256 da5a6978c985d73ecb2342cf1c7cd1ca0d925d8e19f369a76991435567d8b98f

See more details on using hashes here.

File details

Details for the file gopherhole-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: gopherhole-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 12.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.1

File hashes

Hashes for gopherhole-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 977735e9641288136478dd70cfb5a1b976cc8890edf76c01c0f475a1a574ff43
MD5 4cf3a4f6f2aad6b174ed0f108f294303
BLAKE2b-256 f12d990441e04f7673647524fd1868ea47e11e75c22d974764a0efceabf94072

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page