Skip to main content

Python SDK for Telcoflow

Project description

Telcoflow Server Python SDK

A Python SDK for connecting to the Telcoflow Server, handling call events, managing media connections, and processing audio streams.

Features

  • Dual Authentication: Support for API Key and mTLS certificates
  • Automatic Reconnection: Automatically reconnect upon connection lost
  • Frame Demultiplexing: Automatic routing of Text vs Binary WebSocket frames
  • Async/Await Support: Built with asyncio for high-performance concurrent calls
  • Event Handling: Decorator-based events for discrete events, async iterators for streams
  • Explicit Session Readiness: Register handlers, then call session.ready() — no events are delivered until you're ready, so nothing is missed
  • Flow Control & Buffering: Internal byte buffer for outgoing audio with pull-based flow control
  • Interruption Handling: Ability to clear outgoing buffers instantly when an AI model is interrupted
  • Distributed Handling: Serialize a Call with to_json() / from_json() to hand media processing to another node

Logging

The SDK uses Python's standard logging module for all log messages. The SDK follows Python logging best practices for shared libraries:

  • Module-level loggers: Each module uses logging.getLogger(__name__) for hierarchical logging
  • No handlers in library: The SDK does not configure handlers, allowing applications to control logging configuration
  • NullHandler by default: Prevents "No handler found" warnings if the application doesn't configure logging

Configuring Logging

To enable logging in your application, configure the logging system before using the SDK:

import logging

# Configure logging for the SDK
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# Or configure specific loggers
logging.getLogger('telcoflow').setLevel(logging.DEBUG)
logging.getLogger('telcoflow.session_manager_connection').setLevel(logging.INFO)
logging.getLogger('telcoflow.voice_session').setLevel(logging.INFO)

Log Levels

  • DEBUG: Detailed diagnostic information (e.g., sent commands, received message types)
  • INFO: General informational messages (e.g., connection established, call events)
  • WARNING: Warning messages (e.g., reconnection attempts, missing data)
  • ERROR: Error messages (e.g., connection failures, parsing errors)

Security Note

The SDK does not log sensitive information such as:

  • API keys
  • Authentication tokens
  • Certificate contents
  • Private keys

Only connection URLs and call IDs are logged for debugging purposes.

Installation

pip install telcoflow

Quick Start

Build a config with SessionManagerConfig.create(), run inside async with SessionManager(config) as sm, and keep the process alive with await sm.run_forever().

The session flow is notify → open → ready:

  1. Register @sm.on_session_notification to receive a SessionNotification for each new inbound session.
  2. Call await sm.open_session(noti.session_id) to claim it and get a Session.
  3. Register the session's handlers (@session.on_incoming_call, …).
  4. Call await session.ready() — only then does the server start delivering events. Registering before ready() guarantees no event is missed.

By default the SDK connects to the production endpoint. Pass base_url="ws://localhost:8080" to target a local server.

import asyncio
import logging
import os
from telcoflow import SessionManager, SessionManagerConfig, SessionNotification, Call, CallAudioConfig

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

async def main():
    config = SessionManagerConfig.create(
        api_key=os.getenv("WSS_API_KEY"),
        connector_uuid=os.getenv("WSS_CONNECTOR_UUID"),
        base_url="ws://localhost:8080",                 # omit for the production endpoint
        call_audio=CallAudioConfig(sample_rate=24000),
    )
    try:
        async with SessionManager(config) as sm:
            logger.info("Connected. Waiting for sessions...")

            @sm.on_session_notification
            async def handle_session(noti: SessionNotification):
                session = await sm.open_session(noti.session_id)

                @session.on_incoming_call
                async def on_call(call: Call):
                    logger.info("Incoming call %s from %s", call.id, call.caller_number)

                    @call.on_terminated
                    def on_terminated():
                        logger.info("Call %s terminated", call.id)

                    result = await call.answer()
                    if not result:
                        logger.error("Answer failed: %s (%s)", result.error_message, result.error_code)
                        return

                    async for audio_chunk in call.audio_stream():
                        await call.send_audio(audio_chunk)  # echo back
                    await call.close()

                await session.ready()      # start event delivery (register handlers first!)

            await sm.run_forever()
    except asyncio.CancelledError:
        logger.info("Interrupted.")

if __name__ == "__main__":
    asyncio.run(main())

For concurrent receive/send (e.g. separate tasks for reading from audio_stream() and writing with send_audio()), use asyncio.TaskGroup and a queue.

Authentication

SessionManagerConfig.create() accepts either API key credentials or mTLS certificates. The default endpoint is production; pass base_url= to override (e.g. for local development).

# API key auth
config = SessionManagerConfig.create(
    api_key=os.getenv("WSS_API_KEY"),
    connector_uuid=os.getenv("WSS_CONNECTOR_UUID"),
    call_audio=CallAudioConfig(sample_rate=24000),
)

# mTLS auth
config = SessionManagerConfig.create(
    cert_path="/etc/certs/client.pem",
    key_path="/etc/certs/client.key",
    call_audio=CallAudioConfig(sample_rate=24000),
)

Audio settings (sample rate, audio mode, buffer size) live in an optional CallAudioConfig passed via call_audio=; omit it for the defaults (16000 Hz / MIXED / 1 MB).

Architecture

The SDK follows a dual-connection architecture:

  1. Session Manager Connection: Persistent WebSocket connection for session/call events
  2. Voice Session: Ephemeral WebSocket connection per call for audio streaming

When a new inbound session arrives:

  1. The server broadcasts a session.notify, delivered to your @sm.on_session_notification handler as a SessionNotification (session_id, remote_address, local_address, channel, activity, created_at).
  2. You call await sm.open_session(noti.session_id) to claim it, register the session's handlers, then call await session.ready().
  3. For CALL sessions, a Call is delivered to your @session.on_incoming_call handler. Answering opens the voice session for audio I/O.

To process media on a different node, serialize the Call with call.to_json(), forward it over your own transport, and reconstruct it with Call.from_json(data) on the other side.

Common Call Flows

Achieve different interaction patterns using the SDK's core commands. Command methods return a CommandResult (truthy on success) — check it rather than catching exceptions for operational failures.

1. Basic AI Agent (Answer)

The most common flow: answer, listen to the stream, and respond.

@session.on_incoming_call
async def handle_call(call: Call):
    await call.answer()
    async for chunk in call.audio_stream():
        response = await ai_model.generate(chunk)
        await call.send_audio(response)
    await call.close()

2. AI Assistant (Connect & Whisper)

The agent connects the caller to another party (3-way conference) and can "whisper" private audio to the callee.

@session.on_incoming_call
async def assistant_flow(call: Call):
    await call.answer()
    await call.connect(ring_time_seconds=30)
    await call.whisper()
    await call.send_audio(private_guidance_pcm)
    await call.close()

3. Distributed Call Handling

Forward the call to another server for media processing.

@session.on_incoming_call
async def forward_call(call: Call):
    # Send handoff JSON to another server via your message queue / HTTP
    await message_queue.publish(call.to_json())

# On the other server:
call = Call.from_json(received_message)
await call.answer()

Event Handling

Decorator-based Events (Discrete Events)

Use decorators for discrete, one-time events.

@session.on_incoming_call
async def handle_call(call: Call):
    await call.answer()

    @call.on_terminated
    def on_terminated():
        print("Call ended")

    # Server-side call errors (e.g. TTS failures)
    @call.on_call_event(CallEvent.CALL_ERROR)
    def on_error(data):
        print(f"Call error: {data['error_code']}{data.get('error_message')}")

Available CallErrorCode values:

Code Meaning
CallErrorCode.TTS_SESSION_ERROR TTS voice cloning session encountered an error

Async Iterators (Continuous Streams)

Use async iterators for continuous audio streams:

async for audio_chunk in call.audio_stream():
    # Process each audio chunk
    processed = await process_audio(audio_chunk)
    await call.send_audio(processed)

API Reference

SessionManager

Main entry point for managing connections.

Methods:

Method Description
on_session_notification Decorator — receives a SessionNotification for every new inbound session.
await open_session(session_id) Claim a session by id. Returns a Session. Idempotent.
await create_session(channel, remote_address) Start an SDK-initiated (outbound) session. Returns a Session.
await list_sessions(channel=None, remote_address=None) List OPEN sessions for this connector.
await setup_call_flow(config) Push a custom call/message flow (CallFlowConfig) to the server. Send-once; re-send after a reconnect if needed.
await run_forever() Run until Ctrl+C or context-manager exit. Use inside async with SessionManager(config) as sm: after registering handlers.

SessionNotification

Delivered to @sm.on_session_notification handlers.

Fields:

  • session_id — Session identifier to pass to open_session()
  • remote_address — Remote party address (caller for inbound calls)
  • local_address — Local party address (callee for inbound calls)
  • channel — Channel type (e.g. TELCO, WA)
  • activity — A SessionActivityType value (NEW_INCOMING_CALL / NEW_INCOMING_MESSAGE / RESUME)
  • created_at — Notify emit time, unix epoch seconds

Session

Represents a claimed session. Register handlers, then call ready().

Methods:

Method Description
on_incoming_call(func) Decorator — receives the Call for a CALL session.
on_incoming_message(func) Decorator — receives an IncomingMessage (WA channel).
on_closed(func) Decorator — invoked when the session closes.
await ready() Signal readiness; the server then starts delivering events. Idempotent; no-op on a closed session.
await send_message(...) Send an outbound message (WA channel).
await wait_closed() Await until the session is closed.
await close(...) Close the session and release resources.

Properties:

  • is_closed — Whether the session has been closed
  • channel — Channel type

Call

Represents a voice call with a media connection. Command methods (answer, connect, whisper, barge, spy, disconnect, close) return a CommandResult — truthy on success — for operational failures; connection-gone and programmer errors still raise.

Methods:

Method Description
await answer() Answer the call
await connect(ring_time_seconds=60) Initiate a 3-way conference between the caller, callee, and the agent
await whisper() Switch to whisper mode. Only available after connect(). Agent audio is heard by the number subscriber only (the callee for incoming calls, or the caller for outgoing calls). The other party cannot hear the agent.
await barge() Switch to barge mode. Only available after connect(). Agent joins the conversation and can be heard by both the caller and the callee.
await spy() Switch to spy mode. Only available after connect(). Agent listens to both parties but neither can hear the agent (silent monitoring).
await disconnect() End the call for all parties (server ends the call)
await close() Close the call and release resources. After connect(): the agent leaves the call; the caller and callee remain connected. After answer() without connect(): the call ends for both the agent and the caller.
await send_audio(audio_data) Queue binary audio data for sending
await clear_send_audio_buffer() Clear all queued audio from the outgoing buffer
await get_send_audio_buffer_size() Get current size of the outgoing buffer
audio_stream(channel_id=0) Async iterator for incoming audio chunks
on_terminated(func) Decorator sugar for CallEvent.CALL_TERMINATED
on_call_event(event) Decorator for call event handlers (e.g. CallEvent.CALL_ERROR)
to_json() / from_json(data) Serialize / reconstruct a Call for cross-node handoff

Properties:

  • id — Unique call identifier
  • caller_number — Caller phone number
  • callee_number — Callee phone number
  • state — Current CallState
  • audio_config — The CallAudioConfig for this call

CommandResult

Returned by Call command methods.

  • successTrue on success (the object is truthy, so if await call.answer(): ... works)
  • error_code — Server error code on failure, or "TIMEOUT" if the command timed out
  • error_message — Human-readable error detail
  • payload — Server response payload, when present

Error Handling

The SDK provides a hierarchy of custom exceptions. Catch WSSError to handle any SDK-raised error, or catch specific subclasses for more granular control. Note that operational command failures (server success=false, timeouts) are returned as a CommandResult, not raised.

  • WSSError - Base exception for all SDK errors
  • WSSConnectionError - Connection-related errors (session manager or media)
  • WSSAuthenticationError - Authentication failures (API key, tokens, or mTLS)
  • WSSCallError - Base class for call-related errors:
    • WSSCallClosedError - Attempted operation on a closed call
    • WSSCallCommandError - Command failure carrying error_code and error_message
    • WSSCallCommandTimeoutError - Client timed out waiting for a command response
  • WSSSessionError - Voice session connection errors
  • BufferFullError - Outgoing audio buffer is full
  • BufferClosedError - Write attempted on a closed outgoing audio buffer (e.g. after call ended)

Handling CALL_UNANSWERED for connect()

When using call.connect(), the callee may not answer within the ring time. The server returns error code CALL_UNANSWERED, surfaced on the returned CommandResult. The call remains active — you can retry, try another number, or end the call.

@session.on_incoming_call
async def assistant_flow(call: Call):
    await call.answer()
    result = await call.connect(ring_time_seconds=30)
    if not result:
        if result.error_code == "CALL_UNANSWERED":
            await call.disconnect()
        else:
            logger.error("Connect failed: %s (%s)", result.error_message, result.error_code)

Audio Buffering & Interruption Handling

The SDK uses a pull-based flow control mechanism. When you call send_audio(), the data is placed in an internal ConcurrentByteBuffer. The SDK then sends this data to the server only when the server requests it.

Why use a buffer?

  • Smooth Playback: Prevents audio jitter by maintaining a steady supply of data for the server.
  • Flow Control: Automatically handles the rate at which audio should be sent.
  • Interruption Handling: If your AI model gets interrupted (e.g., via a Gemini Live interruption event), you can instantly clear the buffer to stop any pending audio from being played.

Handling Interruption

# When the AI model detects an interruption
await call.clear_send_audio_buffer()
# Now you can start sending new audio without old audio playing first

Integration with AI Models

This example demonstrates bidirectional audio streaming and real-time interruption handling using the Google GenAI SDK.

import asyncio
import os
from google import genai
from google.genai import types
from telcoflow import SessionManager, SessionManagerConfig, SessionNotification, Call, CallAudioConfig

gemini_client = genai.Client(api_key=os.getenv("GEMINI_API_KEY"))
MODEL = "gemini-2.5-flash-native-audio-preview-12-2025"

async def start_gemini_session(call: Call):
    await call.answer()
    async with gemini_client.aio.live.connect(model=MODEL) as session:
        async def stream_to_gemini():
            async for chunk in call.audio_stream():
                await session.send_realtime_input(
                    audio=types.Blob(data=chunk, mime_type="audio/pcm;rate=24000")
                )
        async def receive_from_gemini():
            async for response in session.receive():
                if content := response.server_content:
                    if content.interrupted:
                        await call.clear_send_audio_buffer()
                    elif content.model_turn:
                        for part in content.model_turn.parts:
                            if part.inline_data:
                                await call.send_audio(part.inline_data.data)
        await asyncio.gather(stream_to_gemini(), receive_from_gemini())

async def main():
    config = SessionManagerConfig.create(
        api_key=os.getenv("WSS_API_KEY"),
        connector_uuid=os.getenv("WSS_CONNECTOR_UUID"),
        call_audio=CallAudioConfig(sample_rate=24000),
    )
    async with SessionManager(config) as sm:
        @sm.on_session_notification
        async def handle_session(noti: SessionNotification):
            session = await sm.open_session(noti.session_id)

            @session.on_incoming_call
            async def on_call(call: Call):
                await start_gemini_session(call)

            await session.ready()

        await sm.run_forever()

if __name__ == "__main__":
    asyncio.run(main())

Integration with Google ADK (Agent Development Kit)

For more complex AI agent behavior, including multi-agent orchestration, long-term memory, and structured tools, you can integrate with the Google Agent Development Kit (ADK).

The following example shows how to bridge Telcoflow's bidirectional audio stream with ADK's Runner and LiveRequestQueue.

import asyncio
import os
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.agents.live_request_queue import LiveRequestQueue
from google.adk.agents.run_config import RunConfig, StreamingMode
from google.genai import types

from telcoflow import SessionManager, SessionManagerConfig, SessionNotification, Call, CallAudioConfig

# Configure ADK Agent
agent = Agent(
    name="telcoflow_agent",
    model="gemini-2.5-flash-native-audio-preview-12-2025",
    instruction="You are a helpful AI assistant talking over a phone call.",
)
session_service = InMemorySessionService()
runner = Runner(app_name="telcoflow_app", agent=agent, session_service=session_service)

async def start_adk_session(call: Call):
    """Bridges audio with ADK Live Runner."""
    await call.answer()

    # ADK uses a queue to receive real-time inputs (audio chunks)
    live_request_queue = LiveRequestQueue()

    # Task 1: Stream audio to ADK
    async def stream_to_adk():
        async for audio_chunk in call.audio_stream():
            audio_blob = types.Blob(
                data=audio_chunk,
                mime_type="audio/pcm;rate=24000"
            )
            live_request_queue.send_realtime(audio_blob)

    # Task 2: Receive events and audio from ADK and send back to caller
    async def receive_from_adk():
        run_config = RunConfig(
            streaming_mode=StreamingMode.BIDI,
            response_modalities=["AUDIO"]
        )
        async for event in runner.run_live(
            user_id="default_user",
            session_id=call.id,
            live_request_queue=live_request_queue,
            run_config=run_config,
        ):
            # Instant interruption handling
            if event.interrupted:
                await call.clear_send_audio_buffer()

            # Forward model audio to the caller
            if event.content and event.content.parts[0].inline_data:
                await call.send_audio(event.content.parts[0].inline_data.data)

    # Run both directions concurrently
    await asyncio.gather(stream_to_adk(), receive_from_adk())

async def main():
    config = SessionManagerConfig.create(
        api_key=os.getenv("WSS_API_KEY"),
        connector_uuid=os.getenv("WSS_CONNECTOR_UUID"),
        call_audio=CallAudioConfig(sample_rate=24000),
    )
    async with SessionManager(config) as sm:
        @sm.on_session_notification
        async def handle_session(noti: SessionNotification):
            session = await sm.open_session(noti.session_id)

            @session.on_incoming_call
            async def handle_call(call: Call):
                await start_adk_session(call)

            await session.ready()

        await sm.run_forever()

if __name__ == "__main__":
    asyncio.run(main())

Thread Safety & Concurrency

  • Each call.new event is processed in its own dedicated background task, allowing multiple calls to be handled concurrently
  • Each Call runs independently in its own asyncio task
  • Control connection heartbeat runs in a separate task
  • Reconnection logic runs in a separate task
  • All operations are non-blocking
  • The SDK automatically manages task lifecycle and cleanup

Requirements

  • Python 3.11+
  • websockets>=15.0
  • typing-extensions>=4.8.0
  • abxbus>=2.4

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

telcoflow-1.0.0.tar.gz (38.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

telcoflow-1.0.0-py3-none-any.whl (47.5 kB view details)

Uploaded Python 3

File details

Details for the file telcoflow-1.0.0.tar.gz.

File metadata

  • Download URL: telcoflow-1.0.0.tar.gz
  • Upload date:
  • Size: 38.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.2

File hashes

Hashes for telcoflow-1.0.0.tar.gz
Algorithm Hash digest
SHA256 a052acd4291d4b735d6b1db31553a00d3f89d1871a5dac00b7ce5f156cdbc5a4
MD5 2eef044feb9a2ff24718965e94708238
BLAKE2b-256 f77a14f3e4d03c502060623f55dbd7d739d0bc7641c1497ed892fcd9e8b0d2d1

See more details on using hashes here.

File details

Details for the file telcoflow-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: telcoflow-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 47.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.2

File hashes

Hashes for telcoflow-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 9cf8c5104ef9b84ac7d4373d5382d588aae113b6d7eb5f3fcf12c6a5e3716fb3
MD5 cba2809fe87408ce547126c9a16067d7
BLAKE2b-256 9f76488047eb5f6b3cad6d9d72c63b5cf493774b1e731ef5dc7c7c4db6d944de

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page