Skip to main content

MCP Callback Protocol extension for enabling asynchronous tool callbacks and streaming updates from MCP clients

Project description

MCPC - Model Context Protocol Callback

PyPI version License: MIT Python Versions

An extension to the MCP (Model-Context-Protocol) protocol that enables asynchronous real-time callbacks and streaming updates from MCP tools.

Quick Start

Prerequisites

MCPC extends the MCP protocol, so you need to have MCP installed first.

Installation

UV is the preferred package manager for installing MCPC due to its speed and reliability, but you can use any of your favorite package managers (pip, poetry, conda, etc.) to install and manage MCPC.

uv add mcpc

For projects using traditional pip:

pip install mcpc

Client Usage

from mcpc import MCPCHandler, MCPCMessage
from mcp import ClientSession
from mcp.client.stdio import stdio_client

# Define your event listener function
async def my_mcpc_listener(mcpc_message: MCPCMessage) -> None:
    print(f"Received MCPC message: {mcpc_message}")
    # Handle the message based on status
    if mcpc_message.type == "task" and mcpc_message.event == "complete":
        print(f"Task {mcpc_message.task_id} completed with result: {mcpc_message.result}")

# Initialize the MCPC handler
mcpc_handler = MCPCHandler("my-provider")

# Add your event listener for MCPCMessage
mcpc_handler.add_event_listener(my_mcpc_listener)

# In your connection logic:
async def connect_to_mcp():
    # Connect to MCP provider
    transport = await stdio_client(parameters)

    # Wrap the transport with MCPC event listeners
    wrapped_transport = await mcpc_handler.wrap_streams(*transport)

    # Create a ClientSession with the wrapped transport
    session = await ClientSession(*wrapped_transport)

    # Initialize the session
    await session.initialize()

    # Check if MCPC is supported
    mcpc_supported = await mcpc_handler.check_mcpc_support(session)
    if mcpc_supported:
        print(f"MCPC protocol v{mcpc_handler.protocol_version} supported")

    return session

# When calling tools, add MCPC metadata
async def run_tool(session, tool_name, tool_args, session_id):
    # Add MCPC metadata if supported
    enhanced_args = mcpc_handler.add_metadata(tool_args, session_id)

    # Call the tool with enhanced arguments
    return await session.call_tool(tool_name, enhanced_args)

Why MCPC Exists

I created MCPC to solve a critical limitation in LLM tool interactions: maintaining conversational flow while running background tasks.

The standard MCP protocol follows a synchronous request-response pattern, which blocks the conversation until a tool completes. This creates poor UX when:

  1. You want to chat with an LLM while a long-running task executes
  2. You need real-time progress updates from background operations
  3. You're running tasks that potentially continue forever (like monitoring)

MCPC addresses these limitations by enabling:

  • Continuous conversation with LLMs during tool execution
  • Real-time updates from background processes
  • Asynchronous notifications when operations complete
  • Support for indefinitely running tasks with streaming updates
  • LLMs can react to events and take action (e.g., "Database migration finished, let me verify the tables" or "File arrived, I'll start processing it")

For example, you might start a data processing task, continue discussing with the LLM about the expected results, receive progress updates throughout, and get notified when processing completes - all without interrupting the conversation flow.

MCPC also enables powerful interactive patterns that weren't possible before in MCP:

  • Modifying running tasks: You can adjust parameters or change the behavior of a task while it's running (e.g., "focus on this subset of data instead" or "I see that you're misunderstanding some relations, can you please parse the PDF first?")
  • Tool-initiated prompts: A tool can ask for clarification when it encounters ambiguity or needs additional input (e.g., "I found multiple matches, which one did you mean?" or "I need additional authorization to proceed")
  • Conversation branching: Start multiple background tasks and selectively respond to their updates while maintaining conversational context
  • Proactive AI Actions: Your MCP server can notify the LLM of events, allowing it to take action (e.g., "Database migration completed" → LLM runs verification query → "Table missing" → LLM starts targeted migration)

These capabilities create a much more natural interaction model where tools feel like collaborative participants in the conversation rather than black-box functions.

How MCPC Works

MCPC extends MCP by:

  1. Adding metadata to tool calls: Session and task identifiers
  2. Defining a message structure: Standardized format for callbacks
  3. Providing stream interception: Monitors I/O streams for MCPC messages
  4. Implementing task management: Handles background tasks and messaging

The protocol is fully backward compatible with MCP, allowing MCPC-enabled clients to work with standard MCP servers, and vice versa.

Server Implementation

For implementing MCPC in your MCP servers, use the MCPCHelper class to handle message creation, background tasks, and progress updates.

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from mcpc import MCPCHelper
import asyncio
import uuid

# Initialize MCPC helper
PROVIDER_NAME = "my-processor"
mcpc = MCPCHelper(PROVIDER_NAME)

async def serve():
    """Run the MCP server with MCPC support."""
    server = Server(PROVIDER_NAME)

    @server.list_tools()
    async def list_tools():
        return [
            Tool(
                name="process_data",
                description="Process data with real-time progress updates.",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "data_id": {"type": "string"},
                        "process_type": {"type": "string"}
                    },
                    "required": ["data_id"]
                }
            )
        ]

    @server.call_tool()
    async def call_tool(name, arguments):
        # Extract MCPC metadata
        metadata = arguments.pop("_metadata", {})
        session_id = metadata.get("mcpc_session_id", "default")
        task_id = metadata.get("mcpc_task_id", str(uuid.uuid4()))

        # Handle MCPC protocol info request
        if name == "is_mcpc_enabled":
            info = mcpc.get_protocol_info()
            return [TextContent(type="text", text=info.model_dump_json())]

        # Handle the tool call
        if name == "process_data":
            data_id = arguments.get("data_id")

            # Define the background task that will provide real-time updates
            async def process_data_task():
                try:
                    # Send initial update
                    await mcpc.send_direct(mcpc.create_message(
                        type="task",
                        event="update",
                        tool_name="process_data",
                        session_id=session_id,
                        task_id=task_id,
                        result="Starting data processing"
                    ).model_dump_json())

                    # Simulate work with progress updates
                    total_steps = 5
                    for step in range(1, total_steps + 1):
                        # Send progress update
                        await mcpc.send_direct(mcpc.create_message(
                            type="task",
                            event="update",
                            tool_name="process_data",
                            session_id=session_id,
                            task_id=task_id,
                            result={
                                "status": f"Processing step {step}/{total_steps}",
                                "progress": step / total_steps * 100
                            }
                        ).model_dump_json())

                        # Simulate work
                        await asyncio.sleep(1)

                    # Send completion message
                    await mcpc.send_direct(mcpc.create_message(
                        type="task",
                        event="complete",
                        tool_name="process_data",
                        session_id=session_id,
                        task_id=task_id,
                        result={
                            "status": "Complete",
                            "data_id": data_id,
                            "summary": "Processing completed successfully"
                        }
                    ).model_dump_json())

                except Exception as e:
                    # Send error message
                    await mcpc.send_direct(mcpc.create_message(
                        type="task",
                        event="failed",
                        tool_name="process_data",
                        session_id=session_id,
                        task_id=task_id,
                        result=f"Error: {str(e)}"
                    ).model_dump_json())

                finally:
                    # Clean up task
                    mcpc.cleanup_task(task_id)

            # Start the background task
            mcpc.start_task(task_id, process_data_task)

            # Return immediate response
            response = mcpc.create_message(
                type="task",
                event="created",
                tool_name="process_data",
                session_id=session_id,
                task_id=task_id,
                result=f"Started processing data_id={data_id}. Updates will stream in real-time."
            )

            return [TextContent(type="text", text=response.model_dump_json())]

    # Start the server
    options = server.create_initialization_options()
    async with stdio_server() as (read_stream, write_stream):
        await server.run(read_stream, write_stream, options)

if __name__ == "__main__":
    asyncio.run(serve())

Advanced Server Features

The MCPCHelper class provides additional features for complex server implementations:

  1. Task Management

    • start_task(): Run a background task with automatic thread management
    • check_task(): Get the status of a running task
    • stop_task(): Request a task to stop gracefully
    • cleanup_task(): Remove a completed task from tracking
  2. Message Creation

    • create_message(): Create standardized MCPC protocol messages
    • send_direct(): Send messages directly to clients over stdout
  3. Protocol Information

    • get_protocol_info(): Return MCPC protocol compatibility information

MCPC Message Structure

MCPC messages have the following structure:

class MCPCMessage:
    session_id: str      # Unique session identifier
    task_id: str | None  # Unique task identifier (required for task messages)
    tool_name: str | None # Name of the tool being called (required for task messages)
    result: Any = None   # Result or update data
    event: str          # Event type (restricted for task messages)
    type: Literal["task", "server_event"] = "task"  # Type of message
    protocol: str = "mcpc"  # Protocol identifier

Example Server Event Message

# Server-initiated Kafka notification
server_event = mcpc.create_server_event(
    session_id="session123",
    result={
        "topic": "user_updates",
        "event": "user_created",
        "user_id": "user456",
        "timestamp": "2024-03-20T10:00:00Z"
    },
    event="notification"  # Event must be explicitly specified
)

Message Types and Events

MCPC defines two types of messages with different event restrictions:

Task Messages

  • Type: task
  • Events:
    • created: Initial acknowledgment when task begins
    • update: Progress updates during task execution
    • complete: Final result when task completes successfully
    • failed: Error information when task fails

Server Event Messages

  • Type: server_event
  • Events: Any string is allowed, as they are not tied to a specific task lifecycle
  • Common examples include: notification, alert, update, error, etc.
  • Proactive AI Responses: Server events could trigger LLM actions:
    • System events ("Database migration finished" → LLM verifies tables)
    • File events ("PDF arrived" → LLM starts processing)
    • Task results ("Analysis complete" → LLM reviews findings)
    • State changes ("API updated" → LLM tests new endpoints)
    • Any event that might require AI attention or action

Use Cases

MCPC is ideal for:

  • Interactive AI Agents: Chat with LLMs while tasks run in the background
  • Data Processing: Stream progress updates during large file processing
  • Content Generation: Receive partial results as they're generated
  • Long-Running Operations: Support for tasks that run indefinitely
  • Distributed Systems: Coordinate asynchronous operations across services
  • Proactive AI: Let LLMs respond to events and take action automatically
  • Automated Workflows: Create self-managing systems that adapt to events
  • Intelligent Monitoring: AI agents that actively respond to system changes

Compatibility

MCPC is designed to be fully backward compatible with the MCP protocol:

  • MCPC-enabled clients can communicate with standard MCP servers
  • MCPC-enabled servers can respond to standard MCP clients
  • The protocol negotiation ensures graceful fallback to standard MCP when needed

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mcpc-0.2.1.tar.gz (32.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mcpc-0.2.1-py3-none-any.whl (12.0 kB view details)

Uploaded Python 3

File details

Details for the file mcpc-0.2.1.tar.gz.

File metadata

  • Download URL: mcpc-0.2.1.tar.gz
  • Upload date:
  • Size: 32.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for mcpc-0.2.1.tar.gz
Algorithm Hash digest
SHA256 f82a0b0fa0511d871e4ea8d878a71ee9757e974a5b426a7ee4dd802525f84300
MD5 e77946a7f292766f31dee3cadddd9ac0
BLAKE2b-256 d564c9326612bc3c4dbb443a2168e198ab80a16872fa03c68bb1d1e90a41ef64

See more details on using hashes here.

Provenance

The following attestation bundles were made for mcpc-0.2.1.tar.gz:

Publisher: python-publish.yml on OlaHulleberg/mcpc

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file mcpc-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: mcpc-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 12.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for mcpc-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 8423dccd719afda2dc0835a448d3fea73b411288500e519892d3cdb8941e5c93
MD5 2434f3e30b7ec0fe9e1c555177e514d8
BLAKE2b-256 5b2be1226ffb23c39e0a2e7318bf78e4eaa0e699204fa949f329c409336d7df8

See more details on using hashes here.

Provenance

The following attestation bundles were made for mcpc-0.2.1-py3-none-any.whl:

Publisher: python-publish.yml on OlaHulleberg/mcpc

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page