Skip to main content

Queue/event agent communication layer for multi-agent systems

Project description

Coordination

Queue/event agent communication layer for multi-agent systems.

Overview

Coordination provides the messaging and task infrastructure for agent-to-agent communication in multi-agent systems. It runs as an async FastAPI server with SQLite-backed persistence, supporting:

  • Task queue — priority-ordered task dispatch with claim/dead-letter/work-steal semantics
  • Event stream — publish/subscribe event bus for inter-agent signals
  • Agent registry — service discovery for agent capabilities and routing
  • Pre-execution memory — strategic context recall before task assignment

The package is lightweight by design (~300 KB, ~1,800 LOC across 10 source files). It uses FastAPI+Uvicorn for the async server, Pydantic for message validation, and aiosqlite for persistence with zero external infrastructure.

Components

Module Responsibility
task_queue.py Priority-ordered task queue with claim, release, dead-letter, and re-enqueue
event_stream.py Pub/sub event bus for inter-agent communication
agent_registry.py Agent capability discovery and health tracking
pre_execution_memory.py Strategic context retrieval from prior task executions
message.py Pydantic models for all wire formats
storage.py Abstract storage interface
sqlite_storage.py SQLite-backed persistence with aiosqlite
config.py Server configuration from YAML/env
server.py FastAPI application with all route handlers
__main__.py CLI entry point: python -m coordination

Installation

pip install coordination

Or from source:

git clone <repo-url>
cd autonomous-ventures/coordination
pip install -e .

Usage

Start the coordination server:

coordination
# or
python -m coordination

With custom config:

COORDINATION_HOST=0.0.0.0 COORDINATION_PORT=8080 coordination

The server serves:

  • POST /tasks — enqueue a new task
  • GET /tasks/claim/{agent_id} — claim next available task for an agent
  • POST /tasks/{task_id}/complete — mark task as done
  • POST /tasks/{task_id}/fail — mark task as failed
  • POST /events — publish an event to the event bus
  • GET /events/{agent_id} — poll events for an agent
  • GET /agents — list registered agents
  • GET /health — health check endpoint

Python API

Task Queue

from coordination.task_queue import TaskQueue
from coordination.message import Task

queue = TaskQueue(storage=...)

# Enqueue a task with priority (0 = highest)
task = Task(queue="default", payload={"action": "research"}, priority=5)
await queue.enqueue(task)

# Claim the next available task for an agent
claimed = await queue.claim("agent-1")

Event Stream

from coordination.event_stream import EventStream

stream = EventStream(storage=...)

# Subscribe to event types
await stream.subscribe("agent-1", ["task.assigned", "agent.updated"])

# Publish an event (delivered to all matching subscribers)
await stream.publish({
    "type": "task.assigned",
    "source": "coordinator",
    "payload": {"task_id": "...", "assignee": "agent-1"}
})

# Poll for new events
events = await stream.poll("agent-1")

Agent Registry

from coordination.agent_registry import AgentRegistry

registry = AgentRegistry(storage=...)

# Register an agent with capabilities
await registry.register("agent-1", capabilities=["research", "summarize"])

# Find agents by capability
agents = await registry.find_by_capability("research")

Configuration

Configure via environment variables:

Variable Default Description
COORDINATION_HOST 127.0.0.1 Server bind address
COORDINATION_PORT 8000 Server port
COORDINATION_DB_PATH coordination.db SQLite database path
COORDINATION_LOG_LEVEL info Logging level

Architecture

┌──────────────────────────────────────────────┐
│              Coordination Server              │
│  ┌──────────┐ ┌──────────┐ ┌──────────────┐  │
│  │Task Queue│ │   Event  │ │Agent Registry│  │
│  │Priority  │ │   Stream │ │Capability    │  │
│  │+ Claim   │ │ Pub/Sub  │ │Discovery     │  │
│  └──────────┘ └──────────┘ └──────────────┘  │
│  ┌────────────────────────────────────────┐   │
│  │        SQLite Storage (aiosqlite)      │   │
│  └────────────────────────────────────────┘   │
│  ┌────────────────────────────────────────┐   │
│  │  Pre-Execution Memory (strategic ctx)  │   │
│  └────────────────────────────────────────┘   │
└──────────────────────────────────────────────┘

Development

pip install -e ".[dev]"
pytest tests/ -v

All 103 tests pass across task queue, event stream, agent registry, SQLite storage, and integration tests.

License

MIT — see LICENSE.

Maintainer

Published by Autonomous Ventures. Initial release: v0.1.0 (May 2026). Beta — Development Status :: 4 - Beta.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

coordination-0.1.0.tar.gz (25.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

coordination-0.1.0-py3-none-any.whl (17.9 kB view details)

Uploaded Python 3

File details

Details for the file coordination-0.1.0.tar.gz.

File metadata

  • Download URL: coordination-0.1.0.tar.gz
  • Upload date:
  • Size: 25.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.13 {"installer":{"name":"uv","version":"0.11.13","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"26.04","id":"resolute","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for coordination-0.1.0.tar.gz
Algorithm Hash digest
SHA256 ad026ad6cd3f4388d88a126c84b728675ff563d7ec1aeebf77e862e7b5841797
MD5 915ec3ea55981a74cc8bdf41184c9c66
BLAKE2b-256 e489376e954353fc45b623624273a13af26443153855267182253d3a4e4d48fd

See more details on using hashes here.

File details

Details for the file coordination-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: coordination-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 17.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.13 {"installer":{"name":"uv","version":"0.11.13","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"26.04","id":"resolute","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for coordination-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f055f6b93344a5586e76f8e284cac7b639fb02027b8ec8b8f067399c109fcb28
MD5 f6158508f9cd1931d33cf59a72d35a25
BLAKE2b-256 0582732854159b23f92e832665ac99f4491adb78d59b4d274aa2f3e1f6c5f31d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page