Load-aware scheduling layer for multi-agent systems
Project description
LOCO-Agent
Load-aware scheduling layer for multi-agent AI systems. Sits underneath any Python agent framework and decides which agent gets the shared resource next -- based on queue depth, wait time, and task cost.
Works with LangChain, CrewAI, Google ADK, OpenAI Agents SDK, Anthropic SDK, AWS Bedrock, and Azure/AutoGen. 289 tests. AGPL-3.0.
Features
- Load function scheduling -- one equation ranks all agents:
L(i) = alpha * (Qi / max Qj) + (1 - alpha) * (Dmax_i / max Dmax_j) - No priority rules -- agents with urgent work escalate automatically via Dmax (wait time)
- Bounded concurrency --
SharedResource(capacity=N)limits concurrent LLM calls - Backpressure --
max_waiterscap prevents unbounded queue growth - Cost tracking -- per-agent token spend visibility across all frameworks
- Budget management -- per-agent spend limits with reject / alert / downgrade enforcement modes
- Empirical cost tuning -- EMA-based weight adjustment from actual token usage
- Adaptive alpha -- auto-tunes the latency/throughput tradeoff based on observed wait-time variance
- 7 framework adapters -- Anthropic, OpenAI, Google ADK, LangChain, CrewAI, AWS Bedrock, Azure/AutoGen
- Multi-resource -- deadlock-safe scheduling across multiple resources (LLM + DB + GPU)
- A2A protocol -- registers as a first-class agent-to-agent participant
- Framework-agnostic -- a LangChain agent and an ADK agent are indistinguishable to the scheduler
Install
git clone https://github.com/ArielSmoliar/loco-agent.git
cd loco-agent
python3 -m venv .venv && source .venv/bin/activate
pip install -e ".[dev]"
Python 3.10+. Zero required dependencies (adapters use optional deps).
Quick Start
import asyncio
from loco import Agent, Task, AsyncLOCOScheduler, SharedResource
async def main():
# 1. Define a shared resource with bounded concurrency
resource = SharedResource("llm_api", capacity=1)
# 2. Register agents
agents = [Agent(agent_id="urgent"), Agent(agent_id="batch")]
scheduler = AsyncLOCOScheduler(agents, resource, optimize_for="balanced")
# 3. Submit tasks -- weight reflects cost (1=cheap, 5=expensive)
for _ in range(5):
await scheduler.submit_task("batch", Task(weight=1.0))
await scheduler.submit_task("urgent", Task(weight=3.0))
# 4. Agents compete for the resource via acquire/release
async def worker(agent_id, n):
for _ in range(n):
async with scheduler.acquire(agent_id):
scheduler.get_agent(agent_id).serve_oldest_task()
await asyncio.sleep(0)
await asyncio.gather(worker("urgent", 1), worker("batch", 5))
# 5. Inspect cost
print(scheduler.metrics.cost_by_agent())
print(f"Total: {scheduler.metrics.total_cost()}")
asyncio.run(main())
Core Concepts
The Load Function
L(i) = alpha * (Qi / max Qj) + (1 - alpha) * (Dmax_i / max Dmax_j)
| Term | What it is |
|---|---|
Qi |
Weighted queue depth -- sum of task.weight in agent i's queue |
Dmax_i |
Age of the oldest waiting task (measured in ticks) |
alpha |
Tradeoff: 0.0 = latency-first, 0.5 = throughput-first |
Both terms are normalized across all competing agents. Relative load, not absolute.
Ticks
A tick is one unit of work completed. Each release() increments the tick counter and ages all waiting tasks by 1. Under heavy load, ticks fire fast. Under low load, ticks fire slowly. Priority only shifts when there's actual contention.
Alpha
| Setting | alpha | Behavior | Use when |
|---|---|---|---|
"latency" |
0.0 | Serve longest-waiting agents first | Webhooks, user-facing requests |
"balanced" |
0.25 | Default | Most workloads |
"throughput" |
0.5 | Serve deepest-backlog agents first | Batch processing, ETL |
Do not use alpha > 0.5. Simulation proves alpha >= 0.75 causes starvation.
Task Weight
Task weight is a cost proxy set at submit time. The scheduler uses it for queue depth scoring but never interprets it as dollars or tokens -- that's the adapter's job.
| Model tier | Typical weight |
|---|---|
| haiku / gpt-4o-mini / gemini-flash | 1.0 |
| sonnet / gpt-4o / gemini-pro | 2.0--3.0 |
| opus / o1 | 5.0 |
Adapters compute weight automatically from model name and prompt length. Without an adapter, set weight manually on each Task.
Contention Resolution
When multiple agents call acquire() and the resource is full:
- Agent joins the wait queue
- On each
release(), the scheduler re-scores ALL waiters using L(i) - Highest score gets the slot -- not FIFO
- Dmax grows every tick an agent waits, preventing starvation
Scoring happens at grant time, not request time. An agent that arrived late but has high Dmax can win over one that arrived first.
sequenceDiagram
participant A as Agent A (L=0.9)
participant B as Agent B (L=0.6)
participant C as Agent C (L=0.3)
participant S as Scheduler
participant R as Resource (capacity=1)
A->>S: acquire()
S->>R: slot available, grant A
B->>S: acquire()
S-->>B: capacity full, wait
C->>S: acquire()
S-->>C: capacity full, wait
Note over B,C: Tasks age each tick (Dmax grows)
A->>S: release()
S->>S: tick++ / age tasks / re-score
Note over S: B: L=0.7 / C: L=0.5
S->>R: grant B (highest)
B->>S: release()
S->>S: tick++ / age tasks / re-score
S->>R: grant C (only waiter)
API Reference
AsyncLOCOScheduler
from loco import Agent, Task, AsyncLOCOScheduler, SharedResource
scheduler = AsyncLOCOScheduler(
agents=[ Agent(agent_id="a"), Agent(agent_id="b") ],
resource=SharedResource("llm_api", capacity=3),
optimize_for="balanced", # or "latency" / "throughput"
max_waiters=100, # backpressure limit
seed=42, # deterministic tie-breaking
auto_tune=True, # adaptive alpha tuning
on_task_started=callback, # lifecycle hook
on_task_completed=callback,
)
| Method | Description |
|---|---|
await submit_task(agent_id, task) |
Enqueue a task. Auto-registers unknown agents. |
async with acquire(agent_id, timeout=None) |
Context manager. Blocks until L(i) wins a slot. Auto-releases on exit. |
await acquire_start(agent_id, timeout=None) |
Split API. Returns AcquireHandle. Use when acquire and release happen in separate callbacks. |
await release_handle(handle) |
Release via handle from acquire_start(). Safe to call multiple times. |
register_agent(agent) |
Register a new agent at runtime. |
unregister_agent(agent_id) |
Remove an agent. Raises if holding or waiting. |
get_agent(agent_id) |
Get the Agent object. |
await shutdown(timeout=30.0) |
Graceful shutdown. Cancels waiters, drains in-flight holders. |
| Property | Type | Description |
|---|---|---|
agents |
dict[str, Agent] |
All registered agents |
alpha |
float |
Current alpha value |
logical_tick |
int |
Current tick counter |
resource |
SharedResource |
The shared resource |
metrics |
SchedulerMetrics |
Cost and fairness metrics |
Task
Task(weight=3.0, task_type="anthropic:opus")
| Field | Type | Default | Description |
|---|---|---|---|
weight |
float |
1.0 |
Cost proxy for scheduling |
task_type |
str |
"" |
Label (e.g., "anthropic:sonnet") |
age |
int |
0 |
Ticks waited. Auto-incremented by scheduler. |
Agent
Agent(agent_id="fraud-detector", agent_type="batch")
| Property | Description |
|---|---|
agent_id |
Unique identifier |
agent_type |
Label (e.g., "webhook", "batch") |
tasks |
Pending task queue |
completed_tasks |
Completed task list |
queue_depth_weighted |
Sum of task weights (Qi) |
dmax |
Age of oldest task (Dmax_i) |
serve_oldest_task() |
Pop and complete the oldest task |
SharedResource
SharedResource(name="llm_api", capacity=3)
| Property | Description |
|---|---|
capacity |
Max concurrent holders |
utilization |
holder_count / capacity (0.0 to 1.0) |
available_slots |
capacity - holder_count |
holder_count |
Currently holding agents |
waiter_count |
Currently waiting agents |
SchedulerMetrics
scheduler.metrics.cost_by_agent()
# {"fraud-detector": 847.5, "webhook-handler": 42.0}
scheduler.metrics.total_cost()
# 889.5
scheduler.metrics.agent_cost("fraud-detector")
# 847.5
Also: record_actual_tokens(agent_id, task, tokens), empirical_weight(agent_id), actual_tokens_by_agent(), total_actual_tokens().
BudgetManager
from loco.budget import BudgetManager, BudgetExceededError
budget = BudgetManager(default_limit=100.0, on_exceeded="reject")
budget.set_limit("expensive-agent", max_cost=50.0)
budget.check("expensive-agent", task_cost=10.0) # True
budget.record_spend("expensive-agent", cost=10.0)
budget.remaining("expensive-agent") # 40.0
budget.spent("expensive-agent") # 10.0
budget.summary() # full state dict
budget.reset("expensive-agent") # reset spend to 0
budget.reset_all() # reset all agents
| Enforcement mode | Behavior |
|---|---|
"reject" |
Raises BudgetExceededError |
"alert" |
Logs warning, allows the task, records alert |
"downgrade" |
Allows the task, flags for model downgrade |
Budget alerts: budget.alerts returns a list of all exceeded events.
Framework Adapters
All adapters follow the same pattern: wrap LLM calls in LOCO scheduling. The developer's agent code does not change.
Anthropic SDK
from loco.adapters.anthropic import AnthropicAdapter
adapter = AnthropicAdapter(scheduler, client=anthropic.AsyncAnthropic())
response = await adapter.create_message("analyst", model="claude-sonnet-4-20250514", ...)
Auto-computes weight from model tier (opus=5, sonnet=2, haiku=1) and prompt length.
OpenAI Agents SDK
from loco.adapters.openai import OpenAIAdapter
adapter = OpenAIAdapter(scheduler, client=openai.AsyncOpenAI())
response = await adapter.create_chat("assistant", model="gpt-4o", messages=[...])
Weight: gpt-4o=3, gpt-4o-mini=1.
Google ADK
from loco.adapters.google_adk import ADKAdapter
adapter = ADKAdapter(scheduler)
# Wire into ADK agent callbacks:
agent = adk.Agent(
name="support",
model="gemini-2.0-flash",
before_model_callback=adapter.before_model,
after_model_callback=adapter.after_model,
)
Uses split acquire/release across the two callbacks. Weight from Gemini model tier.
LangChain
from loco.adapters.langchain import LOCOCallbackHandler
callback = LOCOCallbackHandler(scheduler, agent_id="rag-pipeline")
llm = ChatOpenAI(callbacks=[callback])
Hooks into on_llm_start / on_llm_end. Extracts model from serialized config.
CrewAI
from loco.adapters.crewai import CrewAIAdapter
adapter = CrewAIAdapter(scheduler)
result = await adapter.run_crew(crew, task_descriptions=[...])
Per-step scheduling via step_callback. Weight by agent role.
AWS Bedrock
from loco.adapters.aws_bedrock import BedrockAdapter
adapter = BedrockAdapter(scheduler, client=bedrock_client)
response = await adapter.invoke("security-scanner", model_id="anthropic.claude-sonnet-4-20250514-v1:0", body={...})
Weight from Bedrock model family (Claude, Llama, Titan).
Azure / AutoGen
from loco.adapters.autogen import AutoGenAdapter
adapter = AutoGenAdapter(scheduler, default_model="gpt-4o")
result = await adapter.send_message("coordinator", "analyst", "analyze this")
result = await adapter.publish_message("coordinator", "security", content, subscribers=[...])
Wraps AutoGen v0.4 message delivery. Weight from Azure OpenAI model tier.
Cross-Framework Scheduling
All frameworks point to the same scheduler instance:
scheduler = AsyncLOCOScheduler(all_agents, llm_api, optimize_for="balanced")
# LangChain agents: "rag-pipeline", "qa-chain", "summarizer"
# ADK agents: "webhook-handler", "support-bot"
# All 5 compete for the same 3 LLM API slots
When ADK webhooks spike, their Dmax grows. The scheduler deprioritizes LangChain batch jobs automatically.
Examples
python examples/burst.py # 8 agents, simultaneous work arrival
python examples/fairness.py # 10 agents, sustained load, Jain's fairness
python examples/webhook_spike.py # Background load + urgent webhook spike
python examples/mdash_security.py # Multi-model cost routing (55 agents)
python sandbox.py --scenario webhook_spike --optimize-for latency
python sandbox.py --scenario burst --agents 10
See the Evaluation Guide for copy-paste examples per framework. No API keys needed.
Architecture
graph TD
subgraph Adapters
A1["Anthropic"] --> SCH
A2["OpenAI"] --> SCH
A3["ADK"] --> SCH
A4["LangChain"] --> SCH
A5["CrewAI"] --> SCH
A6["Bedrock"] --> SCH
A7["AutoGen"] --> SCH
end
SCH["AsyncLOCOScheduler\nL(i) scoring + grant"] --> RES["SharedResource\ncapacity=N"]
SCH --- MET["SchedulerMetrics\ncost tracking"]
SCH --- BUD["BudgetManager\nspend limits"]
SCH --- ALP["AdaptiveAlphaTuner\nauto-tune"]
style SCH fill:#e65100,color:#fff,stroke:#e65100
style RES fill:#2e7d32,color:#fff,stroke:#2e7d32
style MET fill:#1565c0,color:#fff,stroke:#1565c0
style BUD fill:#1565c0,color:#fff,stroke:#1565c0
style ALP fill:#1565c0,color:#fff,stroke:#1565c0
| Public API | What it does |
|---|---|
submit_task(agent_id, task) |
Enqueue task to agent |
acquire(agent_id) |
compute_load_scores() -> select_agent() -> grant or wait |
release (implicit) |
tick++ -> age tasks -> re-score waiters -> grant next |
shutdown(timeout) |
Cancel waiters, drain in-flight |
Roadmap
Shipped
- Async scheduler with acquire/release, backpressure, cancellation
optimize_forAPI, split acquire/release, dynamic agent registration- 4 scenarios validated, structured JSON logging, metrics API
- 7 framework adapters (Anthropic, OpenAI, ADK, LangChain, CrewAI, Bedrock, AutoGen)
- Empirical cost tracking (EMA-based weight adjustment)
- Adaptive alpha tuning (
auto_tune=True) - Multi-resource contention (deadlock-safe ResourcePool)
- BudgetManager with per-agent spend limits (reject / alert / downgrade)
- A2A protocol integration
- 263 tests
Next
- Wire BudgetManager into AsyncLOCOScheduler (automatic enforcement on acquire)
- Prometheus / OTEL exporter
- Team/tenant model for organizational cost governance
- Model-tier routing (load/budget-aware model selection)
See ROADMAP.md for the full plan.
Contributing
git clone https://github.com/ArielSmoliar/loco-agent.git
cd loco-agent
python3 -m venv .venv && source .venv/bin/activate
pip install -e ".[dev]"
pytest # 263 tests
See CONTRIBUTING.md for the full guide.
License
AGPL-3.0. See LICENSE.
Enterprise licensing available -- contact ariel.smoliar@gmail.com.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file loco_agent-0.2.2.tar.gz.
File metadata
- Download URL: loco_agent-0.2.2.tar.gz
- Upload date:
- Size: 79.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
04a9320ac95477edfbecbd7cdaf1f6ef584f7a966d493575f29ae497399f0a4d
|
|
| MD5 |
1ca0015e14c19e2328a04b9af92d8454
|
|
| BLAKE2b-256 |
d48652e715d1f72bc7580319e09d34892315cb9b378361a970339c284060df05
|
File details
Details for the file loco_agent-0.2.2-py3-none-any.whl.
File metadata
- Download URL: loco_agent-0.2.2-py3-none-any.whl
- Upload date:
- Size: 60.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
621cc95f2746454d34a4b7c40ef39d74b387dbefbfb1be8377cd5acaad76ac35
|
|
| MD5 |
f7b16096577ad638f94bc4e7dda9d329
|
|
| BLAKE2b-256 |
c43d59d19a96697cec6cc390f61c5cf961cede1d87a6ecac63d9d87023130681
|