Python SDK for Telcoflow
Project description
Telcoflow Server Python SDK
A Python SDK for connecting to the Telcoflow Server, handling call events, managing media connections, and processing audio streams.
Features
- Dual Authentication: Support for API Key and mTLS certificates
- Automatic Reconnection: Automatically reconnect upon connection lost
- Frame Demultiplexing: Automatic routing of Text vs Binary WebSocket frames
- Async/Await Support: Built with
asynciofor high-performance concurrent calls - Event Handling: Decorator-based events for discrete events, async iterators for streams
- Explicit Session Readiness: Register handlers, then call
session.ready()— no events are delivered until you're ready, so nothing is missed - Flow Control & Buffering: Internal byte buffer for outgoing audio with pull-based flow control
- Interruption Handling: Ability to clear outgoing buffers instantly when an AI model is interrupted
- Distributed Handling: Serialize a
Callwithto_json()/from_json()to hand media processing to another node
Logging
The SDK uses Python's standard logging module for all log messages. The SDK follows Python logging best practices for shared libraries:
- Module-level loggers: Each module uses
logging.getLogger(__name__)for hierarchical logging - No handlers in library: The SDK does not configure handlers, allowing applications to control logging configuration
- NullHandler by default: Prevents "No handler found" warnings if the application doesn't configure logging
Configuring Logging
To enable logging in your application, configure the logging system before using the SDK:
import logging
# Configure logging for the SDK
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Or configure specific loggers
logging.getLogger('telcoflow').setLevel(logging.DEBUG)
logging.getLogger('telcoflow.session_manager_connection').setLevel(logging.INFO)
logging.getLogger('telcoflow.voice_session').setLevel(logging.INFO)
Log Levels
- DEBUG: Detailed diagnostic information (e.g., sent commands, received message types)
- INFO: General informational messages (e.g., connection established, call events)
- WARNING: Warning messages (e.g., reconnection attempts, missing data)
- ERROR: Error messages (e.g., connection failures, parsing errors)
Security Note
The SDK does not log sensitive information such as:
- API keys
- Authentication tokens
- Certificate contents
- Private keys
Only connection URLs and call IDs are logged for debugging purposes.
Installation
pip install telcoflow
Quick Start
Build a config with SessionManagerConfig.create(), run inside async with SessionManager(config) as sm, and keep the process alive with await sm.run_forever().
The session flow is notify → open → ready:
- Register
@sm.on_session_notificationto receive aSessionNotificationfor each new inbound session. - Call
await sm.open_session(noti.session_id)to claim it and get aSession. - Register the session's handlers (
@session.on_incoming_call, …). - Call
await session.ready()— only then does the server start delivering events. Registering beforeready()guarantees no event is missed.
By default the SDK connects to the production endpoint. Pass base_url="ws://localhost:8080" to target a local server.
import asyncio
import logging
import os
from telcoflow import SessionManager, SessionManagerConfig, SessionNotification, Call, CallAudioConfig
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
async def main():
config = SessionManagerConfig.create(
api_key=os.getenv("WSS_API_KEY"),
connector_uuid=os.getenv("WSS_CONNECTOR_UUID"),
base_url="ws://localhost:8080", # omit for the production endpoint
call_audio=CallAudioConfig(sample_rate=24000),
)
try:
async with SessionManager(config) as sm:
logger.info("Connected. Waiting for sessions...")
@sm.on_session_notification
async def handle_session(noti: SessionNotification):
session = await sm.open_session(noti.session_id)
@session.on_incoming_call
async def on_call(call: Call):
logger.info("Incoming call %s from %s", call.id, call.caller_number)
@call.on_terminated
def on_terminated():
logger.info("Call %s terminated", call.id)
result = await call.answer()
if not result:
logger.error("Answer failed: %s (%s)", result.error_message, result.error_code)
return
async for audio_chunk in call.audio_stream():
await call.send_audio(audio_chunk) # echo back
await call.close()
await session.ready() # start event delivery (register handlers first!)
await sm.run_forever()
except asyncio.CancelledError:
logger.info("Interrupted.")
if __name__ == "__main__":
asyncio.run(main())
For concurrent receive/send (e.g. separate tasks for reading from audio_stream() and writing with send_audio()), use asyncio.TaskGroup and a queue.
Authentication
SessionManagerConfig.create() accepts either API key credentials or mTLS certificates. The default endpoint is production; pass base_url= to override (e.g. for local development).
# API key auth
config = SessionManagerConfig.create(
api_key=os.getenv("WSS_API_KEY"),
connector_uuid=os.getenv("WSS_CONNECTOR_UUID"),
call_audio=CallAudioConfig(sample_rate=24000),
)
# mTLS auth
config = SessionManagerConfig.create(
cert_path="/etc/certs/client.pem",
key_path="/etc/certs/client.key",
call_audio=CallAudioConfig(sample_rate=24000),
)
Audio settings (sample rate, audio mode, buffer size) live in an optional CallAudioConfig passed via call_audio=; omit it for the defaults (16000 Hz / MIXED / 1 MB).
Architecture
The SDK follows a dual-connection architecture:
- Session Manager Connection: Persistent WebSocket connection for session/call events
- Voice Session: Ephemeral WebSocket connection per call for audio streaming
When a new inbound session arrives:
- The server broadcasts a
session.notify, delivered to your@sm.on_session_notificationhandler as aSessionNotification(session_id,remote_address,local_address,channel,activity,created_at). - You call
await sm.open_session(noti.session_id)to claim it, register the session's handlers, then callawait session.ready(). - For CALL sessions, a
Callis delivered to your@session.on_incoming_callhandler. Answering opens the voice session for audio I/O.
To process media on a different node, serialize the Call with call.to_json(), forward it over your own transport, and reconstruct it with Call.from_json(data) on the other side.
Common Call Flows
Achieve different interaction patterns using the SDK's core commands. Command methods return a CommandResult (truthy on success) — check it rather than catching exceptions for operational failures.
1. Basic AI Agent (Answer)
The most common flow: answer, listen to the stream, and respond.
@session.on_incoming_call
async def handle_call(call: Call):
await call.answer()
async for chunk in call.audio_stream():
response = await ai_model.generate(chunk)
await call.send_audio(response)
await call.close()
2. AI Assistant (Connect & Whisper)
The agent connects the caller to another party (3-way conference) and can "whisper" private audio to the callee.
@session.on_incoming_call
async def assistant_flow(call: Call):
await call.answer()
await call.connect(ring_time_seconds=30)
await call.whisper()
await call.send_audio(private_guidance_pcm)
await call.close()
3. Distributed Call Handling
Forward the call to another server for media processing.
@session.on_incoming_call
async def forward_call(call: Call):
# Send handoff JSON to another server via your message queue / HTTP
await message_queue.publish(call.to_json())
# On the other server:
call = Call.from_json(received_message)
await call.answer()
Event Handling
Decorator-based Events (Discrete Events)
Use decorators for discrete, one-time events.
@session.on_incoming_call
async def handle_call(call: Call):
await call.answer()
@call.on_terminated
def on_terminated():
print("Call ended")
# Server-side call errors (e.g. TTS failures)
@call.on_call_event(CallEvent.CALL_ERROR)
def on_error(data):
print(f"Call error: {data['error_code']} — {data.get('error_message')}")
Available CallErrorCode values:
| Code | Meaning |
|---|---|
CallErrorCode.TTS_SESSION_ERROR |
TTS voice cloning session encountered an error |
Async Iterators (Continuous Streams)
Use async iterators for continuous audio streams:
async for audio_chunk in call.audio_stream():
# Process each audio chunk
processed = await process_audio(audio_chunk)
await call.send_audio(processed)
API Reference
SessionManager
Main entry point for managing connections.
Methods:
| Method | Description |
|---|---|
on_session_notification |
Decorator — receives a SessionNotification for every new inbound session. |
await open_session(session_id) |
Claim a session by id. Returns a Session. Idempotent. |
await create_session(channel, remote_address) |
Start an SDK-initiated (outbound) session. Returns a Session. |
await list_sessions(channel=None, remote_address=None) |
List OPEN sessions for this connector. |
await setup_call_flow(config) |
Push a custom call/message flow (CallFlowConfig) to the server. Send-once; re-send after a reconnect if needed. |
await run_forever() |
Run until Ctrl+C or context-manager exit. Use inside async with SessionManager(config) as sm: after registering handlers. |
SessionNotification
Delivered to @sm.on_session_notification handlers.
Fields:
session_id— Session identifier to pass toopen_session()remote_address— Remote party address (caller for inbound calls)local_address— Local party address (callee for inbound calls)channel— Channel type (e.g.TELCO,WA)activity— ASessionActivityTypevalue (NEW_INCOMING_CALL/NEW_INCOMING_MESSAGE/RESUME)created_at— Notify emit time, unix epoch seconds
Session
Represents a claimed session. Register handlers, then call ready().
Methods:
| Method | Description |
|---|---|
on_incoming_call(func) |
Decorator — receives the Call for a CALL session. |
on_incoming_message(func) |
Decorator — receives an IncomingMessage (WA channel). |
on_closed(func) |
Decorator — invoked when the session closes. |
await ready() |
Signal readiness; the server then starts delivering events. Idempotent; no-op on a closed session. |
await send_message(...) |
Send an outbound message (WA channel). |
await wait_closed() |
Await until the session is closed. |
await close(...) |
Close the session and release resources. |
Properties:
is_closed— Whether the session has been closedchannel— Channel type
Call
Represents a voice call with a media connection. Command methods (answer, connect, whisper, barge, spy, disconnect, close) return a CommandResult — truthy on success — for operational failures; connection-gone and programmer errors still raise.
Methods:
| Method | Description |
|---|---|
await answer() |
Answer the call |
await connect(ring_time_seconds=60) |
Initiate a 3-way conference between the caller, callee, and the agent |
await whisper() |
Switch to whisper mode. Only available after connect(). Agent audio is heard by the number subscriber only (the callee for incoming calls, or the caller for outgoing calls). The other party cannot hear the agent. |
await barge() |
Switch to barge mode. Only available after connect(). Agent joins the conversation and can be heard by both the caller and the callee. |
await spy() |
Switch to spy mode. Only available after connect(). Agent listens to both parties but neither can hear the agent (silent monitoring). |
await disconnect() |
End the call for all parties (server ends the call) |
await close() |
Close the call and release resources. After connect(): the agent leaves the call; the caller and callee remain connected. After answer() without connect(): the call ends for both the agent and the caller. |
await send_audio(audio_data) |
Queue binary audio data for sending |
await clear_send_audio_buffer() |
Clear all queued audio from the outgoing buffer |
await get_send_audio_buffer_size() |
Get current size of the outgoing buffer |
audio_stream(channel_id=0) |
Async iterator for incoming audio chunks |
on_terminated(func) |
Decorator sugar for CallEvent.CALL_TERMINATED |
on_call_event(event) |
Decorator for call event handlers (e.g. CallEvent.CALL_ERROR) |
to_json() / from_json(data) |
Serialize / reconstruct a Call for cross-node handoff |
Properties:
id— Unique call identifiercaller_number— Caller phone numbercallee_number— Callee phone numberstate— CurrentCallStateaudio_config— TheCallAudioConfigfor this call
CommandResult
Returned by Call command methods.
success—Trueon success (the object is truthy, soif await call.answer(): ...works)error_code— Server error code on failure, or"TIMEOUT"if the command timed outerror_message— Human-readable error detailpayload— Server response payload, when present
Error Handling
The SDK provides a hierarchy of custom exceptions. Catch WSSError to handle any SDK-raised error, or catch specific subclasses for more granular control. Note that operational command failures (server success=false, timeouts) are returned as a CommandResult, not raised.
WSSError- Base exception for all SDK errorsWSSConnectionError- Connection-related errors (session manager or media)WSSAuthenticationError- Authentication failures (API key, tokens, or mTLS)WSSCallError- Base class for call-related errors:WSSCallClosedError- Attempted operation on a closed callWSSCallCommandError- Command failure carryingerror_codeanderror_messageWSSCallCommandTimeoutError- Client timed out waiting for a command response
WSSSessionError- Voice session connection errorsBufferFullError- Outgoing audio buffer is fullBufferClosedError- Write attempted on a closed outgoing audio buffer (e.g. after call ended)
Handling CALL_UNANSWERED for connect()
When using call.connect(), the callee may not answer within the ring time. The server returns error code CALL_UNANSWERED, surfaced on the returned CommandResult. The call remains active — you can retry, try another number, or end the call.
@session.on_incoming_call
async def assistant_flow(call: Call):
await call.answer()
result = await call.connect(ring_time_seconds=30)
if not result:
if result.error_code == "CALL_UNANSWERED":
await call.disconnect()
else:
logger.error("Connect failed: %s (%s)", result.error_message, result.error_code)
Audio Buffering & Interruption Handling
The SDK uses a pull-based flow control mechanism. When you call send_audio(), the data is placed in an internal ConcurrentByteBuffer. The SDK then sends this data to the server only when the server requests it.
Why use a buffer?
- Smooth Playback: Prevents audio jitter by maintaining a steady supply of data for the server.
- Flow Control: Automatically handles the rate at which audio should be sent.
- Interruption Handling: If your AI model gets interrupted (e.g., via a Gemini Live interruption event), you can instantly clear the buffer to stop any pending audio from being played.
Handling Interruption
# When the AI model detects an interruption
await call.clear_send_audio_buffer()
# Now you can start sending new audio without old audio playing first
Integration with AI Models
This example demonstrates bidirectional audio streaming and real-time interruption handling using the Google GenAI SDK.
import asyncio
import os
from google import genai
from google.genai import types
from telcoflow import SessionManager, SessionManagerConfig, SessionNotification, Call, CallAudioConfig
gemini_client = genai.Client(api_key=os.getenv("GEMINI_API_KEY"))
MODEL = "gemini-2.5-flash-native-audio-preview-12-2025"
async def start_gemini_session(call: Call):
await call.answer()
async with gemini_client.aio.live.connect(model=MODEL) as session:
async def stream_to_gemini():
async for chunk in call.audio_stream():
await session.send_realtime_input(
audio=types.Blob(data=chunk, mime_type="audio/pcm;rate=24000")
)
async def receive_from_gemini():
async for response in session.receive():
if content := response.server_content:
if content.interrupted:
await call.clear_send_audio_buffer()
elif content.model_turn:
for part in content.model_turn.parts:
if part.inline_data:
await call.send_audio(part.inline_data.data)
await asyncio.gather(stream_to_gemini(), receive_from_gemini())
async def main():
config = SessionManagerConfig.create(
api_key=os.getenv("WSS_API_KEY"),
connector_uuid=os.getenv("WSS_CONNECTOR_UUID"),
call_audio=CallAudioConfig(sample_rate=24000),
)
async with SessionManager(config) as sm:
@sm.on_session_notification
async def handle_session(noti: SessionNotification):
session = await sm.open_session(noti.session_id)
@session.on_incoming_call
async def on_call(call: Call):
await start_gemini_session(call)
await session.ready()
await sm.run_forever()
if __name__ == "__main__":
asyncio.run(main())
Integration with Google ADK (Agent Development Kit)
For more complex AI agent behavior, including multi-agent orchestration, long-term memory, and structured tools, you can integrate with the Google Agent Development Kit (ADK).
The following example shows how to bridge Telcoflow's bidirectional audio stream with ADK's Runner and LiveRequestQueue.
import asyncio
import os
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.agents.live_request_queue import LiveRequestQueue
from google.adk.agents.run_config import RunConfig, StreamingMode
from google.genai import types
from telcoflow import SessionManager, SessionManagerConfig, SessionNotification, Call, CallAudioConfig
# Configure ADK Agent
agent = Agent(
name="telcoflow_agent",
model="gemini-2.5-flash-native-audio-preview-12-2025",
instruction="You are a helpful AI assistant talking over a phone call.",
)
session_service = InMemorySessionService()
runner = Runner(app_name="telcoflow_app", agent=agent, session_service=session_service)
async def start_adk_session(call: Call):
"""Bridges audio with ADK Live Runner."""
await call.answer()
# ADK uses a queue to receive real-time inputs (audio chunks)
live_request_queue = LiveRequestQueue()
# Task 1: Stream audio to ADK
async def stream_to_adk():
async for audio_chunk in call.audio_stream():
audio_blob = types.Blob(
data=audio_chunk,
mime_type="audio/pcm;rate=24000"
)
live_request_queue.send_realtime(audio_blob)
# Task 2: Receive events and audio from ADK and send back to caller
async def receive_from_adk():
run_config = RunConfig(
streaming_mode=StreamingMode.BIDI,
response_modalities=["AUDIO"]
)
async for event in runner.run_live(
user_id="default_user",
session_id=call.id,
live_request_queue=live_request_queue,
run_config=run_config,
):
# Instant interruption handling
if event.interrupted:
await call.clear_send_audio_buffer()
# Forward model audio to the caller
if event.content and event.content.parts[0].inline_data:
await call.send_audio(event.content.parts[0].inline_data.data)
# Run both directions concurrently
await asyncio.gather(stream_to_adk(), receive_from_adk())
async def main():
config = SessionManagerConfig.create(
api_key=os.getenv("WSS_API_KEY"),
connector_uuid=os.getenv("WSS_CONNECTOR_UUID"),
call_audio=CallAudioConfig(sample_rate=24000),
)
async with SessionManager(config) as sm:
@sm.on_session_notification
async def handle_session(noti: SessionNotification):
session = await sm.open_session(noti.session_id)
@session.on_incoming_call
async def handle_call(call: Call):
await start_adk_session(call)
await session.ready()
await sm.run_forever()
if __name__ == "__main__":
asyncio.run(main())
Thread Safety & Concurrency
- Each
call.newevent is processed in its own dedicated background task, allowing multiple calls to be handled concurrently - Each
Callruns independently in its own asyncio task - Control connection heartbeat runs in a separate task
- Reconnection logic runs in a separate task
- All operations are non-blocking
- The SDK automatically manages task lifecycle and cleanup
Requirements
- Python 3.11+
- websockets>=15.0
- typing-extensions>=4.8.0
- abxbus>=2.4
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file telcoflow-1.0.0.tar.gz.
File metadata
- Download URL: telcoflow-1.0.0.tar.gz
- Upload date:
- Size: 38.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a052acd4291d4b735d6b1db31553a00d3f89d1871a5dac00b7ce5f156cdbc5a4
|
|
| MD5 |
2eef044feb9a2ff24718965e94708238
|
|
| BLAKE2b-256 |
f77a14f3e4d03c502060623f55dbd7d739d0bc7641c1497ed892fcd9e8b0d2d1
|
File details
Details for the file telcoflow-1.0.0-py3-none-any.whl.
File metadata
- Download URL: telcoflow-1.0.0-py3-none-any.whl
- Upload date:
- Size: 47.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9cf8c5104ef9b84ac7d4373d5382d588aae113b6d7eb5f3fcf12c6a5e3716fb3
|
|
| MD5 |
cba2809fe87408ce547126c9a16067d7
|
|
| BLAKE2b-256 |
9f76488047eb5f6b3cad6d9d72c63b5cf493774b1e731ef5dc7c7c4db6d944de
|