AI provider abstractions for the k agent framework
Project description
kai
Unified multi-provider LLM API for the k agent framework.
kai provides a simple, provider-agnostic interface for streaming LLM completions with tool calling support. It has two entry points — stream() for real-time events and complete() for one-shot responses.
Installation
uv add kcastle-ai
Quick Start
import asyncio
from kai import OpenAIChatCompletions, Context, Message, complete
async def main():
provider = OpenAIChatCompletions(model="gpt-4o")
context = Context(
system="You are a helpful assistant.",
messages=[Message(role="user", content="Hello!")],
)
message = await complete(provider, context)
print(message.extract_text())
asyncio.run(main())
API Overview
Layer Boundary
kai defines provider interfaces and concrete provider clients, but does not own provider factory/registry wiring.
Provider instantiation policy (provider mapping and custom registries) belongs to the application layer (for this repo: kcastle).
Two Entry Points
| Function | Returns | Use When |
|---|---|---|
complete(provider, context) |
Message |
You need the full response at once |
stream(provider, context) |
AsyncIterator[StreamEvent] |
You want real-time token-by-token output |
Providers
| Provider | Class | Env Variable |
|---|---|---|
| OpenAI Chat Completions (+ compatible APIs) | OpenAIChatCompletions(model=...) |
OPENAI_API_KEY |
| OpenAI Responses | OpenAIResponses(model=...) |
OPENAI_API_KEY |
| Anthropic Messages | AnthropicMessages(model=...) |
ANTHROPIC_API_KEY |
Both classes accept api_key and base_url for explicit configuration.
Message Types
# Simple text
Message(role="user", content="Hello")
# Multimodal (text + image)
Message(role="user", content=[
TextPart(text="What's in this image?"),
ImagePart(data=base64_data, mime_type="image/png"),
])
# Tool result
Message.tool_result(tool_call_id, "result text")
Stream Events
When using stream(), events are emitted with pattern matching:
async for event in await stream(provider, context):
match event:
case StartEvent(): ... # Stream started
case TextStartEvent(): ... # New text block
case TextDeltaEvent(delta): ... # Text fragment
case TextEndEvent(text): ... # Text block complete
case ThinkStartEvent(): ... # Thinking started
case ThinkDeltaEvent(delta): ... # Thinking fragment
case ThinkEndEvent(text): ... # Thinking complete
case ToolCallStartEvent(): ... # Tool call started
case ToolCallDeltaEvent(): ... # Tool call args fragment
case ToolCallEndEvent(tc): ... # Tool call complete
case DoneEvent(message): ... # Stream finished
case ErrorEvent(error): ... # Error occurred
Every event carries a partial field with the accumulated message so far.
Tool Calling
kai provides declarative tool definitions — no execution logic. Define tools as JSON Schema, execute them yourself:
tool = Tool(
name="get_weather",
description="Get weather for a city.",
parameters={
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
)
response = await complete(provider, Context(messages=messages, tools=[tool]))
if response.tool_calls:
for tc in response.tool_calls:
result = your_execute_fn(tc.name, tc.arguments)
messages.append(response)
messages.append(Message.tool_result(tc.id, result))
Custom / Compatible Providers
Use OpenAIChatCompletions with base_url for any OpenAI-compatible API:
# DeepSeek
provider = OpenAIChatCompletions(model="deepseek-chat", base_url="https://api.deepseek.com")
# Local Ollama
provider = OpenAIChatCompletions(model="llama3", api_key="ollama", base_url="http://localhost:11434/v1")
# Together AI
provider = OpenAIChatCompletions(model="meta-llama/...", base_url="https://api.together.xyz/v1")
Error Handling
from kai.errors import StatusError, ConnectionError, TimeoutError, ProviderError
try:
msg = await complete(provider, context)
except StatusError as e:
print(f"HTTP {e.status_code}: {e}")
except ConnectionError:
print("Connection failed")
except TimeoutError:
print("Request timed out")
With stream(), errors arrive as ErrorEvent instead of exceptions.
Extended Thinking (Anthropic)
provider = AnthropicMessages(
model="claude-sonnet-4-20250514",
thinking={"type": "enabled", "budget_tokens": 5000},
)
async for event in stream(provider, context):
match event:
case ThinkDeltaEvent(delta=text):
print(f"💭 {text}", end="")
case TextDeltaEvent(delta=text):
print(text, end="")
Examples
See the examples/ directory for runnable demos covering streaming, tool calling, multi-turn conversations, multimodal input, error handling, and more.
Architecture
Dual-layer streaming: Providers yield raw Chunk objects. The stream() function accumulates them into rich StreamEvent objects, each carrying a partial message snapshot — so consumers always have the full state.
complete() — One-shot Flow
complete() is a thin wrapper over stream(). It silently consumes all events and returns the final Message:
sequenceDiagram
participant U as User Code
participant K as kai complete()
participant P as Provider
U->>K: complete(provider, context)
K->>P: stream_raw(context)
loop SSE stream
P-->>K: Chunk (TextChunk, UsageChunk, ...)
Note right of K: Accumulate internally,<br/>discard intermediate events
end
K-->>U: Message(role="assistant", content="...", usage=...)
stream() — Real-time Event Flow
stream() exposes every intermediate step. Each event carries a partial message snapshot reflecting state accumulated so far:
sequenceDiagram
participant U as User Code
participant S as kai stream()
participant St as _StreamState
participant P as Provider
U->>S: stream(provider, context)
S->>P: stream_raw(context)
Note over P: Context → SDK wire format<br/>Message → {"role":"user",...}<br/>Tool → {"type":"function",...}
S-->>U: StartEvent
P-->>S: TextChunk("Hel")
S->>St: process_chunk
Note over St: _text = "Hel"
St-->>S: [TextStartEvent, TextDeltaEvent]
S-->>U: TextStartEvent(partial=Message)
S-->>U: TextDeltaEvent(delta="Hel", partial=Message)
P-->>S: TextChunk("lo!")
S->>St: process_chunk
Note over St: _text = "Hello!"
St-->>S: [TextDeltaEvent]
S-->>U: TextDeltaEvent(delta="lo!", partial=Message)
P-->>S: ToolCallStart(id="call_1", name="get_weather")
S->>St: process_chunk
Note over St: flush _text → TextPart<br/>_tool_id = "call_1"
St-->>S: [TextEndEvent, ToolCallStartEvent]
S-->>U: TextEndEvent(text="Hello!", partial=Message)
S-->>U: ToolCallStartEvent(partial=Message)
P-->>S: ToolCallDelta(args='{"city":')
S->>St: process_chunk
Note over St: _tool_args = '{"city":'
St-->>S: [ToolCallDeltaEvent]
S-->>U: ToolCallDeltaEvent(partial=Message)
P-->>S: ToolCallDelta(args='"Beijing"}')
S->>St: process_chunk
Note over St: _tool_args = '{"city":"Beijing"}'
St-->>S: [ToolCallDeltaEvent]
S-->>U: ToolCallDeltaEvent(partial=Message)
P-->>S: UsageChunk(input=42, output=15)
S->>St: process_chunk
Note over St: usage = TokenUsage(42, 15)
Note over S: Stream ended → flush_pending()
S->>St: flush_pending
Note over St: Finalize ToolCall → tool_calls[]
St-->>S: [ToolCallEndEvent]
S-->>U: ToolCallEndEvent(tool_call=ToolCall(...), partial=Message)
S->>St: build_final(stop_reason="tool_use")
S-->>U: DoneEvent(message=Message)
Note over U: message.tool_calls[0].name == "get_weather"<br/>message.tool_calls[0].arguments == '{"city":"Beijing"}'<br/>message.usage.input_tokens == 42
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kcastle_ai-0.0.1a2.tar.gz.
File metadata
- Download URL: kcastle_ai-0.0.1a2.tar.gz
- Upload date:
- Size: 18.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ad529a77e744216fd9eefb1b54d9a571eb072ac38beb8a9597298504b8f23781
|
|
| MD5 |
0354fbdecb8c2c1d018257322bfaa58d
|
|
| BLAKE2b-256 |
6229a96ef2cff6221dc66bb7083f96cab1a2bcda144461518a71f6cff56639e4
|
Provenance
The following attestation bundles were made for kcastle_ai-0.0.1a2.tar.gz:
Publisher:
release.yml on shenxiangzhuang/kcastle
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
kcastle_ai-0.0.1a2.tar.gz -
Subject digest:
ad529a77e744216fd9eefb1b54d9a571eb072ac38beb8a9597298504b8f23781 - Sigstore transparency entry: 1183044145
- Sigstore integration time:
-
Permalink:
shenxiangzhuang/kcastle@b06efe81823098fa16b6388a351240ef50be484c -
Branch / Tag:
refs/tags/v0.0.1a2 - Owner: https://github.com/shenxiangzhuang
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@b06efe81823098fa16b6388a351240ef50be484c -
Trigger Event:
release
-
Statement type:
File details
Details for the file kcastle_ai-0.0.1a2-py3-none-any.whl.
File metadata
- Download URL: kcastle_ai-0.0.1a2-py3-none-any.whl
- Upload date:
- Size: 25.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f33a290c076974fc5990f49f16184a160ce373c7a1504440edb934366cba366b
|
|
| MD5 |
8e64a5cf6cca9f60e60a9f914d676cdc
|
|
| BLAKE2b-256 |
f1bbb1eb80cc3e853557741ebe9e111db9a8936384030687ea8c2ce77b3b98bb
|
Provenance
The following attestation bundles were made for kcastle_ai-0.0.1a2-py3-none-any.whl:
Publisher:
release.yml on shenxiangzhuang/kcastle
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
kcastle_ai-0.0.1a2-py3-none-any.whl -
Subject digest:
f33a290c076974fc5990f49f16184a160ce373c7a1504440edb934366cba366b - Sigstore transparency entry: 1183044177
- Sigstore integration time:
-
Permalink:
shenxiangzhuang/kcastle@b06efe81823098fa16b6388a351240ef50be484c -
Branch / Tag:
refs/tags/v0.0.1a2 - Owner: https://github.com/shenxiangzhuang
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@b06efe81823098fa16b6388a351240ef50be484c -
Trigger Event:
release
-
Statement type: