Pydantic-AI based Multi-Agent Framework with YAML-based Agents, Teams, Workflows & Extended ACP / AGUI integration
Project description
LLMling-Agent
Read the documentation!
🚀 Getting Started
LLMling Agent is a framework for creating and managing LLM-powered agents. It integrates with LLMling's resource system and provides structured interactions with language models.
✨ Features
- 🔄 Modern python written from ground up with Python 3.13
- 🤝 Integrate multiple external ACP agents (Claude Code, Codex, Goose, etc.), AGUI Agents as well as native Pydantic-AI based agents into a single pool where they can cooperate, interact and delegate.
- 🛡️ Complete (multi-)agent pool setup via YAML files including extensive JSON schema to help with creating configurations.
- 🔌 Extensive MCP support including elicitation, sampling, progress reporting, multi-modality, including bridging to ACP / AG-UI protocols.
- 🛜 Comletely UPath backed. All file operations (& Code execution) by agents are abstrated in a way that agents can operate directly on remote sources without having to install anything on the remote.
- 🎤 Streaming TTS- support for all Agents
- 📚 Improved aider-based RepoMap implementation for code exploration
- 📂 Composable virtual filesystems for agents
- 📝 CodeMode support
Quick Start
Agent Client Protocol (ACP)
The fastest way to start chatting with an AI:
llmling-agent supports the Agent Client Protocol for seamless integration with desktop applications and IDEs. Run your agents as ACP servers to enable bidirectional communication, session management, and file operations through JSON-RPC 2.0 over stdio.
The recommended client is Zed IDE (& soon Toad, a python client based on Textual)
# Start ACP server
llmling-agent[default,coding] serve-acp [path/to/config.yml]
Compatible with ACP-enabled Clients like Zed. See the ACP Integration documentation for setup instructions.
Run /help in the chat to see what commands are at your disposal.
YAML configuration
While you can define agents with 3 lines of YAML (or competely programmatic or via CLI), you can also create agents as well as their connections, agent tasks, storage providers and much more via YAML. This is the extended version
# agents.yml
agents:
analyzer:
type: native
name: "Code Analyzer" # Display name
inherits: "base_agent" # Optional parent config to inherit from
description: "Code analysis specialist"
debug: false
retries: 1 # Number of retries for failed operations
model: # Model configuration
type: "fallback" # Lot of special "meta-models" included out of the box!
models: # Try models in sequence
- "openai:gpt-5"
- "openai:gpt-5-nano"
- "anthropic:claude-sonnet-4-0"
# Structured output
output_type:
type: "inline" # or "import" for Python types
fields:
severity:
type: "str"
description: "Issue severity"
issues:
type: "list[str]"
description: "Found issues"
# Core behavior
system_prompts:
- "You analyze code for potential issues and improvements."
# Session & History
session:
name: "analysis_session"
since: "1h" # Only load messages from last hour
roles: ["user", "assistant"] # Only specific message types
# Toolsets (available tool groups)
toolsets:
- type: agent_management # Enables delegation
- type: resource_access # Enables resource loading
# Knowledge sources
knowledge:
paths: ["docs/**/*.md"] # Glob patterns for files
resources:
- type: "repository"
url: "https://github.com/user/repo"
prompts:
- type: "file"
path: "prompts/analysis.txt"
# MCP Server integration
mcp_servers:
- type: "stdio"
command: "python"
args: ["-m", "mcp_server"]
env:
DEBUG: "1"
- "python -m other_server" # shorthand syntax
# Worker agents (specialists)
workers:
- type: agent
name: "formatter"
reset_history_on_run: true
pass_message_history: false
- "linter" # shorthand syntax
# Message forwarding
connections:
- type: node
name: "reporter"
connection_type: "run" # "run" | "context" | "forward"
priority: 1
queued: true
queue_strategy: "latest"
transform: "my_module.transform_func"
wait_for_completion: true
filter_condition: # When to forward messages
type: "word_match"
words: ["error", "warning"]
case_sensitive: false
stop_condition: # When to disconnect
type: "message_count"
max_messages: 100
count_mode: "total" # or "per_agent"
exit_condition: # When to exit application
type: "cost_limit"
max_cost: 10.0
# Event triggers
triggers:
- type: "file"
name: "code_change"
paths: ["src/**/*.py"]
extensions: [".py"]
debounce: 1000 # ms
teams:
# Complex workflows via YAML
full_pipeline:
mode: sequential
members:
- analyzer
- planner
connections:
- type: node
name: final_reviewer
wait_for_completion: true
- type: file
path: "reports/{date}_workflow.txt"
# Response type definitions
responses:
AnalysisResult:
response_schema:
type: "inline"
description: "Code analysis result format"
fields:
severity: {type: "str"}
issues: {type: "list[str]"}
ComplexResult:
type: "import"
import_path: "myapp.types.ComplexResult"
# Storage configuration
storage:
providers:
- type: "sql"
url: "sqlite:///history.db"
pool_size: 5
- type: "text_file"
path: "logs/chat.log"
format: "chronological"
log_messages: true
log_conversations: true
log_commands: true
# Pre-defined jobs
jobs:
analyze_code:
name: "Code Analysis"
description: "Analyze code quality"
prompt: "Analyze this code: {code}"
required_return_type: "AnalysisResult"
knowledge:
paths: ["src/**/*.py"]
tools: ["analyze_complexity", "run_linter"]
You can use an Agents manifest in multiple ways:
- Run it using the CLI
llmling-agent run --config agents.yml my_agent "Some prompt"
- Start watch mode and only react to triggers
llmling-agent watch --config agents.yml
Agent Pool: Multi-Agent Coordination
The AgentPool allows multiple agents to work together on tasks, including external ACP-enabled agents like Claude Code, Codex, or Goose. Here's a practical example of parallel file downloading:
# agents.yml
agents:
file_getter:
type: native
model: openai:gpt-5-mini
toolsets:
- type: file_access # includes download_file, read_file, list_directory
system_prompts:
- |
You are a download specialist. Just use the download_file tool
and report its results. No explanations needed.
overseer:
type: native
toolsets:
- type: agent_management # Enables delegation and agent discovery tools
model: openai:gpt-5-mini
system_prompts:
- |
You coordinate downloads using available agents.
1. Check out the available agents and assign each of them the download task
2. Report the results.
Programmatic Usage:
from llmling_agent.delegation import AgentPool
async def main():
async with AgentPool("agents.yml") as pool:
# first we create two agents based on the file_getter template
file_getter_1 = pool.get_agent("file_getter")
file_getter_2 = pool.get_agent("file_getter")
# then we form a team and execute the task
team = file_getter_1 & file_getter_2
responses = await team.run_parallel("Download https://example.com/file.zip")
# Or let a coordinator orchestrate using his capabilities.
coordinator = pool.get_agent("coordinator")
result = await overseer.run(
"Download https://example.com/file.zip by delegating to all workers available!"
)
External ACP Agents
You can also integrate external ACP-enabled agents into your pool via YAML configuration:
# agents.yml
agents:
claude:
type: acp
provider: claude
display_name: "Claude Code"
description: "Claude Code through ACP"
goose:
type: acp
provider: goose
display_name: "Goose"
description: "Block's Goose agent through ACP"
coordinator:
type: native
model: openai:gpt-5-mini
toolsets:
- type: agent_management # Enables delegation to ACP agents
async with AgentPool("agents.yml") as pool:
# Access external ACP agents just like regular agents
claude = pool.get_agent("claude")
result = await claude.run("Refactor this code")
See the ACP Integration documentation for supported agents and configuration options.
The framework provides three types of message nodes:
- Agents: Individual LLM-powered actors
# Single agent processing
analyzer = pool.get_agent("analyzer")
result = await analyzer.run("analyze this")
- Teams: Groups for parallel execution
# Create team using & operator
team = analyzer & planner & executor
results = await team.run("handle this task")
- TeamRuns: Sequential execution chains
# Create chain using | operator
chain = analyzer | planner | executor
results = await chain.run("process in sequence")
The beauty of this system is that these nodes are completely composable:
def process_text(text: str) -> str:
return text.upper()
# Nested structures work naturally
team_1 = analyzer & planner # Team
team_2 = validator & reporter # Another team
chain = team_1 | process_text | team_2 # Teams and Callables in a chain
# Complex workflows become intuitive
(analyzer & planner) | validator # Team followed by validator
team_1 | (team_2 & agent_3) # Chain with parallel components
# Every node has the same core interface
async for message in node.run_iter("prompt"):
print(message.content)
# Monitoring works the same for all types
print(f"Messages: {node.stats.message_count}")
print(f"Cost: ${node.stats.total_cost:.2f}")
(note: the operator overloading is just syntactic sugar. In general, teams should be created
using pool.create_team() / pool.create_team_run() or agent/team.connect_to())
)
All message nodes support the same execution patterns:
# Single execution
result = await node.run("prompt")
# Streaming
async for event in node.run_stream("prompt"):
print(event)
# Nested teams work naturally
team_1 = analyzer & planner # First team
team_2 = validator & reporter # Second team
parallel_team = Team([team_1, agent_3, team_2]) # Team containing teams!
# This means you can create sophisticated structures:
result = await parallel_team.run("analyze this") # Will execute:
# - team_1 (analyzer & planner) in parallel
# - agent_3 in parallel
# - team_2 (validator & reporter) in parallel
# And still use all the standard patterns:
async for msg in parallel_team.run_iter("prompt"):
print(msg.content)
# With full monitoring functionality:
print(f"Total cost: ${parallel_team.stats.total_cost:.2f}")
This unified system makes it easy to:
- Build complex workflows
- Monitor message flow
- Compose nodes in any combination
- Use consistent patterns across all node types
Each message in the system carries content, metadata, and execution information, providing a consistent interface across all types of interactions. See Message System for details.
Advanced Connection Features
Connections between agents are highly configurable and support various patterns:
# Basic connection in shorthand form.
connection = agent_a >> agent_b # Forward all messages
# Extended setup: Queued connection (manual processing)
connection = agent_a.connect_to(
agent_b,
queued=True,
queue_strategy="latest", # or "concat", "buffer"
)
# messages can queue up now
await connection.trigger(optional_additional_prompt) # Process queued messages sequentially
# Filtered connection (example: filter by keyword):
connection = agent_a.connect_to(
agent_b,
filter_condition=lambda ctx: "keyword" in ctx.message.content,
)
# Conditional disconnection (example: disconnect after cost limit):
connection = agent_a.connect_to(
agent_b,
filter_condition=lambda ctx: ctx.stats.total_cost > 1.0,
)
# Message transformations
async def transform_message(message: str) -> str:
return f"Transformed: {message}"
connection = agent_a.connect_to(agent_b, transform=transform_message)
# Connection statistics
print(f"Messages processed: {connection.stats.message_count}")
print(f"Total tokens: {connection.stats.token_count}")
print(f"Total cost: ${connection.stats.total_cost:.2f}")
The two basic programmatic patterns of this librry are:
- Tree-like workflows (hierarchical):
# Can be modeled purely with teams/chains using & and |
team_a = agent1 & agent2 # Parallel branch 1
team_b = agent3 & agent4 # Parallel branch 2
chain = preprocessor | team_a | postprocessor # Sequential with team
nested = Team([chain, team_b]) # Hierarchical nesting
- DAG (Directed Acyclic Graph) workflows:
# Needs explicit signal connections for non-tree patterns
analyzer = Agent("analyzer")
planner = Agent("planner")
executor = Agent("executor")
validator = Agent("validator")
# Can't model this with just teams - need explicit connections
analyzer.connect_to(planner)
analyzer.connect_to(executor) # Same source to multiple targets
planner.connect_to(validator)
executor.connect_to(validator) # Multiple sources to same target
validator.connect_to(executor) # Cyclic connections
BOTH connection types can be set up for BOTH teams and agents intiuiviely in the YAML file.
You can also use LLMling-models for more sophisticated human-in-the-loop integration:
- Remote human operators via network
- Hybrid human-AI workflows
- Input streaming support
- Custom UI integration
Multi-Modal Support
Handle images and PDFs alongside text (depends on provider / model support)
from llmling_agent import Agent
async with Agent(...) as agent:
result = await agent.run("What's in this image?", pathlib.Path("image.jpg"))
result = await agent.run("What's in this PDF?", pathlib.Path("document.pdf"))
Command System
Extensive slash commands available when used via ACP:
/list-tools # Show available tools
/enable-tool tool_name # Enable specific tool
/connect other_agent # Forward results
/model gpt-5 # Switch models
/history search "query" # Search conversation
/stats # Show usage statistics
Storage & Analytics
All interaction is tracked using (multiple) configurable storage providers. Information can get fetched via CLI.
# View recent conversations
llmling-agent history show
llmling-agent history show --period 24h # Last 24 hours
llmling-agent history show --query "database" # Search content
# View usage statistics
llmling-agent history stats # Basic stats
llmling-agent history stats --group-by model # Model usage
llmling-agent history stats --group-by day # Daily breakdown
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file agentichub-1.17.0.tar.gz.
File metadata
- Download URL: agentichub-1.17.0.tar.gz
- Upload date:
- Size: 548.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"25.10","id":"questing","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
957d8136dbb6bedb255863604fe3d6a9b38f03a6a6a6b1de9f32fbde0547cb24
|
|
| MD5 |
d7fc10e90702e5e5b3d1c3cce7266105
|
|
| BLAKE2b-256 |
56b07e1892a7a7f25a8d6b3f7702f99938795dfa0fd0431887e0c9f5f7644755
|
File details
Details for the file agentichub-1.17.0-py3-none-any.whl.
File metadata
- Download URL: agentichub-1.17.0-py3-none-any.whl
- Upload date:
- Size: 769.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"25.10","id":"questing","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b4ce95c42b18f1eb6e1410f70299a44de13dae27db867bb0ceb6ab3e566512d8
|
|
| MD5 |
1d0d0e5eeaccded189664c3c34d09c28
|
|
| BLAKE2b-256 |
978256dd3122ef37328fa3e2eba388742be598e3b18d5df77e93050f56d176c0
|