Agent2Agent server utility and boilerplate code from google.
Project description
agent2agent-server
This repository is a wrapper for building google a2a servers, without explicitly copying the code to the native development codebase. This repository support - a2a server core, - a2a server utils, - a2a server types under the package called commons. Infact this package is direct lift and shift from google's repository.
How to install
clone this repository
-
option-1
git clone https://github.com/pavanjava/agent2agent-server.gitpip install -e .
-
option-2
pip install agent2agent-serverfrom pypi.org
How to use this?
# --- Agent Card Definition ---
from common_server.types import AgentCapabilities, AgentSkill, AgentCard
SEARCH_AGENT_CARD = AgentCard(
name="Search Agent",
description="A simple Agno A2A agent that search internet for user messages.",
url="http://localhost:8001/agno-a2a", # Where this server will run
version="0.1.0",
capabilities=AgentCapabilities(
streaming=False, # This simple agent won't stream
pushNotifications=False,
stateTransitionHistory=False
),
authentication=None, # No auth for this simple example
defaultInputModes=["text"],
defaultOutputModes=["text"],
skills=[
AgentSkill(
id="search-agent",
name="Search Message",
description="Receives a text message and searches it in internet.",
inputModes=["text"],
outputModes=["text"]
)
]
)
#--- agent_task_manager.py-----
import asyncio
from typing import Union, AsyncIterable
import logging
from common_server.server import TaskManager
from common_server.types import TaskResubscriptionRequest, SendTaskResponse, JSONRPCResponse, \
GetTaskPushNotificationRequest, \
GetTaskPushNotificationResponse, SetTaskPushNotificationRequest, SetTaskPushNotificationResponse, \
SendTaskStreamingRequest, SendTaskStreamingResponse, SendTaskRequest, CancelTaskRequest, CancelTaskResponse, \
GetTaskRequest, GetTaskResponse, Task, JSONRPCError, TextPart, TaskStatus, TaskState, Message
from search_agent import search_agent_team
# Configure basic logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SearchAgentTaskManager(TaskManager):
def __init__(self):
pass
async def on_get_task(self, request: GetTaskRequest) -> GetTaskResponse:
pass
async def on_cancel_task(self, request: CancelTaskRequest) -> CancelTaskResponse:
pass
async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
# call your agent here.
pass
async def on_send_task_subscribe(self, request: SendTaskStreamingRequest) -> Union[
AsyncIterable[SendTaskStreamingResponse], JSONRPCResponse]:
pass
async def on_set_task_push_notification(self,
request: SetTaskPushNotificationRequest) -> SetTaskPushNotificationResponse:
pass
async def on_get_task_push_notification(self,
request: GetTaskPushNotificationRequest) -> GetTaskPushNotificationResponse:
pass
async def on_resubscribe_to_task(self, request: TaskResubscriptionRequest) -> Union[
AsyncIterable[SendTaskResponse], JSONRPCResponse]:
pass
#---- agent_server.py-------
import logging
from common_server.server import A2AServer
from a2a_agent_cards import SEARCH_AGENT_CARD
from a2a_task_manager import SearchAgentTaskManager
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())
# Configure basic logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
if __name__ == "__main__":
task_manager = SearchAgentTaskManager()
server = A2AServer(
host="localhost",
port=8001,
endpoint="/search-a2a", # Matches AgentCard URL path
agent_card=SEARCH_AGENT_CARD,
task_manager=task_manager
)
print("Starting A2A Server on http://localhost:8001")
# Use server.start() which calls uvicorn.run
# Note: For production, use a proper ASGI server like uvicorn or hypercorn directly
server.start()
# Alternatively, run directly with uvicorn:
# uvicorn.run(server.app, host="localhost", port=8001)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file agent2agent_server-0.0.7.tar.gz.
File metadata
- Download URL: agent2agent_server-0.0.7.tar.gz
- Upload date:
- Size: 16.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d5e62e83d746b024f89be4c7633d0d1848f8c3ac210955eeb3e36d07f45eaf0f
|
|
| MD5 |
5fd2a85092c81e4fcdeb0d71b1ddd736
|
|
| BLAKE2b-256 |
6720e065911becd54d503a2de2a69fc88716c34ac72e19e3332d8ddb0ab8ee84
|
File details
Details for the file agent2agent_server-0.0.7-py3-none-any.whl.
File metadata
- Download URL: agent2agent_server-0.0.7-py3-none-any.whl
- Upload date:
- Size: 16.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
773d235858f4bb4ff043b49135ff3deccf7988e5d2dc9893b008d3d87fecbb8f
|
|
| MD5 |
aef2c6315e9920e589366aa938be3a26
|
|
| BLAKE2b-256 |
51f1184e5d43a4b120ed4be2b63df3a25c6db269f90f49f4ef1d3cbd2e288bbc
|