Skip to main content

Contention-based multi-agent coordination with transactional floor control

Project description

floorctl

Contention-based multi-agent coordination with transactional floor control.

A Python framework for building multi-agent systems where agents compete for a shared floor rather than being orchestrated by a central controller.

Key Innovations

  • Transactional Floor Control: Atomic claim/release ensures only one agent speaks at a time
  • Urgency-Based Scheduling: Agents compute urgency scores to decide when to compete
  • Self-Validating Agents: Each agent validates its own output before posting
  • Reactive Moderator: Observer pattern — watches and intervenes, doesn't control
  • Modular Capabilities: Extend agents with RAG, memory, tools, and custom hooks — no subclassing
  • Research-Grade Metrics: Gini coefficient, floor dynamics, validation rates

Install

pip install floorctl               # core library (zero dependencies)
pip install "floorctl[openai]"     # + OpenAI for LLM-powered agents
pip install "floorctl[websocket]"  # + WebSocket for distributed sessions
pip install "floorctl[firestore]"  # + Firebase Firestore for production
pip install "floorctl[all]"        # all optional backends
pip install "floorctl[dev]"        # + pytest, mypy, ruff for development

Quick Start

from floorctl import FloorAgent, FloorSession, AgentProfile, InMemoryBackend

def my_llm(agent_name, context):
    # Your LLM call here
    return f"{agent_name}: My response about {context['topic']}"

backend = InMemoryBackend()

agent1 = FloorAgent(
    name="Optimist",
    profile=AgentProfile(name="Optimist", react_to=["problem", "risk"], temperament="passionate"),
    generate_fn=my_llm,
    backend=backend,
)
agent2 = FloorAgent(
    name="Skeptic",
    profile=AgentProfile(name="Skeptic", react_to=["opportunity", "vision"], temperament="reactive"),
    generate_fn=my_llm,
    backend=backend,
)

session = FloorSession(backend=backend)
session.add_agent(agent1)
session.add_agent(agent2)
result = session.run("debate-001", topic="Should we colonize Mars?")

Core Concepts

Concept What it does
FloorAgent An autonomous agent that listens, computes urgency, claims the floor, generates a response, self-validates, and posts
FloorSession Orchestrates the lifecycle — starts agents in threads, runs the moderator, collects metrics
ModeratorObserver Watches the conversation and intervenes only on anomalies (silence, dominance, escalation)
Backend The coordination substrate — atomic floor claims, turn posting, pub/sub. Swap between InMemory, WebSocket, or Firestore
Urgency Scorer 8 composable components that decide whether an agent should compete for the floor
Validators Self-validation pipeline — agents check their own output before posting
Capabilities Modular plugins (RAG, memory, tools) that hook into the generate pipeline without subclassing

Full Example with OpenAI + Moderator

import os
from openai import OpenAI
from floorctl import (
    FloorAgent, FloorSession, ModeratorObserver,
    AgentProfile, ArenaConfig, PhaseConfig, PhaseSequence,
    FloorConfig, ModeratorConfig, InMemoryBackend,
    SpeakerPrefixValidator, DuplicateValidator,
    LengthValidator, BannedPhraseValidator,
)

client = OpenAI()  # uses OPENAI_API_KEY from env

# --- Generator: wraps any LLM ---
def generate(agent_name: str, context: dict) -> str:
    retry_info = ""
    if context.get("retry_failures"):
        retry_info = "\nFix these issues:\n" + "\n".join(
            f"- {f}" for f in context["retry_failures"]
        )

    prompt = f"""You are {agent_name}.
Personality: {context.get('personality', '')}
Topic: {context['topic']}
Phase: {context['phase']}

Recent discussion:
{context.get('recent_turns', '(none)')}

RULES:
- Prefix response with "{agent_name}:"
- {context.get('phase_min_words', 15)}-{context.get('phase_max_words', 80)} words
- Be substantive, no filler
{retry_info}

Respond:"""

    resp = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        max_tokens=200, temperature=0.8,
    )
    return resp.choices[0].message.content.strip()


# --- Moderator ---
def moderate(prompt_type: str, context: dict) -> str:
    prompts = {
        "intro": f"You moderate a discussion on '{context.get('topic')}' with {context.get('agents')}. 2-sentence intro.",
        "invite_opening": f"Invite {context.get('agent_name')} to share their view. 1 sentence.",
        "phase_transition": f"Transition from {context.get('previous_phase')} to {context.get('phase')}. 2 sentences.",
        "intervention": f"Moderate: {context.get('intervention_type')} detected for {context.get('target_agent')}. 1 sentence.",
        "closing": f"Close the discussion on '{context.get('topic')}'. 2 sentences.",
    }
    resp = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompts.get(prompt_type, "Brief comment.")}],
        max_tokens=150, temperature=0.7,
    )
    return resp.choices[0].message.content.strip()


# --- Setup ---
backend = InMemoryBackend()

phases = PhaseSequence(phases=[
    PhaseConfig(name="IDEAS", is_opening=True, max_turns=12, max_words=80, min_words=10),
    PhaseConfig(name="DEBATE", min_turns=6, max_turns=16, max_words=120, min_words=20),
    PhaseConfig(name="CLOSING", is_terminal=True),
])

config = ArenaConfig(
    phases=phases,
    floor=FloorConfig(timeout_seconds=30, min_turns_between_speaking=1),
    moderator=ModeratorConfig(silence_threshold=3, dominance_threshold=3),
    banned_phrases=["As an AI", "Let's face it"],
    max_total_turns=20,
)

validators = [
    SpeakerPrefixValidator(),
    DuplicateValidator(),
    LengthValidator(),
    BannedPhraseValidator(banned_phrases=config.banned_phrases),
]

optimist = FloorAgent(
    name="Optimist",
    profile=AgentProfile(
        name="Optimist",
        personality="Bold futurist. Thinks in moonshots.",
        react_to=["risk", "problem", "can't", "impossible"],
        temperament="passionate",
        urgency_boost_keywords=["future", "opportunity", "transform"],
    ),
    generate_fn=generate, backend=backend,
    validators=validators, config=config,
)

skeptic = FloorAgent(
    name="Skeptic",
    profile=AgentProfile(
        name="Skeptic",
        personality="Pragmatic engineer. Argues with data.",
        react_to=["dream", "vision", "should", "must"],
        temperament="reactive",
        urgency_boost_keywords=["cost", "evidence", "precedent"],
    ),
    generate_fn=generate, backend=backend,
    validators=validators, config=config,
)

session_id = "demo-001"
backend.create_session(session_id, {
    "topic": "Should AI be given legal personhood?",
    "phase": "IDEAS",
    "participants": ["Optimist", "Skeptic"],
})

moderator = ModeratorObserver(
    agent_names=["Optimist", "Skeptic"],
    moderator_fn=moderate,
    backend=backend,
    session_id=session_id,
    phase_sequence=phases,
    config=config.moderator,
)

backend.subscribe_turns(session_id, lambda t: print(f"[{t.speaker}] {t.text[:150]}"))

session = FloorSession(backend=backend, config=config)
session.add_agent(optimist)
session.add_agent(skeptic)
session.set_moderator(moderator)

result = session.run(session_id, topic="Should AI be given legal personhood?", timeout_seconds=120)

print(f"\nDone! {result.total_turns} turns in {result.duration_seconds:.1f}s")
print(f"Gini: {result.session_metrics.get('participation', {}).get('gini', 'N/A')}")

Temperaments

Each agent has a temperament that sets its urgency threshold:

Temperament Threshold Behaviour
reactive 0.35 Jumps in quickly, speaks often
passionate 0.40 Engaged, responds to triggers fast
provocative 0.42 Challenges others, slightly restrained
mediating 0.45 Balanced, waits for the right moment
deliberate 0.50 Thoughtful, speaks with purpose
patient 0.55 Waits longest, speaks only when necessary

Urgency Components

The urgency score is computed from 8 composable components. When the total exceeds the agent's threshold, it competes for the floor:

Component Max Score Triggers on
keyword_react 0.6 Agent's react_to keywords found in last turn
boost_keywords 0.3 Agent's urgency_boost_keywords in last turn
name_mention 0.4 Agent's name mentioned by another speaker
moderator_question 0.35 Moderator asks a question or opens the floor
silence_bonus 0.40 Agent hasn't spoken for 4+ turns
recent_speaker_penalty -0.5 Agent spoke too recently
jitter 0.15 Random tie-breaking
phase_boost 0.10 Active discussion phase

Validators

Agents self-validate before posting. If validation fails, the agent retries with failure reasons injected into the prompt:

from floorctl import (
    SpeakerPrefixValidator,   # response must start with "AgentName:"
    DuplicateValidator,       # rejects near-duplicates (cosine similarity)
    LengthValidator,          # enforces min/max word count from phase config
    BannedPhraseValidator,    # blocks "As an AI" etc.
    StyleContractValidator,   # enforces custom regex patterns
    PhaseValidator,           # phase-specific rules
)

# Custom style contract per agent
from floorctl import StyleContract, AgentProfile

profile = AgentProfile(
    name="DataScientist",
    style_contract=StyleContract(
        description="Must cite at least one number or statistic",
        required_patterns=[r"\d+"],           # must contain a number
        forbidden_patterns=[r"(?i)obviously"], # no "obviously"
        max_sentences=4,
    ),
    react_to=["data", "model"],
    temperament="deliberate",
)

Phases

Phases structure the conversation flow:

phases = PhaseSequence(phases=[
    PhaseConfig(
        name="OPENING",
        is_opening=True,         # moderator invites agents one by one
        max_turns=8,
        max_words=60,
        allow_critiques=False,   # no attacking during opening
        constraints="State your position clearly.",
    ),
    PhaseConfig(
        name="DISCUSSION",
        min_turns=6,             # must run at least 6 agent turns
        max_turns=20,            # transitions after 20 agent turns
        max_words=120,
        min_words=20,
    ),
    PhaseConfig(name="CLOSING", is_terminal=True),
])

Moderator Interventions

The moderator observes and intervenes automatically:

Intervention Trigger What happens
Silence An agent hasn't spoken for N turns Moderator invites them by name
Dominance One agent took 3+ of the last 6 turns Moderator redirects to quieter agent
Escalation Heated language detected Moderator de-escalates and reframes
ModeratorConfig(
    silence_threshold=3,
    dominance_window=6,
    dominance_threshold=3,
    escalation_threshold=2,
)

Capabilities (v0.4)

Extend agent behavior without subclassing. Capabilities are modular plugins that hook into the generate pipeline:

from floorctl import AgentCapability, FloorAgent

class RAGCapability(AgentCapability):
    name = "rag"

    def __init__(self, retriever):
        self.retriever = retriever

    def enrich_context(self, context):
        context["rag:docs"] = self.retriever.search(context["topic"])
        return context

class MemoryCapability(AgentCapability):
    name = "memory"

    def __init__(self):
        self.history = []

    def on_turn_received(self, turn, agent_name):
        self.history.append({"speaker": turn.speaker, "text": turn.text})

    def enrich_context(self, context):
        context["memory:history"] = self.history[-20:]
        return context

# Attach to any agent
agent = FloorAgent(name="Researcher", ...)
agent.add_capability(RAGCapability(my_retriever))
agent.add_capability(MemoryCapability())

Capability Hooks

Hook When Use for
enrich_context(context) Before LLM call Inject data: RAG docs, search results, DB records
post_process(response, context) After LLM call, before validation Transform response: append sources, tool execution
on_turn_received(turn, agent_name) On every new turn Side-effects: logging, memory, analytics

Pipeline

New turn arrives
  |
  v
on_turn_received()        <-- all capabilities notified (side-effects only)
  |
  v
compute urgency  -->  claim floor (atomic)  -->  build base context
  |
  v
enrich_context()          <-- capabilities inject data (chained, insertion order)
  |
  v
generate_fn()             <-- LLM call with enriched context
  |
  v
post_process()            <-- capabilities transform the response (chained)
  |
  v
validators  -->  post turn

Capabilities are error-resilient — a failing capability is logged and skipped without crashing the agent. Context keys are namespaced by convention ("rag:docs", "memory:history") to avoid collisions.

More Capability Examples

Tool Use — detect tool calls in the response and execute them:

import re

class ToolCapability(AgentCapability):
    name = "tools"

    def __init__(self, tools: dict):
        self.tools = tools  # {"search": search_fn, "calculate": calc_fn}

    def post_process(self, response, context):
        pattern = r'\[TOOL:(\w+):([^\]]*)\]'
        for tool_name, args in re.findall(pattern, response):
            if tool_name in self.tools:
                result = self.tools[tool_name](args)
                response = response.replace(f"[TOOL:{tool_name}:{args}]", f"[{result}]")
        return response

Composing capabilities — just add them in order:

agent = FloorAgent(name="FullStack", ...)
agent.add_capability(RAGCapability(my_retriever))     # 1. retrieve docs
agent.add_capability(MemoryCapability())               # 2. add conversation memory
agent.add_capability(ToolCapability({"search": fn}))   # 3. execute tool calls

Testing Capabilities

Capabilities are plain Python objects — test without running a full session:

def test_rag_enriches_context():
    cap = RAGCapability(mock_retriever)
    context = {"topic": "AI Safety", "phase": "DISCUSS"}
    result = cap.enrich_context(context)
    assert "rag:docs" in result

def test_tool_replaces_patterns():
    cap = ToolCapability({"upper": str.upper})
    response = "Result: [TOOL:upper:hello]"
    result = cap.post_process(response, {})
    assert result == "Result: [HELLO]"

Backends

floorctl ships with three backends:

# Local development (default)
from floorctl import InMemoryBackend
backend = InMemoryBackend()

# Distributed via WebSocket relay
from floorctl import WebSocketBackend
backend = WebSocketBackend("ws://relay.example.com:8765", api_key="key", agent_name="Agent1")
backend.connect()

# Production with Firestore (transactional)
from floorctl import FirestoreBackend
backend = FirestoreBackend(project_id="my-project", session_collection="sessions")

Bringing Your Own LLM

floorctl is LLM-agnostic. The generate_fn is just a function:

def generate(agent_name: str, context: dict) -> str:
    # context contains: topic, phase, personality, recent_turns,
    #                   phase_min_words, phase_max_words, style_contract,
    #                   retry_failures (if retrying after validation failure),
    #                   plus any keys injected by capabilities
    return f"{agent_name}: Your response here"

Works with OpenAI, Anthropic, local models, or any HTTP API:

# Anthropic Claude
import anthropic
client = anthropic.Anthropic()

def generate(agent_name, context):
    resp = client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=200,
        messages=[{"role": "user", "content": f"You are {agent_name}. Topic: {context['topic']}. Respond:"}],
    )
    return resp.content[0].text

# Local Ollama
import requests

def generate(agent_name, context):
    resp = requests.post("http://localhost:11434/api/generate", json={
        "model": "llama3",
        "prompt": f"You are {agent_name}. Topic: {context['topic']}. Respond:",
    })
    return resp.json()["response"]

# AWS Bedrock
import boto3, json

bedrock = boto3.client("bedrock-runtime", region_name="us-east-1")

def generate(agent_name, context):
    response = bedrock.invoke_model(
        modelId="anthropic.claude-3-sonnet-20240229-v1:0",
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 200,
            "messages": [{"role": "user", "content": f"You are {agent_name}. Topic: {context['topic']}. Respond:"}],
        }),
    )
    result = json.loads(response["body"].read())
    return result["content"][0]["text"]

Saving Transcripts

Export a rich Markdown transcript with floor contention tables and session metrics:

from floorctl import export_transcript

result = session.run(...)
path = export_transcript(result, "transcripts/my-session.md")

Metrics & Fairness

Every session produces research-grade metrics:

result = session.run(...)

# Session-level
gini = result.session_metrics["participation"]["gini"]  # 0.0 = perfect equality

# Per-agent
for name, metrics in result.agent_metrics.items():
    floor = metrics["floor"]
    print(f"{name}: {floor['claims_won']}/{floor['claims_attempted']} claims won")
    print(f"  Avg urgency: {metrics['urgency']['mean']:.3f}")
    print(f"  Validation pass rate: {metrics['validation']['pass_rate']:.0%}")

Examples

Example Agents Demonstrates
examples/debate.py 2 Basic floor contention, urgency, TurnLoggerCapability
examples/brainstorm.py 4 Multi-party contention, dominance detection, IdeaTrackerCapability
examples/code_review.py 3 Style contracts, domain expertise, FindingsTrackerCapability
examples/investment_committee.py 4 Phase transitions, escalation, RiskRegisterCapability
examples/architecture_adr.py 4 Artifacts, consensus, conviction tracking, EvidenceExtractorCapability
examples/distributed_debate.py 2 WebSocket relay, multi-machine, PositionMemoryCapability
examples/customer_support.py 3 Extensive — RAG, sentiment, resolution tracking, escalation (5 capabilities)
examples/research_team.py 4 Extensive — Citations, claims, quality scoring, brief building (5 capabilities, shared state)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

floorctl-0.4.2.tar.gz (115.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

floorctl-0.4.2-py3-none-any.whl (69.0 kB view details)

Uploaded Python 3

File details

Details for the file floorctl-0.4.2.tar.gz.

File metadata

  • Download URL: floorctl-0.4.2.tar.gz
  • Upload date:
  • Size: 115.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.3

File hashes

Hashes for floorctl-0.4.2.tar.gz
Algorithm Hash digest
SHA256 4dce545ceec7e01a16205db8fcb88c0e8fce3dcacc6a40732f66ad018c57a02e
MD5 74223ab3b472d90665ea46c77ce5322e
BLAKE2b-256 a844d45601edfd78a96905922416d6d2bc591464850e6b7a4fda69ac4f660e19

See more details on using hashes here.

File details

Details for the file floorctl-0.4.2-py3-none-any.whl.

File metadata

  • Download URL: floorctl-0.4.2-py3-none-any.whl
  • Upload date:
  • Size: 69.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.3

File hashes

Hashes for floorctl-0.4.2-py3-none-any.whl
Algorithm Hash digest
SHA256 a5e37d5ca035c9818f1fb55f9279a555c5429258a6595b739e96f944362c21da
MD5 4dfd801a801196b33ed078ad1c686f9f
BLAKE2b-256 272e11f05f8acc5e3cac491d559d3abdde0342fd1c37c69cb098dea8e891ddc4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page