Agent2Agent server utility and boilerplate code from google.
Project description
agent2agent-client
This repository is a wrapper for building google a2a client, without explicitly copying the code to the native development codebase. This repository support - a2a client core, - a2a client utils, - a2a client types under the package called commons. Infact this package is direct lift and shift from google's repository.
How to install
clone this repository
-
option-1
git clone https://github.com/pavanjava/agent2agent-client.gitpip install -e .
-
option-2
pip install agent2agent-clientfrom pypi.org
How to use this?
# streaming_echo_client.py (Modifications based on echo_client.py)
import asyncio
import logging
from uuid import uuid4
from common_client.client import A2AClient
from common_client.types import Message, TextPart, TaskStatusUpdateEvent, TaskArtifactUpdateEvent
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
SEARCH_SERVER_URL = "http://localhost:8001/search-a2a"
async def main():
client = A2AClient(url=SEARCH_SERVER_URL)
task_id = f"search-task-{uuid4().hex}"
user_text = "Impact of AI on on Supply Chain and Shipping Lines"
user_message = Message(role="user", parts=[TextPart(text=user_text)])
send_params = {
"id": task_id,
"message": user_message,
}
try:
logger.info(f"Sending task {task_id} to {SEARCH_SERVER_URL}...")
# Use the client's send_task_streaming method
response = await client.send_task(payload=send_params, timeout=300)
print(response)
if response.error:
# Errors might be sent as part of the stream in some implementations
logger.error(f"Received error in stream for task {task_id}: {response.error.message}")
elif response.result:
event = response.result
if isinstance(event, TaskStatusUpdateEvent):
logger.info(f"Task {task_id} Status Update: {event.status.state}")
if event.status.message and event.status.message.parts:
part = event.status.message.parts[0]
if isinstance(part, TextPart):
logger.info(f" Agent Message: {part.text}")
if event.final:
logger.info(f"Task {task_id} reached final state.")
elif isinstance(event, TaskArtifactUpdateEvent):
logger.info(f"Task {task_id} Artifact Update: {event.artifact.name}")
# Process artifact parts...
else:
logger.warning(f"Received unknown event type : {type(event)}")
else:
logger.error(f"Received unexpected empty response for task {task_id}")
except Exception as e:
logger.error(f"An error occurred during task communication: {e}")
if __name__ == "__main__":
asyncio.run(main())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file agent2agent_client-0.0.2.tar.gz.
File metadata
- Download URL: agent2agent_client-0.0.2.tar.gz
- Upload date:
- Size: 13.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c51db002788d1bdbf35818af40e440c2b2d2b36a8e00d9dbc00d482ac2778915
|
|
| MD5 |
5d98d65ee71e9bee00ecdede9eef59f1
|
|
| BLAKE2b-256 |
0ae8ea6581ddafa842c5efe1f6181a9ab138cbf83ef42b31acf2dfc39eeff13f
|
File details
Details for the file agent2agent_client-0.0.2-py3-none-any.whl.
File metadata
- Download URL: agent2agent_client-0.0.2-py3-none-any.whl
- Upload date:
- Size: 13.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
560f5ca5b416ee762d60935c0be8c4b0c4557dc91062976f3f0589ef62b7baeb
|
|
| MD5 |
23caeb8ab7c8445ea159507f3288b6ad
|
|
| BLAKE2b-256 |
e35127c0ef57b19abeb7cf7fb0a590ea83d049a0c91890bb1afbdab70fc7122a
|