Skip to main content

Provider-agnostic voice call runtime for telephony projects

Project description

voice_runtime

Provider-agnostic voice call runtime for telephony projects. Manages audio queues, mark synchronization, STT/TTS providers, and transport protocols — so consumers focus on conversation logic, not plumbing.

Quick Example

Make a call, say something, listen for a response via on_committed callback, hang up:

import asyncio
import threading
import time
import uvicorn
from fastapi import FastAPI

from voice_runtime.session import VoiceSession
from voice_runtime.transports.twilio_ws import register_voice_websocket
from voice_runtime.transports.twilio_call import initiate_outbound_call
from voice_runtime.tts import create_tts
from voice_runtime.stt import create_stt

# 1. Create session and start WebSocket server
session = VoiceSession()
app = FastAPI()
register_voice_websocket(app, session)

def run_server():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    session.set_loop(loop)
    loop.run_until_complete(uvicorn.Server(
        uvicorn.Config(app, host="0.0.0.0", port=8080, log_level="warning")
    ).serve())

threading.Thread(target=run_server, daemon=True).start()
time.sleep(1)  # wait for server to start

# 2. Initiate call (Twilio calls back to our /voice WebSocket)
call_sid = initiate_outbound_call("+358401234567")
session.call_sid = call_sid
session.wait_for_ws_connect(timeout=30)

# 3. Speak
tts = create_tts()  # default: ElevenLabs
tts.speak("Hello! How are you today?", session)
session.send_mark_and_wait("after-greeting")  # block until playback done

# 4. Listen — persistent STT with on_committed callback
transcript_event = threading.Event()
heard = []

def on_committed(text: str):
    heard.append(text)
    transcript_event.set()

session.stt_factory = lambda: create_stt()  # default: ElevenLabs
# Transport starts STT automatically; on_committed fires for each utterance

transcript_event.wait(timeout=30)
print(f"Caller said: {heard}")

# 5. Hang up
session.request_disconnect()

What happens under the hood

Consumer thread              voice_runtime                 Transport (Twilio)
─────────────────────────────────────────────────────────────────────────────
initiate_outbound_call()  →  Twilio REST calls.create()  → Twilio dials phone
                             build_stream_twiml()           with <Connect><Stream>
wait_for_ws_connect()     ←  signal_ws_connected()       ← Twilio opens /voice WS
                                                            stt_factory() → stt.start()
tts.speak(text, session)  →  ffmpeg MP3→μ-law             → send_audio task
                             put_outbound_sync()             sends base64 frames
send_mark_and_wait()      →  get_pending_mark()           → WS sends mark JSON
                          ←  signal_mark_received()       ← WS receives mark echo
on_committed(text)        ←  stt.on_committed()           ← WS receives media frames
                                                             decodes base64 → STT
request_disconnect()      →  _disconnect_requested.set()  → watch_disconnect task
                                                             stt.stop(), closes WS
                                                             Twilio ends call

Architecture

┌──────────────────────────────────────────────┐
│ Consumer (outcaller, ninchat_voice)          │
│  - Subclass VoiceSession (e.g. TelcoSession) │
│  - Call speak(), listen(), hang up           │
├──────────────────────────────────────────────┤
│ voice_runtime                                │
│  - VoiceSession: queues, marks, intents      │
│  - Factories: create_stt / create_tts        │
│  - Providers: ElevenLabs, Azure (STT + TTS)  │
│  - SttTee: dual-provider fan-out             │
│  - Audio: G.711 μ-law codec + mixer          │
├──────────────────────────────────────────────┤
│ Transport (protocol-specific)                │
│  - twilio_ws: Media Streams WebSocket        │
│  - twilio_call: REST call initiation + TwiML │
└──────────────────────────────────────────────┘

Key invariant: VoiceSession has zero transport or provider imports. Consumers never import Twilio or ElevenLabs directly — they use factories and the intent API.

Factories

Provider-agnostic factories mirror the yamlgraph create_llm() pattern:

from voice_runtime.stt import create_stt, get_stt_class
from voice_runtime.tts import create_tts
from voice_runtime.transport import create_transport

stt = create_stt(provider="elevenlabs")     # or "azure"
tts = create_tts(provider="elevenlabs")     # or "azure"
transport = create_transport(provider="twilio")

# get_stt_class returns the class without instantiating (for factory arguments)
SttClass = get_stt_class(provider="elevenlabs")
session.stt_factory = lambda: SttClass(language_code="en")

SttProvider Protocol

All STT providers implement this structural protocol (defined in providers/__init__.py):

class SttProvider(Protocol):
    on_committed: Callable[[str], None] | None    # final transcript for utterance
    on_recognizing: Callable[[str], None] | None  # interim hypothesis (NC-199)
    on_error: Callable[[str], None] | None        # fatal error after reconnect exhausted (NC-258)

    def set_speaking(self, speaking: bool) -> None: ...
    async def start(self, inbound_queue: asyncio.Queue[bytes | None]) -> None: ...
    async def stop(self) -> None: ...
Callback When it fires Typical consumer action
on_committed Final transcript past echo discard window Route to LLM / FSM
on_recognizing Interim hypothesis (may change) Show live transcription UI
on_error Reconnect attempts exhausted (fatal) Transition FSM to error state

Transport starts/stops the provider; the consumer decides routing (queue, dispatch, ignore).

TtsProvider Protocol

All TTS providers implement this structural protocol (NC-260 Gap A):

class TtsProvider(Protocol):
    on_error: Callable[[str], None] | None  # synthesis failure (NC-260 Gap A)

    def speak(
        self,
        text: str,
        session: VoiceSession,
        stop_event: threading.Event | None = ...,
    ) -> dict[str, Any]: ...

speak() returns a dict with keys: last_spoken (str), and optionally call_disconnected (bool) or interrupted (bool). on_error fires on synthesis failures so the FSM doesn't hang in a speaking state.

VoiceSession

Central coordinator between sync tool threads, async transport, and STT/TTS providers.

Audio I/O

Method Thread safety Purpose
put_inbound(data) Any → async Enqueue caller audio (transport calls this)
get_outbound() async only Dequeue agent audio (transport reads this)
put_outbound_sync(data) Sync → async Enqueue agent audio (TTS provider calls this)
clear_inbound() Any Drain stale audio frames

All sync→async bridging uses asyncio.run_coroutine_threadsafe().

Mark Synchronization

Marks let sync tool code block until the transport confirms audio playback reached a point. This is how you know a TTS utterance finished playing before you start listening.

tts.speak("What is your name?", session)
session.send_mark_and_wait("after-question", timeout=10.0)
# Now safe to start listening — caller heard the full question

session.clear_inbound()
transcript = stt.listen(session, timeout=30)
Method Purpose
send_mark_and_wait(name, timeout) Block sync thread until mark echoed
signal_mark_received(name) Called by transport when mark arrives
get_pending_mark() Async — transport reads next mark to send

Transport Intent (NC-154)

Consumers signal what they want; the transport decides how.

session.request_disconnect()     # transport closes connection, call ends
session.request_clear_buffer()   # transport discards buffered audio (barge-in)

Both are thread-safe. The transport watches _disconnect_requested (asyncio.Event) and _clear_queue (asyncio.Queue) and acts in its own protocol's terms — e.g. Twilio closes the WebSocket, which ends the call; SIP would send BYE.

STT Factory

Attach an STT factory and the transport manages its lifecycle automatically:

from voice_runtime.stt import create_stt

session.stt_factory = lambda: create_stt(provider="elevenlabs")
# Transport calls stt_factory() on stream start, stt.stop() on disconnect

Optional secondary STT for parallel logging/comparison (via SttTee):

session.stt_secondary_factory = lambda: create_stt(provider="azure")
# Transport wraps both in SttTee — primary drives on_committed, secondary logs only

STT Ready Hook (NC-260 Gap E)

Wire callbacks after the transport creates the STT instance but before start():

def wire_callbacks(stt: SttProvider):
    stt.on_committed = handle_transcript
    stt.on_recognizing = handle_interim
    stt.on_error = handle_stt_death

session.on_stt_ready = wire_callbacks

This replaces the old pattern of wiring callbacks before attaching the factory. The transport calls on_stt_ready(stt) after construction, guaranteeing callbacks are set before start() fires.

Lifecycle

Method Purpose
signal_ws_connected(stream_sid) Transport calls when connection established
wait_for_ws_connect(timeout) Consumer blocks until connected; raises CallNotAnsweredError
signal_disconnected() Transport calls on hangup
is_disconnected Property — check if call ended
reset() Clear all state for session reuse (multi-call servers)

Audio Monitoring

Optional two-channel mixer for real-time call monitoring (requires ffplay) and WAV recording (NC-235):

from pathlib import Path
from voice_runtime.audio import AudioMixer

# Monitor only (plays mixed audio through ffplay)
mixer = AudioMixer()

# Monitor + record to WAV file (8kHz mono mulaw)
mixer = AudioMixer(record_path=Path("recordings/call_001.wav"))

mixer.start()
session.set_mixer(mixer)
# session.tap_caller() / session.tap_agent() now feed audio to ffplay + WAV

Exceptions

Exception When
MissingStreamUrlError VOICE_STREAM_URL env var not set
CallNotAnsweredError(timeout) WebSocket didn't connect within timeout
CallHangupError Call hung up during a listen operation

Transport: Twilio

WebSocket Handler

Registers a /voice endpoint on a FastAPI app implementing Twilio Media Streams:

from voice_runtime.transports.twilio_ws import register_voice_websocket

app = FastAPI()
register_voice_websocket(app, session)

Runs 5 async tasks on stream start: send_audio, send_marks, watch_disconnect, send_clears, stt (if factory provided).

Call Initiation

from voice_runtime.transports.twilio_call import (
    initiate_outbound_call,
    build_stream_twiml,  # alias: build_stream_xml
)

# Outbound: dial phone, Twilio connects back to /voice WebSocket
call_sid = initiate_outbound_call("+358401234567")

# Inbound webhook: return TwiML that tells Twilio to stream audio to /voice
xml = build_stream_twiml("wss://example.ngrok.io")

Providers

ElevenLabs TTS

Streams text → ElevenLabs API → ffmpeg (MP3 → μ-law 8kHz) → session outbound queue.

from voice_runtime.tts import create_tts

tts = create_tts(provider="elevenlabs")
result = tts.speak("Hello", session, stop_event=barge_in_event)
# result: {"last_spoken": "Hello"} or {"last_spoken": "Hello", "call_disconnected": True}

Supports barge-in interrupt: pass a threading.Event as stop_event; set it from another thread to cut TTS mid-stream. Result may include {"interrupted": True}.

ElevenLabs STT

Persistent Scribe WebSocket per call lifetime. Barge-in detection, echo discard, reconnect on fatal errors.

from voice_runtime.stt import create_stt

stt = create_stt(provider="elevenlabs")
stt.on_committed = lambda text: print(f"Heard: {text}")
await stt.start(session.inbound)
# ... later
await stt.stop()

Typically managed by the transport via session.stt_factory rather than started manually.

Azure TTS

Streams text → Azure Speech SDK → mulaw 8kHz → session outbound queue.

from voice_runtime.tts import create_tts

tts = create_tts(provider="azure")
result = tts.speak("Hello", session, stop_event=barge_in_event)

Same speak() interface as ElevenLabs. Uses AZURE_SPEECH_KEY, AZURE_SPEECH_REGION, AZURE_TTS_VOICE env vars.

Azure STT

Persistent Azure Speech SDK recognizer with continuous recognition.

from voice_runtime.stt import create_stt

stt = create_stt(provider="azure", language_code="fi-FI", silence_timeout_ms=1500)
stt.on_committed = lambda text: print(f"Heard: {text}")
await stt.start(session.inbound)

Same SttProvider protocol as ElevenLabs. Echo discard window (400ms) after TTS ends.

SttTee (Dual STT)

Fan-out adapter running two STT providers on the same audio stream:

from voice_runtime.stt_tee import SttTee

tee = SttTee(primary=elevenlabs_stt, secondary=azure_stt)
tee.on_committed = handler  # proxied to primary only
await tee.start(session.inbound)

Primary drives production (on_committed). Secondary receives same frames for logging/comparison only — its errors never propagate. Typically wired automatically via session.stt_secondary_factory.

Audio Codec

G.711 μ-law at 8kHz — Twilio's native format. 160 bytes = 20ms frame.

from voice_runtime.audio import mix_frames

mixed = mix_frames(caller_chunk, agent_chunk)  # mix two 160-byte frames

Environment Variables

Variable Purpose Required
VOICE_STREAM_URL Public WebSocket URL for transport callback Yes
VOICE_SERVER_PORT Uvicorn listen port No (default: 8080)
TWILIO_ACCOUNT_SID Twilio credentials (call initiation) For outbound
TWILIO_AUTH_TOKEN Twilio credentials For outbound
TWILIO_PHONE_NUMBER Outbound caller ID For outbound
ELEVENLABS_API_KEY ElevenLabs authentication For ElevenLabs
ELEVENLABS_VOICE_ID TTS voice No (default: Rachel)
ELEVENLABS_MODEL TTS model No (default: eleven_multilingual_v2)
STT_MODEL_ID ElevenLabs STT model No (default: scribe_v2_realtime)
STT_LANGUAGE_CODE ElevenLabs STT language No (default: fi)
AZURE_SPEECH_KEY Azure Speech SDK authentication For Azure
AZURE_SPEECH_REGION Azure Speech SDK region No (default: westeurope)
AZURE_TTS_VOICE Azure TTS voice name No (default: fi-FI-NooraNeural)
VOICE_MONITOR Enable AudioMixer monitoring No (default: off)

Transport: SMS (NC-193)

Send SMS via Twilio REST API without importing the Twilio SDK in consumers:

from voice_runtime.transport import get_sms_transport

sms = get_sms_transport()  # default: twilio
result = sms.send_sms(to="+358401234567", body="Your appointment is confirmed.")
# result: {"message_sid": "SM...", "status": "queued", "to": "+358..."}

Requires TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN, and TWILIO_PHONE_NUMBER env vars.

Consumer Pattern

Typical consumer subclasses VoiceSession and adds server lifecycle:

from dataclasses import dataclass
from voice_runtime.session import VoiceSession

@dataclass
class TelcoSession(VoiceSession):
    def start(self):
        app = FastAPI()
        register_voice_websocket(app, self)
        # Run uvicorn in daemon thread
        threading.Thread(target=self._run_loop, daemon=True).start()

    def shutdown(self):
        # Signal event loop to stop, join thread
        ...

Tool nodes then use the session for audio I/O, mark sync, and transport intents — without knowing anything about Twilio, WebSockets, or ElevenLabs.

Known Consumers

Any project that subclasses VoiceSession and registers a FastAPI WebSocket handler via register_voice_websocket is a consumer. See the Consumer Pattern section above for the canonical implementation.

Multi-Call Session Reuse

When servers handle multiple sequential calls on the same VoiceSession instance, reset() clears all state between calls:

  • Stops active STT via asyncio.run_coroutine_threadsafe(stt.stop(), loop) before clearing the reference (prevents orphaned WebSocket connections)
  • Drains inbound and outbound queues
  • Resets mark synchronization and transport intent events

The STT start() method also drains the inbound queue as defense-in-depth against sentinel values from prior call cleanup.

STT Reconnect on Fatal Errors

PersistentSttSession detects fatal WebSocket errors (connection closed, protocol errors) and automatically reconnects:

  • _on_error() schedules _reconnect_after_error() for errors in _FATAL_ERRORS
  • Reconnect drains stale frames, creates a new WebSocket, and resumes the feed task
  • _feed_audio() wraps send() in try/except for dead socket resilience

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

voice_runtime-0.1.2.tar.gz (64.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

voice_runtime-0.1.2-py3-none-any.whl (41.2 kB view details)

Uploaded Python 3

File details

Details for the file voice_runtime-0.1.2.tar.gz.

File metadata

  • Download URL: voice_runtime-0.1.2.tar.gz
  • Upload date:
  • Size: 64.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for voice_runtime-0.1.2.tar.gz
Algorithm Hash digest
SHA256 df593899067a638dc65e0333952f5df2e2a4f39da179a70fa0be66428491e389
MD5 fc485df288e66036b693f7e7052a5430
BLAKE2b-256 e3091446e71a9f467c8ace1ad72f1aed2454b66b1c7ffc590b85785792bb350e

See more details on using hashes here.

Provenance

The following attestation bundles were made for voice_runtime-0.1.2.tar.gz:

Publisher: publish.yml on sheikkinen/voice_runtime

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file voice_runtime-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: voice_runtime-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 41.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for voice_runtime-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 4239060ebd8f2d9a18ae62b7db3ef2058c1ad557e9da9717b183becdede819b7
MD5 f0b5aae9a756665d8fee36a8f2b66843
BLAKE2b-256 7e3e2751271c18d9b425fb3f28b17dff065deeaa010b34b09b663a9d4fd192ed

See more details on using hashes here.

Provenance

The following attestation bundles were made for voice_runtime-0.1.2-py3-none-any.whl:

Publisher: publish.yml on sheikkinen/voice_runtime

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page