Implicit state machine middleware for LangChain v1 agents. Ordered task pipelines with per-task tool scoping, prompt injection, and composable validation.
Project description
langchain-task-steering
Implicit state-machine middleware for LangChain v1 agents. Define ordered task pipelines with per-task tool scoping, dynamic prompt injection, and composable validation — all as a drop-in AgentMiddleware.
PENDING ──> IN_PROGRESS ──> COMPLETE
The model drives its own transitions by calling update_task_status. The middleware enforces ordering, scopes tools, injects the active task's instruction into the system prompt, and gates completion via pluggable validators.
When to use this
| Scenario | task-steering | LangGraph explicit workflows |
|---|---|---|
| Linear task pipeline (A then B then C) | Best fit | Verbose — one node + edges per task |
| Per-task tool scoping | Built-in | Manual — separate tool lists per node |
| Dynamic tasks from config / DB | Easy — tasks are data | Hard — graph is compiled at build time |
| Branching / parallel execution | Not supported | Built-in — edges + Send() |
| Per-task human-in-the-loop interrupts | Not supported | Built-in — interrupt() per node |
| Complex orchestration with retries / cycles | Not supported | Built-in — conditional edges |
| Composition with other middleware | Native — it's an AgentMiddleware |
N/A — different abstraction |
| Debuggability in LangGraph Studio | Opaque — single agent node | Clear — each node visible in traces |
Rule of thumb: If your tasks are sequential and tool-scoped, use task-steering. If you need branching, parallelism, or per-task interrupts, use explicit LangGraph workflows.
Install
pip install langchain-task-steering
For development:
git clone https://github.com/edvinhallvaxhiu/langchain-task-steering
cd langchain-task-steering
pip install -e ".[dev]"
Requirements
- Python >= 3.10
langchain >= 1.0.0langgraph >= 0.4.0
Quick start
from langchain.agents import create_agent
from langchain.tools import tool
from langchain_task_steering import TaskSteeringMiddleware, Task
@tool
def add_items(items: list[str]) -> str:
"""Add items to the inventory."""
return f"Added {len(items)} items."
@tool
def categorize(categories: dict[str, list[str]]) -> str:
"""Assign items to categories."""
return f"Categorized into {len(categories)} groups."
pipeline = TaskSteeringMiddleware(
tasks=[
Task(
name="collect",
instruction="Collect all relevant items from the user's input.",
tools=[add_items],
),
Task(
name="categorize",
instruction="Organize the collected items into categories.",
tools=[categorize],
),
],
)
agent = create_agent(
model="anthropic:claude-sonnet-4-6",
middleware=[pipeline],
system_prompt="You are an inventory assistant.",
)
result = agent.invoke(
{"messages": [{"role": "user", "content": "I have apples, bolts, and milk."}]}
)
The agent automatically receives an update_task_status tool and sees a task pipeline block in its system prompt. It must complete collect before starting categorize.
How it works
What the model sees
Every model call, the middleware appends a status block to the system prompt:
<task_pipeline>
[x] collect (complete)
[>] categorize (in_progress)
<current_task name="categorize">
Organize the collected items into categories.
</current_task>
<rules>
Required order: collect -> categorize
Use update_task_status to advance. Do not skip tasks.
</rules>
</task_pipeline>
Only the active task's tools (plus globals and update_task_status) are visible to the model.
Middleware hooks
| Hook | Behavior |
|---|---|
before_agent |
Initializes task_statuses in state on first invocation. |
wrap_model_call |
Appends task status board + active task instruction to system prompt. Filters tools to only the active task's tools + globals + update_task_status. Delegates to task-scoped middleware if present. |
wrap_tool_call |
Intercepts update_task_status — runs validate_completion on the task's scoped middleware before allowing completion. Rejects out-of-scope tool calls. Delegates other tool calls to the active task's scoped middleware. |
tools |
Auto-registers all task tools + globals + update_task_status with the agent. |
Task lifecycle
PENDING ──> IN_PROGRESS ──> COMPLETE
- The agent drives transitions by calling
update_task_status(task, status). - Transitions are enforced:
pending -> in_progress -> completeonly. - When
enforce_order=True, a task cannot start until all preceding tasks are complete. - On
complete, the task'smiddleware.validate_completion(state)runs first — rejection returns an error to the agent without completing the transition.
Task-scoped middleware
Each task can have a TaskMiddleware that activates only when the task is IN_PROGRESS. This enables mid-task enforcement, not just completion gating.
from langchain.messages import ToolMessage
from langchain_task_steering import Task, TaskMiddleware, TaskSteeringMiddleware
class ThreatsMiddleware(TaskMiddleware):
"""Block gap_analysis until enough threats exist."""
def __init__(self, min_threats: int = 25):
super().__init__()
self.min_threats = min_threats
def validate_completion(self, state) -> str | None:
threats = state.get("threats", [])
if len(threats) < self.min_threats:
return f"Only {len(threats)} threats — need at least {self.min_threats}."
return None
def wrap_tool_call(self, request, handler):
if request.tool_call["name"] == "gap_analysis":
threats = request.state.get("threats", [])
if len(threats) < self.min_threats:
return ToolMessage(
content=f"Cannot run gap_analysis: {len(threats)}/{self.min_threats} threats.",
tool_call_id=request.tool_call["id"],
)
return handler(request)
pipeline = TaskSteeringMiddleware(
tasks=[
Task(name="assets", instruction="...", tools=[create_assets]),
Task(
name="threats",
instruction="Identify STRIDE threats for each asset.",
tools=[create_threats, gap_analysis],
middleware=ThreatsMiddleware(min_threats=25),
),
],
)
TaskMiddleware hooks
| Method | When it runs | Purpose |
|---|---|---|
validate_completion(state) |
Before complete transition |
Return error string to reject, None to allow |
on_start(state) |
After successful in_progress transition |
Side effects (logging, state init) |
on_complete(state) |
After successful complete transition |
Side effects (trail capture, cleanup) |
wrap_tool_call(request, handler) |
On every tool call during this task | Mid-task tool gating / modification |
wrap_model_call(request, handler) |
On every model call during this task | Extra prompt injection / request modification |
state_schema |
At middleware init | Merge custom state fields into the agent's state |
Persistent state for task middleware
Task middleware can declare a state_schema to persist custom fields across interrupts:
from langchain.agents import AgentState
from typing_extensions import NotRequired
class ThreatsState(AgentState):
gap_analysis_uses: NotRequired[int]
class ThreatsMiddleware(TaskMiddleware):
state_schema = ThreatsState
# ...
TaskSteeringMiddleware automatically merges all task middleware schemas into its own state_schema, so the fields survive checkpointing and interrupts.
Configuration
| Parameter | Default | Description |
|---|---|---|
tasks |
(required) | Ordered list of Task definitions. |
global_tools |
[] |
Tools available in every task. |
enforce_order |
True |
Require tasks to be completed in definition order. |
Task fields
| Field | Required | Description |
|---|---|---|
name |
yes | Unique identifier (used in prompts and state). |
instruction |
yes | Injected into system prompt when this task is active. |
tools |
yes | Tools visible when this task is IN_PROGRESS. |
middleware |
no | Scoped TaskMiddleware — only active during this task. |
Composability
TaskSteeringMiddleware is a standard AgentMiddleware. It composes with other LangChain v1 middleware:
from langchain.agents import create_agent
from langchain.agents.middleware import SummarizationMiddleware
agent = create_agent(
model="anthropic:claude-sonnet-4-6",
middleware=[
SummarizationMiddleware(
model="anthropic:claude-haiku-4-5-20251001",
trigger={"tokens": 8000},
),
pipeline,
],
)
Development
# Install with dev dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Run tests with coverage
pytest --cov=langchain_task_steering
Project structure
langchain-task-steering/
src/langchain_task_steering/
__init__.py # Public exports
types.py # Task, TaskMiddleware, TaskStatus, TaskSteeringState
middleware.py # TaskSteeringMiddleware implementation
tests/
conftest.py # Fixtures and mock objects
test_middleware.py # Test suite
examples/
simple_agent.py # End-to-end example with Bedrock
pyproject.toml
License
MIT — see LICENSE.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file langchain_task_steering-0.1.1.tar.gz.
File metadata
- Download URL: langchain_task_steering-0.1.1.tar.gz
- Upload date:
- Size: 16.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8001fb56fb26cf89d62b51f734d26415e622debe6fcb9c94f415ce61a57dec1a
|
|
| MD5 |
b755a2c3eefb2a2e8f1cdff326e8bdab
|
|
| BLAKE2b-256 |
4d294649582cb442e30ab4f0bdb7d470c286349e3d12fae7715f4cfa4708aeeb
|
File details
Details for the file langchain_task_steering-0.1.1-py3-none-any.whl.
File metadata
- Download URL: langchain_task_steering-0.1.1-py3-none-any.whl
- Upload date:
- Size: 11.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5b73f6a91b23cb180da13037e02c3a03d0b3b624b148d58f7111d1eb7ad97b03
|
|
| MD5 |
14128865a8e9c67befc0b490d6eb3fdf
|
|
| BLAKE2b-256 |
8109bab6aac0a82ab537a8230d9ced4fc66407eb2d8bae6446d22d8a9645242b
|