Skip to main content

Expose Celery tasks as async MCP tools with automatic lifecycle management

Project description

mcp-celery

Expose Celery tasks as async MCP tools — without writing the boilerplate.

Long-running work (reports, file processing, LLM pipelines) usually runs in Celery. Exposing those tasks to an MCP client means writing four tools per task by hand: one to start, one to poll status, one to fetch the result, one to cancel. mcp-celery does that generation for you. You register a Celery task once; four MCP tools (or a single MCP-native async operation) appear.

This README is written for a first-time reader. If you've never touched Celery or MCP before, start at What this package does and follow it top to bottom.


Table of contents


What this package does

The problem

Without mcp-celery, exposing one long-running Celery task to an MCP client takes roughly 100 lines of boilerplate — one MCP tool for each lifecycle step:

@mcp.tool(name="generate_report_start")
async def generate_report_start(user_id: str) -> str:
    result = generate_report.apply_async(kwargs={"user_id": user_id})
    return json.dumps({"task_id": result.id, "status": "PENDING"})

@mcp.tool(name="generate_report_status")
async def generate_report_status(task_id: str) -> str:
    result = AsyncResult(task_id, app=celery_app)
    status = CELERY_TO_STATUS.get(result.state, "PENDING")
    # ...state mapping, progress extraction, error handling...

@mcp.tool(name="generate_report_result")
async def generate_report_result(task_id: str) -> str: ...

@mcp.tool(name="generate_report_cancel")
async def generate_report_cancel(task_id: str) -> str: ...

Multiply that by every task and the surface area quickly becomes unmaintainable. The full baseline implementation is in examples/baseline/server.py.

The solution

from mcp_celery import AsyncToolServer

server = AsyncToolServer("my-server", celery_app=celery_app)
server.register_async_tool(generate_report, description="Generate a report")

That single register_async_tool call produces four MCP tools:

Tool name Purpose
generate_report_start Dispatch the Celery task, return a token.
generate_report_status Poll the task with the token.
generate_report_result Fetch the final result once the task is complete.
generate_report_cancel Revoke a running task.

A token is simply the Celery task id — the single handle a client carries across calls.


Core concepts in one page

Concept One-line explanation
Celery task A regular Python function that has been decorated with @celery_app.task. It runs in a worker process, not in your MCP server.
MCP tool A callable function that an LLM / MCP client can invoke through the Model Context Protocol.
Async tool (mcp-celery) A Celery task that mcp-celery has auto-wrapped into a group of MCP tools (polling) or a single MCP task operation (Approach 3).
Token A string identifier — in practice, the Celery task id. Returned by _start, passed into every follow-up call.
Backend (AbstractBackend) The thing that actually runs a task. The only concrete implementation today is CeleryBackend, but the interface is narrow so you could swap in Dramatiq, ARQ, or a stub.
Exposure strategy (AbstractExposureStrategy) How a task is surfaced over MCP. PollingExposureStrategy generates the four tools; OperationResourceStrategy uses MCP's experimental Task + Resource API.
AsyncToolServer Public entry point. Owns a FastMCP instance, a backend, a strategy, and a registry.
TaskStatus Canonical lifecycle enum (PENDING, RUNNING, COMPLETED, FAILED, CANCELLED, RETRYING). All backends map their native states into these values via lifecycle.py.

The layering is strict: server → exposure/backend → schema. The exposure strategy never imports a concrete backend, and the backend never imports a strategy.


Quickstart — one command

Everything in examples/ can be run through a single script: examples/run.sh. It handles Redis, the virtualenv, and the Celery worker for you.

# From the repo root:
bash examples/run.sh                  # defaults to 'with_package'
bash examples/run.sh baseline         # manual implementation (no mcp-celery)
bash examples/run.sh with_package     # mcp-celery with polling (Approach 1)
bash examples/run.sh approach3        # MCP-native tasks + resource URI
bash examples/run.sh approach3_notify # Approach 3 with push notifications

What the script does on your behalf:

  1. Redis. If localhost:6379 isn't accepting connections, it starts a Docker container named redis-test. On subsequent runs it reuses the same container.
  2. Virtualenv. On the first run it invokes examples/setup.sh, which creates examples/.venv/ and installs mcp-celery in editable mode.
  3. Worker. It launches celery -A tasks worker for the chosen example in the background and waits until Celery prints ready.
  4. Client. It runs the example's client.py.
  5. Cleanup. On exit (or Ctrl+C) it stops the worker. The Redis container is kept running for speed; remove it with docker rm -f redis-test when you're done.

Requirements: python3, bash, and (first-time only, if Redis isn't already running) docker.


The three examples

Each example under examples/ runs the same Celery task (generate_report, which sleeps for 6 seconds and reports progress at 25 % / 60 % / 90 %). Only the MCP wiring changes.

1. baseline/ — without the package

Hand-written MCP tools. About 100 lines for a single task. This is the "before" picture — everything mcp-celery is designed to eliminate.

2. with_package/ — polling strategy (Approach 1)

Uses the default PollingExposureStrategy. Four tools per task, token-based polling. The client calls _start, polls _status, then calls _result.

LLM → generate_report_start(user_id="u_1")     ← {"token": "abc", "status": "PENDING"}
LLM → generate_report_status(token="abc")      ← {"token": "abc", "status": "RUNNING", "progress": 0.6}
LLM → generate_report_result(token="abc")      ← {"token": "abc", "status": "COMPLETED", "result": {...}}

3. approach3/ — MCP task operations + resource URI

Uses OperationResourceStrategy. Instead of four tools, each task becomes:

  • One MCP tool that returns a CreateTaskResult (an MCP Task object plus a resourceUri in _meta).
  • Standard MCP task endpointstasks/get, tasks/cancel — backed by a CeleryTaskStore that lazily syncs MCP Task state with Celery state on every read.
  • A resource template mcp://results/<tool>/<task_id> that returns the final result via resources/read.

The result is no longer conflated with the task lifecycle — progress lives on the task, payload lives in a resource.

The approach3_notify variant adds notifications=True. The server spawns a watcher coroutine per task that pushes TaskStatusNotification to the client whenever the status changes, so the client can await instead of polling. See server_notifications.py and client_notifications.py.

Expected output (Approach 1)

Registered tools: ['generate_report_start', 'generate_report_status',
                   'generate_report_result', 'generate_report_cancel']

--- Calling generate_report_start ---
Response: { "token": "b16a...", "status": "PENDING" }

--- Polling generate_report_status ---
  Poll 1: RUNNING  (60%)
  Poll 2: RUNNING  (90%)
  Poll 3: COMPLETED

--- Calling generate_report_result ---
Response: {
  "token": "b16a...",
  "status": "COMPLETED",
  "result": { "user_id": "u_demo_42", "revenue": 42000, "items": 17 }
}

The client spawns the MCP server as a subprocess over stdio — you only run run.sh.


How it works internally

Reading order: start at server.py, then follow the data flow below.

Registration

server.register_async_tool(celery_task)          # or @server.async_tool
  ├─ _schema_from_signature(celery_task.run)     # build JSON schema from Python type hints
  ├─ AsyncToolDef(name, description, schema, task_ref, …)
  ├─ ToolRegistry.register(tool_def)             # keep it around so we can list registrations
  ├─ strategy.generate_tools(tool_def, backend)  # → list[GeneratedTool]
  ├─ for tool in generated: mcp.tool(...)(tool.handler)
  └─ strategy.setup(mcp, backend)                # optional: resources, handler overrides, etc.

The strategy owns all decisions about what MCP surface to expose. Whether it's four tools or one tool + resources + notifications is entirely a strategy concern — neither the server nor the backend knows.

Polling strategy (Approach 1) — runtime

LLM → <name>_start(**kwargs)
  PollingExposureStrategy._make_start → _build_start_handler
  → backend.dispatch(task_ref, kwargs)             # CeleryBackend.apply_async
  → format_response(token, TaskStatus.PENDING)

LLM → <name>_status(token)
  → backend.get_status(token)                      # AsyncResult + map_celery_state
  → format_response(token, status, progress, metadata)

LLM → <name>_result(token)
  → backend.get_result(token)
  → if COMPLETED: format_response(..., result=...)
  → if FAILED:    format_response(..., error=..., is_error=True)
  → otherwise:    format_response(..., is_error=True, "not yet complete")

LLM → <name>_cancel(token)
  → backend.cancel(token)                          # result.revoke(terminate=True)
  → format_response(token, TaskStatus.CANCELLED)

format_response (schema.py) is the single source of truth for response shape. FastMCP serializes the returned dict into TextContent automatically — you never hand-build TextContent.

Operation + Resource strategy (Approach 3) — runtime

Setup installs three handler overrides on the low-level MCP server:

  • CallToolRequest — intercepted; for our tools it dispatches to Celery, creates an MCP Task in the CeleryTaskStore, and returns a CreateTaskResult with _meta.resourceUri pointing at the result resource.
  • CancelTaskRequest — cancels the MCP task (so the store state becomes cancelled) before revoking the Celery task (to avoid a race where the next get_task would see a terminal state and reject the cancellation).
  • GetTaskPayloadRequest — returns a pointer {"resourceUri": "mcp://results/<tool>/<task_id>"} instead of the payload itself; the client is expected to read the resource.

Plus a resource template per tool whose reader calls backend.get_result(task_id) and returns JSON.

CeleryTaskStore is an InMemoryTaskStore that overrides get_task to query Celery on every read and reconcile the MCP task status with what the worker actually reports. Terminal MCP states are never overwritten.

When notifications=True, _watch_and_notify runs per task, polling the store every poll_interval_ms and pushing TaskStatusNotification on any status change until a terminal state is reached.

State mapping

All backend-specific state strings are translated once, in lifecycle.py:

"PENDING"   TaskStatus.PENDING
"STARTED"   TaskStatus.RUNNING
"SUCCESS"   TaskStatus.COMPLETED
"FAILURE"   TaskStatus.FAILED
"REVOKED"   TaskStatus.CANCELLED
# unknown custom states → RUNNING (typically set via update_state)

Everything else in the code uses the TaskStatus enum.


Public API

from mcp_celery import (
    AsyncToolServer,             # main entry point
    TaskStatus,                  # canonical status enum
    TaskInfo, AsyncToolDef,      # dataclasses
    format_response,             # response builder

    AbstractBackend, CeleryBackend,
    AbstractExposureStrategy, PollingExposureStrategy, OperationResourceStrategy,
    GeneratedTool, ToolRegistry,
    CeleryTaskStore,
    map_celery_state,

    McpCeleryError, TaskNotReady, TaskNotFound, BackendError,
)

AsyncToolServer is the only class most users touch:

server = AsyncToolServer(
    name="my-server",
    celery_app=celery_app,
    strategy=PollingExposureStrategy(),      # or OperationResourceStrategy(...)
    default_cancel=True,
)

# Decorator form: register-as-you-define
@server.async_tool
def generate_report(user_id: str) -> dict:
    ...

# Imperative form: register an existing Celery task
server.register_async_tool(
    existing_celery_task,
    name="generate_report",
    description="Generate a report for a user",
    cancel=True,
    result_ttl=300,
)

server.run()   # delegates to FastMCP.run()

Project layout

mcp-celery/
├── pyproject.toml
├── LICENSE
├── CHANGELOG.md
├── README.md
├── src/
│   └── mcp_celery/                    # the importable package
│       ├── __init__.py                # public API (lazy-loads runtime deps)
│       ├── py.typed                   # advertises inline type hints
│       ├── schema.py                  # TaskStatus, AsyncToolDef, TaskInfo, format_response
│       ├── errors.py                  # McpCeleryError, TaskNotReady, TaskNotFound, BackendError
│       ├── lifecycle.py               # Celery state ↔ TaskStatus ↔ MCP status
│       ├── registry.py                # ToolRegistry (name → AsyncToolDef)
│       ├── server.py                  # AsyncToolServer + JSON-schema-from-signature helper
│       ├── backend/
│       │   ├── base.py                # AbstractBackend (dispatch / get_status / get_result / cancel)
│       │   └── celery.py              # CeleryBackend
│       ├── exposure/
│       │   ├── base.py                # AbstractExposureStrategy, GeneratedTool
│       │   ├── polling.py             # PollingExposureStrategy (Approach 1)
│       │   └── operation_resource.py  # OperationResourceStrategy (Approach 3)
│       └── stores/
│           └── celery_task_store.py   # Celery-synced MCP TaskStore (Approach 3)
├── tests/
│   ├── test_tools.py                  # unit tests — no Redis or Celery worker required
│   └── test_operation_resource.py
├── examples/
│   ├── setup.sh                       # one-time venv setup
│   ├── run.sh                         # one-command runner — see Quickstart
│   ├── baseline/                      # manual MCP tools (before)
│   ├── with_package/                  # mcp-celery polling strategy (after)
│   └── approach3/                     # OperationResource strategy + optional notifications
└── docs/
    ├── approach3-design.md            # Approach 3 design notes
    ├── transport-resumable.md         # transport-agnostic resumable requests notes
    └── quickstart-example.py          # stand-alone usage snippet

The src/ layout is deliberate — it prevents pytest from silently picking up the in-repo sources instead of the installed wheel, so every test run exercises what users will actually pip install.


Running the unit tests

No Redis, no Celery worker needed:

pip install -e ".[dev]"
pytest tests/test_tools.py -v

The tests use a mock backend (tests/mock_backend.py) and a manual run(coro) helper that creates a fresh event loop per call — there are no pytest-asyncio markers.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mcp_celery-0.1.0.tar.gz (32.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mcp_celery-0.1.0-py3-none-any.whl (25.2 kB view details)

Uploaded Python 3

File details

Details for the file mcp_celery-0.1.0.tar.gz.

File metadata

  • Download URL: mcp_celery-0.1.0.tar.gz
  • Upload date:
  • Size: 32.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for mcp_celery-0.1.0.tar.gz
Algorithm Hash digest
SHA256 4c90a59198d5f134644ff20a12603aaabf3a75a4ab8dc4f72e1581fd580c11d8
MD5 2d52979ec2a8caa6cc05aa2eb2242d0f
BLAKE2b-256 9d1d562be6433f6a7c13f07372a316eed2aea45fa8bc204369dec46cf590440a

See more details on using hashes here.

File details

Details for the file mcp_celery-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: mcp_celery-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 25.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for mcp_celery-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 03b8c082b9b24f3c816a195aa6a7d4b23db60b3cc2b5172ad9235322330b6162
MD5 cc39faf5154fdbec3ca21ef1a94b5f99
BLAKE2b-256 70c68e60fdf42bc83026dc295638ca826ba9a6737d5809637af71f2493c5ea44

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page