Skip to main content

Python SDK for the Estuary real-time AI conversation platform

Project description

Estuary Python SDK

Python SDK for the Estuary real-time AI conversation platform. Supports text chat, streaming voice (WebSocket and LiveKit WebRTC), vision, and memory.

Installation

pip install estuary-sdk

Install with optional extras:

pip install estuary-sdk[audio]    # Microphone recording + speaker playback
pip install estuary-sdk[livekit]  # LiveKit WebRTC voice
pip install estuary-sdk[all]      # Everything

Or with PDM:

pdm install              # Core only
pdm install -G audio     # + audio
pdm install -G livekit   # + LiveKit
pdm install -G all       # Everything

Requires Python 3.11+.

Getting Your Credentials

To use the SDK you need an API key and a character ID from the Estuary Dashboard:

  1. Sign up or log in at app.estuary-ai.com
  2. Create a character — go to Characters and click Create Character. Configure your character's name, personality, and voice, then save.
  3. Copy the character ID — on the character's page, copy the UUID shown under the character name (or from the URL).
  4. Generate an API key — go to Settings → API Keys and click Create Key. Copy the key (it starts with est_).

Use these values for api_key and character_id in the examples below.

Quick Start

import asyncio
from estuary_sdk import EstuaryClient, EstuaryConfig, BotResponse

async def main():
    config = EstuaryConfig(
        server_url="https://api.estuary-ai.com",
        api_key="est_...",
        character_id="your-character-uuid",
        player_id="player-1",
    )

    async with EstuaryClient(config) as client:
        client.on("bot_response", lambda r: print(r.text, end="" if not r.is_final else "\n"))

        await client.connect()
        client.send_text("Hello!")

        await asyncio.sleep(5)  # Wait for response

asyncio.run(main())

Voice

Continuous Mode

Audio streams continuously. The server uses VAD (voice activity detection) to detect turn boundaries.

from estuary_sdk import VoiceMode

await client.start_voice(VoiceMode.CONTINUOUS)

# Send raw PCM16 audio (16-bit signed, mono, 16kHz)
await client.send_audio(pcm_bytes)

await client.stop_voice()

Push-to-Talk

You control when audio is captured and when the turn ends.

await client.start_voice(VoiceMode.PUSH_TO_TALK)

await client.start_recording()
await client.send_audio(pcm_bytes)
await client.stop_recording()  # Triggers end-of-turn

await client.stop_voice()

With Microphone (requires audio extra)

from estuary_sdk.audio import AudioRecorder, AudioPlayer

recorder = AudioRecorder(on_audio=client.send_audio)
player = AudioPlayer()

client.on("bot_voice", player.enqueue)

await client.start_voice()
await recorder.start()

VoiceSession

VoiceSession is a high-level wrapper that owns the AudioPlayer, AudioRecorder, and event handlers as an async context manager:

from estuary_sdk import VoiceSession, VoiceMode

async with EstuaryClient(config) as client:
    await client.connect()

    async with VoiceSession(client, mode=VoiceMode.CONTINUOUS) as session:
        # Microphone and speaker are wired up, voice is active.
        await asyncio.Event().wait()

For push-to-talk, control recording within the session:

async with VoiceSession(client, mode=VoiceMode.PUSH_TO_TALK) as session:
    await session.start_recording()   # User presses button
    await asyncio.sleep(3)
    await session.stop_recording()    # User releases button

Requires the audio extra (pip install estuary-sdk[audio]).

Utilities

client.toggle_mute()                        # Toggle microphone mute
print(client.is_muted)                      # Check mute state
print(client.is_voice_active)               # Check if voice is running
client.interrupt()                          # Interrupt current bot response
client.notify_audio_playback_complete()     # Tell server playback finished

Vision

Send a camera image for multimodal VLM processing:

client.send_camera_image(image_base64, "image/jpeg")

# Respond to server-initiated capture requests
client.on("camera_capture_request", lambda req: client.send_camera_image(b64, "image/jpeg", request_id=req.request_id))

Streaming Responses

Bot responses stream token-by-token. Use is_final to detect the complete message.

def on_bot_response(response: BotResponse):
    if response.is_final:
        print(f"[{response.message_id}] {response.text}")
    else:
        print(response.text, end="", flush=True)

client.on("bot_response", on_bot_response)

Send and Wait

For simple request-response flows, send_text_and_wait() sends a message and returns the final BotResponse directly — no manual event wiring needed:

response = await client.send_text_and_wait("What is the capital of France?")
print(response.text)  # "The capital of France is Paris."

# With options
response = await client.send_text_and_wait(
    "Summarize our conversation.",
    text_only=True,   # Suppress voice response
    timeout=30.0,     # Max seconds to wait (default: 20)
)

Raises asyncio.TimeoutError if no final response arrives within the timeout. The listener is cleaned up automatically whether the call succeeds or times out.

Memory

The REST memory API is available after connect() via client.memory.

# Search memories
results = await client.memory.search("favorite color")

# List with filters
memories = await client.memory.get_memories(memory_type="preference", limit=20)

# Other endpoints
stats = await client.memory.get_stats()
facts = await client.memory.get_core_facts()
timeline = await client.memory.get_timeline(group_by="week")
graph = await client.memory.get_graph()

# Delete all memories
result = await client.memory.delete_all(confirm=True)

Real-time memory extraction events:

from estuary_sdk import MemoryUpdatedEvent

def on_memory(event: MemoryUpdatedEvent):
    print(f"Extracted {event.memories_extracted} memories")

client.on("memory_updated", on_memory)

Characters

Manage characters via client.characters (available immediately — no connect() required):

char = await client.characters.create("My Character", personality="Friendly helper")
chars = await client.characters.list()
char = await client.characters.get("character-uuid")
char = await client.characters.update("character-uuid", name="New Name")
await client.characters.delete("character-uuid")

Players

Query player data via client.players (available after connect()):

stats = await client.players.get_stats()
players = await client.players.list(limit=20)
player = await client.players.get("player-123")
messages = await client.players.get_messages("player-123")
await client.players.delete("player-123")

Character Generation

Generate a character from a photo via client.generate (available immediately):

with open("photo.jpg", "rb") as f:
    character = await client.generate.image_to_character(f.read())

# Poll 3D model progress
status = await client.generate.get_model_status(character.id)
print(status.model_status, f"{status.progress}%")

Use wait_for_model() to poll until the 3D model is ready (or fails), with an optional progress callback:

from estuary_sdk import ModelStatus

status = await client.generate.wait_for_model(
    character.id,
    poll_interval=2.0,   # Seconds between polls (default: 2)
    timeout=300.0,       # Max seconds to wait (default: 300)
    on_progress=lambda s: print(f"{s.model_status} {s.progress}%"),
)
print("Final:", status.model_status, status.model_url)

Agent-to-Agent Conversations

Start a conversation between two AI characters and observe the exchange in real-time:

from estuary_sdk import AgentTurnText, AgentTurnComplete, AgentConversationComplete

client.on("agent_turn_text", lambda t: print(f"[Agent {t.agent_id}] {t.text}", end="" if not t.is_final else "\n"))
client.on("agent_turn_complete", lambda t: print(f"--- Turn {t.turn_number} complete ---"))
client.on("agent_conversation_complete", lambda c: print(f"Done after {c.total_turns} turns: {c.reason}"))

client.start_agent_conversation(
    agent_a_id="character-uuid-1",
    agent_b_id="character-uuid-2",
    conversation_context="Discuss the meaning of life.",
    max_turns=8,
    timeout_seconds=90,
)

# To stop early:
client.stop_agent_conversation()

CLI Tester

An interactive chat program is included for quick testing:

python examples/chat.py --api-key [API_KEY] --character-id [CHARACTER_ID]

Or via PDM:

pdm run chat --api-key [API_KEY] --character-id [CHARACTER_ID]
Flag Short Default Description
--api-key -k required API key
--character-id -c required Character UUID
--server-url -s https://api.estuary-ai.com Server URL
--player-id -p python-sdk-tester Player ID
--debug -d off Verbose logging
--text-only -t off Suppress voice responses

In-chat commands: /quit, /voice, /stop, /memories, /help

Configuration

EstuaryConfig fields:

Field Type Default Description
server_url str required Server URL
api_key str required API key
character_id str required Character UUID
player_id str required Player identifier
audio_sample_rate int 16000 Audio sample rate (Hz)
auto_reconnect bool True Auto-reconnect on disconnect
max_reconnect_attempts int 5 Max reconnection attempts
reconnect_delay float 1.0 Initial reconnect delay (seconds)
debug bool False Enable debug logging
voice_transport str "websocket" "websocket", "livekit", or "auto"
realtime_memory bool False Enable realtime memory updates

Events

Event Payload Description
connected SessionInfo Connected and authenticated
disconnected str Disconnected (reason)
reconnecting int Reconnection attempt number
connection_state_changed ConnectionState State transition
bot_response BotResponse Streaming text response
bot_voice BotVoice Audio chunk (base64)
stt_response SttResponse Speech-to-text result
interrupt InterruptData Bot response interrupted
quota_exceeded QuotaExceededData Usage quota hit
camera_capture_request CameraCaptureRequest Server requests a camera image
voice_started Voice session started
voice_stopped Voice session stopped
voice_error str Voice session error message
audio_received bytes Raw PCM audio from LiveKit transport
memory_updated MemoryUpdatedEvent New memories extracted
agent_turn_text AgentTurnText Streaming text from an agent-to-agent turn
agent_turn_voice AgentTurnVoice Streaming voice audio from an agent-to-agent turn
agent_turn_complete AgentTurnComplete An agent finished its turn
agent_conversation_complete AgentConversationComplete Agent-to-agent conversation finished
livekit_connected str LiveKit room name
livekit_disconnected LiveKit disconnected
error Exception Error occurred
auth_error str Authentication failed

Register listeners with client.on(), client.once(), or client.off():

client.on("bot_response", handle_response)     # Persistent listener
client.once("connected", handle_first_connect)  # One-time listener
client.off("bot_response", handle_response)     # Remove listener

Both sync and async callbacks are supported.

Error Handling

from estuary_sdk import EstuaryError, ErrorCode

try:
    await client.connect()
except EstuaryError as e:
    print(e.code)     # ErrorCode enum
    print(e.details)  # Optional extra info

Error codes:

Code Description
CONNECTION_FAILED Could not connect to server
AUTH_FAILED Authentication rejected
CONNECTION_TIMEOUT Connection timed out
QUOTA_EXCEEDED Usage quota exceeded
VOICE_NOT_SUPPORTED Voice not supported
VOICE_ALREADY_ACTIVE Voice session already running
VOICE_NOT_ACTIVE No active voice session
VOICE_START_FAILED Voice session failed to start
LIVEKIT_UNAVAILABLE LiveKit dependency not installed
NOT_CONNECTED Not connected to server
REST_ERROR REST API request failed
UNKNOWN Unknown error

Optional Dependencies

Extra Packages Purpose
audio sounddevice, numpy Microphone capture + speaker playback
livekit livekit LiveKit WebRTC voice transport
all All of the above Everything
dev pytest, black, isort, flake8, mypy Development tools

Development

cd estuary-python-sdk
pdm install -G dev

pdm run test          # pytest
pdm run format        # black
pdm run lint          # flake8
pdm run sort-imports  # isort
pdm run typecheck     # mypy (strict)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

estuary_sdk-0.3.3.tar.gz (66.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

estuary_sdk-0.3.3-py3-none-any.whl (45.2 kB view details)

Uploaded Python 3

File details

Details for the file estuary_sdk-0.3.3.tar.gz.

File metadata

  • Download URL: estuary_sdk-0.3.3.tar.gz
  • Upload date:
  • Size: 66.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: pdm/2.26.6 CPython/3.12.3 Linux/6.6.87.2-microsoft-standard-WSL2

File hashes

Hashes for estuary_sdk-0.3.3.tar.gz
Algorithm Hash digest
SHA256 936cb3d4ecac738f95884b1cb6bc7d834fe0280cc0132cd652e6fa54ce76ce6b
MD5 f8060bf139385a63cb897ed36bc651ae
BLAKE2b-256 2070a05e38062c26f9d4f939fd0487c2dee18e5ecae7053685356ccf9bc61666

See more details on using hashes here.

File details

Details for the file estuary_sdk-0.3.3-py3-none-any.whl.

File metadata

  • Download URL: estuary_sdk-0.3.3-py3-none-any.whl
  • Upload date:
  • Size: 45.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: pdm/2.26.6 CPython/3.12.3 Linux/6.6.87.2-microsoft-standard-WSL2

File hashes

Hashes for estuary_sdk-0.3.3-py3-none-any.whl
Algorithm Hash digest
SHA256 eaba8f5b01d31d19eb48fa159dfba392e34882ecb8c7252d17f8b914de5ab46f
MD5 e779b16273bd36f2c6057c9e553de288
BLAKE2b-256 2a9a552c5f49e9de53089e226fe1267541c68414cf97cfe8968965f7943277d8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page