Skip to main content

Zapier for AI agents. Connect to any API on the fly.

Project description

Liquid

Connect your AI agent to anything.

APIs, databases, and other agents — discovered automatically, read and write, through one stable, token-efficient, self-healing interface. No per-service connector to write or maintain: Liquid learns each one, and re-learns it when it changes.

PyPI License Python


What an agent can reach through Liquid

One agent-facing API (fetch · query · write · sense) over everything an agent might need to touch — Liquid figures out how to talk to it so the agent doesn't have to. It's the agent's senses and hands: fetch/query probe, sense perceives a live event stream, write acts on the world.

  • Web APIs — REST/JSON, GraphQL, SOAP/WSDL, gRPC, WebSocket
  • Other agents & tools — any MCP server, A2A agents, ChatGPT-plugin manifests
  • Databases — Postgres (+ pgvector), MySQL/MariaDB, SQLite, DuckDB, SQL Server, Neo4j (graph), MongoDB (documents), Redis (key-value)

Point it at a https://… endpoint, a postgres://… / mongodb://… / redis://… DSN, a grpc://… target, or another MCP server — discovery identifies the interface, learns its shape, and hands your agent typed records. The same fetch/query/write works regardless of what's underneath. No per-service connector to hand-write; the integration maintains itself when the upstream changes.

# A web API it has never seen — no spec, no connector, no auth
adapter = await liquid.get_or_create(
    "https://api.openbrewerydb.org/v1/breweries",
    target_model={"name": "str", "city": "str", "country": "str"},
    auto_approve=True,
)
breweries = await liquid.fetch(adapter)            # typed records

# A database is just another interface — same API, and it writes too
db = await liquid.get_or_create("postgresql://reader@host/shop",
                                target_model={"id": "int", "email": "str"},
                                auto_approve=True)
orders = await liquid.fetch(db, "/public/orders")
await liquid.write(db, "/public/orders", op="insert",
                   values={"email": "a@b.com", "total_cents": 9900},
                   allow_write=True)               # opt-in; mutates the store

You hand-write no connector and no schema: an LLM learns the interface once at setup (databases introspect themselves and skip even that), and the integration repairs itself when the upstream drifts. The runtime is plain deterministic transport — predictable cost, reproducible behavior, nothing to babysit.

Built for the constraints real agents hit

Reaching everything is half of it. The other half is that agents pay for every token, get confused by inconsistent shapes, and can't parse error prose. Liquid answers each with a concrete primitive — all shipped, all on PyPI.

Context-budget control

# Search / aggregate server-side instead of fetch-then-filter — 10-100x fewer tokens
orders = await liquid.search(adapter, "/orders",
    where={"total_cents": {"$gt": 10000}, "status": "paid"}, limit=20)

stats = await liquid.aggregate(adapter, "/orders",
    group_by="status", agg={"total_cents": "sum", "id": "count"})

hits = await liquid.text_search(adapter, "/tickets", "shipping delay")  # BM25-lite

data = await liquid.fetch(adapter, "/orders", max_tokens=2000)      # budget cap
data = await liquid.fetch(adapter, "/customers", verbosity="terse") # id + 1-2 fields

Cross-source normalization

liquid = Liquid(..., normalize_output=True)
# Stripe {amount:1000,currency:"usd"} · PayPal {value:"10.00",currency_code:"USD"}
#   → Money(amount_cents=1000, currency="USD", amount_decimal=Decimal("10.00"))

Timestamps (Unix / ISO 8601 / RFC 2822) collapse to UTC datetime; pagination envelopes ({data:[…]} / {results:[…]} / Link headers) flatten; ID fields normalize across id / _id / uuid / *_id.

Canonical intents — one mental model across services

await liquid.execute_intent(adapter, "charge_customer",
    {"customer_id": "cus_xyz", "amount_cents": 9999, "currency": "USD"})
# Same intent on Stripe / Braintree / Square / Adyen — 71 canonical intents

Structured recovery — agents self-heal without parsing text

try:
    await liquid.fetch(adapter, "/orders")
except LiquidError as e:
    if e.recovery and e.recovery.next_action:
        await agent.call_tool(e.recovery.next_action.tool, e.recovery.next_action.args)

Every error carries a Recovery with next_action: ToolCall, retry_safe, and retry_after_seconds. 401 → store_credentials. 404/410 → repair_adapter. 429 → retry after the given delay. And when the upstream's schema drifts, adapters self-heal (repair_adapter) — the agent keeps working.

Predictable cost — know before you call

est = await liquid.estimate_fetch(adapter, "/orders")
# FetchEstimate(expected_items=250, expected_tokens=52_000, confidence="high", …)
if est.expected_tokens < my_budget:
    data = await liquid.fetch(adapter, "/orders")

Tools emitted by to_tools() carry a metadata block (cost_credits, typical_latency_ms, cached, idempotent, side_effects, related_tools) so the agent can reason about which tool to pick — and ambient tools (liquid_check_quota, liquid_list_adapters, …) let it ask about state instead of memorizing it.


Measured impact

Deterministic benchmarks on realistic agent tasks (500-order, 200-ticket fixtures, mocked HTTP) — reproducible via python -m benchmarks.run:

Task Metric Baseline With Liquid Delta
Find 10 orders over $100 tokens 75,482 1,519 −98%
Revenue by status (aggregate) tokens 75,482 115 −100%
Fetch customer (id+email only) tokens 424 12 −97%
Recover from 401 structured next_action no yes
Find the shipping ticket tokens 14,588 154 −99%
Stripe↔PayPal consistency field overlap 0.11 1.00 +9×
Skip wasted call via estimate tokens 14,943 0 −100%
max_tokens=2000 budget cap tokens 14,943 1,999 −87%

Full methodology + per-task breakdown: benchmarks/RESULTS.md.

Install

pip install liquid-api               # core + bundled MCP server (the `liquid-mcp` command)
pip install 'liquid-api[litellm]'    # any of 100+ LLM providers (or [gemini] / [anthropic])
pip install 'liquid-api[grpc]'       # gRPC transport (reflection)
pip install 'liquid-api[ws]'         # WebSocket transport
pip install 'liquid-api[pg]'         # Postgres / pgvector (asyncpg)
pip install 'liquid-api[mysql]'      # MySQL / MariaDB (aiomysql); SQLite needs no extra
pip install 'liquid-api[neo4j]'      # Neo4j graph (Bolt / Cypher)
pip install 'liquid-api[duckdb]'     # DuckDB (embedded analytics)
pip install 'liquid-api[mssql]'      # SQL Server (ODBC; needs a system ODBC driver)
pip install 'liquid-api[mongodb]'    # MongoDB (collections as endpoints)
pip install 'liquid-api[redis]'      # Redis (keyspace namespaces as endpoints)
# Framework integrations
pip install liquid-langchain   # LangChain / LangGraph
pip install liquid-crewai      # CrewAI

The core is dependency-free — every backend's library is an optional extra, imported only when used.

See it work — live, no pre-config

Point Liquid at an API it has never seen (no adapter, no OpenAPI spec, no auth) and get typed records back — you write no connector; discovery + mapping is the only place a model runs. Runnable end to end via examples/live_quickstart.py:

Connecting to an API Liquid has never seen:
  https://api.openbrewerydb.org/v1/breweries

  discovery method : rest_heuristic
  mapped fields    : ['name', 'city', 'state', 'country']
  LLM calls so far : 2  (discovery + mapping)

fetch() -> 50 typed records; first 3:
   {'name': '(405) Brewing Co', 'city': 'Norman', 'state': 'Oklahoma', 'country': 'United States'}
   {'name': '(512) Brewing Co', 'city': 'Austin', 'state': 'Texas', 'country': 'United States'}
   {'name': '1 of Us Brewing Company', 'city': 'Mount Pleasant', 'state': 'Wisconsin', 'country': 'United States'}

  LLM calls during fetch : 0
  LLM calls on 2nd fetch : 0

You wrote no connector, no schema, no auth glue — Liquid learned the interface for you, and will re-learn it if it changes. That's the point: integrations you don't build or babysit.

Run as an MCP server (open source, self-hosted)

Expose the engine to any MCP client (Claude Desktop, Cursor, Claude Code) — it runs in your own process, no cloud, no account, no lock-in:

Add to Cursor

One-click in Cursor (the button writes the server into your mcp.json; add your OPENAI_API_KEY in Cursor's MCP settings afterward). Or set it up manually:

pip install liquid-api
export OPENAI_API_KEY=sk-...        # or GEMINI_API_KEY / ANTHROPIC_API_KEY,
                                    # or OPENAI_BASE_URL=http://localhost:11434/v1 for local (Ollama/vLLM)
liquid-mcp                          # or: python -m liquid.mcp_server

Zero-install with uvx (the liquid-mcp package makes the command run by name) — Claude Code:

claude mcp add liquid --scope user -e OPENAI_API_KEY=sk-... -- uvx liquid-mcp

Claude Desktop / any MCP client:

{ "mcpServers": { "liquid": {
  "command": "uvx",
  "args": ["liquid-mcp"],
  "env": { "OPENAI_API_KEY": "sk-..." }
} } }

(Or after pip install liquid-api, drop uvx and use "command": "liquid-mcp" directly.)

One-click in Claude Desktop: install the .mcpb bundle — it prompts for your model key on install (stored in the OS keychain), with no JSON to edit. Requires uv on the machine.

Tools: liquid_connect (discover + map any interface), liquid_fetch, liquid_query (server-side search/aggregate), liquid_estimate (pre-flight cost/size, no call), liquid_list_adapters, liquid_discover. The surface is read-only by default; start the server with LIQUID_ALLOW_WRITES=1 to also expose liquid_execute (database insert/update/delete). Adapters and credentials persist under ~/.liquid. Backed by any LLM — OpenAI, Gemini, Anthropic, any OpenAI-compatible/local endpoint via base_url, 100+ providers via LiteLLM, or your own function through CallableBackend.

Quick start — LangGraph agent

from liquid import Liquid, InMemoryCache, RateLimiter
from liquid._defaults import InMemoryVault, InMemoryAdapterRegistry, CollectorSink
from liquid_langchain import LiquidToolkit
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI

liquid = Liquid(
    llm=my_llm, vault=InMemoryVault(), sink=CollectorSink(),
    registry=InMemoryAdapterRegistry(), cache=InMemoryCache(), rate_limiter=RateLimiter(),
    normalize_output=True,    # cross-source canonical shapes
    include_meta=True,        # _meta block on every response
)

adapter = await liquid.get_or_create(
    "https://api.shopify.com",
    target_model={"id": "str", "total_cents": "int", "customer_email": "str"},
    credentials={"access_token": "shpat_..."},
    auto_approve=True,
)

tools = LiquidToolkit(adapter, liquid).get_tools()
agent = create_react_agent(ChatOpenAI(model="gpt-4o-mini"), tools)
result = await agent.ainvoke(
    {"messages": [("user", "Find 5 recent orders over $100 from VIP customers")]}
)

The agent's tools come with rich descriptions (WHEN to use, NOT FOR what, return shape, cost), structured recovery on every error, and server-side search so it never pulls 500 orders to find 5.

Every interface, one API

Discovery identifies the target and tags each endpoint with a protocol; a pluggable transport driver runs it — but the agent-facing API (fetch, query, write, mapping, recovery, cache, rate limits) is identical across all of them.

Interface Runtime Write Install
REST / HTTP+JSON ✅ actions (POST/PUT/PATCH/DELETE)
GraphQL ✅ query + Relay pagination ✅ mutations
SOAP / WSDL ✅ stdlib XML
gRPC ✅ unary + server-streaming (reflection) liquid-api[grpc]
WebSocket ✅ bounded batch reads + subscribe liquid-api[ws]
MCP (agent) ✅ call tools / read resources ✅ tool calls
A2A (agent) ✅ JSON-RPC message/send to AgentCard skills
Postgres (+pgvector) ✅ tables/views, filters, pagination, vector search liquid-api[pg]
MySQL / MariaDB ✅ tables/views, filters, pagination liquid-api[mysql]
SQLite ✅ tables/views, filters, pagination — (stdlib)
DuckDB ✅ tables/views, filters, pagination liquid-api[duckdb]
SQL Server ✅ tables/views, OFFSET/FETCH pagination liquid-api[mssql]
Neo4j (graph) ✅ labels/relationship types, property filters ✅ node CRUD liquid-api[neo4j]
MongoDB (document) ✅ collections, field filters, pagination liquid-api[mongodb]
Redis (key-value) ✅ keyspace namespaces, typed values, SCAN paging ✅ SET/HSET/DEL liquid-api[redis]

Read and write. liquid.write(adapter, endpoint, op="insert", values={...}, allow_write=True) mutates any database (SQL INSERT/UPDATE/DELETE, Mongo insert/update/delete, Redis SET/HSET/DEL, Neo4j node CRUD); web/agent writes go through verified actions. Identifiers come from introspection and values are parameterized; update/delete require a where (no blanket mutations); writes are off until you opt in with allow_write=True.

Discovery is automatic — and identifies on the fly. Before the pipeline runs, a fingerprint step names the target: a bare host:port is normalized by well-known port (db:5432postgresql://db:5432), and liquid.identify(url) answers "what is this, and is its driver installed?" with an install hint when a backend is missing. (Identifying a protocol is feasible on the fly; speaking a new authenticated binary protocol isn't — so unknowns are named, not guessed at.)

Discovery Where it looks Cost
Databases catalog introspection (postgres://, mysql://, mongodb://, redis://, neo4j://, …) Low
gRPC / WebSocket server reflection / frame sampling Low
MCP / A2A / Plugin /mcp, /.well-known/agent-card.json, /.well-known/ai-plugin.json Low
OpenAPI / GraphQL / SOAP spec, introspection, or WSDL Low
REST heuristic common paths + LLM interpretation Medium
Browser Playwright capturing network High

Add a backend without writing code. For the SQL family the contract is declarative enough to be data: a dialect manifest (quoting, placeholder style, pagination, introspection SQL, error map, DBAPI2 module) registered via register_sql_manifest({...}) installs a working driver + discovery — so a new SQL / wire-compatible store (CockroachDB, ClickHouse, any DBAPI2 driver), even one fetched from the network as JSON, connects without a release. New protocols otherwise plug in via the liquid.transport.ProtocolDriver protocol; SQL backends share a dialect-aware core, so a new one is a ~80-line adapter.

2,500+ APIs are pre-discovered and pre-mapped in the global catalog — most popular services connect with zero discovery cost.

Architecture

URL / DSN                       Agent
   ↓                              ↑
 FINGERPRINT → DISCOVERY        FETCH · QUERY · WRITE · SEARCH · AGGREGATE
   ↓                              ↑
 one ProtocolDriver per          Deterministic per-protocol transport
 interface:                        • Query DSL (server-side filter)
   REST GraphQL SOAP gRPC WS       • Output normalization
   MCP A2A · SQL graph doc KV      • Verbosity / max_tokens / _meta
   ↓                              • Structured recovery + self-heal
 APISchema                        • Rate-limit-aware token bucket
   ↓                              • Response cache (Cache-Control aware)
 AI MAPPING (setup only)          • Empirical probing data (Cloud)
   ↓
 AdapterConfig

AI participates at setup only. Runtime is pure transport with transforms — no LLM per call, predictable cost, reproducible behavior (except search_nl, which caches its compilations).

Swappable components

Every cross-cutting concern is a Protocol you can replace:

from liquid.protocols import (
    Vault, LLMBackend, DataSink, KnowledgeStore, AdapterRegistry, CacheStore,
)

In-memory implementations ship for all of them; liquid-cloud provides PostgresVault, RedisCache, etc. for hosted deployments.

Framework support

adapter.to_tools(format="anthropic")   # Claude tool use
adapter.to_tools(format="openai")      # OpenAI function calling
adapter.to_tools(format="mcp")         # MCP (Claude Desktop, Cursor)
from liquid_crewai import LiquidCrewToolkit  # CrewAI

Ecosystem

Package Purpose
liquid-api Core library (this repo)
liquid-langchain LangChain / LangGraph integration
liquid-crewai CrewAI integration
liquid-cli liquid init quickstart
Liquid Cloud Hosted service + global catalog + empirical probing

Comparison

Feature Liquid Zapier LangChain tool DIY
Auto-discovers any interface (no curated connector) yes no no no
APIs + databases + agents in one layer yes partial no no
Read and write through one API yes yes partial no
Server-side search / aggregate yes no no partial
Cross-source output normalization yes partial no no
Structured recovery with next_action yes no no no
Self-healing on schema drift yes no no no
Pre-flight cost estimate yes no no no
MCP + A2A + LangChain + CrewAI native yes no partial no
Open source yes no yes n/a

Documentation

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

liquid_api-0.51.0.tar.gz (623.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

liquid_api-0.51.0-py3-none-any.whl (292.7 kB view details)

Uploaded Python 3

File details

Details for the file liquid_api-0.51.0.tar.gz.

File metadata

  • Download URL: liquid_api-0.51.0.tar.gz
  • Upload date:
  • Size: 623.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for liquid_api-0.51.0.tar.gz
Algorithm Hash digest
SHA256 dcc9b650b74498a65fc47b857e3acee36641784ac2b7a108ad5e523319b5e1c7
MD5 ee44a73fa03a135ef6a670510d042c0f
BLAKE2b-256 dafc6e589b64917d0de80810f3dc998f9a17bb8235d1cb242a806ca1c8e8287f

See more details on using hashes here.

File details

Details for the file liquid_api-0.51.0-py3-none-any.whl.

File metadata

  • Download URL: liquid_api-0.51.0-py3-none-any.whl
  • Upload date:
  • Size: 292.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for liquid_api-0.51.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d26c912c8ad9e2a756edd6e6f927cefae4156ded147b2362c9cab9f69b8c1a47
MD5 06817163f5029f3b98e67b7363ed7953
BLAKE2b-256 8e5a0b78636dfe7cf4c8795ed3b2a66d48d9c41f04710a89e1f43884b02955fd

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page