Synqed - A wrapper around A2A for simplified multi-agent systems interaction and communication
Project description
Synqed - Universal Agent Collaboration
Agents that actually talk to each other. Your OpenAI agent delegates to a LangChain agent. A research agent consults specialists. A design agent brainstorms with analysis agents. All automatically. All seamlessly.
Synqed enables true agent-to-agent collaboration across any vendor, any framework, anywhere.
Why Synqed?
- True Agent-to-Agent Collaboration - Agents delegate, consult, brainstorm, and coordinate autonomously to solve complex problems together
- Universal Cross-Vendor Interoperability - OpenAI, Anthropic, LangChain, CrewAI, custom frameworks - they all speak the same language
- Production-Ready in 5 Minutes - Zero protocol knowledge required. Just build agents that collaborate
- Intelligent Orchestration - LLM-powered routing automatically selects the right agents for each task
Installation
pip install synqed
Optional Dependencies
# For gRPC support
pip install synqed[grpc]
# For SQL task store
pip install synqed[sql]
# Everything
pip install synqed[all]
LLM Provider Dependencies
Synqed's Orchestrator works with multiple LLM providers. Install your preferred provider:
# OpenAI
pip install openai
# Anthropic
pip install anthropic
# Google
pip install google-generativeai
Quick Start
Step 1: Create Your First Agent
Create a file my_agent.py:
import synqed
import asyncio
async def main():
# Create your agent
agent = synqed.Agent(
name="Customer Support Assistant",
description="Automated support agent that handles customer inquiries and service requests efficiently",
skills=["customer_support", "ticket_routing", "inquiry_handling"],
executor=agent_logic, # The function that defines the agent's behavior (LLM choice and reasoning logic)
)
# Create a server to host your agent
server = synqed.AgentServer(agent, port=8000)
# Start the server
print(f"Agent running at {agent.url}")
await server.start()
"""
Defines the agent's behavior by specifying which LLM to use and implementing the agent's logic.
"""
async def agent_logic(context):
# Get user's message
user_message = context.get_user_input()
# Define the agent's abilities and what LLM it is
from openai import AsyncOpenAI
client = AsyncOpenAI(api_key="OPENAI_API_KEY")
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a helpful customer support assistant."},
{"role": "user", "content": user_message}
]
)
return response.choices[0].message.content
if __name__ == "__main__":
asyncio.run(main())
Step 2: Connect a Client
Create a file client.py:
async def main():
# Connect to the AI agent
async with synqed.Client("http://localhost:8000") as client:
# Prompt AI agent
response = await client.ask("What are 3 best practices for customer support?")
print(f"Response:\n{response}\n")
if __name__ == "__main__":
asyncio.run(main())
Step 3: Run It
# Terminal 1 - Start your agent
python my_agent.py
# Terminal 2 - Connect your client
python client.py
Congratulations! You just built, deployed and prompted your first AI agent.
Core Concepts
The Three Pillars of Synqed
┌─────────────┬──────────────┬─────────────────┐
│ Agents │ Client │ Orchestrator │
│ │ │ │
│ The brains │ The bridge │ The director │
└─────────────┴──────────────┴─────────────────┘
- Agent - An autonomous AI agent with specific skills
- Client - Connect to and communicate with agents
- Orchestrator - Intelligently routes tasks to the right agents
Building Agents
Basic Agent
from synqed import Agent
agent = Agent(
name="WeatherAgent",
description="Provides weather forecasts and alerts",
skills=["weather_forecast", "weather_alerts"],
executor=agent_logic
)
Agent with Detailed Skills
agent = Agent(
name="RecipeAgent",
description="Find and recommend recipes",
skills=[
{
"skill_id": "recipe_search",
"name": "Recipe Search",
"description": "Search for recipes by ingredients or cuisine type",
"tags": ["cooking", "recipes", "food", "search"]
},
{
"skill_id": "nutrition_info",
"name": "Nutrition Information",
"description": "Get nutritional information for recipes",
"tags": ["nutrition", "health", "calories"]
}
],
executor=agent_logic
)
Agent Executor Function
The executor function is where your agent's logic/capability lives:
async def agent_logic(context):
"""
Args:
context: RequestContext object with methods:
- get_user_input() → str: The user's message
- get_task() → Task: Full task object
- get_message() → Message: Full message object
Returns:
str or Message: Your agent's response
"""
user_message = context.get_user_input()
# Your custom logic here
# - Call LLMs (OpenAI, Anthropic, Google, etc.)
# - Query databases
# - Call external APIs
# - Process data
# - Whatever your agent needs to do!
return "Agent response"
Agent Capabilities
agent = Agent(
name="MyAgent",
description="Does amazing things",
skills=["skill1"],
executor=logic,
capabilities={
"streaming": True, # Support real-time streaming
"push_notifications": False, # Enable webhook notifications
"state_transition_history": False # Track state changes
}
)
Hosting Your Agent
from synqed import AgentServer
# Create server
server = AgentServer(agent, host="0.0.0.0", port=8000)
# Option 1: Start in foreground (blocking)
await server.start()
# Option 2: Start in background
await server.start_background()
# ... do other things ...
await server.stop()
💬 Using the Client
Two Ways to Get Responses
1. Complete Response (ask)
Wait for the full response before continuing.
from synqed import Client
async with Client("http://localhost:8000") as client:
response = await client.ask("What's 2+2?")
print(response) # "4"
Use ask() when:
- You need the complete answer before proceeding
- Response time is reasonable (< 30 seconds)
- You want simpler code without iteration
2. Streaming Response (stream)
Get the response piece by piece as it's generated (like ChatGPT).
async with Client("http://localhost:8000") as client:
async for chunk in client.stream("Tell me a story"):
print(chunk, end="", flush=True) # Creates typing effect
Use stream() when:
- You want to show progress to users
- The response might be long
- You want to process data as it arrives
Pro tip: Use end="" to prevent newlines between chunks and flush=True to display output immediately.
Task Management
async with Client("http://localhost:8000") as client:
# Submit a task
task_id = await client.submit_task("Long running operation")
# Check task status
task = await client.get_task(task_id)
print(f"Status: {task.state}")
# Cancel if needed
await client.cancel_task(task_id)
Advanced Client Features
# Custom timeout
client = Client(
agent_url="http://localhost:8000",
timeout=120.0 # 2 minutes
)
# Disable streaming
client = Client(
agent_url="http://localhost:8000",
streaming=False
)
Orchestration (Intelligent Routing)
The Orchestrator uses an LLM to analyze tasks and automatically select the best agent(s) to handle them.
Basic Orchestration
import synqed
# Initialize with your LLM of choice
orchestrator = synqed.Orchestrator(
provider=LLMProvider.OPENAI, # or ANTHROPIC, GOOGLE
api_key="your-api-key",
model="gpt-4o"
)
# Tell the orchestrator what agents can be used
orchestrator.register_agent(recipe_agent.card, recipe_agent.url)
orchestrator.register_agent(shopping_agent.card, shopping_agent.url)
orchestrator.register_agent(weather_agent.card, weather_agent.url)
# Orchestrator delegates task
result = await orchestrator.orchestrate(
"I want to cook pasta tonight but need to know what ingredients to buy"
)
# View the results
print(f"Selected Agent: {result.selected_agents[0].agent_name}")
print(f"Confidence: {result.selected_agents[0].confidence:.0%}")
print(f"Reasoning: {result.selected_agents[0].reasoning}")
print(f"Plan: {result.execution_plan}")
Orchestration Result
@dataclass
class OrchestrationResult:
task: str # The original task
selected_agents: list[AgentSelection] # Best agent(s)
execution_plan: str # How to execute
alternative_agents: list[AgentSelection] # Backup options
Supported LLM Providers
# OpenAI
orchestrator = Orchestrator(
provider=LLMProvider.OPENAI,
api_key="sk-...",
model="..."
)
# Anthropic
orchestrator = Orchestrator(
provider=LLMProvider.ANTHROPIC,
api_key="sk-ant-...",
model="..."
)
# Google
orchestrator = Orchestrator(
provider=LLMProvider.GOOGLE,
api_key="...",
model="..."
)
Fine-tune Orchestration
orchestrator = Orchestrator(
provider=LLMProvider.OPENAI,
api_key="sk-...",
model="gpt-4o",
temperature=0.7, # Creativity (0.0 - 1.0)
max_tokens=2000 # Response length limit
)
Multi-Agent Delegation
The TaskDelegator coordinates multiple agents working together on complex tasks.
Basic Delegation
from synqed import TaskDelegator
# Create delegator
delegator = TaskDelegator()
# Register agents
delegator.register_agent(agent=recipe_agent)
delegator.register_agent(agent=shopping_agent)
delegator.register_agent(agent=weather_agent)
# Submit a task - automatically routed to the right agent
result = await delegator.submit_task(
"Find me a recipe and create a shopping list"
)
Delegation with Orchestrator
For intelligent routing, combine TaskDelegator with Orchestrator:
# Create orchestrator for intelligent routing
orchestrator = Orchestrator(
provider=LLMProvider.OPENAI,
api_key="your-key",
model="gpt-4o"
)
# Create delegator with orchestrator
delegator = TaskDelegator(orchestrator=orchestrator)
# Register agents
delegator.register_agent(agent=recipe_agent)
delegator.register_agent(agent=shopping_agent)
# Now tasks are intelligently routed using LLM analysis
result = await delegator.submit_task(
"Plan dinner for a cold rainy evening"
)
Remote Agent Registration
# Register a remote agent by URL
delegator.register_agent(
agent_url="https://recipe-service.example.com",
agent_card=recipe_agent_card # Optional pre-loaded card
)
Complete Examples
Example 1: Simple Customer Support Agent
import asyncio
import os
from synqed import Agent, AgentServer
from openai import AsyncOpenAI
async def support_logic(context):
"""Customer support agent logic."""
user_message = context.get_user_input()
client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "You are a helpful customer support agent. "
"Be polite, professional, and solve problems efficiently."
},
{"role": "user", "content": user_message}
]
)
return response.choices[0].message.content
async def main():
agent = Agent(
name="SupportAgent",
description="Customer support assistant",
skills=["customer_support", "ticket_routing", "faq"],
executor=support_logic
)
server = AgentServer(agent, port=8000)
print(f"✅ Support agent running at {agent.url}")
await server.start()
if __name__ == "__main__":
asyncio.run(main())
Example 2: Multi-Agent System with Orchestration
import asyncio
import os
from synqed import Agent, AgentServer, Orchestrator, LLMProvider
from openai import AsyncOpenAI
# ============================================================================
# Agent 1: Recipe Agent
# ============================================================================
async def recipe_logic(context):
user_message = context.get_user_input()
client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "You are a recipe expert. Suggest recipes based on "
"ingredients, cuisine type, or dietary restrictions."
},
{"role": "user", "content": user_message}
]
)
return response.choices[0].message.content
# ============================================================================
# Agent 2: Shopping Agent
# ============================================================================
async def shopping_logic(context):
user_message = context.get_user_input()
client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "You are a shopping assistant. Create shopping lists, "
"compare prices, and suggest where to buy items."
},
{"role": "user", "content": user_message}
]
)
return response.choices[0].message.content
# ============================================================================
# Agent 3: Nutrition Agent
# ============================================================================
async def nutrition_logic(context):
user_message = context.get_user_input()
client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "You are a nutrition expert. Provide nutritional "
"information, calculate calories, and give healthy eating advice."
},
{"role": "user", "content": user_message}
]
)
return response.choices[0].message.content
# ============================================================================
# Main System
# ============================================================================
async def main():
# Create agents
recipe_agent = Agent(
name="RecipeAgent",
description="Find and recommend recipes",
skills=[
{
"skill_id": "recipe_search",
"name": "Recipe Search",
"description": "Find recipes by ingredient or cuisine",
"tags": ["cooking", "recipes", "food"]
}
],
executor=recipe_logic
)
shopping_agent = Agent(
name="ShoppingAgent",
description="Create shopping lists and find products",
skills=[
{
"skill_id": "shopping_list",
"name": "Shopping List",
"description": "Create and manage shopping lists",
"tags": ["shopping", "grocery", "list"]
}
],
executor=shopping_logic
)
nutrition_agent = Agent(
name="NutritionAgent",
description="Provide nutrition information and advice",
skills=[
{
"skill_id": "nutrition_info",
"name": "Nutrition Info",
"description": "Calculate calories and provide nutrition facts",
"tags": ["nutrition", "health", "calories"]
}
],
executor=nutrition_logic
)
# Start agents on different ports
recipe_server = AgentServer(recipe_agent, port=8001)
shopping_server = AgentServer(shopping_agent, port=8002)
nutrition_server = AgentServer(nutrition_agent, port=8003)
await recipe_server.start_background()
await shopping_server.start_background()
await nutrition_server.start_background()
print("✅ All agents running")
print(f" - Recipe Agent: {recipe_agent.url}")
print(f" - Shopping Agent: {shopping_agent.url}")
print(f" - Nutrition Agent: {nutrition_agent.url}")
# Create orchestrator
orchestrator = Orchestrator(
provider=LLMProvider.OPENAI,
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o"
)
# Register agents
orchestrator.register_agent(recipe_agent.card, recipe_agent.url)
orchestrator.register_agent(shopping_agent.card, shopping_agent.url)
orchestrator.register_agent(nutrition_agent.card, nutrition_agent.url)
print("\n✅ Orchestrator configured with 3 agents\n")
# Test orchestration
tasks = [
"Find me a healthy pasta recipe",
"Create a shopping list for a stir fry dinner",
"How many calories are in a pepperoni pizza?"
]
for task in tasks:
print(f"📋 Task: {task}")
result = await orchestrator.orchestrate(task)
print(f" 🎯 Selected: {result.selected_agents[0].agent_name}")
print(f" 📊 Confidence: {result.selected_agents[0].confidence:.0%}")
print(f" 💡 Reasoning: {result.selected_agents[0].reasoning}\n")
# Keep servers running
print("Press Ctrl+C to stop...")
try:
await asyncio.Event().wait()
except KeyboardInterrupt:
print("\n\n🛑 Shutting down...")
await recipe_server.stop()
await shopping_server.stop()
await nutrition_server.stop()
if __name__ == "__main__":
asyncio.run(main())
Best Practices
1. Agent Design
DO:
- ✅ Give agents focused, specific skills
- ✅ Write clear, descriptive agent descriptions
- ✅ Use detailed skill definitions with tags
- ✅ Include proper error handling in executor functions
DON'T:
- ❌ Create "do everything" agents
- ❌ Use vague descriptions like "General agent"
- ❌ Skip skill tags (they help with routing)
- ❌ Let exceptions crash your executor
2. Orchestration
DO:
- ✅ Use descriptive agent and skill names
- ✅ Review confidence scores before execution
- ✅ Check alternative agents for complex tasks
- ✅ Tune temperature based on your use case
DON'T:
- ❌ Ignore low confidence scores (< 0.6)
- ❌ Use orchestration for single-agent systems
- ❌ Over-rely on default settings
- ❌ Skip testing with various task types
3. Production Deployment
DO:
- ✅ Use environment variables for API keys
- ✅ Implement comprehensive logging
- ✅ Add health check endpoints
- ✅ Set reasonable timeouts
- ✅ Use async context managers (
async with)
DON'T:
- ❌ Hard-code credentials
- ❌ Run without error monitoring
- ❌ Use default ports in production
- ❌ Forget to clean up resources
- ❌ Skip authentication
4. Error Handling
async def robust_executor(context):
try:
user_message = context.get_user_input()
# Your logic here
result = await do_something(user_message)
return result
except ValueError as e:
# Handle expected errors gracefully
return f"I couldn't process that: {e}"
except Exception as e:
# Log unexpected errors
logger.error(f"Executor error: {e}", exc_info=True)
return "I encountered an unexpected error. Please try again."
5. Resource Management
# Good: Use context manager
async with Client("http://localhost:8000") as client:
response = await client.ask("Hello")
# Good: Manual cleanup
client = Client("http://localhost:8000")
try:
response = await client.ask("Hello")
finally:
await client.close()
# Bad: No cleanup
client = Client("http://localhost:8000")
response = await client.ask("Hello")
# Resources leak!
🔒 Security Considerations
Environment Variables
Never hard-code credentials:
# ❌ BAD
orchestrator = Orchestrator(
provider=LLMProvider.OPENAI,
api_key="sk-proj-abc123...", # DON'T DO THIS!
model="gpt-4o"
)
# ✅ GOOD
import os
orchestrator = Orchestrator(
provider=LLMProvider.OPENAI,
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o"
)
Authentication
agent = Agent(
name="SecureAgent",
description="Requires authentication",
skills=["secure_skill"],
executor=logic,
security_schemes={
"api_key": {
"type": "apiKey",
"in": "header",
"name": "X-API-Key"
}
}
)
🐛 Troubleshooting
Common Issues
1. "Connection refused"
Problem: Client can't connect to agent
Solution:
# Make sure agent is running
# Check the port matches
# Try: curl http://localhost:8000/agent-card
2. "Module not found: openai"
Problem: Missing LLM provider package
Solution:
pip install openai # or anthropic, google-generativeai
3. Streaming not working
Problem: No streaming support or agent doesn't stream
Solution:
# Enable streaming in agent
agent = Agent(
name="MyAgent",
description="...",
skills=["..."],
executor=logic,
capabilities={"streaming": True} # Add this
)
# Enable in client
client = Client("http://localhost:8000", streaming=True)
4. Low orchestration confidence
Problem: Orchestrator is unsure which agent to select
Solution:
- Improve agent descriptions
- Add more detailed skills with tags
- Make agent names more descriptive
- Use a more powerful LLM (e.g., GPT-4o vs GPT-4o-mini)
5. "Task timeout"
Problem: Agent takes too long to respond
Solution:
# Increase client timeout
client = Client("http://localhost:8000", timeout=120.0)
# Or implement progress updates in your executor
📊 Performance Tips
1. Use Background Servers for Multiple Agents
# Start all agents in background
await recipe_server.start_background()
await shopping_server.start_background()
await weather_server.start_background()
# Now they all run concurrently
2. Reuse Clients
# ❌ BAD: Create new client for each request
for task in tasks:
async with Client(url) as client:
await client.ask(task)
# ✅ GOOD: Reuse client
async with Client(url) as client:
for task in tasks:
await client.ask(task)
3. Use Streaming for Long Responses
# Streaming shows progress and feels faster
async for chunk in client.stream("Long task..."):
print(chunk, end="", flush=True)
4. Parallel Task Submission
# Submit multiple tasks in parallel
tasks = [
client1.ask("Task 1"),
client2.ask("Task 2"),
client3.ask("Task 3")
]
results = await asyncio.gather(*tasks)
🆘 Getting Help
Documentation & Resources
- Examples: Check the
examples/directory for complete working code - Tests: Review
tests/for usage patterns and edge cases - License: See
LICENSEfile for terms and conditions
Common Questions
Q: Do I need to understand A2A protocol?
A: No! That's the whole point of Synqed - it abstracts the protocol away.
Q: Can I use any LLM provider?
A: In your agent executor, yes! For orchestration, we support OpenAI, Anthropic, and Google.
Q: How many agents can I run?
A: As many as your system resources allow. Each agent runs on its own port.
Q: Can agents call other agents?
A: Yes! Use the Client within your executor function to call other agents.
Q: Is Synqed production-ready?
A: Yes, with proper error handling, logging, and monitoring in place.
Q: What's the difference between Orchestrator and TaskDelegator?
A: Orchestrator routes tasks intelligently using an LLM. TaskDelegator executes the routing decision and manages the actual delegation.
📄 License
This software is proprietary and confidential. See LICENSE file for full terms.
Copyright © 2025 Synq Team. All rights reserved.
🚀 Next Steps
Ready to build something amazing?
- Install:
pip install synqed - Try examples: Explore the
examples/directory - Build your first agent: Start with the Quick Start above
- Scale up: Add orchestration and delegation
- Deploy: Take it to production
Happy building! 🎉
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file synqed-1.0.2.tar.gz.
File metadata
- Download URL: synqed-1.0.2.tar.gz
- Upload date:
- Size: 88.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
44275e6e4e594d6eb466beaf977dac958efc8ae45b7291d2246123a701dbee68
|
|
| MD5 |
4b0384ac552ee6ec25a92f702897dfa2
|
|
| BLAKE2b-256 |
9ac3540808f5a14ae91a9da7cfc01431a9c7a830fb6c14cec32a0e3bc0c87434
|
File details
Details for the file synqed-1.0.2-py3-none-any.whl.
File metadata
- Download URL: synqed-1.0.2-py3-none-any.whl
- Upload date:
- Size: 27.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1db383d50ad1f6c8687d3102d4bdd8fd55f160abb86233b2328224125ec0ddfd
|
|
| MD5 |
ce61898865076a7d9ce41f0f28d1e1d8
|
|
| BLAKE2b-256 |
cade880fb66cd089b5aa5816803072df05895ec3c5c456c7719f6d40462dcdbf
|