Skip to main content

Connect your AI agent to anything — APIs, databases, and agents; read, write, and sense.

Project description

Liquid

Connect your AI agent to anything.

APIs, databases, and other agents — discovered automatically, read and write, through one stable, token-efficient, self-healing interface. No per-service connector to write or maintain: Liquid learns each one, and re-learns it when it changes.

PyPI License Python


What an agent can reach through Liquid

One agent-facing API (fetch · query · write · sense) over everything an agent might need to touch — Liquid figures out how to talk to it so the agent doesn't have to. It's the agent's senses and hands: fetch/query probe, sense perceives a live event stream, write acts on the world.

  • Web APIs — REST/JSON, GraphQL, SOAP/WSDL, gRPC, WebSocket, SSE/NDJSON streams
  • Other agents & tools — any MCP server, A2A agents, ChatGPT-plugin manifests
  • Databases — Postgres (+ pgvector), MySQL/MariaDB, SQLite, DuckDB, SQL Server, Neo4j (graph), MongoDB (documents), Redis (key-value)
  • People — a human as a node via connectors (Telegram today): perceive their messages with sense, answer them with send

Point it at a https://… endpoint, a postgres://… / mongodb://… / redis://… DSN, a grpc://… target, or another MCP server — discovery identifies the interface, learns its shape, and hands your agent typed records. The same fetch/query/write works regardless of what's underneath. No per-service connector to hand-write; the integration maintains itself when the upstream changes.

# A web API it has never seen — no spec, no connector, no auth
adapter = await liquid.get_or_create(
    "https://api.openbrewerydb.org/v1/breweries",
    target_model={"name": "str", "city": "str", "country": "str"},
    auto_approve=True,
)
breweries = await liquid.fetch(adapter)            # typed records

# A database is just another interface — same API, and it writes too
db = await liquid.get_or_create("postgresql://reader@host/shop",
                                target_model={"id": "int", "email": "str"},
                                auto_approve=True)
orders = await liquid.fetch(db, "/public/orders")
await liquid.write(db, "/public/orders", op="insert",
                   values={"email": "a@b.com", "total_cents": 9900},
                   allow_write=True)               # opt-in; mutates the store

You hand-write no connector and no schema: an LLM learns the interface once at setup (databases introspect themselves and skip even that), and the integration repairs itself when the upstream drifts. The runtime is plain deterministic transport — predictable cost, reproducible behavior, nothing to babysit.

Built for the constraints real agents hit

Reaching everything is half of it. The other half is that agents pay for every token, get confused by inconsistent shapes, and can't parse error prose. Liquid answers each with a concrete primitive — all shipped, all on PyPI.

Context-budget control

# Search / aggregate server-side instead of fetch-then-filter — 10-100x fewer tokens
orders = await liquid.search(adapter, "/orders",
    where={"total_cents": {"$gt": 10000}, "status": "paid"}, limit=20)

stats = await liquid.aggregate(adapter, "/orders",
    group_by="status", agg={"total_cents": "sum", "id": "count"})

hits = await liquid.text_search(adapter, "/tickets", "shipping delay")  # BM25-lite

data = await liquid.fetch(adapter, "/orders", max_tokens=2000)      # budget cap
data = await liquid.fetch(adapter, "/customers", verbosity="terse") # id + 1-2 fields

Cross-source normalization

liquid = Liquid(..., normalize_output=True)
# Stripe {amount:1000,currency:"usd"} · PayPal {value:"10.00",currency_code:"USD"}
#   → Money(amount_cents=1000, currency="USD", amount_decimal=Decimal("10.00"))

Timestamps (Unix / ISO 8601 / RFC 2822) collapse to UTC datetime; pagination envelopes ({data:[…]} / {results:[…]} / Link headers) flatten; ID fields normalize across id / _id / uuid / *_id.

Canonical intents — one mental model across services

await liquid.execute_intent(adapter, "charge_customer",
    {"customer_id": "cus_xyz", "amount_cents": 9999, "currency": "USD"})
# Same intent on Stripe / Braintree / Square / Adyen — 71 canonical intents

Structured recovery — agents self-heal without parsing text

try:
    await liquid.fetch(adapter, "/orders")
except LiquidError as e:
    if e.recovery and e.recovery.next_action:
        await agent.call_tool(e.recovery.next_action.tool, e.recovery.next_action.args)

Every error carries a Recovery with next_action: ToolCall, retry_safe, and retry_after_seconds. 401 → store_credentials. 404/410 → repair_adapter. 429 → retry after the given delay. And when the upstream's schema drifts, adapters self-heal (repair_adapter) — the agent keeps working.

Predictable cost — know before you call

est = await liquid.estimate_fetch(adapter, "/orders")
# FetchEstimate(expected_items=250, expected_tokens=52_000, confidence="high", …)
if est.expected_tokens < my_budget:
    data = await liquid.fetch(adapter, "/orders")

Tools emitted by to_tools() carry a metadata block (cost_credits, typical_latency_ms, cached, idempotent, side_effects, related_tools) so the agent can reason about which tool to pick — and ambient tools (liquid_check_quota, liquid_list_adapters, …) let it ask about state instead of memorizing it.


Measured impact

Deterministic benchmarks on realistic agent tasks (500-order, 200-ticket fixtures, mocked HTTP) — reproducible via python -m benchmarks.run:

Task Metric Baseline With Liquid Delta
Find 10 orders over $100 tokens 75,482 1,519 −98%
Revenue by status (aggregate) tokens 75,482 115 −100%
Fetch customer (id+email only) tokens 424 12 −97%
Recover from 401 structured next_action no yes
Find the shipping ticket tokens 14,588 154 −99%
Stripe↔PayPal consistency field overlap 0.11 1.00 +9×
Skip wasted call via estimate tokens 14,943 0 −100%
max_tokens=2000 budget cap tokens 14,943 1,999 −87%

Full methodology + per-task breakdown: benchmarks/RESULTS.md.

Install

pip install liquid-api                 # core + bundled MCP server (the `liquid-mcp` command)
pip install 'liquid-api[discovery]'    # + an LLM for discovering spec-less REST APIs & field mapping

Do you need an LLM extra? Self-describing interfaces — OpenAPI, GraphQL, gRPC, MCP, A2A, WSDL — and all databases (introspection) discover with no LLM, and the whole runtime (fetch/query/write/sense) never calls a model. You only need an LLM backend to discover a REST API that has no machine-readable spec (heuristic + LLM) and to map its fields. [discovery] pulls LiteLLM, which reaches OpenAI / Gemini / Anthropic / local / 100+ providers; or pick one directly:

pip install 'liquid-api[gemini]'     # Google Gemini   (or [anthropic]; OpenAI/local work with no extra via base_url)
pip install 'liquid-api[grpc]'       # gRPC transport (reflection)
pip install 'liquid-api[ws]'         # WebSocket transport
pip install 'liquid-api[pg]'         # Postgres / pgvector (asyncpg)
pip install 'liquid-api[mysql]'      # MySQL / MariaDB (aiomysql); SQLite needs no extra
pip install 'liquid-api[neo4j]'      # Neo4j graph (Bolt / Cypher)
pip install 'liquid-api[duckdb]'     # DuckDB (embedded analytics)
pip install 'liquid-api[mssql]'      # SQL Server (ODBC; needs a system ODBC driver)
pip install 'liquid-api[mongodb]'    # MongoDB (collections as endpoints)
pip install 'liquid-api[redis]'      # Redis (keyspace namespaces as endpoints)
# Framework integrations
pip install liquid-langchain   # LangChain / LangGraph
pip install liquid-crewai      # CrewAI

The core is dependency-free — every backend's library is an optional extra, imported only when used.

See it work — live, no pre-config

Point Liquid at an API it has never seen (no adapter, no OpenAPI spec, no auth) and get typed records back — you write no connector; discovery + mapping is the only place a model runs. Runnable end to end via examples/live_quickstart.py:

Connecting to an API Liquid has never seen:
  https://api.openbrewerydb.org/v1/breweries

  discovery method : rest_heuristic
  mapped fields    : ['name', 'city', 'state', 'country']
  LLM calls so far : 2  (discovery + mapping)

fetch() -> 50 typed records; first 3:
   {'name': '(405) Brewing Co', 'city': 'Norman', 'state': 'Oklahoma', 'country': 'United States'}
   {'name': '(512) Brewing Co', 'city': 'Austin', 'state': 'Texas', 'country': 'United States'}
   {'name': '1 of Us Brewing Company', 'city': 'Mount Pleasant', 'state': 'Wisconsin', 'country': 'United States'}

  LLM calls during fetch : 0
  LLM calls on 2nd fetch : 0

You wrote no connector, no schema, no auth glue — Liquid learned the interface for you, and will re-learn it if it changes. That's the point: integrations you don't build or babysit.

Run as an MCP server (open source, self-hosted)

Expose the engine to any MCP client (Claude Desktop, Cursor, Claude Code) — it runs in your own process, no cloud, no account, no lock-in:

Add to Cursor

One-click in Cursor (the button writes the server into your mcp.json; add your OPENAI_API_KEY in Cursor's MCP settings afterward). Or set it up manually:

pip install liquid-api
export OPENAI_API_KEY=sk-...        # or GEMINI_API_KEY / ANTHROPIC_API_KEY,
                                    # or OPENAI_BASE_URL=http://localhost:11434/v1 for local (Ollama/vLLM)
liquid-mcp                          # or: python -m liquid.mcp_server

Zero-install with uvx (the liquid-mcp package makes the command run by name) — Claude Code:

claude mcp add liquid --scope user -e OPENAI_API_KEY=sk-... -- uvx liquid-mcp

Claude Desktop / any MCP client:

{ "mcpServers": { "liquid": {
  "command": "uvx",
  "args": ["liquid-mcp"],
  "env": { "OPENAI_API_KEY": "sk-..." }
} } }

(Or after pip install liquid-api, drop uvx and use "command": "liquid-mcp" directly.)

One-click in Claude Desktop: install the .mcpb bundle — it prompts for your model key on install (stored in the OS keychain), with no JSON to edit. Requires uv on the machine.

Tools: liquid_connect (discover + map any interface), liquid_fetch, liquid_query (server-side search/aggregate), liquid_estimate (pre-flight cost/size, no call), liquid_list_adapters, liquid_discover. The surface is read-only by default; start the server with LIQUID_ALLOW_WRITES=1 to also expose liquid_execute (database insert/update/delete). Adapters and credentials persist under ~/.liquid. Backed by any LLM — OpenAI, Gemini, Anthropic, any OpenAI-compatible/local endpoint via base_url, 100+ providers via LiteLLM, or your own function through CallableBackend.

Quick start — LangGraph agent

from liquid import Liquid, InMemoryCache, RateLimiter
from liquid._defaults import InMemoryVault, InMemoryAdapterRegistry, CollectorSink
from liquid_langchain import LiquidToolkit
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI

liquid = Liquid(
    llm=my_llm, vault=InMemoryVault(), sink=CollectorSink(),
    registry=InMemoryAdapterRegistry(), cache=InMemoryCache(), rate_limiter=RateLimiter(),
    normalize_output=True,    # cross-source canonical shapes
    include_meta=True,        # _meta block on every response
)

adapter = await liquid.get_or_create(
    "https://api.shopify.com",
    target_model={"id": "str", "total_cents": "int", "customer_email": "str"},
    credentials={"access_token": "shpat_..."},
    auto_approve=True,
)

tools = LiquidToolkit(adapter, liquid).get_tools()
agent = create_react_agent(ChatOpenAI(model="gpt-4o-mini"), tools)
result = await agent.ainvoke(
    {"messages": [("user", "Find 5 recent orders over $100 from VIP customers")]}
)

The agent's tools come with rich descriptions (WHEN to use, NOT FOR what, return shape, cost), structured recovery on every error, and server-side search so it never pulls 500 orders to find 5.

Every interface, one API

Discovery identifies the target and tags each endpoint with a protocol; a pluggable transport driver runs it — but the agent-facing API (fetch, query, write, mapping, recovery, cache, rate limits) is identical across all of them.

Interface Runtime Write Install
REST / HTTP+JSON ✅ actions (POST/PUT/PATCH/DELETE)
GraphQL ✅ query + Relay pagination ✅ mutations
SOAP / WSDL ✅ stdlib XML
gRPC ✅ unary + server-streaming (reflection) liquid-api[grpc]
WebSocket ✅ bounded batch reads + subscribe + live sense liquid-api[ws]
SSE / NDJSON (HTTP server-push) ✅ bounded batch reads + live sense
MCP (agent) ✅ call tools / read resources + notification sense ✅ tool calls
A2A (agent) ✅ JSON-RPC message/send to AgentCard skills
Postgres (+pgvector) ✅ tables/views, filters, pagination, vector search liquid-api[pg]
MySQL / MariaDB ✅ tables/views, filters, pagination liquid-api[mysql]
SQLite ✅ tables/views, filters, pagination — (stdlib)
DuckDB ✅ tables/views, filters, pagination liquid-api[duckdb]
SQL Server ✅ tables/views, OFFSET/FETCH pagination liquid-api[mssql]
Neo4j (graph) ✅ labels/relationship types, property filters ✅ node CRUD liquid-api[neo4j]
MongoDB (document) ✅ collections, field filters, pagination liquid-api[mongodb]
Redis (key-value) ✅ keyspace namespaces, typed values, SCAN paging ✅ SET/HSET/DEL liquid-api[redis]

Read and write. liquid.write(adapter, endpoint, op="insert", values={...}, allow_write=True) mutates any database (SQL INSERT/UPDATE/DELETE, Mongo insert/update/delete, Redis SET/HSET/DEL, Neo4j node CRUD); web/agent writes go through verified actions. Identifiers come from introspection and values are parameterized; update/delete require a where (no blanket mutations); writes are off until you opt in with allow_write=True.

Sense — the afferent organ. liquid.sense(adapter, endpoint) perceives a live event stream wherever one exists: SQL row deltas (and Postgres LISTEN/NOTIFY), Redis pub/sub, WebSocket frames, HTTP server-push (SSE/NDJSON), and MCP notifications — each yielded as a modality-agnostic event. Pointed inward, liquid.sense_webhook(port=…, verifier=…) hosts an inbound endpoint so a service (or a human, via a webhook) POSTing to the agent becomes a perceivable signal too. All bounded by max_events / max_seconds, so an agent can drain-by-pull.

The sensorimotor loop. react(stream, handler) drives a handler for each perceived event — with error isolation and bounded concurrency — so a host can perceive → wake the agent → act. merge_senses(*streams) fans several senses into one loop, so one agent can watch a database, a queue, and a webhook at once:

events = merge_senses(
    await liquid.sense(orders, "/orders"),
    await liquid.sense_webhook(port=8088, verifier=stripe_verifier),
)
await react(events, on_event, max_concurrency=4)

Discovery is automatic — and identifies on the fly. Before the pipeline runs, a fingerprint step names the target: a bare host:port is normalized by well-known port (db:5432postgresql://db:5432), and liquid.identify(url) answers "what is this, and is its driver installed?" with an install hint when a backend is missing. (Identifying a protocol is feasible on the fly; speaking a new authenticated binary protocol isn't — so unknowns are named, not guessed at.)

Discovery Where it looks Cost
Databases catalog introspection (postgres://, mysql://, mongodb://, redis://, neo4j://, …) Low
gRPC / WebSocket / SSE server reflection / frame sampling / content-type sniff Low
MCP / A2A / Plugin /mcp, /.well-known/agent-card.json, /.well-known/ai-plugin.json Low
OpenAPI / GraphQL / SOAP spec, introspection, or WSDL Low
REST heuristic common paths + LLM interpretation Medium
Browser Playwright capturing network High

Add a backend without writing code. For the SQL family the contract is declarative enough to be data: a dialect manifest (quoting, placeholder style, pagination, introspection SQL, error map, DBAPI2 module) registered via register_sql_manifest({...}) installs a working driver + discovery — so a new SQL / wire-compatible store (CockroachDB, ClickHouse, any DBAPI2 driver), even one fetched from the network as JSON, connects without a release. New protocols otherwise plug in via the liquid.transport.ProtocolDriver protocol; SQL backends share a dialect-aware core, so a new one is a ~80-line adapter.

2,500+ APIs are pre-discovered and pre-mapped in the global catalog — most popular services connect with zero discovery cost.

Architecture

URL / DSN                       Agent
   ↓                              ↑
 FINGERPRINT → DISCOVERY        FETCH · QUERY · WRITE · SEARCH · AGGREGATE
   ↓                              ↑
 one ProtocolDriver per          Deterministic per-protocol transport
 interface:                        • Query DSL (server-side filter)
   REST GraphQL SOAP gRPC WS       • Output normalization
   MCP A2A · SQL graph doc KV      • Verbosity / max_tokens / _meta
   ↓                              • Structured recovery + self-heal
 APISchema                        • Rate-limit-aware token bucket
   ↓                              • Response cache (Cache-Control aware)
 AI MAPPING (setup only)          • Empirical probing data (Cloud)
   ↓
 AdapterConfig

AI participates at setup only. Runtime is pure transport with transforms — no LLM per call, predictable cost, reproducible behavior (except search_nl, which caches its compilations).

Swappable components

Every cross-cutting concern is a Protocol you can replace:

from liquid.protocols import (
    Vault, LLMBackend, DataSink, KnowledgeStore, AdapterRegistry, CacheStore,
)

In-memory implementations ship for all of them; liquid-cloud provides PostgresVault, RedisCache, etc. for hosted deployments.

Framework support

adapter.to_tools(format="anthropic")   # Claude tool use
adapter.to_tools(format="openai")      # OpenAI function calling
adapter.to_tools(format="mcp")         # MCP (Claude Desktop, Cursor)
from liquid_crewai import LiquidCrewToolkit  # CrewAI

Ecosystem

Package Purpose
liquid-api Core library (this repo)
liquid-langchain LangChain / LangGraph integration
liquid-crewai CrewAI integration
liquid-cli liquid init quickstart
Liquid Cloud Hosted service + global catalog + empirical probing

Comparison

Feature Liquid Zapier LangChain tool DIY
Auto-discovers any interface (no curated connector) yes no no no
APIs + databases + agents in one layer yes partial no no
Read and write through one API yes yes partial no
Server-side search / aggregate yes no no partial
Cross-source output normalization yes partial no no
Structured recovery with next_action yes no no no
Self-healing on schema drift yes no no no
Pre-flight cost estimate yes no no no
MCP + A2A + LangChain + CrewAI native yes no partial no
Open source yes no yes n/a

Documentation

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

liquid_api-0.59.0.tar.gz (657.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

liquid_api-0.59.0-py3-none-any.whl (318.2 kB view details)

Uploaded Python 3

File details

Details for the file liquid_api-0.59.0.tar.gz.

File metadata

  • Download URL: liquid_api-0.59.0.tar.gz
  • Upload date:
  • Size: 657.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for liquid_api-0.59.0.tar.gz
Algorithm Hash digest
SHA256 9aff95b64352ef8df015e5c21b1ee7127ae72fc20d6f4f9cccead55fbd4be937
MD5 01df5999d9fc5219fbba13834a7c0e77
BLAKE2b-256 f19237841405d357ad4c68e85c0396473435a027f5f191286bcd835e3c148a32

See more details on using hashes here.

File details

Details for the file liquid_api-0.59.0-py3-none-any.whl.

File metadata

  • Download URL: liquid_api-0.59.0-py3-none-any.whl
  • Upload date:
  • Size: 318.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for liquid_api-0.59.0-py3-none-any.whl
Algorithm Hash digest
SHA256 dcce3f3795d08a2b765c74d9a7e4048d40ee19a115e00078a3ad8bd33b97522f
MD5 7892ef9e21b32b6890831a5b35d266a6
BLAKE2b-256 53603cf07b324e79aba2aefbb5489f0a76a7c4711a602af2b5d5977b2413cd61

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page