Skip to main content

RabbitMQ-based TaskMessageQueue implementation for MCP (Model Context Protocol)

Project description

rabbitmq-mcp-task-queue

RabbitMQ-based TaskMessageQueue implementation for the Model Context Protocol (MCP).

Features

  • Persistent Message Storage: Uses RabbitMQ for reliable, distributed message queuing
  • Task Isolation: Each task gets its own dedicated queue (mcp:queue:{task_id})
  • Async/Await Support: Built with modern Python async patterns
  • Automatic Queue Management: Queues are created on-demand and cleaned up automatically
  • Production Ready: Durable queues with persistent message delivery

Installation

pip install rabbitmq-mcp-task-queue

Requirements

  • Python 3.11+
  • RabbitMQ server (local or remote)
  • MCP SDK 1.3.0+

Quick Start

from rabbitmq_mcp_task_queue import RabbitMQTaskMessageQueue
from mcp.server import Server

# Initialize the queue
queue = RabbitMQTaskMessageQueue(rabbitmq_url="amqp://localhost:5672")

# Use with MCP server
server = Server("my-server")
server.experimental.enable_tasks(queue=queue)

Configuration

RabbitMQ Connection

# Local RabbitMQ
queue = RabbitMQTaskMessageQueue()

# Remote RabbitMQ with credentials
queue = RabbitMQTaskMessageQueue(
    rabbitmq_url="amqp://user:password@rabbitmq.example.com:5672/vhost"
)

Queue Naming

Queues are automatically named using the pattern: mcp:queue:{task_id}

Usage with MCP Server

from mcp.server import Server
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from rabbitmq_mcp_task_queue import RabbitMQTaskMessageQueue

server = Server("task-server")
queue = RabbitMQTaskMessageQueue(rabbitmq_url="amqp://localhost:5672")

server.experimental.enable_tasks(queue=queue)

# Define your tools and handlers
@server.call_tool()
async def handle_tool(name: str, arguments: dict):
    # Your tool implementation
    pass

API Reference

RabbitMQTaskMessageQueue

Constructor

RabbitMQTaskMessageQueue(rabbitmq_url: str = "amqp://localhost:5672")

Parameters:

  • rabbitmq_url: AMQP connection URL for RabbitMQ

Methods

  • async enqueue(task_id: str, message: QueuedMessage) -> None: Add message to queue
  • async dequeue(task_id: str) -> QueuedMessage | None: Remove and return next message
  • async peek(task_id: str) -> QueuedMessage | None: View next message without removing
  • async is_empty(task_id: str) -> bool: Check if queue is empty
  • async clear(task_id: str) -> list[QueuedMessage]: Remove all messages
  • async wait_for_message(task_id: str) -> None: Wait for message availability
  • close() -> None: Close RabbitMQ connection

Architecture

Message Persistence

  • All queues are declared as durable
  • Messages are published with delivery_mode=2 (persistent)
  • Ensures messages survive RabbitMQ restarts

Connection Management

  • Lazy connection initialization
  • Automatic reconnection on connection loss
  • Single channel per queue instance

Message Format

Messages are serialized as JSON with the following structure:

{
  "type": "request|notification",
  "message": {...},
  "timestamp": "2024-01-01T00:00:00",
  "original_request_id": "..."
}

Development

Setup

git clone https://github.com/yourusername/mcp-task-rmq-queue.git
cd mcp-task-rmq-queue
uv venv
source .venv/bin/activate  # or `.venv\Scripts\activate` on Windows
uv pip install -e .

Running Tests

pytest tests/

Building

python -m build

License

MIT License - see LICENSE file for details

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Support

For issues and questions:

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rabbitmq_mcp_task_queue-0.1.0.tar.gz (45.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rabbitmq_mcp_task_queue-0.1.0-py3-none-any.whl (9.1 kB view details)

Uploaded Python 3

File details

Details for the file rabbitmq_mcp_task_queue-0.1.0.tar.gz.

File metadata

  • Download URL: rabbitmq_mcp_task_queue-0.1.0.tar.gz
  • Upload date:
  • Size: 45.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.12

File hashes

Hashes for rabbitmq_mcp_task_queue-0.1.0.tar.gz
Algorithm Hash digest
SHA256 352942025ac2a3de893e9e2e9c939133d97125d7bed68db0896145d563409cc4
MD5 4d5b90ceb93d2d6a8fe1073889a3b6ae
BLAKE2b-256 3d6069dd9d94cf6c9b0809ccf958aae24f57bb8e5b0757f16f9a4c3dfdedb76d

See more details on using hashes here.

File details

Details for the file rabbitmq_mcp_task_queue-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for rabbitmq_mcp_task_queue-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 06aa5bd3fe3f893e55d12ce32ef62c0af53bcf57ce295e6cda8f0288ea6c4f93
MD5 1760be09dfc25c91f24c14779a21ca7e
BLAKE2b-256 94a39e92d11ae9b6cbdfe4eaee35817f782a51ad180927e6e2f462060668807e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page