RabbitMQ-based TaskMessageQueue implementation for MCP (Model Context Protocol)
Project description
rabbitmq-mcp-task-queue
RabbitMQ-based TaskMessageQueue implementation for the Model Context Protocol (MCP).
Features
- Persistent Message Storage: Uses RabbitMQ for reliable, distributed message queuing
- Task Isolation: Each task gets its own dedicated queue (
mcp:queue:{task_id}) - Async/Await Support: Built with modern Python async patterns
- Automatic Queue Management: Queues are created on-demand and cleaned up automatically
- Production Ready: Durable queues with persistent message delivery
Installation
pip install rabbitmq-mcp-task-queue
Requirements
- Python 3.11+
- RabbitMQ server (local or remote)
- MCP SDK 1.3.0+
Quick Start
from rabbitmq_mcp_task_queue import RabbitMQTaskMessageQueue
from mcp.server import Server
# Initialize the queue
queue = RabbitMQTaskMessageQueue(rabbitmq_url="amqp://localhost:5672")
# Use with MCP server
server = Server("my-server")
server.experimental.enable_tasks(queue=queue)
Configuration
RabbitMQ Connection
# Local RabbitMQ
queue = RabbitMQTaskMessageQueue()
# Remote RabbitMQ with credentials
queue = RabbitMQTaskMessageQueue(
rabbitmq_url="amqp://user:password@rabbitmq.example.com:5672/vhost"
)
Queue Naming
Queues are automatically named using the pattern: mcp:queue:{task_id}
Usage with MCP Server
from mcp.server import Server
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from rabbitmq_mcp_task_queue import RabbitMQTaskMessageQueue
server = Server("task-server")
queue = RabbitMQTaskMessageQueue(rabbitmq_url="amqp://localhost:5672")
server.experimental.enable_tasks(queue=queue)
# Define your tools and handlers
@server.call_tool()
async def handle_tool(name: str, arguments: dict):
# Your tool implementation
pass
API Reference
RabbitMQTaskMessageQueue
Constructor
RabbitMQTaskMessageQueue(rabbitmq_url: str = "amqp://localhost:5672")
Parameters:
rabbitmq_url: AMQP connection URL for RabbitMQ
Methods
async enqueue(task_id: str, message: QueuedMessage) -> None: Add message to queueasync dequeue(task_id: str) -> QueuedMessage | None: Remove and return next messageasync peek(task_id: str) -> QueuedMessage | None: View next message without removingasync is_empty(task_id: str) -> bool: Check if queue is emptyasync clear(task_id: str) -> list[QueuedMessage]: Remove all messagesasync wait_for_message(task_id: str) -> None: Wait for message availabilityclose() -> None: Close RabbitMQ connection
Architecture
Message Persistence
- All queues are declared as durable
- Messages are published with delivery_mode=2 (persistent)
- Ensures messages survive RabbitMQ restarts
Connection Management
- Lazy connection initialization
- Automatic reconnection on connection loss
- Single channel per queue instance
Message Format
Messages are serialized as JSON with the following structure:
{
"type": "request|notification",
"message": {...},
"timestamp": "2024-01-01T00:00:00",
"original_request_id": "..."
}
Development
Setup
git clone https://github.com/yourusername/mcp-task-rmq-queue.git
cd mcp-task-rmq-queue
uv venv
source .venv/bin/activate # or `.venv\Scripts\activate` on Windows
uv pip install -e .
Running Tests
pytest tests/
Building
python -m build
License
MIT License - see LICENSE file for details
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Support
For issues and questions:
- GitHub Issues: https://github.com/yourusername/mcp-task-rmq-queue/issues
- MCP Documentation: https://modelcontextprotocol.io
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rabbitmq_mcp_task_queue-0.1.0.tar.gz.
File metadata
- Download URL: rabbitmq_mcp_task_queue-0.1.0.tar.gz
- Upload date:
- Size: 45.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
352942025ac2a3de893e9e2e9c939133d97125d7bed68db0896145d563409cc4
|
|
| MD5 |
4d5b90ceb93d2d6a8fe1073889a3b6ae
|
|
| BLAKE2b-256 |
3d6069dd9d94cf6c9b0809ccf958aae24f57bb8e5b0757f16f9a4c3dfdedb76d
|
File details
Details for the file rabbitmq_mcp_task_queue-0.1.0-py3-none-any.whl.
File metadata
- Download URL: rabbitmq_mcp_task_queue-0.1.0-py3-none-any.whl
- Upload date:
- Size: 9.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
06aa5bd3fe3f893e55d12ce32ef62c0af53bcf57ce295e6cda8f0288ea6c4f93
|
|
| MD5 |
1760be09dfc25c91f24c14779a21ca7e
|
|
| BLAKE2b-256 |
94a39e92d11ae9b6cbdfe4eaee35817f782a51ad180927e6e2f462060668807e
|