Expose Celery tasks as async MCP tools with automatic lifecycle management
Project description
mcp-celery
Expose Celery tasks as async MCP tools — without writing the boilerplate.
Long-running work (reports, file processing, LLM pipelines) usually runs in Celery. Exposing those tasks to an MCP client means writing four tools per task by hand: one to start, one to poll status, one to fetch the result, one to cancel. mcp-celery does that generation for you. You register a Celery task once; four MCP tools (or a single MCP-native async operation) appear.
This README is written for a first-time reader. If you've never touched Celery or MCP before, start at What this package does and follow it top to bottom.
Table of contents
- What this package does
- Core concepts in one page
- Quickstart — one command
- The three examples
- How it works internally
- Public API
- Project layout
- Running the unit tests
What this package does
The problem
Without mcp-celery, exposing one long-running Celery task to an MCP client takes roughly 100 lines of boilerplate — one MCP tool for each lifecycle step:
@mcp.tool(name="generate_report_start")
async def generate_report_start(user_id: str) -> str:
result = generate_report.apply_async(kwargs={"user_id": user_id})
return json.dumps({"task_id": result.id, "status": "PENDING"})
@mcp.tool(name="generate_report_status")
async def generate_report_status(task_id: str) -> str:
result = AsyncResult(task_id, app=celery_app)
status = CELERY_TO_STATUS.get(result.state, "PENDING")
# ...state mapping, progress extraction, error handling...
@mcp.tool(name="generate_report_result")
async def generate_report_result(task_id: str) -> str: ...
@mcp.tool(name="generate_report_cancel")
async def generate_report_cancel(task_id: str) -> str: ...
Multiply that by every task and the surface area quickly becomes unmaintainable. The full baseline implementation is in examples/baseline/server.py.
The solution
from mcp_celery import AsyncToolServer
server = AsyncToolServer("my-server", celery_app=celery_app)
server.register_async_tool(generate_report, description="Generate a report")
That single register_async_tool call produces four MCP tools:
| Tool name | Purpose |
|---|---|
generate_report_start |
Dispatch the Celery task, return a token. |
generate_report_status |
Poll the task with the token. |
generate_report_result |
Fetch the final result once the task is complete. |
generate_report_cancel |
Revoke a running task. |
A token is simply the Celery task id — the single handle a client carries across calls.
Core concepts in one page
| Concept | One-line explanation |
|---|---|
| Celery task | A regular Python function that has been decorated with @celery_app.task. It runs in a worker process, not in your MCP server. |
| MCP tool | A callable function that an LLM / MCP client can invoke through the Model Context Protocol. |
| Async tool (mcp-celery) | A Celery task that mcp-celery has auto-wrapped into a group of MCP tools (polling) or a single MCP task operation (Approach 3). |
| Token | A string identifier — in practice, the Celery task id. Returned by _start, passed into every follow-up call. |
Backend (AbstractBackend) |
The thing that actually runs a task. The only concrete implementation today is CeleryBackend, but the interface is narrow so you could swap in Dramatiq, ARQ, or a stub. |
Exposure strategy (AbstractExposureStrategy) |
How a task is surfaced over MCP. PollingExposureStrategy generates the four tools; OperationResourceStrategy uses MCP's experimental Task + Resource API. |
AsyncToolServer |
Public entry point. Owns a FastMCP instance, a backend, a strategy, and a registry. |
TaskStatus |
Canonical lifecycle enum (PENDING, RUNNING, COMPLETED, FAILED, CANCELLED, RETRYING). All backends map their native states into these values via lifecycle.py. |
The layering is strict: server → exposure/backend → schema. The exposure strategy never imports a concrete backend, and the backend never imports a strategy.
Quickstart — one command
Everything in examples/ can be run through a single script: examples/run.sh. It handles Redis, the virtualenv, and the Celery worker for you.
# From the repo root:
bash examples/run.sh # defaults to 'with_package'
bash examples/run.sh baseline # manual implementation (no mcp-celery)
bash examples/run.sh with_package # mcp-celery with polling (Approach 1)
bash examples/run.sh approach3 # MCP-native tasks + resource URI
bash examples/run.sh approach3_notify # Approach 3 with push notifications
What the script does on your behalf:
- Redis. If
localhost:6379isn't accepting connections, it starts a Docker container namedredis-test. On subsequent runs it reuses the same container. - Virtualenv. On the first run it invokes
examples/setup.sh, which createsexamples/.venv/and installsmcp-celeryin editable mode. - Worker. It launches
celery -A tasks workerfor the chosen example in the background and waits until Celery printsready. - Client. It runs the example's
client.py. - Cleanup. On exit (or Ctrl+C) it stops the worker. The Redis container is kept running for speed; remove it with
docker rm -f redis-testwhen you're done.
Requirements: python3, bash, and (first-time only, if Redis isn't already running) docker.
The three examples
Each example under examples/ runs the same Celery task (generate_report, which sleeps for 6 seconds and reports progress at 25 % / 60 % / 90 %). Only the MCP wiring changes.
1. baseline/ — without the package
Hand-written MCP tools. About 100 lines for a single task. This is the "before" picture — everything mcp-celery is designed to eliminate.
2. with_package/ — polling strategy (Approach 1)
Uses the default PollingExposureStrategy. Four tools per task, token-based polling. The client calls _start, polls _status, then calls _result.
LLM → generate_report_start(user_id="u_1") ← {"token": "abc", "status": "PENDING"}
LLM → generate_report_status(token="abc") ← {"token": "abc", "status": "RUNNING", "progress": 0.6}
LLM → generate_report_result(token="abc") ← {"token": "abc", "status": "COMPLETED", "result": {...}}
3. approach3/ — MCP task operations + resource URI
Uses OperationResourceStrategy. Instead of four tools, each task becomes:
- One MCP tool that returns a
CreateTaskResult(an MCPTaskobject plus aresourceUriin_meta). - Standard MCP task endpoints —
tasks/get,tasks/cancel— backed by aCeleryTaskStorethat lazily syncs MCP Task state with Celery state on every read. - A resource template
mcp://results/<tool>/<task_id>that returns the final result viaresources/read.
The result is no longer conflated with the task lifecycle — progress lives on the task, payload lives in a resource.
The approach3_notify variant adds notifications=True. The server spawns a watcher coroutine per task that pushes TaskStatusNotification to the client whenever the status changes, so the client can await instead of polling. See server_notifications.py and client_notifications.py.
Expected output (Approach 1)
Registered tools: ['generate_report_start', 'generate_report_status',
'generate_report_result', 'generate_report_cancel']
--- Calling generate_report_start ---
Response: { "token": "b16a...", "status": "PENDING" }
--- Polling generate_report_status ---
Poll 1: RUNNING (60%)
Poll 2: RUNNING (90%)
Poll 3: COMPLETED
--- Calling generate_report_result ---
Response: {
"token": "b16a...",
"status": "COMPLETED",
"result": { "user_id": "u_demo_42", "revenue": 42000, "items": 17 }
}
The client spawns the MCP server as a subprocess over stdio — you only run run.sh.
How it works internally
Reading order: start at server.py, then follow the data flow below.
Registration
server.register_async_tool(celery_task) # or @server.async_tool
├─ _schema_from_signature(celery_task.run) # build JSON schema from Python type hints
├─ AsyncToolDef(name, description, schema, task_ref, …)
├─ ToolRegistry.register(tool_def) # keep it around so we can list registrations
├─ strategy.generate_tools(tool_def, backend) # → list[GeneratedTool]
├─ for tool in generated: mcp.tool(...)(tool.handler)
└─ strategy.setup(mcp, backend) # optional: resources, handler overrides, etc.
The strategy owns all decisions about what MCP surface to expose. Whether it's four tools or one tool + resources + notifications is entirely a strategy concern — neither the server nor the backend knows.
Polling strategy (Approach 1) — runtime
LLM → <name>_start(**kwargs)
PollingExposureStrategy._make_start → _build_start_handler
→ backend.dispatch(task_ref, kwargs) # CeleryBackend.apply_async
→ format_response(token, TaskStatus.PENDING)
LLM → <name>_status(token)
→ backend.get_status(token) # AsyncResult + map_celery_state
→ format_response(token, status, progress, metadata)
LLM → <name>_result(token)
→ backend.get_result(token)
→ if COMPLETED: format_response(..., result=...)
→ if FAILED: format_response(..., error=..., is_error=True)
→ otherwise: format_response(..., is_error=True, "not yet complete")
LLM → <name>_cancel(token)
→ backend.cancel(token) # result.revoke(terminate=True)
→ format_response(token, TaskStatus.CANCELLED)
format_response (schema.py) is the single source of truth for response shape. FastMCP serializes the returned dict into TextContent automatically — you never hand-build TextContent.
Operation + Resource strategy (Approach 3) — runtime
Setup installs three handler overrides on the low-level MCP server:
CallToolRequest— intercepted; for our tools it dispatches to Celery, creates an MCPTaskin theCeleryTaskStore, and returns aCreateTaskResultwith_meta.resourceUripointing at the result resource.CancelTaskRequest— cancels the MCP task (so the store state becomescancelled) before revoking the Celery task (to avoid a race where the nextget_taskwould see a terminal state and reject the cancellation).GetTaskPayloadRequest— returns a pointer{"resourceUri": "mcp://results/<tool>/<task_id>"}instead of the payload itself; the client is expected to read the resource.
Plus a resource template per tool whose reader calls backend.get_result(task_id) and returns JSON.
CeleryTaskStore is an InMemoryTaskStore that overrides get_task to query Celery on every read and reconcile the MCP task status with what the worker actually reports. Terminal MCP states are never overwritten.
When notifications=True, _watch_and_notify runs per task, polling the store every poll_interval_ms and pushing TaskStatusNotification on any status change until a terminal state is reached.
State mapping
All backend-specific state strings are translated once, in lifecycle.py:
"PENDING" → TaskStatus.PENDING
"STARTED" → TaskStatus.RUNNING
"SUCCESS" → TaskStatus.COMPLETED
"FAILURE" → TaskStatus.FAILED
"REVOKED" → TaskStatus.CANCELLED
# unknown custom states → RUNNING (typically set via update_state)
Everything else in the code uses the TaskStatus enum.
Public API
from mcp_celery import (
AsyncToolServer, # main entry point
TaskStatus, # canonical status enum
TaskInfo, AsyncToolDef, # dataclasses
format_response, # response builder
AbstractBackend, CeleryBackend,
AbstractExposureStrategy, PollingExposureStrategy, OperationResourceStrategy,
GeneratedTool, ToolRegistry,
CeleryTaskStore,
map_celery_state,
McpCeleryError, TaskNotReady, TaskNotFound, BackendError,
)
AsyncToolServer is the only class most users touch:
server = AsyncToolServer(
name="my-server",
celery_app=celery_app,
strategy=PollingExposureStrategy(), # or OperationResourceStrategy(...)
default_cancel=True,
)
# Decorator form: register-as-you-define
@server.async_tool
def generate_report(user_id: str) -> dict:
...
# Imperative form: register an existing Celery task
server.register_async_tool(
existing_celery_task,
name="generate_report",
description="Generate a report for a user",
cancel=True,
result_ttl=300,
)
server.run() # delegates to FastMCP.run()
Project layout
mcp-celery/
├── pyproject.toml
├── LICENSE
├── CHANGELOG.md
├── README.md
├── src/
│ └── mcp_celery/ # the importable package
│ ├── __init__.py # public API (lazy-loads runtime deps)
│ ├── py.typed # advertises inline type hints
│ ├── schema.py # TaskStatus, AsyncToolDef, TaskInfo, format_response
│ ├── errors.py # McpCeleryError, TaskNotReady, TaskNotFound, BackendError
│ ├── lifecycle.py # Celery state ↔ TaskStatus ↔ MCP status
│ ├── registry.py # ToolRegistry (name → AsyncToolDef)
│ ├── server.py # AsyncToolServer + JSON-schema-from-signature helper
│ ├── backend/
│ │ ├── base.py # AbstractBackend (dispatch / get_status / get_result / cancel)
│ │ └── celery.py # CeleryBackend
│ ├── exposure/
│ │ ├── base.py # AbstractExposureStrategy, GeneratedTool
│ │ ├── polling.py # PollingExposureStrategy (Approach 1)
│ │ └── operation_resource.py # OperationResourceStrategy (Approach 3)
│ └── stores/
│ └── celery_task_store.py # Celery-synced MCP TaskStore (Approach 3)
├── tests/
│ ├── test_tools.py # unit tests — no Redis or Celery worker required
│ └── test_operation_resource.py
├── examples/
│ ├── setup.sh # one-time venv setup
│ ├── run.sh # one-command runner — see Quickstart
│ ├── baseline/ # manual MCP tools (before)
│ ├── with_package/ # mcp-celery polling strategy (after)
│ └── approach3/ # OperationResource strategy + optional notifications
└── docs/
├── approach3-design.md # Approach 3 design notes
├── transport-resumable.md # transport-agnostic resumable requests notes
└── quickstart-example.py # stand-alone usage snippet
The src/ layout is deliberate — it prevents pytest from silently picking up the in-repo sources instead of the installed wheel, so every test run exercises what users will actually pip install.
Running the unit tests
No Redis, no Celery worker needed:
pip install -e ".[dev]"
pytest tests/test_tools.py -v
The tests use a mock backend (tests/mock_backend.py) and a manual run(coro) helper that creates a fresh event loop per call — there are no pytest-asyncio markers.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mcp_celery-0.1.0.tar.gz.
File metadata
- Download URL: mcp_celery-0.1.0.tar.gz
- Upload date:
- Size: 32.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4c90a59198d5f134644ff20a12603aaabf3a75a4ab8dc4f72e1581fd580c11d8
|
|
| MD5 |
2d52979ec2a8caa6cc05aa2eb2242d0f
|
|
| BLAKE2b-256 |
9d1d562be6433f6a7c13f07372a316eed2aea45fa8bc204369dec46cf590440a
|
File details
Details for the file mcp_celery-0.1.0-py3-none-any.whl.
File metadata
- Download URL: mcp_celery-0.1.0-py3-none-any.whl
- Upload date:
- Size: 25.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
03b8c082b9b24f3c816a195aa6a7d4b23db60b3cc2b5172ad9235322330b6162
|
|
| MD5 |
cc39faf5154fdbec3ca21ef1a94b5f99
|
|
| BLAKE2b-256 |
70c68e60fdf42bc83026dc295638ca826ba9a6737d5809637af71f2493c5ea44
|