Skip to main content

Official Python SDK for Anam AI - Real-time AI avatar streaming

Project description

Anam AI Python SDK

Official Python SDK for Anam AI - Real-time AI avatar streaming.

PyPI version Python 3.10+ License: MIT

Installation

# Using uv (recommended)
uv add anam

# With optional display utilities (for testing)
uv add anam --extra display

# Using pip
pip install anam

# With optional display utilities (for testing)
pip install anam[display]

Quick Start

import asyncio
from anam import AnamClient
from av.video.frame import VideoFrame
from av.audio.frame import AudioFrame

async def main():
    # Create client with your API key and persona
    client = AnamClient(
        api_key="your-api-key",
        persona_id="your-persona-id",
    )

    # Connect and stream
    async with client.connect() as session:
        print(f"Connected! Session: {session.session_id}")
        
        # Consume video and audio frames concurrently
        async def consume_video():
            async for frame in session.video_frames():
                img = frame.to_ndarray(format="rgb24")  # numpy array (H, W, 3) in RGB format - use "bgr24" for OpenCV
                print(f"Video: {frame.width}x{frame.height}")
        
        async def consume_audio():
            async for frame in session.audio_frames():
                samples = frame.to_ndarray()  # int16 samples (1D array, interleaved for stereo)
                # Determine mono/stereo from frame layout
                channel_type = "mono" if frame.layout.nb_channels == 1 else "stereo"
                print(f"Audio: {samples.size} samples ({channel_type}) @ {frame.sample_rate}Hz")
        
        # Run both streams concurrently until session closes
        await asyncio.gather(
            consume_video(),
            consume_audio(),
        )

asyncio.run(main())

Features

  • 🎥 Real-time Audio/Video streaming - Receive synchronized audio/video frames from the avatar (as PyAV AudioFrame/VideoFrame objects)
  • 💬 Two-way communication - Send text messages (like transcribed user speech) and receive generated responses
  • 📝 Real-time transcriptions - Receive incremental message stream events for user and persona text as it's generated
  • 📚 Message history tracking - Automatic conversation history with incremental updates
  • 🤖 Audio-passthrough - Send TTS generated audio input and receive rendered synchronized audio/video avatar
  • 🗣️ Direct text-to-speech - Send text directly to TTS for immediate speech output (bypasses LLM processing)
  • 🎤 Real-time user audio input - Send raw audio samples (e.g. from microphone) to Anam for processing (turnkey solution: STT → LLM → TTS → Face)
  • 📡 Async iterator API - Clean, Pythonic async/await patterns for continuous stream of audio/video frames
  • 🎯 Event-driven API - Simple decorator-based event handlers for discrete events
  • 📝 Fully typed - Complete type hints for IDE support
  • 🔒 Server-side ready - Designed for server-side Python applications (e.g. for backend pipelines)

API Reference

AnamClient

The main client class for connecting to Anam AI.

from anam import AnamClient, PersonaConfig, ClientOptions

# Simple initialization
client = AnamClient(
    api_key="your-api-key",
    persona_id="your-persona-id",
)

# Advanced initialization with full persona config
client = AnamClient(
    api_key="your-api-key",
    persona_config=PersonaConfig(
        persona_id="your-persona-id",
        name="My Assistant",
        system_prompt="You are a helpful assistant...",
        voice_id="emma",
        language_code="en",
    ),
)

Video and Audio Frames

Frames are PyAV objects (VideoFrame/AudioFrame) containing synchronized decoded audio (PCM) and video (RGB) samples from the avatar, delivered over WebRTC and extracted by aiortc. All PyAV frame attributes are accessible (samples, format, layout, etc.). Access the frames via async iterators and run both iterators concurrently, e.g. using asyncio.gather():

async with client.connect() as session:
    async def process_video():
        async for frame in session.video_frames():
            img = frame.to_ndarray(format="rgb24")  # RGB numpy array
            # Process frame...
    
    async def process_audio():
        async for frame in session.audio_frames():
            samples = frame.to_ndarray()  # int16 samples
            # Process frame...
    
    # Both streams run concurrently
    await asyncio.gather(process_video(), process_audio())

User Audio Input

User audio input is real time audio such as microphone audio. User audio is 16 bit PCM samples, mono or stereo, with any sample rate. In order to process the audio correctly, the sample rate needs to be provided. The audio is forwarded in real-time as a webRTC audio track. In order to reduce latency, any audio provided before the webRTC audio track is created will be dropped.

TTS audio (Audio Passthrough)

TTS audio is generated by a TTS engine, and should be provided in chunks through the send_audio_chunk method. The audio can be a byte array or base64 encoded strings (the SDK will convert to base64). The audio is ingested to the backend at max bandwidth. Sample_rate and channels need to be provided through the AgentAudioInputConfig object.

For best performance, we suggest using 24kHz mono audio. The provided audio is returned in-sync with the avatar without any resampling. Sample rates lower than 24kHz will result in poor Avatar performance. Sample rates higher than 24kHz might impact latency without any noticeable improvement in audio quality.

Events

Register callbacks for connection and message events using the @client.on() decorator:

from anam import AnamEvent, Message,MessageRole, MessageStreamEvent

@client.on(AnamEvent.CONNECTION_ESTABLISHED)
async def on_connected():
    """Called when the connection is established."""
    print("✅ Connected!")

@client.on(AnamEvent.CONNECTION_CLOSED)
async def on_closed(code: str, reason: str | None):
    """Called when the connection is closed."""
    print(f"Connection closed: {code} - {reason or 'No reason'}")

@client.on(AnamEvent.MESSAGE_STREAM_EVENT_RECEIVED)
async def on_message_stream_event(event: MessageStreamEvent):
    """Called for each incremental chunk of transcribed text or persona response.
    
    This event fires for both user transcriptions and persona responses as they stream in.
    This can be used for real-time captions or transcriptions.
    """
    if event.role == MessageRole.USER:
        # User transcription (from their speech)
        if event.content_index == 0:
            print(f"👤 User: ", end="", flush=True)
        print(event.content, end="", flush=True)
        if event.end_of_speech:
            print()  # New line when transcription completes
    else:
        # Persona response
        if event.content_index == 0:
            print(f"🤖 Persona: ", end="", flush=True)
        print(event.content, end="", flush=True)
        if event.end_of_speech:
            status = "✓" if not event.interrupted else "✗ INTERRUPTED"
            print(f" {status}")

@client.on(AnamEvent.MESSAGE_RECEIVED)
async def on_message(message: Message):
    """Called when a complete message is received (after end_of_speech).
    
    This is fired after MESSAGE_STREAM_EVENT_RECEIVED with end_of_speech=True.
    Useful for backward compatibility or when you only need complete messages.
    """
    print(f"{message.role}: {message.content}")

@client.on(AnamEvent.MESSAGE_HISTORY_UPDATED)
async def on_message_history_updated(messages: list[Message]):
    """Called when the message history is updated (after a message completes).
    
    The messages list contains the complete conversation history.
    """
    print(f"📝 Conversation history: {len(messages)} messages")
    for msg in messages:
        print(f"  {msg.role}: {msg.content[:50]}...")

Session

The Session object is returned by client.connect() and provides methods for interacting with the avatar:

async with client.connect() as session:
    # Send a text message (simulates user speech)
    await session.send_message("Hello, how are you?")
    
    # Send text directly to TTS (bypasses LLM)
    await session.talk("This will be spoken immediately")
    
    # Stream text to TTS incrementally (for streaming scenarios)
    await session.send_talk_stream(
        content="Hello",
        start_of_speech=True,
        end_of_speech=False,
    )
    await session.send_talk_stream(
        content=" world!",
        start_of_speech=False,
        end_of_speech=True,
    )
    
    # Interrupt the avatar if speaking
    await session.interrupt()
    
    # Get message history
    history = client.get_message_history()
    for msg in history:
        print(f"{msg.role}: {msg.content}")
    
    # Wait until the session ends
    await session.wait_until_closed()

Examples

Save Video and Audio

import cv2
import wave
import asyncio
from anam import AnamClient

client = AnamClient(api_key="...", persona_id="...")

video_writer = cv2.VideoWriter("output.mp4", ...)
audio_writer = wave.open("output.wav", "wb")

async def save_video(session):
    async for frame in session.video_frames():
        # Read frame as BGR for OpenCV VideoWriter
        bgr_frame = frame.to_ndarray(format="bgr24")
        video_writer.write(bgr_frame)

async def save_audio(session):
    async for frame in session.audio_frames():
        # Initialize writer on first frame
        if audio_writer.getnframes() == 0:
            audio_writer.setnchannels(frame.layout.nb_channels)
            audio_writer.setsampwidth(2)  # 16-bit
            audio_writer.setframerate(frame.sample_rate)
        # Write audio data (convert to int16 and get bytes)
        audio_writer.writeframes(frame.to_ndarray().tobytes())

async with client.connect() as session:
    # Record for 30 seconds
    await asyncio.wait_for(
        asyncio.gather(save_video(session), save_audio(session)),
        timeout=30.0,
    )

Display Video with OpenCV

import cv2
import asyncio
from anam import AnamClient

client = AnamClient(api_key="...", persona_id="...")
latest_frame = None

async def update_frame(session):
    global latest_frame
    async for frame in session.video_frames():
        # Read frame as BGR for OpenCV display
        latest_frame = frame.to_ndarray(format="bgr24")

async def main():
    async with client.connect() as session:
        # Start frame consumer
        frame_task = asyncio.create_task(update_frame(session))
        
        # Display loop
        while True:
            if latest_frame is not None:
                cv2.imshow("Avatar", latest_frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        frame_task.cancel()

asyncio.run(main())

Configuration

Environment Variables

export ANAM_API_KEY="your-api-key"
export ANAM_PERSONA_ID="your-persona-id"

Client Options

from anam import ClientOptions

options = ClientOptions(
    api_base_url="https://api.anam.ai",  # API base URL
    api_version="v1",                     # API version
    ice_servers=None,                     # Custom ICE servers
)

Persona Configuration

from anam import PersonaConfig

persona = PersonaConfig(
    persona_id="your-persona-id",    # Required
    name="Assistant",                 # Display name
    avatar_id="anna_v2",             # Avatar to use
    voice_id="emma",                 # Voice to use
    system_prompt="You are...",      # Custom system prompt
    language_code="en",              # Language code
    llm_id="gpt-4",                  # LLM model
    max_session_length_seconds=300,  # Max session duration
)

Error Handling

from anam import AnamError, AuthenticationError, SessionError

try:
    async with client.connect() as session:
        await session.wait_until_closed()
except AuthenticationError as e:
    print(f"Invalid API key: {e}")
except SessionError as e:
    print(f"Session error: {e}")
except AnamError as e:
    print(f"Anam error [{e.code}]: {e.message}")

Requirements

  • Python 3.10+
  • Dependencies are installed automatically:
    • aiortc - WebRTC implementation
    • aiohttp - HTTP client
    • websockets - WebSocket client
    • numpy - Array handling
    • pyav - Video and audio handling

Optional for display utilities:

  • opencv-python - Video display
  • sounddevice - Audio playback

License

MIT License - see LICENSE for details.

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

anam-0.2.0a1.tar.gz (27.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

anam-0.2.0a1-py3-none-any.whl (32.3 kB view details)

Uploaded Python 3

File details

Details for the file anam-0.2.0a1.tar.gz.

File metadata

  • Download URL: anam-0.2.0a1.tar.gz
  • Upload date:
  • Size: 27.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for anam-0.2.0a1.tar.gz
Algorithm Hash digest
SHA256 e67568f8238422aef55fe4e09365d0e1669075a96dfff961748904e12d9d208c
MD5 52734f65d15f0a5ba4b3cb0d8ca34c94
BLAKE2b-256 77403189026f69d29be42564aa9910a731d453df50a7d9f9d0264810db1c520e

See more details on using hashes here.

Provenance

The following attestation bundles were made for anam-0.2.0a1.tar.gz:

Publisher: release-alpha.yml on anam-org/python-sdk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file anam-0.2.0a1-py3-none-any.whl.

File metadata

  • Download URL: anam-0.2.0a1-py3-none-any.whl
  • Upload date:
  • Size: 32.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for anam-0.2.0a1-py3-none-any.whl
Algorithm Hash digest
SHA256 8b6790df08a121949ba51d1baf4929f7476d9bc2c7a4bda850381e12a6cfe6ae
MD5 9640334bccc0a1614dc220aeb69bdd28
BLAKE2b-256 bab7c6ba9eb8c7ceb870af4b32cc8531a56ed5ed638e4c7373cf2a5bb6d54275

See more details on using hashes here.

Provenance

The following attestation bundles were made for anam-0.2.0a1-py3-none-any.whl:

Publisher: release-alpha.yml on anam-org/python-sdk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page