Skip to main content

AI provider abstractions for the k agent framework

Project description

kai

PyPI

Unified multi-provider LLM API for the k agent framework.

kai provides a simple, provider-agnostic interface for streaming LLM completions with tool calling support. It has two entry points — stream() for real-time events and complete() for one-shot responses.

Installation

uv add kcastle-ai

Quick Start

import asyncio
from kai import OpenAIChatCompletions, Context, Message, complete

async def main():
    provider = OpenAIChatCompletions(model="gpt-4o")
    context = Context(
        system="You are a helpful assistant.",
        messages=[Message(role="user", content="Hello!")],
    )
    message = await complete(provider, context)
    print(message.extract_text())

asyncio.run(main())

API Overview

Layer Boundary

kai defines provider interfaces and concrete provider clients, but does not own provider factory/registry wiring. Provider instantiation policy (provider mapping and custom registries) belongs to the application layer (for this repo: kcastle).

Two Entry Points

Function Returns Use When
complete(provider, context) Message You need the full response at once
stream(provider, context) AsyncIterator[StreamEvent] You want real-time token-by-token output

Providers

Provider Class Env Variable
OpenAI Chat Completions (+ compatible APIs) OpenAIChatCompletions(model=...) OPENAI_API_KEY
OpenAI Responses OpenAIResponses(model=...) OPENAI_API_KEY
Anthropic Messages AnthropicMessages(model=...) ANTHROPIC_API_KEY

Both classes accept api_key and base_url for explicit configuration.

Message Types

# Simple text
Message(role="user", content="Hello")

# Multimodal (text + image)
Message(role="user", content=[
    TextPart(text="What's in this image?"),
    ImagePart(data=base64_data, mime_type="image/png"),
])

# Tool result
Message.tool_result(tool_call_id, "result text")

Stream Events

When using stream(), events are emitted with pattern matching:

async for event in await stream(provider, context):
    match event:
        case StartEvent():           ...  # Stream started
        case TextStartEvent():       ...  # New text block
        case TextDeltaEvent(delta):  ...  # Text fragment
        case TextEndEvent(text):     ...  # Text block complete
        case ThinkStartEvent():      ...  # Thinking started
        case ThinkDeltaEvent(delta): ...  # Thinking fragment
        case ThinkEndEvent(text):    ...  # Thinking complete
        case ToolCallStartEvent():   ...  # Tool call started
        case ToolCallDeltaEvent():   ...  # Tool call args fragment
        case ToolCallEndEvent(tc):   ...  # Tool call complete
        case DoneEvent(message):     ...  # Stream finished
        case ErrorEvent(error):      ...  # Error occurred

Every event carries a partial field with the accumulated message so far.

Tool Calling

kai provides declarative tool definitions — no execution logic. Define tools as JSON Schema, execute them yourself:

tool = Tool(
    name="get_weather",
    description="Get weather for a city.",
    parameters={
        "type": "object",
        "properties": {"city": {"type": "string"}},
        "required": ["city"],
    },
)

response = await complete(provider, Context(messages=messages, tools=[tool]))
if response.tool_calls:
    for tc in response.tool_calls:
        result = your_execute_fn(tc.name, tc.arguments)
        messages.append(response)
        messages.append(Message.tool_result(tc.id, result))

Custom / Compatible Providers

Use OpenAIChatCompletions with base_url for any OpenAI-compatible API:

# DeepSeek
provider = OpenAIChatCompletions(model="deepseek-chat", base_url="https://api.deepseek.com")

# Local Ollama
provider = OpenAIChatCompletions(model="llama3", api_key="ollama", base_url="http://localhost:11434/v1")

# Together AI
provider = OpenAIChatCompletions(model="meta-llama/...", base_url="https://api.together.xyz/v1")

Error Handling

from kai.errors import StatusError, ConnectionError, TimeoutError, ProviderError

try:
    msg = await complete(provider, context)
except StatusError as e:
    print(f"HTTP {e.status_code}: {e}")
except ConnectionError:
    print("Connection failed")
except TimeoutError:
    print("Request timed out")

With stream(), errors arrive as ErrorEvent instead of exceptions.

Extended Thinking (Anthropic)

provider = AnthropicMessages(
    model="claude-sonnet-4-20250514",
    thinking={"type": "enabled", "budget_tokens": 5000},
)

async for event in stream(provider, context):
    match event:
        case ThinkDeltaEvent(delta=text):
            print(f"💭 {text}", end="")
        case TextDeltaEvent(delta=text):
            print(text, end="")

Examples

See the examples/ directory for runnable demos covering streaming, tool calling, multi-turn conversations, multimodal input, error handling, and more.

Architecture

Dual-layer streaming: Providers yield raw Chunk objects. The stream() function accumulates them into rich StreamEvent objects, each carrying a partial message snapshot — so consumers always have the full state.

complete() — One-shot Flow

complete() is a thin wrapper over stream(). It silently consumes all events and returns the final Message:

sequenceDiagram
    participant U as User Code
    participant K as kai complete()
    participant P as Provider

    U->>K: complete(provider, context)
    K->>P: stream_raw(context)

    loop SSE stream
        P-->>K: Chunk (TextChunk, UsageChunk, ...)
        Note right of K: Accumulate internally,<br/>discard intermediate events
    end

    K-->>U: Message(role="assistant", content="...", usage=...)

stream() — Real-time Event Flow

stream() exposes every intermediate step. Each event carries a partial message snapshot reflecting state accumulated so far:

sequenceDiagram
    participant U as User Code
    participant S as kai stream()
    participant St as _StreamState
    participant P as Provider

    U->>S: stream(provider, context)
    S->>P: stream_raw(context)
    Note over P: Context → SDK wire format<br/>Message → {"role":"user",...}<br/>Tool → {"type":"function",...}

    S-->>U: StartEvent

    P-->>S: TextChunk("Hel")
    S->>St: process_chunk
    Note over St: _text = "Hel"
    St-->>S: [TextStartEvent, TextDeltaEvent]
    S-->>U: TextStartEvent(partial=Message)
    S-->>U: TextDeltaEvent(delta="Hel", partial=Message)

    P-->>S: TextChunk("lo!")
    S->>St: process_chunk
    Note over St: _text = "Hello!"
    St-->>S: [TextDeltaEvent]
    S-->>U: TextDeltaEvent(delta="lo!", partial=Message)

    P-->>S: ToolCallStart(id="call_1", name="get_weather")
    S->>St: process_chunk
    Note over St: flush _text → TextPart<br/>_tool_id = "call_1"
    St-->>S: [TextEndEvent, ToolCallStartEvent]
    S-->>U: TextEndEvent(text="Hello!", partial=Message)
    S-->>U: ToolCallStartEvent(partial=Message)

    P-->>S: ToolCallDelta(args='{"city":')
    S->>St: process_chunk
    Note over St: _tool_args = '{"city":'
    St-->>S: [ToolCallDeltaEvent]
    S-->>U: ToolCallDeltaEvent(partial=Message)

    P-->>S: ToolCallDelta(args='"Beijing"}')
    S->>St: process_chunk
    Note over St: _tool_args = '{"city":"Beijing"}'
    St-->>S: [ToolCallDeltaEvent]
    S-->>U: ToolCallDeltaEvent(partial=Message)

    P-->>S: UsageChunk(input=42, output=15)
    S->>St: process_chunk
    Note over St: usage = TokenUsage(42, 15)

    Note over S: Stream ended → flush_pending()
    S->>St: flush_pending
    Note over St: Finalize ToolCall → tool_calls[]
    St-->>S: [ToolCallEndEvent]
    S-->>U: ToolCallEndEvent(tool_call=ToolCall(...), partial=Message)

    S->>St: build_final(stop_reason="tool_use")
    S-->>U: DoneEvent(message=Message)

    Note over U: message.tool_calls[0].name == "get_weather"<br/>message.tool_calls[0].arguments == '{"city":"Beijing"}'<br/>message.usage.input_tokens == 42

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kcastle_ai-0.0.1a2.tar.gz (18.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kcastle_ai-0.0.1a2-py3-none-any.whl (25.5 kB view details)

Uploaded Python 3

File details

Details for the file kcastle_ai-0.0.1a2.tar.gz.

File metadata

  • Download URL: kcastle_ai-0.0.1a2.tar.gz
  • Upload date:
  • Size: 18.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for kcastle_ai-0.0.1a2.tar.gz
Algorithm Hash digest
SHA256 ad529a77e744216fd9eefb1b54d9a571eb072ac38beb8a9597298504b8f23781
MD5 0354fbdecb8c2c1d018257322bfaa58d
BLAKE2b-256 6229a96ef2cff6221dc66bb7083f96cab1a2bcda144461518a71f6cff56639e4

See more details on using hashes here.

Provenance

The following attestation bundles were made for kcastle_ai-0.0.1a2.tar.gz:

Publisher: release.yml on shenxiangzhuang/kcastle

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file kcastle_ai-0.0.1a2-py3-none-any.whl.

File metadata

  • Download URL: kcastle_ai-0.0.1a2-py3-none-any.whl
  • Upload date:
  • Size: 25.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for kcastle_ai-0.0.1a2-py3-none-any.whl
Algorithm Hash digest
SHA256 f33a290c076974fc5990f49f16184a160ce373c7a1504440edb934366cba366b
MD5 8e64a5cf6cca9f60e60a9f914d676cdc
BLAKE2b-256 f1bbb1eb80cc3e853557741ebe9e111db9a8936384030687ea8c2ce77b3b98bb

See more details on using hashes here.

Provenance

The following attestation bundles were made for kcastle_ai-0.0.1a2-py3-none-any.whl:

Publisher: release.yml on shenxiangzhuang/kcastle

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page