The production-ready agent harness framework for Python
Project description
Water
The production-ready agent harness framework for Python.
Overview
Water is an agent harness framework — it provides the infrastructure around your AI agents, not the agents themselves. Orchestration, resilience, observability, guardrails, approval gates, sandboxing, and deployment tooling so you can focus on what your agents actually do.
Works with any agent framework: LangChain, CrewAI, Agno, OpenAI, Anthropic, or your own custom agents.
Installation
pip install water-ai
Quick Start
import asyncio
from water import Flow, create_task
from pydantic import BaseModel
class NumberInput(BaseModel):
value: int
class NumberOutput(BaseModel):
result: int
def add_five(params, context):
return {"result": params["input_data"]["value"] + 5}
task = create_task(
id="add",
description="Add five",
input_schema=NumberInput,
output_schema=NumberOutput,
execute=add_five,
)
flow = Flow(id="math", description="Math flow").then(task).register()
async def main():
result = await flow.run({"value": 10})
print(result) # {"result": 15}
asyncio.run(main())
Flow Patterns
Water supports composable flow patterns that chain together with a fluent API:
flow = Flow(id="pipeline", description="Example pipeline")
# Sequential — tasks run one after another
flow.then(task_a).then(task_b).then(task_c)
# Parallel — tasks run concurrently, results are merged
flow.parallel([task_a, task_b, task_c])
# Conditional branching — route to different tasks based on data
flow.branch([
(lambda data: data["type"] == "email", email_task),
(lambda data: data["type"] == "sms", sms_task),
])
# Loop — repeat a task while a condition holds
flow.loop(lambda data: data["retries"] < 3, retry_task, max_iterations=5)
# Map — run a task for each item in a list (parallel)
flow.map(process_task, over="items")
# DAG — define tasks with explicit dependencies
flow.dag(
[task_a, task_b, task_c],
dependencies={"task_c": ["task_a", "task_b"]},
)
# SubFlow composition — nest flows with input/output mapping
from water import SubFlow, compose_flows
sub = SubFlow(inner_flow, input_mapping={"text": "raw_input"}, output_mapping={"clean": "text"})
flow.then(sub.as_task())
# Compose multiple flows sequentially
pipeline = compose_flows(flow_a, flow_b, flow_c, id="full_pipeline")
# Try-catch-finally — structured error handling
flow.try_catch(
try_tasks=[risky_task, process_task],
catch_task=error_handler,
finally_task=cleanup_task,
)
# Conditional execution & fallbacks
flow.then(task, when=lambda data: data["enabled"])
flow.then(task, fallback=fallback_task)
Agent Harness
Water provides infrastructure around your AI agents — not the agents themselves.
LLM Tasks
Use any LLM provider through a unified interface:
from water.agents import create_agent_task, OpenAIProvider, AnthropicProvider
agent = create_agent_task(
id="writer",
description="Write copy",
prompt_template="Write about: {topic}",
provider_instance=OpenAIProvider(model="gpt-4o"),
system_prompt="You are a copywriter.",
)
Streaming LLM Agents
Stream responses token-by-token with real-time callbacks:
from water.agents import create_streaming_agent_task, OpenAIStreamProvider
agent = create_streaming_agent_task(
id="stream_writer",
prompt_template="Write about: {topic}",
provider_instance=OpenAIStreamProvider(model="gpt-4o"),
on_chunk=lambda chunk: print(chunk.delta, end="", flush=True),
)
Multi-Agent Orchestration
Coordinate multiple agents with shared context:
from water.agents import create_agent_team, AgentRole
team = create_agent_team(
team_id="research_team",
roles=[
AgentRole(id="researcher", provider=OpenAIProvider(model="gpt-4o"),
system_prompt="Research the topic thoroughly."),
AgentRole(id="writer", provider=AnthropicProvider(model="claude-sonnet-4-20250514"),
system_prompt="Write a clear article."),
],
strategy="sequential", # or "round_robin", "dynamic"
)
Tool Use
Give agents the ability to call tools:
from water.agents import Tool, Toolkit, ToolExecutor
search = Tool(name="search", description="Search the web",
input_schema={"type": "object", "properties": {"query": {"type": "string"}}},
execute=lambda args: {"results": search_web(args["query"])})
executor = ToolExecutor(provider=OpenAIProvider(model="gpt-4o"),
tools=Toolkit(name="tools", tools=[search]), max_rounds=5)
result = await executor.run(messages=[{"role": "user", "content": "Search for AI news"}])
Prompt Templates
Reusable templates with variable interpolation and composition:
from water.agents import PromptTemplate, PromptLibrary
template = PromptTemplate("You are a {{role}}. {{action}}: {{content}}", defaults={"role": "assistant"})
result = template.render(action="Summarize", content="...")
library = PromptLibrary()
library.register("system", "You are a {{role}}.")
library.register("task", "{{action}}: {{input}}")
combined = library.compose("system", "task", separator="\n\n")
Fallback Chains
Automatically failover between LLM providers:
from water.agents import FallbackChain
chain = FallbackChain(
providers=[OpenAIProvider(model="gpt-4o"), AnthropicProvider(model="claude-sonnet-4-20250514")],
strategy="first_success", # also: "round_robin", "lowest_latency"
)
Batch Processing
Process many inputs concurrently with controlled parallelism:
from water.agents import BatchProcessor
processor = BatchProcessor(max_concurrency=5, retry_failed=True, max_retries=2)
result = await processor.run_batch(task=summarize_task, inputs=[{"text": doc} for doc in docs])
print(f"Success rate: {result.success_rate:.0%}")
Dynamic Planning
Let an LLM decompose goals into steps and execute them:
from water.agents import PlannerAgent, TaskRegistry
registry = TaskRegistry()
registry.register("search", search_task, "Search the web")
registry.register("summarize", summarize_task, "Summarize text")
planner = PlannerAgent(provider=OpenAIProvider(model="gpt-4o"), task_registry=registry)
result = await planner.plan_and_execute("Find and summarize AI news")
Context Management
Manage conversation context windows with automatic truncation:
from water.agents import ContextManager, TruncationStrategy
ctx = ContextManager(max_tokens=4096, strategy=TruncationStrategy.SLIDING_WINDOW, reserve_tokens=500)
trimmed = ctx.prepare_messages(long_conversation)
Approval Gates
Add human approval checkpoints for high-risk operations:
from water.agents import create_approval_task, ApprovalGate, ApprovalPolicy, RiskLevel
gate = ApprovalGate(policy=ApprovalPolicy(auto_approve_below=RiskLevel.MEDIUM, timeout=300.0))
approval = create_approval_task(id="prod_gate", action_description="Deploy to production",
risk_level=RiskLevel.CRITICAL, gate=gate)
Sandboxed Execution
Run untrusted code in isolated environments:
from water.agents import create_sandboxed_task, SandboxConfig, SubprocessSandbox
sandboxed = create_sandboxed_task(
id="run_code",
sandbox=SubprocessSandbox(), # also: InMemorySandbox(), DockerSandbox()
config=SandboxConfig(timeout=10.0, max_memory_mb=128),
)
Agentic Loop (ReAct)
The model controls the loop. create_agentic_task runs a Think-Act-Observe-Repeat cycle where the LLM decides which tools to call and when to stop:
from water.agents import create_agentic_task, Tool
search = Tool(name="search", description="Search the web",
input_schema={"type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"]},
execute=lambda query: f"Results for {query}")
agent = create_agentic_task(
id="researcher",
provider=OpenAIProvider(model="gpt-4o"),
tools=[search],
system_prompt="You are a research assistant.",
max_iterations=10,
stop_tool=True, # Inject __done__ tool for explicit stop signaling
on_step=lambda i, step: print(f"Step {i}: {step['think'][:50]}"),
on_tool_call=lambda name, args: False if name == "dangerous" else True,
stop_condition=lambda steps, history: len(history) >= 5,
)
Sub-Agent Isolation
Create child agents that run their own isolated ReAct loops with separate context windows:
from water.agents import SubAgentConfig, create_sub_agent_tool
researcher = create_sub_agent_tool(SubAgentConfig(
id="researcher",
provider=OpenAIProvider(model="gpt-4o"),
tools=[search_tool, read_file_tool],
system_prompt="You are a research specialist.",
max_iterations=5,
))
# Parent agent uses the sub-agent as a regular tool
parent = create_agentic_task(
id="orchestrator", provider=provider,
tools=[researcher, write_tool, test_tool],
system_prompt="Delegate research to your researcher.",
)
Layered Memory
Priority-ordered memory (ORG > PROJECT > USER > SESSION > AUTO_LEARNED) with automatic resolution:
from water.agents import MemoryManager, MemoryLayer, create_memory_tools
memory = MemoryManager()
await memory.add("timeout", "30s", MemoryLayer.ORG)
await memory.add("timeout", "5s", MemoryLayer.SESSION)
entry = await memory.get("timeout") # "30s" — ORG wins
# Give agents tools to manage their own memory
memory_tools = create_memory_tools(memory) # memory_store, memory_recall, memory_list
Semantic Tool Search
TF-IDF based tool selection for large toolkits — no external dependencies:
from water.agents import create_tool_selector
selector = create_tool_selector(tools=all_tools, top_k=5, always_include=["bash"])
agent = create_agentic_task(
id="smart-agent", provider=provider, tools=all_tools,
tool_selector=selector, # Narrows tools per iteration automatically
)
Guardrails
Validate, filter, and constrain agent outputs:
from water.guardrails import GuardrailChain, ContentFilter, SchemaGuardrail, CostGuardrail, TopicGuardrail
chain = GuardrailChain()
chain.add(ContentFilter(block_pii=True, block_injection=True))
chain.add(SchemaGuardrail(schema=OutputModel))
chain.add(CostGuardrail(max_tokens=4000, max_cost_usd=0.50))
chain.add(TopicGuardrail(allowed_topics=["python", "data science"]))
results = chain.check(output_data)
Retry with Feedback
Automatically retry failed guardrails with LLM feedback:
from water.guardrails import RetryWithFeedback
retry = RetryWithFeedback(max_retries=3,
feedback_template="Failed: {violations}. Fix and retry.")
result = await retry.execute_with_retry(execute_fn, check_fn, params, context)
Evaluation
Test and benchmark agent flows:
from water.eval import EvalSuite, EvalCase, ExactMatch, LLMJudge
suite = EvalSuite(
flow=math_flow,
evaluators=[ExactMatch(key="answer"), LLMJudge(provider=provider, rubric="Is this correct?")],
cases=[EvalCase(input={"q": "2+2"}, expected={"answer": "4"})],
)
report = await suite.run()
# CLI-based evaluation
water eval run eval_config.yaml
water eval compare run_001.json run_002.json
Resilience
Built-in patterns for production reliability:
from water.resilience import CircuitBreaker, RateLimiter, InMemoryCheckpoint, InMemoryDLQ
from water.resilience import FlowCache, InMemoryFlowCache, ProviderRateLimiter, ProviderLimits
# Circuit breaker — stop calling failing services
cb = CircuitBreaker(failure_threshold=5, recovery_timeout=30)
# Rate limiter — control throughput
limiter = RateLimiter(max_calls=100, period=60)
# Provider rate limiting — per-model RPM/TPM controls
provider_limiter = ProviderRateLimiter(limits={
"gpt-4o": {"rpm": 60, "tpm": 150_000},
"claude-sonnet-4-20250514": {"rpm": 40, "tpm": 100_000},
})
wait = await provider_limiter.acquire("gpt-4o", estimated_tokens=500)
# Flow caching — cache entire flow results
cache = FlowCache(backend=InMemoryFlowCache(), ttl=600)
# Checkpoint — resume flows after crashes
flow.checkpoint = InMemoryCheckpoint()
# Dead-letter queue — capture failed tasks
flow.dlq = InMemoryDLQ()
Tasks also support retry and timeout out of the box:
task = create_task(
id="flaky", execute=call_api,
retry_count=3, retry_delay=1.0, retry_backoff=2.0, timeout=30.0,
)
Integrations
MCP (Model Context Protocol)
Expose flows as MCP tools or call external MCP servers:
from water.integrations import MCPServer, MCPClient, create_mcp_task
server = MCPServer(flows=[my_flow])
client = MCPClient(server_url="http://localhost:3000")
mcp_task = create_mcp_task(id="search", client=client, tool_name="web_search")
A2A (Agent-to-Agent Protocol)
Expose flows as discoverable A2A agents or call remote agents:
from water.integrations import A2AServer, A2AClient, AgentSkill, create_a2a_task
# Serve
server = A2AServer(flow=my_flow, name="Research Agent",
skills=[AgentSkill(id="research", name="Research", description="Research topics")])
server.add_routes(app) # serves /.well-known/agent.json + /a2a
# Consume
client = A2AClient(agent_url="https://remote-agent.example.com")
task = await client.send_task(input_data={"topic": "quantum computing"})
Chat Adapters
Connect flows to Slack, Discord, or Telegram:
from water.integrations import ChatBot, SlackAdapter
bot = ChatBot(adapter=SlackAdapter(token="xoxb-..."), flows=[support_flow])
SSE Streaming & Event Triggers
from water.integrations import StreamingFlow, StreamManager, add_streaming_routes
from water.triggers import WebhookTrigger, CronTrigger, QueueTrigger, TriggerRegistry
# Stream flow events via SSE
streaming = StreamingFlow(flow, StreamManager())
# Trigger flows from webhooks (with HMAC verification), cron, or queues
webhook = WebhookTrigger("my_flow", path="/hooks", secret="shared-secret")
cron = CronTrigger("report_flow", schedule="0 9 * * 1-5", input_data={"type": "daily"})
queue = QueueTrigger("process_flow", max_size=1000)
registry = TriggerRegistry()
registry.register(webhook)
registry.register(cron)
await registry.start_all()
Observability
from water.observability import (TelemetryManager, FlowDashboard, CostTracker, TokenUsage,
StructuredLogger, auto_instrument)
# OpenTelemetry integration
telemetry = TelemetryManager(service_name="my-service")
# Built-in dashboard (served at /dashboard)
dashboard = FlowDashboard(storage=my_storage)
# Cost tracking with budget enforcement
tracker = CostTracker(budget_limit=10.0, on_budget_exceeded="warn")
flow.use(tracker)
summary = tracker.get_summary()
# Structured JSON logging with context
logger = StructuredLogger(level="info", format="json", redact_fields=["api_key"])
logger.set_context(flow_id="my_flow", execution_id="exec_001")
logger.info("Processing started", step="validation")
# Auto-instrumentation — zero-code tracing
instrumentor = auto_instrument(service_name="my-service", capture_input=True, capture_output=True)
flow.use(instrumentor)
# Execution replay — reproduce and debug past runs
from water.core.replay import ReplayEngine, ReplayConfig
engine = ReplayEngine(storage=my_storage)
result = await engine.replay(flow, session_id="exec_abc123",
config=ReplayConfig(from_task="transform", override_inputs={"transform": {"mode": "v2"}}))
Middleware, Hooks & Events
from water.middleware import HookManager, EventEmitter
# Hooks — register callbacks for lifecycle events
hooks = HookManager()
hooks.on("on_task_start", lambda task_id, **kw: print(f"Starting: {task_id}"))
hooks.on("on_task_error", lambda task_id, error, **kw: alert(error))
# Events — subscribe to real-time flow events
emitter = EventEmitter()
flow.events = emitter
subscription = emitter.subscribe()
async for event in subscription:
print(f"[{event.event_type}] {event.task_id}")
Plugins
Extend Water with custom storage, providers, middleware, guardrails, and integrations:
from water.plugins import PluginRegistry, WaterPlugin, PluginType
class MyPlugin(WaterPlugin):
name = "my_plugin"
plugin_type = PluginType.STORAGE
def register(self, app):
app.register_storage("custom", MyStorage())
registry = PluginRegistry()
registry.register(MyPlugin())
# Or auto-discover via entry points: registry.discover()
Flow Versioning
Track schema changes with compatibility checking and data migration:
from water import SchemaRegistry, snapshot_flow_schemas
registry = SchemaRegistry()
registry.register_version("my_flow", "1.0.0", snapshot_flow_schemas(flow_v1))
registry.register_version("my_flow", "2.0.0", snapshot_flow_schemas(flow_v2))
changes = registry.check_compatibility("my_flow", "1.0.0", "2.0.0")
migrated = registry.migrate_data("my_flow", old_data, "1.0.0", "2.0.0")
Server
Serve your flows as a REST API with one line:
from water.server import FlowServer
server = FlowServer(flows=[flow_a, flow_b])
app = server.get_app()
# Routes:
# GET /flows — list all flows
# GET /flows/{id} — flow details
# POST /flows/{id}/run — execute a flow
# GET /health — health check
# GET /dashboard — observability UI
uvicorn app:app --reload
CLI
# Run a flow
water run cookbook.core.sequential_flow:registration_flow --input '{"email": "a@b.com", "password": "secret", "first_name": "Water"}'
# Visualize as Mermaid diagram
water visualize cookbook.core.dag_flow:pipeline_flow
# Validate without executing
water dry-run cookbook.core.sequential_flow:registration_flow --input '{"email": "a@b.com"}'
# List all flows in a module
water list cookbook.core.sequential_flow
# Run evaluations
water eval run eval_config.yaml
water eval compare run_001.json run_002.json
water eval list ./evals/
# Deploy to Render
water flow prod:render --app playground
Architecture
water/
├── core/ # Flow, Task, ExecutionEngine, Context, SubFlow, Replay, Versioning
├── agents/ # LLM tasks, streaming, multi-agent, tools, context, prompts,
│ # fallback, batch, planner, approval, human-in-the-loop, sandbox,
│ # agentic loop (ReAct), sub-agents, layered memory, tool search
├── guardrails/ # Content filter, schema, cost, topic guardrails, retry-with-feedback
├── eval/ # EvalSuite, evaluators, CLI, YAML/JSON config
├── storage/ # InMemory, SQLite, Redis, Postgres backends
├── resilience/ # Circuit breaker, rate limiter, cache, checkpoint, DLQ,
│ # flow cache, provider rate limiter
├── middleware/ # Middleware, hooks, events
├── integrations/ # MCP, A2A protocol, chat adapters, SSE streaming
├── triggers/ # Webhook, cron, queue triggers with registry
├── observability/ # Telemetry, dashboard, cost tracking, structured logging,
│ # auto-instrumentation
├── plugins/ # Plugin registry with entry-point discovery
├── server/ # FlowServer (FastAPI)
├── tasks/ # Built-in task library (HTTP, JSON transform, file I/O, etc.)
└── utils/ # Testing, scheduler, declarative loader, secrets, CLI
Cookbook
The cookbook/ directory has 73 runnable examples organized by category:
| Category | Examples |
|---|---|
| core/ | Sequential, parallel, branching, loops, map, DAG, subflow, try-catch, replay, versioning, validation, contracts |
| agents/ | LLM tasks, streaming, multi-agent, tools, fallback chains, prompts, batch, planner, approval, human-in-the-loop, sandbox, agentic loop, sub-agents, memory, tool search |
| real_world/ | Claude Code-style coding agent |
| resilience/ | Circuit breaker, rate limiting, provider rate limits, flow cache, checkpointing, DLQ, caching, retry/timeout |
| observability/ | Cost tracking, auto-instrumentation, structured logging, tracing, telemetry, dashboard |
| integrations/ | MCP, A2A protocol, chat bots, SSE streaming, triggers |
| server/ | REST server, playground, deployment |
| utils/ | Testing, secrets, plugins, declarative flows, scheduler |
| storage/ | Storage backends |
| middleware/ | Hooks, events, middleware |
Contributing
We welcome contributions — bug reports, feature requests, code, docs, and testing.
License
Apache License 2.0. See LICENSE for details.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file water_ai-0.1.4.tar.gz.
File metadata
- Download URL: water_ai-0.1.4.tar.gz
- Upload date:
- Size: 290.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1cc7ce6c263efc94e45abb3f0e19581ac63b831a71fc7163bc278b7b33d65ac5
|
|
| MD5 |
13be4d26201b008d51462a70b8f1f42c
|
|
| BLAKE2b-256 |
2fd7502916d6eea032de0129ca38b2eb1cf78492ff1394d27fa4a342ea267d0e
|
File details
Details for the file water_ai-0.1.4-py3-none-any.whl.
File metadata
- Download URL: water_ai-0.1.4-py3-none-any.whl
- Upload date:
- Size: 227.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d0542f30aadbfa5a54babbd9ed628edb206cf636a70d1cee096109031e76c83a
|
|
| MD5 |
6697ef5a46f65ee7a90c31416c6a0bfa
|
|
| BLAKE2b-256 |
d0f94d00568716bf75d465beda547e467f9b5141c7f987fb521f3b8cee9ef7f7
|