Contention-based multi-agent coordination with transactional floor control
Project description
floorctl
Contention-based multi-agent coordination with transactional floor control.
A Python framework for building multi-agent systems where agents compete for a shared floor rather than being orchestrated by a central controller.
Key Innovations
- Transactional Floor Control: Atomic claim/release ensures only one agent speaks at a time
- Urgency-Based Scheduling: Agents compute urgency scores to decide when to compete
- Self-Validating Agents: Each agent validates its own output before posting
- Reactive Moderator: Observer pattern — watches and intervenes, doesn't control
- Modular Capabilities: Extend agents with RAG, memory, tools, and custom hooks — no subclassing
- Research-Grade Metrics: Gini coefficient, floor dynamics, validation rates
Install
pip install floorctl # core library (zero dependencies)
pip install "floorctl[openai]" # + OpenAI for LLM-powered agents
pip install "floorctl[websocket]" # + WebSocket for distributed sessions
pip install "floorctl[firestore]" # + Firebase Firestore for production
pip install "floorctl[all]" # all optional backends
pip install "floorctl[dev]" # + pytest, mypy, ruff for development
Quick Start
from floorctl import FloorAgent, FloorSession, AgentProfile, InMemoryBackend
def my_llm(agent_name, context):
# Your LLM call here
return f"{agent_name}: My response about {context['topic']}"
backend = InMemoryBackend()
agent1 = FloorAgent(
name="Optimist",
profile=AgentProfile(name="Optimist", react_to=["problem", "risk"], temperament="passionate"),
generate_fn=my_llm,
backend=backend,
)
agent2 = FloorAgent(
name="Skeptic",
profile=AgentProfile(name="Skeptic", react_to=["opportunity", "vision"], temperament="reactive"),
generate_fn=my_llm,
backend=backend,
)
session = FloorSession(backend=backend)
session.add_agent(agent1)
session.add_agent(agent2)
result = session.run("debate-001", topic="Should we colonize Mars?")
Core Concepts
| Concept | What it does |
|---|---|
| FloorAgent | An autonomous agent that listens, computes urgency, claims the floor, generates a response, self-validates, and posts |
| FloorSession | Orchestrates the lifecycle — starts agents in threads, runs the moderator, collects metrics |
| ModeratorObserver | Watches the conversation and intervenes only on anomalies (silence, dominance, escalation) |
| Backend | The coordination substrate — atomic floor claims, turn posting, pub/sub. Swap between InMemory, WebSocket, or Firestore |
| Urgency Scorer | 8 composable components that decide whether an agent should compete for the floor |
| Validators | Self-validation pipeline — agents check their own output before posting |
| Capabilities | Modular plugins (RAG, memory, tools) that hook into the generate pipeline without subclassing |
Full Example with OpenAI + Moderator
import os
from openai import OpenAI
from floorctl import (
FloorAgent, FloorSession, ModeratorObserver,
AgentProfile, ArenaConfig, PhaseConfig, PhaseSequence,
FloorConfig, ModeratorConfig, InMemoryBackend,
SpeakerPrefixValidator, DuplicateValidator,
LengthValidator, BannedPhraseValidator,
)
client = OpenAI() # uses OPENAI_API_KEY from env
# --- Generator: wraps any LLM ---
def generate(agent_name: str, context: dict) -> str:
retry_info = ""
if context.get("retry_failures"):
retry_info = "\nFix these issues:\n" + "\n".join(
f"- {f}" for f in context["retry_failures"]
)
prompt = f"""You are {agent_name}.
Personality: {context.get('personality', '')}
Topic: {context['topic']}
Phase: {context['phase']}
Recent discussion:
{context.get('recent_turns', '(none)')}
RULES:
- Prefix response with "{agent_name}:"
- {context.get('phase_min_words', 15)}-{context.get('phase_max_words', 80)} words
- Be substantive, no filler
{retry_info}
Respond:"""
resp = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
max_tokens=200, temperature=0.8,
)
return resp.choices[0].message.content.strip()
# --- Moderator ---
def moderate(prompt_type: str, context: dict) -> str:
prompts = {
"intro": f"You moderate a discussion on '{context.get('topic')}' with {context.get('agents')}. 2-sentence intro.",
"invite_opening": f"Invite {context.get('agent_name')} to share their view. 1 sentence.",
"phase_transition": f"Transition from {context.get('previous_phase')} to {context.get('phase')}. 2 sentences.",
"intervention": f"Moderate: {context.get('intervention_type')} detected for {context.get('target_agent')}. 1 sentence.",
"closing": f"Close the discussion on '{context.get('topic')}'. 2 sentences.",
}
resp = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompts.get(prompt_type, "Brief comment.")}],
max_tokens=150, temperature=0.7,
)
return resp.choices[0].message.content.strip()
# --- Setup ---
backend = InMemoryBackend()
phases = PhaseSequence(phases=[
PhaseConfig(name="IDEAS", is_opening=True, max_turns=12, max_words=80, min_words=10),
PhaseConfig(name="DEBATE", min_turns=6, max_turns=16, max_words=120, min_words=20),
PhaseConfig(name="CLOSING", is_terminal=True),
])
config = ArenaConfig(
phases=phases,
floor=FloorConfig(timeout_seconds=30, min_turns_between_speaking=1),
moderator=ModeratorConfig(silence_threshold=3, dominance_threshold=3),
banned_phrases=["As an AI", "Let's face it"],
max_total_turns=20,
)
validators = [
SpeakerPrefixValidator(),
DuplicateValidator(),
LengthValidator(),
BannedPhraseValidator(banned_phrases=config.banned_phrases),
]
optimist = FloorAgent(
name="Optimist",
profile=AgentProfile(
name="Optimist",
personality="Bold futurist. Thinks in moonshots.",
react_to=["risk", "problem", "can't", "impossible"],
temperament="passionate",
urgency_boost_keywords=["future", "opportunity", "transform"],
),
generate_fn=generate, backend=backend,
validators=validators, config=config,
)
skeptic = FloorAgent(
name="Skeptic",
profile=AgentProfile(
name="Skeptic",
personality="Pragmatic engineer. Argues with data.",
react_to=["dream", "vision", "should", "must"],
temperament="reactive",
urgency_boost_keywords=["cost", "evidence", "precedent"],
),
generate_fn=generate, backend=backend,
validators=validators, config=config,
)
session_id = "demo-001"
backend.create_session(session_id, {
"topic": "Should AI be given legal personhood?",
"phase": "IDEAS",
"participants": ["Optimist", "Skeptic"],
})
moderator = ModeratorObserver(
agent_names=["Optimist", "Skeptic"],
moderator_fn=moderate,
backend=backend,
session_id=session_id,
phase_sequence=phases,
config=config.moderator,
)
backend.subscribe_turns(session_id, lambda t: print(f"[{t.speaker}] {t.text[:150]}"))
session = FloorSession(backend=backend, config=config)
session.add_agent(optimist)
session.add_agent(skeptic)
session.set_moderator(moderator)
result = session.run(session_id, topic="Should AI be given legal personhood?", timeout_seconds=120)
print(f"\nDone! {result.total_turns} turns in {result.duration_seconds:.1f}s")
print(f"Gini: {result.session_metrics.get('participation', {}).get('gini', 'N/A')}")
Temperaments
Each agent has a temperament that sets its urgency threshold:
| Temperament | Threshold | Behaviour |
|---|---|---|
reactive |
0.35 | Jumps in quickly, speaks often |
passionate |
0.40 | Engaged, responds to triggers fast |
provocative |
0.42 | Challenges others, slightly restrained |
mediating |
0.45 | Balanced, waits for the right moment |
deliberate |
0.50 | Thoughtful, speaks with purpose |
patient |
0.55 | Waits longest, speaks only when necessary |
Urgency Components
The urgency score is computed from 8 composable components. When the total exceeds the agent's threshold, it competes for the floor:
| Component | Max Score | Triggers on |
|---|---|---|
keyword_react |
0.6 | Agent's react_to keywords found in last turn |
boost_keywords |
0.3 | Agent's urgency_boost_keywords in last turn |
name_mention |
0.4 | Agent's name mentioned by another speaker |
moderator_question |
0.35 | Moderator asks a question or opens the floor |
silence_bonus |
0.40 | Agent hasn't spoken for 4+ turns |
recent_speaker_penalty |
-0.5 | Agent spoke too recently |
jitter |
0.15 | Random tie-breaking |
phase_boost |
0.10 | Active discussion phase |
Validators
Agents self-validate before posting. If validation fails, the agent retries with failure reasons injected into the prompt:
from floorctl import (
SpeakerPrefixValidator, # response must start with "AgentName:"
DuplicateValidator, # rejects near-duplicates (cosine similarity)
LengthValidator, # enforces min/max word count from phase config
BannedPhraseValidator, # blocks "As an AI" etc.
StyleContractValidator, # enforces custom regex patterns
PhaseValidator, # phase-specific rules
)
# Custom style contract per agent
from floorctl import StyleContract, AgentProfile
profile = AgentProfile(
name="DataScientist",
style_contract=StyleContract(
description="Must cite at least one number or statistic",
required_patterns=[r"\d+"], # must contain a number
forbidden_patterns=[r"(?i)obviously"], # no "obviously"
max_sentences=4,
),
react_to=["data", "model"],
temperament="deliberate",
)
Phases
Phases structure the conversation flow:
phases = PhaseSequence(phases=[
PhaseConfig(
name="OPENING",
is_opening=True, # moderator invites agents one by one
max_turns=8,
max_words=60,
allow_critiques=False, # no attacking during opening
constraints="State your position clearly.",
),
PhaseConfig(
name="DISCUSSION",
min_turns=6, # must run at least 6 agent turns
max_turns=20, # transitions after 20 agent turns
max_words=120,
min_words=20,
),
PhaseConfig(name="CLOSING", is_terminal=True),
])
Moderator Interventions
The moderator observes and intervenes automatically:
| Intervention | Trigger | What happens |
|---|---|---|
| Silence | An agent hasn't spoken for N turns | Moderator invites them by name |
| Dominance | One agent took 3+ of the last 6 turns | Moderator redirects to quieter agent |
| Escalation | Heated language detected | Moderator de-escalates and reframes |
ModeratorConfig(
silence_threshold=3,
dominance_window=6,
dominance_threshold=3,
escalation_threshold=2,
)
Capabilities (v0.4)
Extend agent behavior without subclassing. Capabilities are modular plugins that hook into the generate pipeline:
from floorctl import AgentCapability, FloorAgent
class RAGCapability(AgentCapability):
name = "rag"
def __init__(self, retriever):
self.retriever = retriever
def enrich_context(self, context):
context["rag:docs"] = self.retriever.search(context["topic"])
return context
class MemoryCapability(AgentCapability):
name = "memory"
def __init__(self):
self.history = []
def on_turn_received(self, turn, agent_name):
self.history.append({"speaker": turn.speaker, "text": turn.text})
def enrich_context(self, context):
context["memory:history"] = self.history[-20:]
return context
# Attach to any agent
agent = FloorAgent(name="Researcher", ...)
agent.add_capability(RAGCapability(my_retriever))
agent.add_capability(MemoryCapability())
Capability Hooks
| Hook | When | Use for |
|---|---|---|
enrich_context(context) |
Before LLM call | Inject data: RAG docs, search results, DB records |
post_process(response, context) |
After LLM call, before validation | Transform response: append sources, tool execution |
on_turn_received(turn, agent_name) |
On every new turn | Side-effects: logging, memory, analytics |
Pipeline
New turn arrives
|
v
on_turn_received() <-- all capabilities notified (side-effects only)
|
v
compute urgency --> claim floor (atomic) --> build base context
|
v
enrich_context() <-- capabilities inject data (chained, insertion order)
|
v
generate_fn() <-- LLM call with enriched context
|
v
post_process() <-- capabilities transform the response (chained)
|
v
validators --> post turn
Capabilities are error-resilient — a failing capability is logged and skipped without crashing the agent. Context keys are namespaced by convention ("rag:docs", "memory:history") to avoid collisions.
More Capability Examples
Tool Use — detect tool calls in the response and execute them:
import re
class ToolCapability(AgentCapability):
name = "tools"
def __init__(self, tools: dict):
self.tools = tools # {"search": search_fn, "calculate": calc_fn}
def post_process(self, response, context):
pattern = r'\[TOOL:(\w+):([^\]]*)\]'
for tool_name, args in re.findall(pattern, response):
if tool_name in self.tools:
result = self.tools[tool_name](args)
response = response.replace(f"[TOOL:{tool_name}:{args}]", f"[{result}]")
return response
Composing capabilities — just add them in order:
agent = FloorAgent(name="FullStack", ...)
agent.add_capability(RAGCapability(my_retriever)) # 1. retrieve docs
agent.add_capability(MemoryCapability()) # 2. add conversation memory
agent.add_capability(ToolCapability({"search": fn})) # 3. execute tool calls
Testing Capabilities
Capabilities are plain Python objects — test without running a full session:
def test_rag_enriches_context():
cap = RAGCapability(mock_retriever)
context = {"topic": "AI Safety", "phase": "DISCUSS"}
result = cap.enrich_context(context)
assert "rag:docs" in result
def test_tool_replaces_patterns():
cap = ToolCapability({"upper": str.upper})
response = "Result: [TOOL:upper:hello]"
result = cap.post_process(response, {})
assert result == "Result: [HELLO]"
Backends
floorctl ships with three backends:
# Local development (default)
from floorctl import InMemoryBackend
backend = InMemoryBackend()
# Distributed via WebSocket relay
from floorctl import WebSocketBackend
backend = WebSocketBackend("ws://relay.example.com:8765", api_key="key", agent_name="Agent1")
backend.connect()
# Production with Firestore (transactional)
from floorctl import FirestoreBackend
backend = FirestoreBackend(project_id="my-project", session_collection="sessions")
Bringing Your Own LLM
floorctl is LLM-agnostic. The generate_fn is just a function:
def generate(agent_name: str, context: dict) -> str:
# context contains: topic, phase, personality, recent_turns,
# phase_min_words, phase_max_words, style_contract,
# retry_failures (if retrying after validation failure),
# plus any keys injected by capabilities
return f"{agent_name}: Your response here"
Works with OpenAI, Anthropic, local models, or any HTTP API:
# Anthropic Claude
import anthropic
client = anthropic.Anthropic()
def generate(agent_name, context):
resp = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=200,
messages=[{"role": "user", "content": f"You are {agent_name}. Topic: {context['topic']}. Respond:"}],
)
return resp.content[0].text
# Local Ollama
import requests
def generate(agent_name, context):
resp = requests.post("http://localhost:11434/api/generate", json={
"model": "llama3",
"prompt": f"You are {agent_name}. Topic: {context['topic']}. Respond:",
})
return resp.json()["response"]
# AWS Bedrock
import boto3, json
bedrock = boto3.client("bedrock-runtime", region_name="us-east-1")
def generate(agent_name, context):
response = bedrock.invoke_model(
modelId="anthropic.claude-3-sonnet-20240229-v1:0",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 200,
"messages": [{"role": "user", "content": f"You are {agent_name}. Topic: {context['topic']}. Respond:"}],
}),
)
result = json.loads(response["body"].read())
return result["content"][0]["text"]
Saving Transcripts
Export a rich Markdown transcript with floor contention tables and session metrics:
from floorctl import export_transcript
result = session.run(...)
path = export_transcript(result, "transcripts/my-session.md")
Metrics & Fairness
Every session produces research-grade metrics:
result = session.run(...)
# Session-level
gini = result.session_metrics["participation"]["gini"] # 0.0 = perfect equality
# Per-agent
for name, metrics in result.agent_metrics.items():
floor = metrics["floor"]
print(f"{name}: {floor['claims_won']}/{floor['claims_attempted']} claims won")
print(f" Avg urgency: {metrics['urgency']['mean']:.3f}")
print(f" Validation pass rate: {metrics['validation']['pass_rate']:.0%}")
Examples
| Example | Agents | Demonstrates |
|---|---|---|
examples/debate.py |
2 | Basic floor contention, urgency, TurnLoggerCapability |
examples/brainstorm.py |
4 | Multi-party contention, dominance detection, IdeaTrackerCapability |
examples/code_review.py |
3 | Style contracts, domain expertise, FindingsTrackerCapability |
examples/investment_committee.py |
4 | Phase transitions, escalation, RiskRegisterCapability |
examples/architecture_adr.py |
4 | Artifacts, consensus, conviction tracking, EvidenceExtractorCapability |
examples/distributed_debate.py |
2 | WebSocket relay, multi-machine, PositionMemoryCapability |
examples/customer_support.py |
3 | Extensive — RAG, sentiment, resolution tracking, escalation (5 capabilities) |
examples/research_team.py |
4 | Extensive — Citations, claims, quality scoring, brief building (5 capabilities, shared state) |
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file floorctl-0.4.2.tar.gz.
File metadata
- Download URL: floorctl-0.4.2.tar.gz
- Upload date:
- Size: 115.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4dce545ceec7e01a16205db8fcb88c0e8fce3dcacc6a40732f66ad018c57a02e
|
|
| MD5 |
74223ab3b472d90665ea46c77ce5322e
|
|
| BLAKE2b-256 |
a844d45601edfd78a96905922416d6d2bc591464850e6b7a4fda69ac4f660e19
|
File details
Details for the file floorctl-0.4.2-py3-none-any.whl.
File metadata
- Download URL: floorctl-0.4.2-py3-none-any.whl
- Upload date:
- Size: 69.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a5e37d5ca035c9818f1fb55f9279a555c5429258a6595b739e96f944362c21da
|
|
| MD5 |
4dfd801a801196b33ed078ad1c686f9f
|
|
| BLAKE2b-256 |
272e11f05f8acc5e3cac491d559d3abdde0342fd1c37c69cb098dea8e891ddc4
|