Skip to main content

Zapier for AI agents. Connect to any API on the fly.

Project description

Liquid

The agent-native API fabric.

Liquid is the transformation layer between AI agents and any HTTP API — actively optimizing for the constraints real agents hit: token budgets, context windows, cross-API cognitive load, recovery from failures, and predictable cost.

PyPI License Python


Why agents need more than a tool wrapper

Shipping an agent against real APIs surfaces problems most HTTP clients ignore:

  • A single list_orders response eats 50k tokens of context
  • Stripe, Shopify, and Square represent "money" in three different shapes
  • A 401 from the API returns a string — the agent has to guess how to recover
  • Rate limits trip without warning; one agent run costs another one's budget
  • The agent has no way to ask "how much will this call cost me?" before making it

Liquid addresses each of these with a concrete primitive. Everything below is shipped and on PyPI.

What Liquid gives your agent

Context-budget control

# Search server-side instead of fetch-then-filter — 10-100x token savings
orders = await liquid.search(
    adapter, "/orders",
    where={"total_cents": {"$gt": 10000}, "status": "paid"},
    limit=20,
)

# Aggregate without ever seeing records
stats = await liquid.aggregate(
    adapter, "/orders",
    group_by="status",
    agg={"total_cents": "sum", "id": "count"},
)

# Full-text search across records (BM25-lite, ranked)
hits = await liquid.text_search(adapter, "/tickets", "shipping delay")

# Fetch only what fits in your budget
data = await liquid.fetch(adapter, "/orders", max_tokens=2000)
# -> _meta.truncated=True, _meta.truncated_at="item_42"

# Identity-plus-two-fields mode for context-constrained runs
data = await liquid.fetch(adapter, "/customers", verbosity="terse")

# Walk pages until a predicate matches, then stop
result = await liquid.fetch_until(
    adapter, "/orders",
    predicate={"customer_email": {"$eq": "vip@co.com"}},
    max_pages=20,
)

Cross-API normalization

liquid = Liquid(..., normalize_output=True)

# Stripe: {amount: 1000, currency: "usd"}
# PayPal: {value: "10.00", currency_code: "USD"}
# Square: {amount: 1000, currency: "USD"}
# All three normalize to:
Money(amount_cents=1000, currency="USD", amount_decimal=Decimal("10.00"))

Unix timestamps, ISO 8601, and RFC 2822 dates all collapse to datetime in UTC. Pagination envelopes ({data: [...]} / {results: [...]} / {items: [...]} / Link headers) flatten to a single PaginationEnvelope. ID fields normalize across id / _id / uid / uuid / *_id conventions.

Intent layer — canonical operations across APIs

# Same intent, any supported API
await liquid.execute_intent("charge_customer", {
    "customer_id": "cus_xyz",
    "amount_cents": 9999,
    "currency": "USD",
})
# Works on Stripe, Braintree, Square, Adyen — one agent mental model

Ten canonical intents ship today: charge_customer, refund_charge, create_customer, update_customer, list_orders, cancel_order, send_email, post_message, create_ticket, close_ticket.

Structured recovery — agents self-heal without parsing text

try:
    await liquid.fetch(adapter, "/orders")
except LiquidError as e:
    if e.recovery and e.recovery.next_action:
        # Agent dispatches the action directly — zero text parsing
        await agent.call_tool(
            e.recovery.next_action.tool,
            e.recovery.next_action.args,
        )

Every Fetcher / Executor error carries a Recovery with next_action: ToolCall, retry_safe: bool, and retry_after_seconds where applicable. 401 → store_credentials. 404/410 → repair_adapter. 429 → retry with retry_after_seconds.

Predictable cost — know before you call

est = await liquid.estimate_fetch(adapter, "/orders")
# FetchEstimate(
#   expected_items=250, expected_tokens=52_000, expected_cost_credits=1,
#   expected_latency_ms=800, confidence="high", source="empirical"
# )

if est.expected_tokens < my_budget:
    data = await liquid.fetch(adapter, "/orders")

Every tool emitted by to_tools() also carries a metadata block with cost_credits, typical_latency_ms, cached, cache_ttl_seconds, idempotent, side_effects, expected_result_size, and related_tools so agents can reason about which tool to pick.

Ambient state — no memorization needed

tools = await liquid.to_tools(format="anthropic")
# Auto-includes: liquid_check_quota, liquid_list_adapters, liquid_health_check,
# liquid_check_rate_limit, liquid_get_adapter_info, liquid_estimate_fetch,
# liquid_aggregate, liquid_text_search, liquid_search_nl, liquid_fetch_until,
# liquid_fetch_changes_since

The agent asks "how much budget do I have left?" by calling a tool instead of remembering state in its working memory (where it's unreliable).

Response _meta — provenance and truncation signals

liquid = Liquid(..., include_meta=True)
data = await liquid.fetch(adapter, "/orders")
# {
#   "data": [...],
#   "_meta": {
#     "source": "cache", "age_seconds": 180, "fresh": True,
#     "truncated": False, "total_count": 523, "next_cursor": "...",
#     "adapter": "shopify", "endpoint": "/orders",
#     "fetched_at": "2026-04-20T10:00:00Z", "confidence": 0.93
#   }
# }

Measured impact

Deterministic benchmarks on realistic agent tasks (500-order, 200-ticket fixtures, mocked HTTP) — reproducible via python -m benchmarks.run:

Task Metric Baseline With Liquid Delta
Find 10 orders over $100 tokens 75,482 1,519 −98%
Revenue by status (aggregate) tokens 75,482 115 −100%
Fetch customer (id+email only) tokens 424 12 −97%
Recover from 401 structured next_action no yes
Find the shipping ticket tokens 14,588 154 −99%
Stripe↔PayPal consistency field overlap 0.11 1.00 +9×
Skip wasted call via estimate tokens 14,943 0 −100%
max_tokens=2000 budget cap tokens 14,943 1,999 −87%

Full methodology + per-task breakdown: benchmarks/RESULTS.md.

Install

pip install liquid-api
# Framework integrations
pip install liquid-langchain   # LangChain / LangGraph
pip install liquid-crewai      # CrewAI

See it work — live, no pre-config

Point Liquid at an API it has never seen (no adapter, no OpenAPI spec, no auth) and get typed records back. AI is used once for discovery + mapping; every fetch after is pure HTTP. Runnable end to end — examples/live_quickstart.py:

liquid = Liquid(llm=my_llm, vault=InMemoryVault(), sink=CollectorSink(),
                registry=InMemoryAdapterRegistry())

adapter = await liquid.get_or_create(
    url="https://api.openbrewerydb.org/v1/breweries",
    target_model={"name": "str", "city": "str", "state": "str", "country": "str"},
    auto_approve=True,
)
data = await liquid.fetch(adapter)

Real output (Gemini as the LLM backend):

Connecting to an API Liquid has never seen:
  https://api.openbrewerydb.org/v1/breweries

  discovery method : rest_heuristic
  mapped fields    : ['name', 'city', 'state', 'country']
  LLM calls so far : 2  (discovery + mapping)

fetch() -> 50 typed records; first 3:
   {'name': '(405) Brewing Co', 'city': 'Norman', 'state': 'Oklahoma', 'country': 'United States'}
   {'name': '(512) Brewing Co', 'city': 'Austin', 'state': 'Texas', 'country': 'United States'}
   {'name': '1 of Us Brewing Company', 'city': 'Mount Pleasant', 'state': 'Wisconsin', 'country': 'United States'}

  LLM calls during fetch : 0
  LLM calls on 2nd fetch : 0

Two model calls to learn the API, then zero forever. That's the whole pitch.

Run as an MCP server (open source, self-hosted)

Expose the engine to any MCP client (Claude Desktop, Cursor, Claude Code). It runs the Liquid engine in your own process — no cloud, no account, no lock-in:

pip install 'liquid-api[mcp]'
export OPENAI_API_KEY=sk-...        # or GEMINI_API_KEY / ANTHROPIC_API_KEY,
                                    # or OPENAI_BASE_URL=http://localhost:11434/v1 for local (Ollama/vLLM)
liquid-mcp                          # or: python -m liquid.mcp_server

Claude Code:

claude mcp add liquid --scope user -e OPENAI_API_KEY=sk-... -- liquid-mcp

Claude Desktop / any MCP client:

{ "mcpServers": { "liquid": { "command": "liquid-mcp", "env": { "OPENAI_API_KEY": "sk-..." } } } }

Tools: liquid_connect (discover + map any API), liquid_fetch, liquid_query (server-side search/aggregate), liquid_list_adapters, liquid_discover. Adapters and credentials persist under ~/.liquid. Backed by your model — OpenAI, Gemini, Anthropic, or any OpenAI-compatible/local endpoint via base_url.

Real run — connecting to an API it had never seen, fully local:

liquid_connect → {"status":"connected","service":"Openbrewerydb","mapped_fields":["name","city","country"]}
liquid_fetch   → 50 typed records, e.g. {"name":"(405) Brewing Co","city":"Norman","country":"United States"}

Quick start — LangGraph agent with Shopify

from liquid import Liquid, InMemoryCache, RateLimiter
from liquid._defaults import InMemoryVault, InMemoryAdapterRegistry, CollectorSink
from liquid_langchain import LiquidToolkit
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI

liquid = Liquid(
    llm=my_llm,
    vault=InMemoryVault(),
    sink=CollectorSink(),
    registry=InMemoryAdapterRegistry(),
    cache=InMemoryCache(),
    rate_limiter=RateLimiter(),
    normalize_output=True,    # cross-API canonical shapes
    include_meta=True,        # _meta block on every response
)

adapter = await liquid.get_or_create(
    "https://api.shopify.com",
    target_model={"id": "str", "total_cents": "int", "customer_email": "str"},
    credentials={"access_token": "shpat_..."},
    auto_approve=True,
)

tools = LiquidToolkit(adapter, liquid).get_tools()

agent = create_react_agent(ChatOpenAI(model="gpt-4o-mini"), tools)
result = await agent.ainvoke({
    "messages": [("user", "Find 5 recent orders over $100 from VIP customers")],
})

The agent's tools come with rich descriptions (WHEN to use, NOT FOR what, return shape, cost), structured recovery on every error, and server-side search so it never pulls 500 orders to find 5.

Framework support

# Anthropic tool use
tools = adapter.to_tools(format="anthropic")

# OpenAI function calling
tools = adapter.to_tools(format="openai")

# MCP (Claude Desktop, Cursor)
tools = adapter.to_tools(format="mcp")

# CrewAI
from liquid_crewai import LiquidCrewToolkit
tools = LiquidCrewToolkit(adapter, liquid).get_tools()

# Opt out of metadata block on tools
tools = adapter.to_tools(format="openai", include_metadata=False)

Architecture

URL                           Agent
 ↓                              ↑
 DISCOVERY                   FETCH / EXECUTE / SEARCH / AGGREGATE
 ↓                              ↑
 MCP → OpenAPI → GraphQL     Deterministic HTTP + transforms
 → REST heuristic → Browser     • Query DSL (server-side filter)
          ↓                     • Output normalization
       APISchema                • Verbosity / max_tokens / _meta
          ↓                     • Structured recovery
 AI MAPPING (setup only)        • Rate-limit-aware token bucket
          ↓                     • Response cache (Cache-Control aware)
       AdapterConfig            • Empirical probing data (Cloud)

AI participates at setup only. Runtime is pure HTTP with transforms — no LLM per call, predictable cost, reproducible behavior. The agent UX layer on top doesn't call an LLM either (except search_nl, which caches compilations).

Discovery pipeline

Method Where it looks Cost
MCP /mcp Low (native protocol)
OpenAPI /openapi.json, /swagger.json, /v3/api-docs Low
GraphQL /graphql (introspection) Low
REST heuristic common paths + LLM interpretation Medium
Browser Playwright capturing network High

2,500+ APIs are pre-discovered and pre-mapped in the global catalog — most popular services connect with zero discovery cost.

Protocols

Every component is a swappable Protocol:

from liquid.protocols import (
    Vault, LLMBackend, DataSink, KnowledgeStore,
    AdapterRegistry, CacheStore,
)

In-memory implementations ship for all of them. liquid-cloud provides PostgresVault, RedisCache, etc. for hosted deployments.

Ecosystem

Package Purpose
liquid-api Core library (this repo)
liquid-langchain LangChain / LangGraph integration
liquid-crewai CrewAI integration
liquid-cli liquid init quickstart
Liquid Cloud Hosted service + global catalog + empirical probing

Examples

Comparison

Feature Liquid Zapier LangChain tool DIY
API discovery yes no no no
Server-side search / aggregate yes no no partial
Cross-API output normalization yes partial no no
Structured recovery with next_action yes no no no
Intent layer (canonical operations) yes partial no no
Pre-flight cost estimate yes no no no
Self-healing on schema drift yes no no no
MCP + A2A + LangChain + CrewAI native yes no partial no
Open source yes no yes n/a

Documentation

License

AGPL-3.0. Commercial license available for closed-source deployments — contact hello@ertad.com.

Contributing

Community

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

liquid_api-0.35.0.tar.gz (360.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

liquid_api-0.35.0-py3-none-any.whl (200.0 kB view details)

Uploaded Python 3

File details

Details for the file liquid_api-0.35.0.tar.gz.

File metadata

  • Download URL: liquid_api-0.35.0.tar.gz
  • Upload date:
  • Size: 360.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for liquid_api-0.35.0.tar.gz
Algorithm Hash digest
SHA256 cc834f6548cbfc060e61d41fd53eb3f4c34b096f156333df781a19758fafc55c
MD5 6faf785728e1142b4aa7f195298f9f73
BLAKE2b-256 61abfbb1beafc1e1173e4e2863412ebb8666d7bf3803fc1994aedafdc19df428

See more details on using hashes here.

File details

Details for the file liquid_api-0.35.0-py3-none-any.whl.

File metadata

  • Download URL: liquid_api-0.35.0-py3-none-any.whl
  • Upload date:
  • Size: 200.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for liquid_api-0.35.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0cc9188fea22054bd762bffd1ce2e3372cbf9aaf767ba4d832b0904644e85547
MD5 76eeadccdb5ca83bf8d803124d00b480
BLAKE2b-256 d8e9c6210630718359c26955b676680c2916175234439d8b760c8294f48713f2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page