Skip to main content

Temporal integration for Deep Agents - durable execution for AI agent workflows.

Project description

deepagent-temporal

Temporal integration for Deep Agents — durable execution for AI agent workflows.

If your Deep Agent process crashes mid-task, all progress is lost. Sub-agents are ephemeral. Human-in-the-loop approval blocks a running process. deepagent-temporal solves these problems by running your Deep Agent as a Temporal Workflow:

  • Durable execution — survives process crashes, restarts, and deployments
  • Sub-agent dispatch — sub-agents run as independent Temporal Child Workflows
  • Worker affinity — sticky task queues keep file operations on the same machine, side stepping the need of NFS or shared storage.
  • Zero-resource HITL — workflow pauses consume no compute while waiting for approval

This project is experimental, use at your own risk

Installation

pip install deepagent-temporal

Requires Python 3.10+, langgraph-temporal >= 0.1.0, and a running Temporal server.

Quick Start

Before: vanilla Deep Agent

from deepagents import create_deep_agent
from deepagents.backends import FilesystemBackend
from langchain_anthropic import ChatAnthropic

agent = create_deep_agent(
    model=ChatAnthropic(model="claude-sonnet-4-20250514"),
    tools=[read_file, write_file, execute],
    system_prompt="You are a helpful coding assistant.",
    backend=FilesystemBackend(root_dir="/workspace"),
)

# No durability — if the process crashes, all progress is lost.
# Sub-agents run in-process. HITL blocks a live process.
result = await agent.ainvoke(
    {"messages": [HumanMessage(content="Fix the bug in main.py")]},
    config={"configurable": {"thread_id": "task-123"}},
)

After: Temporal-backed Deep Agent

from datetime import timedelta
from deepagents import create_deep_agent
from deepagents.backends import FilesystemBackend
from langchain_anthropic import ChatAnthropic
from temporalio.client import Client

from deepagent_temporal import TemporalDeepAgent

# 1. Create your agent exactly as before
agent = create_deep_agent(
    model=ChatAnthropic(model="claude-sonnet-4-20250514"),
    tools=[read_file, write_file, execute],
    system_prompt="You are a helpful coding assistant.",
    backend=FilesystemBackend(root_dir="/workspace"),
)

# 2. Connect to Temporal and wrap the agent
client = await Client.connect("localhost:7233")
temporal_agent = TemporalDeepAgent(
    agent,
    client,
    task_queue="coding-agents",
    use_worker_affinity=True,  # automatic worker pinning
)

# 3. Same API — now with durable execution
result = await temporal_agent.ainvoke(
    {"messages": [HumanMessage(content="Fix the bug in main.py")]},
    config={"configurable": {"thread_id": "task-123"}},
)

The ainvoke, astream, get_state, and resume APIs are identical. Your existing code changes by three lines.

Running a Worker

The agent graph executes on a Temporal Worker. Run this in a separate process (or on a dedicated machine for filesystem affinity):

import asyncio
from temporalio.client import Client
from temporalio.worker import UnsandboxedWorkflowRunner

from deepagent_temporal import TemporalDeepAgent

async def main():
    agent = create_deep_agent(...)  # same setup as above

    client = await Client.connect("localhost:7233")
    temporal_agent = TemporalDeepAgent(
        agent, client,
        task_queue="coding-agents",
        use_worker_affinity=True,
    )

    # create_worker() returns a WorkerGroup with two internal workers:
    # one on the shared queue (Workflows + discovery) and one on a
    # unique queue (node Activities).
    worker = temporal_agent.create_worker(
        workflow_runner=UnsandboxedWorkflowRunner(),
    )
    async with worker:
        print("Worker running. Ctrl+C to stop.")
        await asyncio.Future()  # run forever

asyncio.run(main())

Worker Affinity via Worker-Specific Task Queues

Deep Agents often use FilesystemBackend — tools read and write files on the local disk. All Activities for an agent must run on the same worker to keep the filesystem consistent.

Enable use_worker_affinity=True and the framework handles it automatically following the Temporal worker-specific task queues pattern:

temporal_agent = TemporalDeepAgent(
    agent, client,
    task_queue="coding-agents",
    use_worker_affinity=True,  # transparent to the client
)

How it works:

  1. create_worker() generates a unique queue name per worker process and starts two internal workers: one on the shared queue (Workflows + discovery Activity), one on its unique queue (node Activities)
  2. When a Workflow starts, it calls a get_available_task_queue Activity on the shared queue — whichever worker picks it up returns its unique queue
  3. All subsequent node Activities are dispatched to that discovered queue
  4. The discovered queue survives continue-as-new — the same worker stays pinned across workflow runs
  5. HITL waits have no timeout concern — the queue persists independently

The client never needs to know queue names. Workers self-register.

Sub-Agents as Child Workflows

Deep Agents can spawn sub-agents via the task tool. With deepagent-temporal, each sub-agent runs as an independent Temporal Child Workflow with its own durability, timeout, and observability.

Setting up the middleware

TemporalSubAgentMiddleware intercepts task tool calls and dispatches them as Child Workflows instead of running them in-process. Inject it before graph compilation:

from deepagent_temporal import TemporalSubAgentMiddleware

middleware = TemporalSubAgentMiddleware(
    subagent_specs={
        "researcher": "subagent:researcher",
        "coder": "subagent:coder",
    },
)

agent = create_deep_agent(
    model=model,
    tools=tools,
    middleware=[middleware],  # inject before compilation
    # ... other params
)

Configuring sub-agent execution

temporal_agent = TemporalDeepAgent(
    agent, client,
    task_queue="main-agents",
    subagent_task_queue="sub-agents",           # separate queue for sub-agents
    subagent_execution_timeout=timedelta(minutes=15),  # per sub-agent timeout
)

When the LLM invokes the task tool, the middleware stores a SubAgentRequest in a context variable. The Activity collects it, and the Workflow dispatches a Child Workflow. The result flows back as a ToolMessage to the parent agent — exactly matching the behavior of the in-process SubAgentMiddleware.

Human-in-the-Loop

Deep Agents' interrupt() works out of the box. The workflow pauses with zero resource consumption and resumes when you send a Signal:

# Start the agent (non-blocking)
handle = await temporal_agent.astart(
    {"messages": [HumanMessage(content="Refactor auth module")]},
    config={"configurable": {"thread_id": "task-456"}},
)

# ... later, check if it's waiting for approval
state = await temporal_agent.get_state(
    {"configurable": {"thread_id": "task-456"}}
)
if state["status"] == "interrupted":
    print("Pending approval:", state["interrupts"])

    # Approve and resume
    await temporal_agent.resume(
        {"configurable": {"thread_id": "task-456"}},
        "approved",
    )

Local Development

For testing without a Temporal server deployment:

temporal_agent = await TemporalDeepAgent.local(agent)
result = await temporal_agent.ainvoke({"messages": ["hello"]})

This starts an in-process Temporal test server automatically.

Testing

# Unit + integration tests (uses in-process Temporal test server)
make test

# Integration tests only
make test_integration

# Integration tests against Dockerized Temporal
make test_integration_docker

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

deepagent_temporal-0.1.0.tar.gz (47.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

deepagent_temporal-0.1.0-py3-none-any.whl (9.1 kB view details)

Uploaded Python 3

File details

Details for the file deepagent_temporal-0.1.0.tar.gz.

File metadata

  • Download URL: deepagent_temporal-0.1.0.tar.gz
  • Upload date:
  • Size: 47.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for deepagent_temporal-0.1.0.tar.gz
Algorithm Hash digest
SHA256 cc1bc439cf2e62444aa3c73d94a71984ca939b1ba4a4be1fd68f1a678e7070b6
MD5 31468679f759daf2298729d8c7b28825
BLAKE2b-256 7ea7c6fb8332f18b655e9498c9823f7d03a651ff87cf29fce41f7a2d89eed89d

See more details on using hashes here.

Provenance

The following attestation bundles were made for deepagent_temporal-0.1.0.tar.gz:

Publisher: release.yml on pradithya/deepagent-temporal

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file deepagent_temporal-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for deepagent_temporal-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8484f696eda5fefc33c59bc209c059e967456ac35abb98ca685a99de9fb200c0
MD5 ee36ae144737910e7be4adabd0eb7b22
BLAKE2b-256 7666b54929add68d6493a8d6bf19c153220a026b26196f228fc9c683d80e4e24

See more details on using hashes here.

Provenance

The following attestation bundles were made for deepagent_temporal-0.1.0-py3-none-any.whl:

Publisher: release.yml on pradithya/deepagent-temporal

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page