Python SDK for executing Noukai flows from external applications
Project description
noukai-sdk
Python SDK for executing Noukai flows.
Sync and async clients, fully typed (py.typed), Pydantic models for results and events, supports Python 3.10+.
- Install
- Quick start
- Authentication
- Client configuration
- Executing flows
- Streaming steps and events
- Async (queue-backed) jobs
- Tool calls
- Replay and session grouping (experimental)
- Flow versions
- Run traces
- Errors
- Timeouts, retries
- Logging
- Resource management
Install
pip install noukai-sdk
# or
uv add noukai-sdk
# or
poetry add noukai-sdk
Requires Python 3.10+. Dependencies: httpx, pydantic v2.
Quick start
Synchronous
from noukai_sdk import Noukai
with Noukai() as client: # reads NOUKAI_API_KEY
result = client.flow("acme/spelling/grade-3").execute(
message="The cat sat on the mat.",
)
print(result.result)
Asynchronous
import asyncio
from noukai_sdk import AsyncNoukai
async def main() -> None:
async with AsyncNoukai() as client:
result = await client.flow("acme/spelling/grade-3").execute(
message="The cat sat on the mat.",
)
print(result.result)
asyncio.run(main())
The context manager auto-releases the underlying HTTP pool. If you can't use one, call client.close() / await client.aclose() manually.
Authentication
API keys start with nk_. Provide one of:
NOUKAI_API_KEYenvironment variable — recommended.api_keyconstructor argument — overrides the env var.
client = Noukai(api_key="nk_...")
If no key is found, the constructor raises AuthenticationError immediately.
Client configuration
Both Noukai and AsyncNoukai accept the same arguments:
client = Noukai(
api_key="nk_...", # overrides NOUKAI_API_KEY
env="production", # or "dev" — default "production"
org="acme", # default org for short-form slugs
project="spelling", # default project for short-form slugs
timeout=300.0, # default per-request timeout (seconds)
max_retries=1, # retries on retryable 5xx
log_handler=my_logger, # structured logging hook
log_payloads=False, # include request/response bodies in logs
)
env shortcut
| Value | Base URL |
|---|---|
"production" (default) |
https://api.noukai.xyz/api/v1 |
"dev" |
http://localhost:8080/api/v1 |
Falls back to the NOUKAI_ENV env var. The SDK does not accept an arbitrary base URL — all requests target Noukai's hosted endpoints.
Default org / project
When you set both, you can address flows by their short slug:
client = Noukai(org="acme", project="spelling")
client.flow("grade-3") # → acme/spelling/grade-3
client.flow("other-org/x/grade-3") # fully qualified — wins
client.flow(org="acme", project="spelling", slug="grade-3") # explicit
org and project must be passed together (or not at all). Half-defaults raise ValueError.
Executing flows
Three identifier forms
client.flow("grade-3") # uses client defaults
client.flow("acme/spelling/grade-3") # fully qualified
client.flow(org="acme", project="spelling", slug="grade-3")
execute() — synchronous, in-process
Blocks until the flow completes (or pauses for tools).
result = client.flow("acme/spelling/grade-3").execute(
message="Input text",
parameters={"difficulty": "hard"}, # extra initial inputs
block_overrides={"step-id": {"temperature": 0.5}},
attachments=[{"url": "https://...", "mime_type": "image/png"}],
trace=False, # capture full I/O for trace
version="draft", # or an int published version
timeout=60.0, # override client default
)
if result.status == "tool_calls_required":
# PausedResult — see "Tool calls" below
...
else:
print(result.result) # ExecuteResult
Returns ExecuteResult | PausedResult. The same surface exists on AsyncFlow.execute — just await it.
Streaming steps and events
For long-running flows, stream output as it arrives. The sync client returns Iterator; the async client returns AsyncIterator. Both honour early termination (break, return).
steps() — one event per finished step
flow = client.flow("acme/spelling/grade-3")
for step in flow.steps(message="..."):
print(step.name, step.output, step.duration_ms, step.tokens)
Async:
async for step in flow.steps(message="..."):
...
Yields StepCompleted events only. Intermediate signals are filtered out.
events() — every typed SSE event
for event in flow.events(message="..."):
match event.type:
case "run_started": # RunStarted
...
case "step_started": # StepStarted
...
case "step_input": # StepInput
...
case "step_output": # StepOutput
...
case "step_completed": # StepCompleted
...
case "step_error": # StepFailed
...
case "step_paused": # StepPaused
...
case "tool_calls_required": # ToolCallsRequired (has .resume())
...
case "flow_completed": # FlowCompleted
...
Pass run_remaining=True to have the server stream every remaining step in a single SSE connection instead of pausing between steps.
Async (queue-backed) jobs
For long executions where you don't want to hold an HTTP connection open, submit to the server's job queue and poll for the result.
Naming note.
execute_async()refers to server-side queue-backed execution, not Pythonasync. It exists on bothFlow(sync) andAsyncFlow(async).AsyncFlow.execute_async()means "await the submission of a server-side-async job" — both layers are real and useful.
job = client.flow("acme/spelling/grade-3").execute_async(
message="Long input that takes minutes...",
trace=True,
)
print(job.execution_id) # persist this if you want to resume later
# Block until done — polls under the hood (default: 2s interval, 5min timeout).
status = job.wait(timeout=600.0, poll_interval=5.0)
print(status.result)
# Or one-shot:
snapshot = job.poll()
if snapshot.status == "completed":
...
Async equivalent:
job = await client.flow("acme/spelling/grade-3").execute_async(message="...")
status = await job.wait(timeout=600.0)
Tool calls are not supported on this path (server-side limitation).
Tool calls
Flows can pause to request tool execution from your code. The SDK has two modes.
Auto-resume (recommended)
Pass tool_handler; the SDK runs your handler against each pending call and resumes automatically until the flow finishes or max_tool_rounds is hit.
def my_tools(tool_calls: list[dict]) -> list[dict]:
return [
{"tool_call_id": call["id"],
"output": get_weather(call["function"]["arguments"])}
for call in tool_calls
]
result = flow.execute(
message="What's the weather in Paris?",
tools=[{"type": "function", "function": {"name": "get_weather", ...}}],
tool_handler=my_tools,
max_tool_rounds=10, # default — raises ToolCallLimitError if exceeded
)
AsyncFlow.execute accepts both sync and async handlers; Flow.execute accepts sync only and raises TypeError at call time if given a coroutine function.
Manual resume
Omit tool_handler and drive the loop yourself (async only):
result = await flow.execute(message="...", tools=[...])
while result.status == "tool_calls_required":
tool_results = await run_tools(result.tool_calls)
result = await result.resume(tool_results=tool_results)
print(result.result)
During streaming, ToolCallsRequired events expose the same .resume(tool_results=...) method.
Replay and session grouping (experimental)
The @noukai_sdk.trace decorator groups every Noukai SDK call made inside your
route (or any callable) under one session id. In capture mode (always-on
when the decorator is present) the SDK tags each outbound request with
X-Session-Id so the backend can record a replayable snapshot. In replay
mode (gated by NOUKAI_REPLAY_ENABLED=true) a single header lets you replay
that recorded session without making live LLM calls.
Capture (quick start)
import asyncio
import noukai_sdk
from noukai_sdk import AsyncNoukai
noukai = AsyncNoukai(api_key="nk_...", org="acme", project="spelling")
@noukai_sdk.trace
async def handle(message: str) -> dict:
result = await noukai.flow("grade-3").execute(message=message)
return {"output": result.result, "session_id": result.session_id}
asyncio.run(handle("The cat sat on the mat."))
When used with the FastAPI or Flask adapters the response also carries
an X-Noukai-Session header so callers can grab the session id without any
code changes:
from fastapi import FastAPI
from noukai_sdk.adapters.fastapi import NoukaiTraceMiddleware
app = FastAPI()
app.add_middleware(NoukaiTraceMiddleware, client=noukai)
@app.post("/grade")
@noukai_sdk.trace
async def grade(req: Request) -> dict:
body = await req.json()
result = await noukai.flow("grade-3").execute(message=body["text"])
return {"output": result.result}
Flask equivalent:
from flask import Flask
from noukai_sdk.adapters.flask import init_noukai_trace
app = Flask(__name__)
init_noukai_trace(app, client=noukai)
Replay (debugging against your own API)
The goal of replay is debuggability — re-running a recorded execution through your live API code without hitting any LLMs. Useful for stepping a debugger through a failed production request, or re-running a known scenario after editing your handler.
End-to-end flow:
-
Capture the session id during normal traffic. Capture runs automatically whenever a request enters a
@trace-wrapped route. The FastAPI/Flask adapter setsX-Noukai-Session: <session_id>on the response — log it or surface it through your error reporter so you have the handle to replay later. -
Start your API in dev with the replay gate on:
NOUKAI_REPLAY_ENABLED=true uvicorn app:main
-
Call your own API the same way a real client would, just add the replay header pointing at the captured session id. Replace the URL below with your app's dev URL and the route you wrote in step 1 (here
/gradeon the default uvicorn port):curl -H 'X-Noukai-Replay: abc-123-def' \ -H 'Content-Type: application/json' \ -d '{"text": "anything — the body is not matched against the cassette"}' \ http://localhost:8000/grade
The adapter middleware detects the header, fetches the recorded session via
GET /seq/sessions/{id}(idempotent, retried by the transport), and serves everyFlow.execute()/steps()/events()call inside the route from the cassette. No LLM calls, no charges, deterministic output.
Your handler code runs for real — only the Noukai SDK calls are served from the cassette. That means you can edit the handler between replays and the same session id keeps working, as long as your code still makes the same sequence of Noukai calls. This is the fast path for verifying a bug fix or iterating on post-processing logic.
Non-HTTP contexts. For background workers, CLI tools, tests, or notebooks where there is no inbound request to carry the header, open the scope programmatically:
from noukai_sdk import trace_scope async with trace_scope( transport=noukai._transport, replay_session_id="abc-123-def", ): await noukai.flow("grade-3").execute(message="any input")Use
trace_scope_syncfor sync code. The sameNOUKAI_REPLAY_ENABLEDgate applies.
Capture vs replay
| Feature | Capture mode | Replay mode |
|---|---|---|
| Trigger | @trace decorator present |
NOUKAI_REPLAY_ENABLED=true + X-Noukai-Replay header |
| LLM calls | Live | None (served from cassette) |
X-Session-Id outbound |
Yes (set by SDK) | Yes (same header, replay session id) |
X-Noukai-Session response |
Yes (set by adapter) | Yes (same header) |
| Non-determinism | Normal | Eliminated |
| Production safe | Yes | Gated by env var — a stray header in prod is silently ignored |
Explicit session_id kwarg inside a scope (caution)
Warning: passing an explicit
session_id=toFlow.execute()(or any flow method) inside an active replay scope bypasses the scope's cassette. The SDK performs a one-shot fetch of the explicit session and replays from that session instead of the scope session. This is intentional — it lets you mix two sessions in one request — but it is easy to trigger accidentally if you have old code that forwards a session id parameter.Outside a scope, an explicit
session_id=kwarg simply tags the outbound request withX-Session-Id(capture only — no replay fetch is performed without the env var guard).
Production safety
Replay is gated behind NOUKAI_REPLAY_ENABLED=true. Without this env var an
X-Noukai-Replay header in production is silently ignored; the scope opens in
capture mode as normal. This means:
- Deploying the
@tracedecorator to production is safe. - Replay cannot be enabled by a client-supplied header alone.
Errors
| Class | When raised |
|---|---|
ReplayMissError |
No cassette entry matched the (slug, position) key. |
ReplayLeftoverError |
Scope exited with unconsumed cassette entries. |
ReplayForbiddenError |
Session belongs to a different org/project. |
ReplaySessionNotFoundError |
Session id does not exist on the backend. |
ReplaySessionExpiredError |
Session TTL has elapsed; snapshots gone. |
ReplayInvalidSessionError |
Session payload is structurally malformed. |
ReplayNoSnapshotsError |
Session exists but trace_capture_mode was off. |
ReplayDisabledError |
Replay requested but env var not set. |
All extend ReplayError which extends NoukaiError.
Caveats
- Capture requires
trace_capture_modeon the flow or org to befullorredacted. If the mode isoff, no snapshot is written and replay fails withReplayNoSnapshotsError. - Concurrent same-slug calls within one scope are undefined in v1 — the SDK
emits a
UserWarningat runtime if this is detected. execute_async()(server-side queue jobs) is captured normally but replay is not supported in v1 — aReplayMissErroris raised.- The idempotent session fetch (
GET /seq/sessions/{id}) is retried by the transport's default retry policy. Cache of fetched sessions across scopes is deferred to v1.1.
Framework adapter install extras
pip install "noukai-sdk[fastapi]" # pulls starlette>=0.36
pip install "noukai-sdk[flask]" # pulls flask>=3.0
Flow versions
version |
Behaviour |
|---|---|
"draft" (default) |
Latest unpublished draft (what you see in the editor). |
<int> |
A specific published version (e.g. version=3). |
"production" |
Not yet supported — raises NotImplementedError at call site. |
Pin a version when calling from production code; use "draft" only in test and preview environments.
Run traces
Every execution has an execution_id. Use it to fetch trace data after the fact.
run = flow.run("exec_abc123")
# Whole-run trace (one attempt per step)
trace = run.trace()
print(trace.summary, trace.steps)
# Single step, optionally a specific attempt
step_trace = run.step_trace("step-1", attempt="latest")
all_attempts = run.step_trace("step-1", attempt="all")
# Live trace stream (replays from DB, then tails Redis)
for event in run.live_trace():
print(event)
Async equivalent uses await run.trace(), await run.step_trace(...), and async for event in run.live_trace().
Errors
All errors extend NoukaiError and carry status_code, code, execution_id, request_id, and response_body attributes for diagnostics.
| Class | HTTP | When |
|---|---|---|
AuthenticationError |
401 | Missing or invalid API key. .www_authenticate set when server provides it. |
PermissionDeniedError |
403 | Key lacks access to the requested resource. |
FlowNotFoundError |
404 | Slug or execution ID not found. |
InsufficientCreditsError |
402 | Org balance is insufficient or exhausted. |
RateLimitError |
429 | Rate limited — .retry_after (seconds) when present. |
FlowExecutionError |
5xx | Server-side execution failure. Branch on .code. |
APIConnectionError |
n/a | Network / DNS / TLS failure before any response. |
APITimeoutError |
n/a | Request or job-wait exceeded its timeout. |
ToolCallLimitError |
n/a | Client-side: max_tool_rounds exhausted. |
from noukai_sdk import FlowExecutionError
try:
flow.execute(message="...")
except FlowExecutionError as err:
if err.code == "BYOK_KEY_REJECTED":
...
raise
Common .code values: FLOW_NOT_FOUND, INSUFFICIENT_CREDITS, CREDITS_EXHAUSTED, TOOL_ITERATION_LIMIT, BYOK_KEY_REJECTED, INVALID_TREE, NO_STEPS, MISSING_MESSAGE. Full list on the server contract.
Timeouts, retries
Timeouts
- Client default:
timeout=300.0(seconds) on the client constructor. - Per-call override: pass
timeout=60.0to any method. - Triggers
APITimeoutError.
Retries
- Default:
max_retries=1— one retry on retryable 5xx with exponential backoff. - Non-retryable status codes are surfaced immediately.
Logging
def log_handler(event: dict) -> None:
# {"phase": "request" | "response" | "retry",
# "method": ..., "path": ..., "attempt": ...,
# "status_code": ..., "request_id": ...,
# "request_body": ..., "response_body": ...}
logger.info(event)
client = Noukai(log_handler=log_handler, log_payloads=True)
The hook fires on every request, response, and retry attempt. request_body and response_body are omitted unless log_payloads=True — off by default to protect PII and credentials.
Resource management
The client holds an HTTP connection pool. Release it explicitly when you're done:
# Recommended — context manager:
with Noukai() as client:
...
async with AsyncNoukai() as client:
...
# Or manually:
client = Noukai()
try:
...
finally:
client.close() # AsyncNoukai: await client.aclose()
close() / aclose() are safe to call multiple times.
Documentation
Full guides, API reference, and examples: https://noukai.xyz/docs/sdk/python/
License
MIT — see LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file noukai_sdk-0.2.0.tar.gz.
File metadata
- Download URL: noukai_sdk-0.2.0.tar.gz
- Upload date:
- Size: 176.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bdf6f66cba1a6be78ad5d8bd84b0d8244823ad070b705a1fe7e45c5acbfc93ff
|
|
| MD5 |
dc14a77e19a2556f4797414058f462d0
|
|
| BLAKE2b-256 |
52070a8e8e02b585623229b0d9084aa3cc521d390ad9d784b8ebcebacb243f5f
|
Provenance
The following attestation bundles were made for noukai_sdk-0.2.0.tar.gz:
Publisher:
release.yml on abigcircle/noukai-python-sdk
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
noukai_sdk-0.2.0.tar.gz -
Subject digest:
bdf6f66cba1a6be78ad5d8bd84b0d8244823ad070b705a1fe7e45c5acbfc93ff - Sigstore transparency entry: 1935426243
- Sigstore integration time:
-
Permalink:
abigcircle/noukai-python-sdk@d8600357569831ca767c2a56da5761b34601c49a -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/abigcircle
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@d8600357569831ca767c2a56da5761b34601c49a -
Trigger Event:
push
-
Statement type:
File details
Details for the file noukai_sdk-0.2.0-py3-none-any.whl.
File metadata
- Download URL: noukai_sdk-0.2.0-py3-none-any.whl
- Upload date:
- Size: 72.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b79932588a0cc73c984d3fbeddfaab9b217c47e7855f41c4936d3234cc289a2f
|
|
| MD5 |
ea6fff52abc920aaccde009630ce5a07
|
|
| BLAKE2b-256 |
e4467b36ee71a89507515589ea6ab2b5bdd6fcafcddc8b68cbdbce84c6ce9bd4
|
Provenance
The following attestation bundles were made for noukai_sdk-0.2.0-py3-none-any.whl:
Publisher:
release.yml on abigcircle/noukai-python-sdk
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
noukai_sdk-0.2.0-py3-none-any.whl -
Subject digest:
b79932588a0cc73c984d3fbeddfaab9b217c47e7855f41c4936d3234cc289a2f - Sigstore transparency entry: 1935426267
- Sigstore integration time:
-
Permalink:
abigcircle/noukai-python-sdk@d8600357569831ca767c2a56da5761b34601c49a -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/abigcircle
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@d8600357569831ca767c2a56da5761b34601c49a -
Trigger Event:
push
-
Statement type: