Skip to main content

Agent2Agent server utility and boilerplate code from google.

Project description

agent2agent-client

This repository is a wrapper for building google a2a client, without explicitly copying the code to the native development codebase. This repository support - a2a client core, - a2a client utils, - a2a client types under the package called commons. Infact this package is direct lift and shift from google's repository.

How to install

clone this repository

  • option-1

    • git clone https://github.com/pavanjava/agent2agent-client.git
    • pip install -e .
  • option-2

    • pip install agent2agent-client from pypi.org

How to use this?

# streaming_echo_client.py (Modifications based on echo_client.py)
import asyncio
import logging
from uuid import uuid4

from common_client.client import A2AClient
from common_client.types import Message, TextPart, TaskStatusUpdateEvent, TaskArtifactUpdateEvent

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

SEARCH_SERVER_URL = "http://localhost:8001/search-a2a"


async def main():
    client = A2AClient(url=SEARCH_SERVER_URL)

    task_id = f"search-task-{uuid4().hex}"
    user_text = "Impact of AI on on Supply Chain and Shipping Lines"

    user_message = Message(role="user", parts=[TextPart(text=user_text)])

    send_params = {
        "id": task_id,
        "message": user_message,
    }

    try:
        logger.info(f"Sending task {task_id} to {SEARCH_SERVER_URL}...")

        # Use the client's send_task_streaming method
        response = await client.send_task(payload=send_params, timeout=300)
        print(response)
        if response.error:
            # Errors might be sent as part of the stream in some implementations
            logger.error(f"Received error in stream for task {task_id}: {response.error.message}")

        elif response.result:
            event = response.result
            if isinstance(event, TaskStatusUpdateEvent):
                logger.info(f"Task {task_id} Status Update: {event.status.state}")
                if event.status.message and event.status.message.parts:
                    part = event.status.message.parts[0]
                    if isinstance(part, TextPart):
                        logger.info(f"  Agent Message: {part.text}")
                if event.final:
                    logger.info(f"Task {task_id} reached final state.")

            elif isinstance(event, TaskArtifactUpdateEvent):
                logger.info(f"Task {task_id} Artifact Update: {event.artifact.name}")
                # Process artifact parts...
            else:
                logger.warning(f"Received unknown event type : {type(event)}")
        else:
            logger.error(f"Received unexpected empty response for task {task_id}")


    except Exception as e:
        logger.error(f"An error occurred during task communication: {e}")


if __name__ == "__main__":
    asyncio.run(main())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agent2agent_client-0.0.2.tar.gz (13.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agent2agent_client-0.0.2-py3-none-any.whl (13.6 kB view details)

Uploaded Python 3

File details

Details for the file agent2agent_client-0.0.2.tar.gz.

File metadata

  • Download URL: agent2agent_client-0.0.2.tar.gz
  • Upload date:
  • Size: 13.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for agent2agent_client-0.0.2.tar.gz
Algorithm Hash digest
SHA256 c51db002788d1bdbf35818af40e440c2b2d2b36a8e00d9dbc00d482ac2778915
MD5 5d98d65ee71e9bee00ecdede9eef59f1
BLAKE2b-256 0ae8ea6581ddafa842c5efe1f6181a9ab138cbf83ef42b31acf2dfc39eeff13f

See more details on using hashes here.

File details

Details for the file agent2agent_client-0.0.2-py3-none-any.whl.

File metadata

File hashes

Hashes for agent2agent_client-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 560f5ca5b416ee762d60935c0be8c4b0c4557dc91062976f3f0589ef62b7baeb
MD5 23caeb8ab7c8445ea159507f3288b6ad
BLAKE2b-256 e35127c0ef57b19abeb7cf7fb0a590ea83d049a0c91890bb1afbdab70fc7122a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page