Skip to main content

The official Python SDK for the Hydra DB (hydradb.com)

Project description

HydraDB Python SDK

The official Python SDK for HydraDB, a memory and retrieval infrastructure for AI applications.

The SDK exposes synchronous and asynchronous clients for tenant management, knowledge and memory ingestion, retrieval (query), source inspection, deletion, graph relations, and webhook management.

Documentation: docs.hydradb.com


Installation

Install the package from PyPI:

pip install hydradb-sdk

Import the SDK using the Python package name:

from hydra_db import HydraDB, AsyncHydraDB

Note: The package name is hydradb-sdk, but the import name is hydra_db.


Client setup

import os
from hydra_db import HydraDB

client = HydraDB(token=os.environ["HYDRA_DB_API_KEY"])

The default API base URL is:

https://api.hydradb.com

The client targets API version 2 by default. You can override it via api_version.

For local development or a custom host, pass base_url directly:

from hydra_db import HydraDB

client = HydraDB(
    token="YOUR_API_KEY",
    base_url="http://localhost:8080",
)

Async client:

import os
from hydra_db import AsyncHydraDB

async_client = AsyncHydraDB(token=os.environ["HYDRA_DB_API_KEY"])

Constructor options

Parameter Type Default Description
token str | Callable[[], str] required API token, or a callable returning one.
base_url str None Override the API base URL.
environment HydraDBEnvironment HydraDBEnvironment.HYDRAD_DB Predefined environment (base URL).
api_version str "2" API version.
headers dict None Extra headers sent with every request.
timeout float 60 Request timeout in seconds.
follow_redirects bool True Whether the default httpx client follows redirects.
httpx_client httpx.Client None Custom httpx client.
logging LogConfig | Logger None SDK logging configuration.

AsyncHydraDB additionally accepts async_token, an async callable returning a bearer token, and a httpx.AsyncClient for httpx_client.


Client structure

All functionality is reached through one top-level method and three sub-clients:

Accessor Purpose
client.query(...) Retrieve knowledge and/or memory in a single call.
client.context Ingest, inspect, list, delete sources, and fetch graph relations.
client.tenants Create, list, delete, and inspect tenants.
client.webhooks Register and manage webhooks and inspect deliveries.

Important tenant and sub-tenant rule

Most methods accept both tenant_id and sub_tenant_id.

If you ingest with a sub_tenant_id, you should also query, inspect, list, and delete with the same sub_tenant_id.

TENANT_ID = "my-company"
SUB_TENANT_ID = "my-sub-tenant"

Omitting sub_tenant_id means HydraDB uses the default sub-tenant created during tenant setup.


Tenant management

Create a standard tenant

response = client.tenants.create(tenant_id="my-company")
print(response)

Create a tenant for raw embeddings

Use this when you want to bring your own embeddings. Provide embeddings_dimension.

response = client.tenants.create(
    tenant_id="my-embeddings-tenant",
    is_embeddings_tenant=True,
    embeddings_dimension=1536,
)

Create a tenant with a metadata schema

tenant_metadata_schema is a list of property definitions. Each field can enable filtering (enable_match), semantic search (enable_dense_embedding), and/or keyword search (enable_sparse_embedding). Fields with embeddings enabled must be VARCHAR.

response = client.tenants.create(
    tenant_id="my-company",
    tenant_metadata_schema=[
        {"name": "department", "type": "VARCHAR", "enable_match": True},
        {"name": "region", "type": "VARCHAR", "enable_match": True},
    ],
)

List tenants

tenants = client.tenants.list()
print(tenants)

List sub-tenants

sub_tenants = client.tenants.sub_tenants(tenant_id="my-company")
print(sub_tenants)

Check infrastructure status

status = client.tenants.status(tenant_id="my-company")
print(status)

Tenant stats

stats = client.tenants.stats(tenant_id="my-company")
print(stats)

Delete a tenant

This permanently deletes the tenant and its data.

client.tenants.delete(tenant_id="my-company")

Ingest knowledge and memory

client.context.ingest(...) handles both knowledge files and memory, selected via the type parameter ("knowledge" or "memory").

Upload knowledge files

documents accepts a list of file objects. Each entry can be a file-like object, bytes, or a (filename, fileobj, content_type) tuple.

from hydra_db import HydraDB

client = HydraDB(token="YOUR_API_KEY")

TENANT_ID = "my-company"
SUB_TENANT_ID = "my-sub-tenant"

with open("report.pdf", "rb") as f:
    upload = client.context.ingest(
        tenant_id=TENANT_ID,
        sub_tenant_id=SUB_TENANT_ID,
        type="knowledge",
        documents=[("report.pdf", f, "application/pdf")],
        upsert=True,
    )

print(upload)

The initial response typically reports a queued status. That means the file was accepted into the ingestion queue, not that ingestion has finished. Use client.context.status(...) to track progress.

Upload multiple files

with open("a.pdf", "rb") as f1, open("b.pdf", "rb") as f2:
    upload = client.context.ingest(
        tenant_id=TENANT_ID,
        sub_tenant_id=SUB_TENANT_ID,
        type="knowledge",
        documents=[
            ("a.pdf", f1, "application/pdf"),
            ("b.pdf", f2, "application/pdf"),
        ],
        upsert=True,
    )

print(upload)

Upload files with per-file metadata

document_metadata is a JSON string — a JSON array of per-file metadata objects (knowledge only). Each object may include id, metadata, additional_metadata, infer, and relations.

import json

document_metadata = json.dumps([
    {
        "id": "doc_a",
        "metadata": {"department": "sales", "region": "us"},
        "additional_metadata": {"author": "Alice", "title": "Sales Report"},
    },
    {
        "id": "doc_b",
        "metadata": {"department": "marketing", "region": "us"},
        "additional_metadata": {"author": "Bob", "title": "Marketing Report"},
        "relations": {
            "cortex_source_ids": ["doc_a"],
            "properties": {"relation": "same_upload_batch"},
        },
    },
])

with open("a.pdf", "rb") as f1, open("b.pdf", "rb") as f2:
    upload = client.context.ingest(
        tenant_id=TENANT_ID,
        sub_tenant_id=SUB_TENANT_ID,
        type="knowledge",
        documents=[
            ("a.pdf", f1, "application/pdf"),
            ("b.pdf", f2, "application/pdf"),
        ],
        document_metadata=document_metadata,
        upsert=True,
    )

print(upload)

Ingest app-generated knowledge without files

app_knowledge is a JSON string containing a single source object or an array of source objects (knowledge only).

import json

app_knowledge = json.dumps([
    {
        "id": "app-source-001",
        "tenant_id": TENANT_ID,
        "sub_tenant_id": SUB_TENANT_ID,
        "title": "Internal onboarding note",
        "type": "document",
        "description": "Short internal note for onboarding",
        "content": {
            "text": "New users should be added to the onboarding workspace first."
        },
        "tenant_metadata": {"department": "engineering"},
        "document_metadata": {"source": "internal_app"},
    }
])

upload = client.context.ingest(
    tenant_id=TENANT_ID,
    sub_tenant_id=SUB_TENANT_ID,
    type="knowledge",
    app_knowledge=app_knowledge,
    upsert=True,
)

print(upload)

Add memories

memories is a JSON string — a JSON array of memory items (memory only). Each item can include text, title, infer, and metadata.

import json

memories = json.dumps([
    {
        "source_id": "memory-001",
        "title": "User preference",
        "text": "User prefers detailed explanations and dark mode.",
        "infer": True,
        "metadata": {"category": "preference"},
    }
])

memory = client.context.ingest(
    tenant_id=TENANT_ID,
    sub_tenant_id=SUB_TENANT_ID,
    type="memory",
    memories=memories,
    upsert=True,
)

print(memory)

Check ingestion status

client.context.status(...) reports indexing status for one or more source IDs. Pass a single id or a list via ids.

status = client.context.status(
    tenant_id=TENANT_ID,
    sub_tenant_id=SUB_TENANT_ID,
    ids=["doc_a", "doc_b"],
)

print(status)

A single source ID:

status = client.context.status(
    tenant_id=TENANT_ID,
    sub_tenant_id=SUB_TENANT_ID,
    id="doc_a",
)

Poll until ingestion finishes

import time

source_ids = ["doc_a", "doc_b"]

while True:
    batch = client.context.status(
        tenant_id=TENANT_ID,
        sub_tenant_id=SUB_TENANT_ID,
        ids=source_ids,
    )

    print(batch)

    # Inspect the returned status fields and break once everything has
    # completed or errored. Field names follow the SourceStatusApiResponse
    # schema returned by the API.
    time.sleep(5)

Query (retrieval)

client.query(...) is the single retrieval entry point. It searches knowledge, memory, or both, with optional graph context.

results = client.query(
    tenant_id=TENANT_ID,
    sub_tenant_id=SUB_TENANT_ID,
    query="What did the report say about revenue?",
    max_results=10,
    mode="fast",
    alpha=0.8,
    recency_bias=0.0,
    graph_context=True,
)

print(results.data.chunks)
print(results.data.sources)

The response is a QueryApiResponse with success, data, error, and meta. The retrieval payload is on results.data, which carries chunks, sources, graph_context, and additional_context.

Search knowledge, memory, or both

Use the type parameter to choose what to search:

# Knowledge only
client.query(tenant_id=TENANT_ID, query="quarterly revenue", type="knowledge")

# Memory only
client.query(tenant_id=TENANT_ID, query="user preferences", type="memory")

# Both, merged
client.query(tenant_id=TENANT_ID, query="what does the user prefer", type="all")

Hybrid vs. keyword search

alpha controls the hybrid balance and can be a float in 0.01.0 or the string "auto":

  • 1.0: more semantic/vector weighted
  • 0.0: more keyword/BM25 weighted
  • "auto": backend chooses the balance

For pure keyword/BM25 search, set query_by="text" and choose an operator:

results = client.query(
    tenant_id=TENANT_ID,
    sub_tenant_id=SUB_TENANT_ID,
    query="dark mode",
    query_by="text",
    operator="phrase",   # "or" | "and" | "phrase"
    type="memory",
)

query_by accepts "hybrid" (vector + BM25, default) or "text" (BM25 only). operator applies only when query_by="text".

Metadata filters

Top-level keys match tenant_metadata (and must correspond to fields defined in tenant_metadata_schema). To filter on document-level fields you sent as additional_metadata at ingestion, nest them under additional_metadata (the key document_metadata is also accepted as a legacy alias).

results = client.query(
    tenant_id=TENANT_ID,
    sub_tenant_id=SUB_TENANT_ID,
    query="revenue forecast",
    max_results=10,
    metadata_filters={
        "department": "sales",
        "additional_metadata": {"author": "Alice"},
    },
)

All query parameters

Parameter Type Description
tenant_id str Tenant identifier (required).
query str Search terms (required).
sub_tenant_id str Optional sub-tenant identifier.
max_results int Maximum number of results.
mode str Retrieval mode: "fast" or "thinking".
alpha str | float Hybrid balance (0.01.0 or "auto").
recency_bias float Preference for newer content (0.01.0).
graph_context bool Enable graph context (on by default in v2; set False to omit).
query_forceful_relations bool Search forceful relations in thinking mode.
additional_context str Extra context to guide retrieval.
query_apps bool Run a parallel app-aware retrieval lane and fuse results.
metadata_filters dict Key-value metadata filters.
type str What to search: "knowledge", "memory", or "all".
query_by str "hybrid" or "text".
operator str "or", "and", or "phrase" (only with query_by="text").

Formatting results for an LLM

build_string formats a query result (or its .data) into a plain string ready for prompt injection. It is available from hydra_db.helpers.

from hydra_db import HydraDB
from hydra_db.helpers import build_string

client = HydraDB(token="YOUR_API_KEY")

response = client.query(query="What does the user prefer?", tenant_id=TENANT_ID)

# Pass the full envelope or just the data — build_string handles both.
prompt = build_string(response)
print(prompt)

prompt = build_string(response.data)
print(prompt)

The output renders chunks under a === CONTEXT === header, with graph relations and synthesis context appended when present. If there is nothing to render, it returns "No relevant context found.".


Inspect and list sources

List sources

sources = client.context.list(
    tenant_id=TENANT_ID,
    sub_tenant_id=SUB_TENANT_ID,
    type="knowledge",
    page=1,
    page_size=50,
)

print(sources)

List memories with type="memory".

Fetch specific source IDs

sources = client.context.list(
    tenant_id=TENANT_ID,
    sub_tenant_id=SUB_TENANT_ID,
    type="knowledge",
    ids=["doc_a", "doc_b"],
)

Filter list results

filtered = client.context.list(
    tenant_id=TENANT_ID,
    sub_tenant_id=SUB_TENANT_ID,
    type="knowledge",
    filters={
        "tenant_metadata": {"department": "sales"},
        "document_metadata": {"author": "Alice"},
    },
)

Include only selected fields

include_fields reduces response size on knowledge list calls.

sources = client.context.list(
    tenant_id=TENANT_ID,
    sub_tenant_id=SUB_TENANT_ID,
    type="knowledge",
    include_fields=["title", "document_metadata", "tenant_metadata"],
)

Allowed field names:

attachments, content, description, document_metadata, note, relations,
tenant_metadata, timestamp, title, type, url

Inspect a single source

source = client.context.inspect(
    tenant_id=TENANT_ID,
    sub_tenant_id=SUB_TENANT_ID,
    id="doc_a",
    mode="content",
)

print(source)

Supported modes:

content | url | both

For presigned URLs, set the expiry in seconds:

source = client.context.inspect(
    tenant_id=TENANT_ID,
    sub_tenant_id=SUB_TENANT_ID,
    id="doc_a",
    mode="url",
    expiry_seconds=3600,
)

Fetch graph relations

relations = client.context.relations(
    tenant_id=TENANT_ID,
    sub_tenant_id=SUB_TENANT_ID,
    id="doc_a",
    type="knowledge",
    limit=10,
)

print(relations)

To fetch relations across the whole sub-tenant, omit id:

relations = client.context.relations(
    tenant_id=TENANT_ID,
    sub_tenant_id=SUB_TENANT_ID,
    limit=10,
)

Use cursor for pagination.


Delete data

Delete one or more source IDs from a tenant/sub-tenant. Use type to target knowledge or memory.

client.context.delete(
    tenant_id=TENANT_ID,
    sub_tenant_id=SUB_TENANT_ID,
    ids=["doc_a", "doc_b"],
    type="knowledge",
)

Webhooks

HydraDB can notify your application about events such as indexing status changes.

Register a webhook

hook = client.webhooks.register(
    url="https://example.com/hydra-webhook",
    event_types=["indexing.status_changed"],
    signing_secret="a-secret-at-least-16-chars",
)

print(hook)

When signing_secret is set, every delivery includes an X-HydraDB-Signature: sha256=<hmac> header computed as HMAC-SHA256(key=signing_secret, msg=raw_body). The secret must be at least 16 characters; omit it to disable signing.

Get the current webhook

print(client.webhooks.get())

Delete the webhook

client.webhooks.delete()

Send a test event

client.webhooks.test()

List and inspect deliveries

deliveries = client.webhooks.list_deliveries(
    limit=20,
    status="failed",   # pending | failed | delivered | permanently_failed
)
print(deliveries)

one = client.webhooks.get_delivery("delivery_id")
print(one)

Use cursor to paginate deliveries.

Retry a delivery

client.webhooks.retry_delivery("delivery_id")

Async usage

Every method on HydraDB has an await-able counterpart on AsyncHydraDB.

import asyncio
import os
from hydra_db import AsyncHydraDB

TENANT_ID = "my-company"
SUB_TENANT_ID = "my-sub-tenant"

async def main():
    client = AsyncHydraDB(token=os.environ["HYDRA_DB_API_KEY"])

    results = await client.query(
        tenant_id=TENANT_ID,
        sub_tenant_id=SUB_TENANT_ID,
        query="Which mode does the user prefer?",
        max_results=10,
        alpha="auto",
    )

    print(results.data.chunks)

asyncio.run(main())

Async ingestion:

import asyncio
import os
from hydra_db import AsyncHydraDB

TENANT_ID = "my-company"
SUB_TENANT_ID = "my-sub-tenant"

async def main():
    client = AsyncHydraDB(token=os.environ["HYDRA_DB_API_KEY"])

    with open("report.pdf", "rb") as f:
        upload = await client.context.ingest(
            tenant_id=TENANT_ID,
            sub_tenant_id=SUB_TENANT_ID,
            type="knowledge",
            documents=[("report.pdf", f, "application/pdf")],
            upsert=True,
        )

    print(upload)

asyncio.run(main())

Accessing raw responses

Every client and sub-client exposes with_raw_response for callers that need status codes and headers.

raw = client.with_raw_response.query(tenant_id=TENANT_ID, query="revenue")
raw_context = client.context.with_raw_response.list(tenant_id=TENANT_ID)

Error handling

The SDK raises typed errors for common API failures.

from hydra_db import HydraDB
from hydra_db.errors import BadRequestError, UnauthorizedError, UnprocessableEntityError

client = HydraDB(token="YOUR_API_KEY")

try:
    upload = client.context.ingest(
        tenant_id=TENANT_ID,
        sub_tenant_id=SUB_TENANT_ID,
        type="knowledge",
        documents=[],
    )
except UnauthorizedError:
    print("Invalid or missing API key")
except BadRequestError as error:
    print("Bad request", error.body)
except UnprocessableEntityError as error:
    print("Validation error", error.body)

Common HTTP errors exposed by the SDK:

BadRequestError
UnauthorizedError
ForbiddenError
NotFoundError
UnprocessableEntityError
InternalServerError
ServiceUnavailableError
TooManyRequestsError

SDK method reference

Group Method Description
Client client.query Retrieve knowledge and/or memory in a single call.
Context client.context.ingest Ingest knowledge files, app-generated knowledge, or memories.
Context client.context.status Check ingestion/indexing status for source IDs.
Context client.context.inspect Fetch a single source's content or URL.
Context client.context.list List knowledge sources or memories.
Context client.context.delete Delete one or more source IDs.
Context client.context.relations Fetch graph relations for a source or sub-tenant.
Tenants client.tenants.create Create a standard or raw-embeddings tenant.
Tenants client.tenants.list List tenants.
Tenants client.tenants.delete Delete a tenant.
Tenants client.tenants.status Check tenant infrastructure status.
Tenants client.tenants.sub_tenants List sub-tenants for a tenant.
Tenants client.tenants.stats Get tenant stats.
Webhooks client.webhooks.register Register a webhook.
Webhooks client.webhooks.get Get the current webhook.
Webhooks client.webhooks.delete Delete the webhook.
Webhooks client.webhooks.test Send a test event.
Webhooks client.webhooks.list_deliveries List webhook deliveries.
Webhooks client.webhooks.get_delivery Get a single delivery.
Webhooks client.webhooks.retry_delivery Retry a delivery.
Helpers hydra_db.helpers.build_string Format a query result into an LLM-ready string.

A complete, per-parameter reference is in src/hydra_db/reference.md.


Notes for contributors

This SDK is generated from the HydraDB API definition with Fern. The generated clients live under src/hydra_db/client.py, src/hydra_db/context/client.py, src/hydra_db/tenants/client.py, and src/hydra_db/webhooks/client.py. If method signatures change in the generated code, update this README to match.

Before publishing, verify these stay consistent:

  • Package name and version in pyproject.toml (hydradb-sdk)
  • Installation command in this README
  • Import name hydra_db
  • The ingest payload shapes (document_metadata, app_knowledge, and memories are JSON strings)

Links

Support

If you have any questions or need help, reach out at founders@hydradb.com.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

hydradb_sdk-2.0.0.tar.gz (80.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

hydradb_sdk-2.0.0-py3-none-any.whl (122.4 kB view details)

Uploaded Python 3

File details

Details for the file hydradb_sdk-2.0.0.tar.gz.

File metadata

  • Download URL: hydradb_sdk-2.0.0.tar.gz
  • Upload date:
  • Size: 80.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for hydradb_sdk-2.0.0.tar.gz
Algorithm Hash digest
SHA256 279c177a98dccddb2052b166d8e9c50b42f6681d7bbd913bcfe2fa4afd316d09
MD5 ef59784741e855c1f2b2385015bf1345
BLAKE2b-256 122781d332cd285738cdb0467fe0bfc6823825c04ce5fb608c93b57435101d00

See more details on using hashes here.

File details

Details for the file hydradb_sdk-2.0.0-py3-none-any.whl.

File metadata

  • Download URL: hydradb_sdk-2.0.0-py3-none-any.whl
  • Upload date:
  • Size: 122.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for hydradb_sdk-2.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 033c245b379cca5bed252e381b7936415e3321afc618030a58dc951693f3ff65
MD5 e8c1b3925c890b836a16e4676c261950
BLAKE2b-256 ffc9685b8d56288043ce2e02ae84bd7e7fd36d57050fa7a8d12b56804ce47387

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page