Skip to main content

Zapier for AI agents. Connect to any API on the fly.

Project description

Liquid

Connect your AI agent to anything.

APIs, databases, and other agents — discovered automatically, read and write, through one stable, token-efficient, self-healing interface. No per-service connector to write or maintain: Liquid learns each one, and re-learns it when it changes.

PyPI License Python


What an agent can reach through Liquid

One agent-facing API (fetch · query · write) over everything an agent might need to touch — Liquid figures out how to talk to it so the agent doesn't have to:

  • Web APIs — REST/JSON, GraphQL, SOAP/WSDL, gRPC, WebSocket
  • Other agents & tools — any MCP server, A2A agents, ChatGPT-plugin manifests
  • Databases — Postgres (+ pgvector), MySQL/MariaDB, SQLite, DuckDB, SQL Server, Neo4j (graph), MongoDB (documents), Redis (key-value)

Point it at a https://… endpoint, a postgres://… / mongodb://… / redis://… DSN, a grpc://… target, or another MCP server — discovery identifies the interface, learns its shape, and hands your agent typed records. The same fetch/query/write works regardless of what's underneath. No per-service connector to hand-write; the integration maintains itself when the upstream changes.

# A web API it has never seen — no spec, no connector, no auth
adapter = await liquid.get_or_create(
    "https://api.openbrewerydb.org/v1/breweries",
    target_model={"name": "str", "city": "str", "country": "str"},
    auto_approve=True,
)
breweries = await liquid.fetch(adapter)            # typed records

# A database is just another interface — same API, and it writes too
db = await liquid.get_or_create("postgresql://reader@host/shop",
                                target_model={"id": "int", "email": "str"},
                                auto_approve=True)
orders = await liquid.fetch(db, "/public/orders")
await liquid.write(db, "/public/orders", op="insert",
                   values={"email": "a@b.com", "total_cents": 9900},
                   allow_write=True)               # opt-in; mutates the store

You hand-write no connector and no schema: an LLM learns the interface once at setup (databases introspect themselves and skip even that), and the integration repairs itself when the upstream drifts. The runtime is plain deterministic transport — predictable cost, reproducible behavior, nothing to babysit.

Built for the constraints real agents hit

Reaching everything is half of it. The other half is that agents pay for every token, get confused by inconsistent shapes, and can't parse error prose. Liquid answers each with a concrete primitive — all shipped, all on PyPI.

Context-budget control

# Search / aggregate server-side instead of fetch-then-filter — 10-100x fewer tokens
orders = await liquid.search(adapter, "/orders",
    where={"total_cents": {"$gt": 10000}, "status": "paid"}, limit=20)

stats = await liquid.aggregate(adapter, "/orders",
    group_by="status", agg={"total_cents": "sum", "id": "count"})

hits = await liquid.text_search(adapter, "/tickets", "shipping delay")  # BM25-lite

data = await liquid.fetch(adapter, "/orders", max_tokens=2000)      # budget cap
data = await liquid.fetch(adapter, "/customers", verbosity="terse") # id + 1-2 fields

Cross-source normalization

liquid = Liquid(..., normalize_output=True)
# Stripe {amount:1000,currency:"usd"} · PayPal {value:"10.00",currency_code:"USD"}
#   → Money(amount_cents=1000, currency="USD", amount_decimal=Decimal("10.00"))

Timestamps (Unix / ISO 8601 / RFC 2822) collapse to UTC datetime; pagination envelopes ({data:[…]} / {results:[…]} / Link headers) flatten; ID fields normalize across id / _id / uuid / *_id.

Canonical intents — one mental model across services

await liquid.execute_intent(adapter, "charge_customer",
    {"customer_id": "cus_xyz", "amount_cents": 9999, "currency": "USD"})
# Same intent on Stripe / Braintree / Square / Adyen — 71 canonical intents

Structured recovery — agents self-heal without parsing text

try:
    await liquid.fetch(adapter, "/orders")
except LiquidError as e:
    if e.recovery and e.recovery.next_action:
        await agent.call_tool(e.recovery.next_action.tool, e.recovery.next_action.args)

Every error carries a Recovery with next_action: ToolCall, retry_safe, and retry_after_seconds. 401 → store_credentials. 404/410 → repair_adapter. 429 → retry after the given delay. And when the upstream's schema drifts, adapters self-heal (repair_adapter) — the agent keeps working.

Predictable cost — know before you call

est = await liquid.estimate_fetch(adapter, "/orders")
# FetchEstimate(expected_items=250, expected_tokens=52_000, confidence="high", …)
if est.expected_tokens < my_budget:
    data = await liquid.fetch(adapter, "/orders")

Tools emitted by to_tools() carry a metadata block (cost_credits, typical_latency_ms, cached, idempotent, side_effects, related_tools) so the agent can reason about which tool to pick — and ambient tools (liquid_check_quota, liquid_list_adapters, …) let it ask about state instead of memorizing it.


Measured impact

Deterministic benchmarks on realistic agent tasks (500-order, 200-ticket fixtures, mocked HTTP) — reproducible via python -m benchmarks.run:

Task Metric Baseline With Liquid Delta
Find 10 orders over $100 tokens 75,482 1,519 −98%
Revenue by status (aggregate) tokens 75,482 115 −100%
Fetch customer (id+email only) tokens 424 12 −97%
Recover from 401 structured next_action no yes
Find the shipping ticket tokens 14,588 154 −99%
Stripe↔PayPal consistency field overlap 0.11 1.00 +9×
Skip wasted call via estimate tokens 14,943 0 −100%
max_tokens=2000 budget cap tokens 14,943 1,999 −87%

Full methodology + per-task breakdown: benchmarks/RESULTS.md.

Install

pip install liquid-api               # core + bundled MCP server (the `liquid-mcp` command)
pip install 'liquid-api[litellm]'    # any of 100+ LLM providers (or [gemini] / [anthropic])
pip install 'liquid-api[grpc]'       # gRPC transport (reflection)
pip install 'liquid-api[ws]'         # WebSocket transport
pip install 'liquid-api[pg]'         # Postgres / pgvector (asyncpg)
pip install 'liquid-api[mysql]'      # MySQL / MariaDB (aiomysql); SQLite needs no extra
pip install 'liquid-api[neo4j]'      # Neo4j graph (Bolt / Cypher)
pip install 'liquid-api[duckdb]'     # DuckDB (embedded analytics)
pip install 'liquid-api[mssql]'      # SQL Server (ODBC; needs a system ODBC driver)
pip install 'liquid-api[mongodb]'    # MongoDB (collections as endpoints)
pip install 'liquid-api[redis]'      # Redis (keyspace namespaces as endpoints)
# Framework integrations
pip install liquid-langchain   # LangChain / LangGraph
pip install liquid-crewai      # CrewAI

The core is dependency-free — every backend's library is an optional extra, imported only when used.

See it work — live, no pre-config

Point Liquid at an API it has never seen (no adapter, no OpenAPI spec, no auth) and get typed records back — you write no connector; discovery + mapping is the only place a model runs. Runnable end to end via examples/live_quickstart.py:

Connecting to an API Liquid has never seen:
  https://api.openbrewerydb.org/v1/breweries

  discovery method : rest_heuristic
  mapped fields    : ['name', 'city', 'state', 'country']
  LLM calls so far : 2  (discovery + mapping)

fetch() -> 50 typed records; first 3:
   {'name': '(405) Brewing Co', 'city': 'Norman', 'state': 'Oklahoma', 'country': 'United States'}
   {'name': '(512) Brewing Co', 'city': 'Austin', 'state': 'Texas', 'country': 'United States'}
   {'name': '1 of Us Brewing Company', 'city': 'Mount Pleasant', 'state': 'Wisconsin', 'country': 'United States'}

  LLM calls during fetch : 0
  LLM calls on 2nd fetch : 0

You wrote no connector, no schema, no auth glue — Liquid learned the interface for you, and will re-learn it if it changes. That's the point: integrations you don't build or babysit.

Run as an MCP server (open source, self-hosted)

Expose the engine to any MCP client (Claude Desktop, Cursor, Claude Code) — it runs in your own process, no cloud, no account, no lock-in:

pip install liquid-api
export OPENAI_API_KEY=sk-...        # or GEMINI_API_KEY / ANTHROPIC_API_KEY,
                                    # or OPENAI_BASE_URL=http://localhost:11434/v1 for local (Ollama/vLLM)
liquid-mcp                          # or: python -m liquid.mcp_server

Zero-install with uvx (the liquid-mcp package makes the command run by name) — Claude Code:

claude mcp add liquid --scope user -e OPENAI_API_KEY=sk-... -- uvx liquid-mcp

Claude Desktop / any MCP client:

{ "mcpServers": { "liquid": {
  "command": "uvx",
  "args": ["liquid-mcp"],
  "env": { "OPENAI_API_KEY": "sk-..." }
} } }

(Or after pip install liquid-api, drop uvx and use "command": "liquid-mcp" directly.)

Tools: liquid_connect (discover + map any interface), liquid_fetch, liquid_query (server-side search/aggregate), liquid_estimate (pre-flight cost/size, no call), liquid_list_adapters, liquid_discover. The surface is read-only by default; start the server with LIQUID_ALLOW_WRITES=1 to also expose liquid_execute (database insert/update/delete). Adapters and credentials persist under ~/.liquid. Backed by any LLM — OpenAI, Gemini, Anthropic, any OpenAI-compatible/local endpoint via base_url, 100+ providers via LiteLLM, or your own function through CallableBackend.

Quick start — LangGraph agent

from liquid import Liquid, InMemoryCache, RateLimiter
from liquid._defaults import InMemoryVault, InMemoryAdapterRegistry, CollectorSink
from liquid_langchain import LiquidToolkit
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI

liquid = Liquid(
    llm=my_llm, vault=InMemoryVault(), sink=CollectorSink(),
    registry=InMemoryAdapterRegistry(), cache=InMemoryCache(), rate_limiter=RateLimiter(),
    normalize_output=True,    # cross-source canonical shapes
    include_meta=True,        # _meta block on every response
)

adapter = await liquid.get_or_create(
    "https://api.shopify.com",
    target_model={"id": "str", "total_cents": "int", "customer_email": "str"},
    credentials={"access_token": "shpat_..."},
    auto_approve=True,
)

tools = LiquidToolkit(adapter, liquid).get_tools()
agent = create_react_agent(ChatOpenAI(model="gpt-4o-mini"), tools)
result = await agent.ainvoke(
    {"messages": [("user", "Find 5 recent orders over $100 from VIP customers")]}
)

The agent's tools come with rich descriptions (WHEN to use, NOT FOR what, return shape, cost), structured recovery on every error, and server-side search so it never pulls 500 orders to find 5.

Every interface, one API

Discovery identifies the target and tags each endpoint with a protocol; a pluggable transport driver runs it — but the agent-facing API (fetch, query, write, mapping, recovery, cache, rate limits) is identical across all of them.

Interface Runtime Write Install
REST / HTTP+JSON ✅ actions (POST/PUT/PATCH/DELETE)
GraphQL ✅ query + Relay pagination ✅ mutations
SOAP / WSDL ✅ stdlib XML
gRPC ✅ unary + server-streaming (reflection) liquid-api[grpc]
WebSocket ✅ bounded batch reads + subscribe liquid-api[ws]
MCP (agent) ✅ call tools / read resources ✅ tool calls
A2A (agent) ✅ JSON-RPC message/send to AgentCard skills
Postgres (+pgvector) ✅ tables/views, filters, pagination, vector search liquid-api[pg]
MySQL / MariaDB ✅ tables/views, filters, pagination liquid-api[mysql]
SQLite ✅ tables/views, filters, pagination — (stdlib)
DuckDB ✅ tables/views, filters, pagination liquid-api[duckdb]
SQL Server ✅ tables/views, OFFSET/FETCH pagination liquid-api[mssql]
Neo4j (graph) ✅ labels/relationship types, property filters ✅ node CRUD liquid-api[neo4j]
MongoDB (document) ✅ collections, field filters, pagination liquid-api[mongodb]
Redis (key-value) ✅ keyspace namespaces, typed values, SCAN paging ✅ SET/HSET/DEL liquid-api[redis]

Read and write. liquid.write(adapter, endpoint, op="insert", values={...}, allow_write=True) mutates any database (SQL INSERT/UPDATE/DELETE, Mongo insert/update/delete, Redis SET/HSET/DEL, Neo4j node CRUD); web/agent writes go through verified actions. Identifiers come from introspection and values are parameterized; update/delete require a where (no blanket mutations); writes are off until you opt in with allow_write=True.

Discovery is automatic — and identifies on the fly. Before the pipeline runs, a fingerprint step names the target: a bare host:port is normalized by well-known port (db:5432postgresql://db:5432), and liquid.identify(url) answers "what is this, and is its driver installed?" with an install hint when a backend is missing. (Identifying a protocol is feasible on the fly; speaking a new authenticated binary protocol isn't — so unknowns are named, not guessed at.)

Discovery Where it looks Cost
Databases catalog introspection (postgres://, mysql://, mongodb://, redis://, neo4j://, …) Low
gRPC / WebSocket server reflection / frame sampling Low
MCP / A2A / Plugin /mcp, /.well-known/agent-card.json, /.well-known/ai-plugin.json Low
OpenAPI / GraphQL / SOAP spec, introspection, or WSDL Low
REST heuristic common paths + LLM interpretation Medium
Browser Playwright capturing network High

Add a backend without writing code. For the SQL family the contract is declarative enough to be data: a dialect manifest (quoting, placeholder style, pagination, introspection SQL, error map, DBAPI2 module) registered via register_sql_manifest({...}) installs a working driver + discovery — so a new SQL / wire-compatible store (CockroachDB, ClickHouse, any DBAPI2 driver), even one fetched from the network as JSON, connects without a release. New protocols otherwise plug in via the liquid.transport.ProtocolDriver protocol; SQL backends share a dialect-aware core, so a new one is a ~80-line adapter.

2,500+ APIs are pre-discovered and pre-mapped in the global catalog — most popular services connect with zero discovery cost.

Architecture

URL / DSN                       Agent
   ↓                              ↑
 FINGERPRINT → DISCOVERY        FETCH · QUERY · WRITE · SEARCH · AGGREGATE
   ↓                              ↑
 one ProtocolDriver per          Deterministic per-protocol transport
 interface:                        • Query DSL (server-side filter)
   REST GraphQL SOAP gRPC WS       • Output normalization
   MCP A2A · SQL graph doc KV      • Verbosity / max_tokens / _meta
   ↓                              • Structured recovery + self-heal
 APISchema                        • Rate-limit-aware token bucket
   ↓                              • Response cache (Cache-Control aware)
 AI MAPPING (setup only)          • Empirical probing data (Cloud)
   ↓
 AdapterConfig

AI participates at setup only. Runtime is pure transport with transforms — no LLM per call, predictable cost, reproducible behavior (except search_nl, which caches its compilations).

Swappable components

Every cross-cutting concern is a Protocol you can replace:

from liquid.protocols import (
    Vault, LLMBackend, DataSink, KnowledgeStore, AdapterRegistry, CacheStore,
)

In-memory implementations ship for all of them; liquid-cloud provides PostgresVault, RedisCache, etc. for hosted deployments.

Framework support

adapter.to_tools(format="anthropic")   # Claude tool use
adapter.to_tools(format="openai")      # OpenAI function calling
adapter.to_tools(format="mcp")         # MCP (Claude Desktop, Cursor)
from liquid_crewai import LiquidCrewToolkit  # CrewAI

Ecosystem

Package Purpose
liquid-api Core library (this repo)
liquid-langchain LangChain / LangGraph integration
liquid-crewai CrewAI integration
liquid-cli liquid init quickstart
Liquid Cloud Hosted service + global catalog + empirical probing

Comparison

Feature Liquid Zapier LangChain tool DIY
Auto-discovers any interface (no curated connector) yes no no no
APIs + databases + agents in one layer yes partial no no
Read and write through one API yes yes partial no
Server-side search / aggregate yes no no partial
Cross-source output normalization yes partial no no
Structured recovery with next_action yes no no no
Self-healing on schema drift yes no no no
Pre-flight cost estimate yes no no no
MCP + A2A + LangChain + CrewAI native yes no partial no
Open source yes no yes n/a

Documentation

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

liquid_api-0.49.2.tar.gz (612.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

liquid_api-0.49.2-py3-none-any.whl (287.1 kB view details)

Uploaded Python 3

File details

Details for the file liquid_api-0.49.2.tar.gz.

File metadata

  • Download URL: liquid_api-0.49.2.tar.gz
  • Upload date:
  • Size: 612.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for liquid_api-0.49.2.tar.gz
Algorithm Hash digest
SHA256 3401453f77d46a0009a8c4a44c3aac1cab3357c26f5627f3599783c4709a30e1
MD5 674e60626a0ca501c7ac0240f4a01d7d
BLAKE2b-256 131955a46ba789babe75102736e1d66f020d06f94f894728c36563c1aada43ef

See more details on using hashes here.

File details

Details for the file liquid_api-0.49.2-py3-none-any.whl.

File metadata

  • Download URL: liquid_api-0.49.2-py3-none-any.whl
  • Upload date:
  • Size: 287.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for liquid_api-0.49.2-py3-none-any.whl
Algorithm Hash digest
SHA256 fdd5fd65cf2c763b5d10f54b76a8f5531d21c4f2d4ed9cd48cb5cf57445ca826
MD5 ee8f7168fae86a7bba18949d82bf8634
BLAKE2b-256 ff654feff94e0c33dfee354332d67d7c0c8d27f6a2e1e19526dc9f4f917ffbe6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page