Skip to main content

Drop-in replacements for OpenAI, Anthropic, Google, xAI, and Meta SDKs with built-in AI governance. One import change adds permit-first policy enforcement, budget controls, audit trails, and usage reporting to every AI call.

Project description

keel-sdk

Python SDK for the Keel AI governance API.

Keel lets you issue permits before AI calls, enforce policies, track usage, and audit decisions — across any provider.

⚠️ Keel is currently in private beta. You'll need a Keel account and API key to use this SDK. Sign up for early access →

Install

pip install keel-sdk

Quick Start — One-Line Migration

Add Keel governance to your existing AI code with a single import change. No other code modifications needed.

OpenAI:

# BEFORE:
from openai import OpenAI

# AFTER:
from keel_sdk.providers.openai import OpenAI

client = OpenAI()  # Uses KEEL_BASE_URL, KEEL_API_KEY, KEEL_PROJECT_ID env vars
response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello!"}],
    max_tokens=100,
)
print(response["choices"][0]["message"]["content"])

Anthropic:

# BEFORE:
from anthropic import Anthropic

# AFTER:
from keel_sdk.providers.anthropic import Anthropic

client = Anthropic()  # Uses KEEL_BASE_URL, KEEL_API_KEY, KEEL_PROJECT_ID env vars
response = client.messages.create(
    model="claude-sonnet-4-20250514",
    messages=[{"role": "user", "content": "Hello!"}],
    max_tokens=1024,
)
print(response["content"][0]["text"])

Google (Gemini):

# BEFORE:
from google.generativeai import GenerativeModel

# AFTER:
from keel_sdk.providers.google import GenerativeModel

model = GenerativeModel("gemini-2.0-flash")
response = model.generate_content("Hello!")
print(response["candidates"][0]["content"]["parts"][0]["text"])

xAI (Grok):

# BEFORE:
from openai import OpenAI; client = OpenAI(base_url="https://api.x.ai/v1", api_key="xai-...")

# AFTER:
from keel_sdk.providers.xai import Grok

client = Grok()
response = client.chat.completions.create(
    model="grok-2",
    messages=[{"role": "user", "content": "Hello!"}],
)
print(response["choices"][0]["message"]["content"])

Meta (Llama):

# BEFORE:
from openai import OpenAI; client = OpenAI(base_url="https://api.llama-api.com", ...)

# AFTER:
from keel_sdk.providers.meta import Llama

client = Llama()
response = client.chat.completions.create(
    model="llama-3.3-70b",
    messages=[{"role": "user", "content": "Hello!"}],
)
print(response["choices"][0]["message"]["content"])

Every call transparently: requests a permit (policy + budget check), executes via Keel's proxy only if a permit is granted, reports actual token usage, and records audit evidence.

Request Lifecycle

Every request processed by Keel follows a consistent high-level flow:

  • Evaluate: identity, policy, and budget constraints are checked
  • Decide: a permit decision is issued — allow, deny, or constrain
  • Execute: the provider call occurs only if permitted
  • Record: usage, cost, and governance events are captured

Requests are only executed if explicitly permitted.

Provider Wrappers

The provider wrappers in keel_sdk.providers give you drop-in replacements for the official OpenAI, Anthropic, Google (Gemini), xAI (Grok), and Meta (Llama) SDKs. They do not depend on or import the provider packages. Instead, they talk directly to Keel's proxy endpoints.

Configuration

Set environment variables or pass explicitly:

from keel_sdk.providers.openai import OpenAI

# Via env vars (recommended):
#   KEEL_BASE_URL=https://api.keelapi.com
#   KEEL_API_KEY=keel_sk_...
#   KEEL_PROJECT_ID=proj_...
client = OpenAI()

# Or pass explicitly:
client = OpenAI(
    keel_base_url="https://api.keelapi.com",
    keel_api_key="keel_sk_...",
    keel_project_id="proj_...",
    keel_subject={"type": "user", "id": "usr_42"},  # optional, defaults to service/default
)

The api_key parameter (for OpenAI compatibility) is accepted but ignored — Keel manages provider keys.

Governance Flow

On every create() call, the wrapper:

  1. PermitPOST /v1/permits with provider, model, estimated tokens
  2. Deny check — if the permit decision is not "allow", raises KeelError; workflow cap denials raise typed workflow subclasses.
  3. ProxyPOST /v1/proxy/openai (or /anthropic, /google, /xai, /meta) with the provider-shaped request body
  4. Usage reportPOST /v1/permits/{permit_id}/usage with actual tokens from the response

Async Support

from keel_sdk.providers.openai import AsyncOpenAI
from keel_sdk.providers.anthropic import AsyncAnthropic
from keel_sdk.providers.google import AsyncGenerativeModel
from keel_sdk.providers.xai import AsyncGrok
from keel_sdk.providers.meta import AsyncLlama

client = AsyncOpenAI()
response = await client.chat.completions.create(model="gpt-4o", messages=[...])

client = AsyncAnthropic()
response = await client.messages.create(model="claude-sonnet-4-20250514", messages=[...], max_tokens=1024)

model = AsyncGenerativeModel("gemini-2.0-flash")
response = await model.generate_content_async("Hello!")

client = AsyncGrok()
response = await client.chat.completions.create(model="grok-2", messages=[...])

client = AsyncLlama()
response = await client.chat.completions.create(model="llama-3.3-70b", messages=[...])

Streaming

Pass stream=True to get an iterator of chunks:

for chunk in client.chat.completions.create(model="gpt-4o", messages=[...], stream=True):
    print(chunk)

Error Handling

from keel_sdk import KeelError
from keel_sdk.providers.openai import OpenAI

client = OpenAI()
try:
    response = client.chat.completions.create(model="gpt-4o", messages=[...])
except KeelError as e:
    if e.code == "permit_denied":
        print(f"Blocked by policy: {e.message}")
    else:
        print(f"API error {e.status}: {e.message}")

Setup

from keel_sdk import KeelClient

client = KeelClient(
    base_url="https://api.keelapi.com",
    api_key="keel_sk_...",
)

Permits

Request a permit before making an AI call:

import uuid
from keel_sdk import KeelClient

client = KeelClient(base_url="https://api.keelapi.com", api_key="keel_sk_...")

permit = client.permits.create({
    "project_id": "proj_123",
    "idempotency_key": str(uuid.uuid4()),
    "subject": {"type": "user", "id": "usr_123"},
    "action": {"name": "ai.generate"},
    "resource": {
        "type": "request",
        "id": "req_123",
        "attributes": {
            "provider": "openai",
            "model": "gpt-4o-mini",
            "estimated_input_tokens": 200,
            "estimated_output_tokens": 500,
        },
    },
})

if permit["decision"] == "allow":
    # proceed with AI call
    pass

Async variant:

permit = await client.permits.create_async({...})

Dry run

result = client.permits.dry_run(permit_request)

List and get

permits = client.permits.list(project_id="proj_123", limit=50)
permit = client.permits.get("permit_id")

Report usage

client.permits.report_usage("permit_id", {
    "actual_input_tokens": 180,
    "actual_output_tokens": 420,
})

Attestation, evidence, lineage

client.permits.attest("permit_id", {"outcome": "success"})
client.permits.add_evidence("permit_id", {"label": "response_hash", "value": "abc123"})
evidence = client.permits.list_evidence("permit_id")
lineage = client.permits.lineage("permit_id")
bundle = client.permits.bundle("permit_id")

Workflows

Declare a multi-call workflow before it runs, then use the workflow context manager to thread X-Keel-Workflow-Id through every SDK call made inside the block.

from keel_sdk import KeelClient, WorkflowMaxCallsExceededError

client = KeelClient(base_url="https://api.keelapi.com", api_key="keel_sk_...")

decl = client.workflows.declare(
    "invoice-batch-2026-05-13",
    {
        "expected_calls": 10000,
        "max_calls": 12000,
        "expected_model": "gpt-5-mini",
        "expected_input_tokens_per_call": 4000,
        "expected_output_tokens_per_call": 500,
        "max_duration_seconds": 86400,
    },
)

try:
    with client.workflow(decl["workflow_id"]):
        permit = client.permits.create({
            "project_id": "proj_123",
            "idempotency_key": "invoice-001",
            "subject": {"type": "service_account", "id": "billing-agent"},
            "action": {"name": "ai.generate"},
            "resource": {
                "type": "request",
                "id": "invoice-001",
                "attributes": {
                    "provider": "openai",
                    "model": "gpt-5-mini",
                    "estimated_input_tokens": 4000,
                    "estimated_output_tokens": 500,
                },
            },
        })
except WorkflowMaxCallsExceededError:
    # The workflow reached max_calls; amend or stop the run.
    raise

client.workflows.amend(
    decl["workflow_id"],
    if_match_version=decl["version"],
    new_max_calls=20000,
    reason_provided="ticket volume higher than forecast",
)
client.workflows.complete(decl["workflow_id"])

Async calls use the same context variable:

async with client.workflow_async("invoice-batch-2026-05-13"):
    permit = await client.permits.create_async({...})

Workflow API errors are typed subclasses of KeelError, including WorkflowAmendmentVersionConflictError, WorkflowMaxCallsExceededError, and PlanUpgradeRequiredError.

A fuller runnable sample lives in examples/workflows/declare_and_run.py.

Executions

Run a model synchronously:

result = client.executions.create({
    "provider": "openai",
    "model": "gpt-4o-mini",
    "messages": [{"role": "user", "content": "Summarize this document."}],
    "permit_id": permit["permit_id"],
})

Stream tokens as they arrive:

for event in client.executions.stream({
    "provider": "openai",
    "model": "gpt-4o-mini",
    "messages": [{"role": "user", "content": "Write a poem."}],
    "permit_id": permit["permit_id"],
}):
    if event["event_type"] == "content_delta":
        print(event["data"], end="", flush=True)
    if event["event_type"] == "done":
        print()

Async streaming:

async for event in client.executions.stream_async({...}):
    ...

Execute (unified)

result = client.execute.run({
    "model": "gpt-4o-mini",
    "input": "Translate to Spanish: Hello world",
    "provider": "openai",
})

Proxy

Pass requests through to providers with Keel governance applied:

response = client.proxy.openai({
    "model": "gpt-4o-mini",
    "messages": [{"role": "user", "content": "Hello"}],
})

# Also: client.proxy.anthropic(), .google(), .xai(), .meta()

Jobs

Submit async jobs and poll for results:

job = client.jobs.create({
    "provider": "openai",
    "model": "gpt-4o-mini",
    "messages": [{"role": "user", "content": "Analyze this dataset."}],
})

status = client.jobs.get(job["job_id"])
# status["status"]: "pending" | "running" | "completed" | "failed"

API Keys

key = client.api_keys.create()
keys = client.api_keys.list()
client.api_keys.revoke(key["id"])

Request Timeline

timeline = client.requests.timeline("request_id")

Error Handling

from keel_sdk import KeelClient, KeelError, ThrottledError

try:
    client.permits.create(request)
except ThrottledError as e:
    # HTTP 429 — rate-limit throttle (subclass of KeelError)
    print(e.retry_after_seconds)  # seconds to wait before retrying
    print(e.permit_id)            # permit ID from the throttled response
    print(e.reason_code)          # e.g. "budget.rate_limit_throttled"
except KeelError as e:
    print(e.status)        # HTTP status code
    print(e.code)          # e.g. "permit_denied"
    print(e.message)       # human-readable message
    print(e.field)         # field that caused the error, if any
    print(e.is_retryable)  # True for 408, 429, 500, 502, 503, 504
    print(e.retry_after)   # seconds from Retry-After header, or None

Throttle (HTTP 429)

When the API rate-limits a request it returns HTTP 429 with a Retry-After header and a permit body containing decision: "throttled". The SDK:

  1. Parses the Retry-After header (falls back to outcome_detail.retry_after_seconds in the body).
  2. Auto-retries if retry_config is set (429 is in the default retryable codes).
  3. Raises ThrottledError after retries are exhausted (or immediately if retries are disabled).

ThrottledError is a subclass of KeelError, so existing except KeelError handlers continue to work. Catch ThrottledError first if you need throttle-specific fields like permit_id or reason_code.

Reason Codes

Denied and throttled permits carry a dot-namespaced reason_code string. The SDK provides constants for budget, policy, and workflow intent reason codes:

from keel_sdk import BUDGET_RATE_LIMIT_THROTTLED, POLICY_MODEL_NOT_ALLOWED

if e.reason_code == BUDGET_RATE_LIMIT_THROTTLED:
    ...

Automatic Retries

The SDK can automatically retry failed requests that return transient HTTP errors (408, 429, 500, 502, 503, 504) using exponential backoff with jitter.

from keel_sdk import KeelClient, RetryConfig

# Use default retry settings (3 retries, 0.5s initial delay, 2x backoff)
client = KeelClient(
    base_url="https://api.keelapi.com",
    api_key="keel_sk_...",
    retry_config=RetryConfig(),
)

# Customize retry behavior
client = KeelClient(
    base_url="https://api.keelapi.com",
    api_key="keel_sk_...",
    retry_config=RetryConfig(
        max_retries=5,
        initial_delay=1.0,
        max_delay=60.0,
        backoff_multiplier=3.0,
    ),
)

To disable retries (the default), omit retry_config or pass None:

client = KeelClient(
    base_url="https://api.keelapi.com",
    api_key="keel_sk_...",
    retry_config=None,  # no automatic retries
)

When a response includes a Retry-After header, the SDK respects that value instead of the computed backoff delay. Streaming calls are never retried.

Per-Request Timeout

Every method that makes an HTTP call accepts an optional timeout parameter that overrides the client-level timeout for that single request:

# Client-level timeout is 30 s (the default)
client = KeelClient(base_url="https://api.keelapi.com", api_key="keel_sk_...", timeout=30.0)

# This specific call uses a 5 s timeout instead
permit = client.request("POST", "/v1/permits", json=body, timeout=5.0)

# _get and streaming helpers also accept timeout
timeline = client._get("/v1/requests/req_1/timeline", timeout=10.0)

Pass timeout=None (or omit it) to use the client-level default.

Context Manager

with KeelClient(base_url="...", api_key="...") as client:
    permit = client.permits.create({...})

# Async
async with KeelClient(base_url="...", api_key="...") as client:
    permit = await client.permits.create_async({...})

Freshness Headers

For replay-protected endpoints:

client = KeelClient(
    base_url="https://api.keelapi.com",
    api_key="keel_sk_...",
    request_freshness=True,
)

This adds X-Keel-Timestamp and X-Keel-Nonce to every request.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

keel_sdk-0.3.0.tar.gz (76.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

keel_sdk-0.3.0-py3-none-any.whl (57.3 kB view details)

Uploaded Python 3

File details

Details for the file keel_sdk-0.3.0.tar.gz.

File metadata

  • Download URL: keel_sdk-0.3.0.tar.gz
  • Upload date:
  • Size: 76.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for keel_sdk-0.3.0.tar.gz
Algorithm Hash digest
SHA256 6a36bae14ac9be6aca435364dea26861a33338f2c41ac179ef2e5ede512ccced
MD5 e2d39dc65d66fcc3772ed94958da0d62
BLAKE2b-256 a6b02140db017b0f3b519c41bb1681db005005e36866acb674a8684ff22539a1

See more details on using hashes here.

File details

Details for the file keel_sdk-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: keel_sdk-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 57.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for keel_sdk-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3518dc2e358a3024c0bd077ff09fc4042c49108865c4818ca4b44d8638c458dc
MD5 cd3b144367ba8cc0cec01e0869cbe062
BLAKE2b-256 137e42b1ebe3b4ecfb67b17dbd9e0981a42f0d7abfc1eb3b0de038295e0434fd

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page