Production-grade Python SDK for RinggLabs STT over REST and WebSocket.
Project description
RinggLabs Python SDK
ringglabs is the official Python SDK for RinggLabs-compatible speech-to-text services.
It supports:
- Sync and async file transcription (REST/offline)
- Sync and async real-time transcription (streaming)
Installation
pip install ringglabs
Compatibility
- Python:
3.10+ - Windows:
10+ - macOS:
11+ - Ubuntu:
20.04+
Parameter Reference
Client Initialization
Client(...) and AsyncClient(...) share the same constructor parameters.
| Parameter | Type | Default | Description |
|---|---|---|---|
base_url |
str | None |
"prod-api.ringg.ai" |
host name. |
api_key |
str | None |
None |
Default API key for all requests; can be overridden per call. |
timeout |
TimeoutConfig | None |
TimeoutConfig() |
HTTP and WS timeout budgets. |
default_headers |
dict[str, str] | None |
None |
Extra headers attached to SDK requests. |
transcribe(...) Parameters
Client.transcribe(...) and AsyncClient.transcribe(...):
| Parameter | Type | Default | Description |
|---|---|---|---|
source |
str | Path | bytes | bytearray | BinaryIO |
required | Input audio source. |
language |
str |
"hi" |
Language hint sent to proxy. |
enable_cap_punc |
bool |
True |
Enable capitalization and punctuation in output. |
api_key |
str | None |
None |
Per-request API key override. |
filename |
str | None |
auto-detected, fallback "audio.wav" |
Multipart filename for upload. |
content_type |
str |
"audio/wav" |
Multipart content type. |
stream(...) Parameters
Client.stream(...) and AsyncClient.stream(...):
| Parameter | Type | Default | Description |
|---|---|---|---|
sample_rate |
int |
16000 |
Audio sample rate for stream start config. |
encoding |
str |
"int16" |
Audio encoding (int16, linear16, float32, int32). |
language |
str |
"hi" |
Language hint sent to proxy. |
mode |
str |
"stream" |
Streaming mode (stream or on_final). |
vad_tail_sil_ms |
int |
200 |
Server VAD tail silence in ms. |
vad_confidence |
float |
0.55 |
Server VAD confidence threshold. |
enable_cap_punc |
bool |
True |
Enable capitalization/punctuation processing. |
accept_client_vad_events |
bool |
False |
Enables start_speaking()/stop_speaking() signaling from client. |
api_key |
str | None |
None |
Per-stream API key override. |
Stream Session Controls
Sync session (StreamSession)
| Method | Purpose |
|---|---|
send_audio(bytes) |
Send raw audio bytes chunk. |
send_vad_event(state) |
Send explicit VAD event (user_start_speaking, user_stop_speaking). |
start_speaking() |
Convenience wrapper for user_start_speaking. |
stop_speaking() |
Convenience wrapper for user_stop_speaking. |
ping() |
Send ping frame/message. |
end(command="end") |
Request stream finalization/termination. |
recv_event() |
Receive one parsed server event. |
events() |
Iterator over incoming parsed events. |
close() |
Close websocket session. |
Async session (AsyncStreamSession)
Async equivalents:
await send_audio(...)await send_vad_event(...)await start_speaking()await stop_speaking()await ping()await end(...)await recv_event()async for event in session.events(): ...await close()
Timeout Configuration
TimeoutConfig fields:
| Field | Default | Description |
|---|---|---|
connect |
10.0 |
HTTP connect timeout (seconds). |
read |
10.0 |
HTTP read timeout (seconds). |
write |
10.0 |
HTTP write timeout (seconds). |
pool |
10.0 |
HTTP pool timeout (seconds). |
ws_open |
10.0 |
WebSocket open timeout (seconds). |
ws_recv |
30.0 |
WebSocket receive timeout (seconds). |
ws_close |
10.0 |
WebSocket close timeout (seconds). |
Response Models
RestTranscriptionResult
| Field | Type |
|---|---|
status |
str |
transcription |
str |
is_final |
bool |
language |
str |
duration_seconds |
float |
processing_time_seconds |
float |
request_id |
str |
raw |
dict |
WebSocket Event Types
readytranscriptackpongerror
All events include raw with original server payload.
transcript event fields may include:
transcriptionis_finallanguagerequest_idsegment_idxsegmentscompute_latency_msaudio_duration_sectranscribed_audio_duration_secprocessing_time_ms
Health Check
Sync:
from ringglabs.stt import Client
with Client(api_key="rk_live_xxx") as client:
print(client.health())
Async:
from ringglabs.stt import AsyncClient
health = await AsyncClient(api_key="rk_live_xxx").health()
print(health)
Examples (Stream, Offline)
Helper Functions
import wave
def load_wav_mono_int16_16k(path: str) -> tuple[int, bytes]:
with wave.open(path, "rb") as wf:
channels = wf.getnchannels()
sample_width = wf.getsampwidth()
sample_rate = wf.getframerate()
frames = wf.getnframes()
audio = wf.readframes(frames)
if channels != 1:
raise ValueError("Expected mono WAV (1 channel).")
if sample_width != 2:
raise ValueError("Expected 16-bit PCM WAV (sample width = 2).")
if sample_rate != 16000:
raise ValueError("Expected 16 kHz WAV for these full-audio stream examples.")
return sample_rate, audio
def iter_pcm_chunks(audio: bytes, sample_rate: int, chunk_ms: int = 20):
samples_per_chunk = max(1, int(sample_rate * (chunk_ms / 1000.0)))
bytes_per_chunk = samples_per_chunk * 2 # int16 mono
for i in range(0, len(audio), bytes_per_chunk):
yield audio[i : i + bytes_per_chunk]
1) Sync Stream (mode="stream")
from ringglabs.stt import Client, TimeoutError as SdkTimeoutError
def main() -> None:
sample_rate, audio = load_wav_mono_int16_16k("sample.wav")
transcripts: list[str] = []
with Client(api_key="rk_live_xxx").stream(
sample_rate=sample_rate,
encoding="int16",
language="en",
mode="stream",
enable_cap_punc=True,
accept_client_vad_events=False,
) as session:
for chunk in iter_pcm_chunks(audio, sample_rate, chunk_ms=20):
session.send_audio(chunk)
session.end()
try:
for event in session.events():
if event.type == "transcript" and event.transcription.strip():
transcripts.append(event.transcription.strip())
except SdkTimeoutError:
pass
print("segment transcripts:", transcripts)
if __name__ == "__main__":
main()
2) Async Stream (mode="stream")
import asyncio
from ringglabs.stt import AsyncClient, TimeoutError as SdkTimeoutError
async def main() -> None:
sample_rate, audio = load_wav_mono_int16_16k("sample.wav")
transcripts: list[str] = []
async with AsyncClient(api_key="rk_live_xxx").stream(
sample_rate=sample_rate,
encoding="int16",
language="en",
mode="stream",
enable_cap_punc=True,
accept_client_vad_events=False,
) as session:
for chunk in iter_pcm_chunks(audio, sample_rate, chunk_ms=20):
await session.send_audio(chunk)
await session.end()
try:
async for event in session.events():
if event.type == "transcript" and event.transcription.strip():
transcripts.append(event.transcription.strip())
except SdkTimeoutError:
pass
print("segment transcripts:", transcripts)
if __name__ == "__main__":
asyncio.run(main())
3) Sync Stream (mode="on_final")
from ringglabs.stt import Client, TimeoutError as SdkTimeoutError
def main() -> None:
sample_rate, audio = load_wav_mono_int16_16k("sample.wav")
partials: list[str] = []
finals: list[str] = []
with Client(api_key="rk_live_xxx").stream(
sample_rate=sample_rate,
encoding="int16",
language="en",
mode="on_final",
enable_cap_punc=True,
accept_client_vad_events=True,
) as session:
session.start_speaking()
for chunk in iter_pcm_chunks(audio, sample_rate, chunk_ms=20):
session.send_audio(chunk)
session.stop_speaking()
session.end()
try:
for event in session.events():
if event.type != "transcript":
continue
text = event.transcription.strip()
if not text:
continue
if event.is_final:
finals.append(text)
else:
partials.append(text)
except SdkTimeoutError:
pass
print("partials:", partials)
print("finals:", finals)
if finals:
print("final transcript:", finals[-1])
if __name__ == "__main__":
main()
4) Async Stream (mode="on_final")
import asyncio
from ringglabs.stt import AsyncClient, TimeoutError as SdkTimeoutError
async def main() -> None:
sample_rate, audio = load_wav_mono_int16_16k("sample.wav")
partials: list[str] = []
finals: list[str] = []
async with AsyncClient(api_key="rk_live_xxx").stream(
sample_rate=sample_rate,
encoding="int16",
language="en",
mode="on_final",
enable_cap_punc=True,
accept_client_vad_events=True,
) as session:
await session.start_speaking()
for chunk in iter_pcm_chunks(audio, sample_rate, chunk_ms=20):
await session.send_audio(chunk)
await session.stop_speaking()
await session.end()
try:
async for event in session.events():
if event.type != "transcript":
continue
text = event.transcription.strip()
if not text:
continue
if event.is_final:
finals.append(text)
else:
partials.append(text)
except SdkTimeoutError:
pass
print("partials:", partials)
print("finals:", finals)
if finals:
print("final transcript:", finals[-1])
if __name__ == "__main__":
asyncio.run(main())
5) Sync Transcribe (Offline)
from ringglabs.stt import Client
def main() -> None:
with Client(api_key="rk_live_xxx") as client:
result = client.transcribe(
"sample.wav",
language="en",
enable_cap_punc=True,
content_type="audio/wav",
)
print("request_id:", result.request_id)
print("transcription:", result.transcription)
if __name__ == "__main__":
main()
6) Async Transcribe (Offline)
import asyncio
from ringglabs.stt import AsyncClient
async def main() -> None:
async with AsyncClient(api_key="rk_live_xxx") as client:
result = await client.transcribe(
"sample.wav",
language="hi",
enable_cap_punc=True,
content_type="audio/wav",
)
print("request_id:", result.request_id)
print("transcription:", result.transcription)
if __name__ == "__main__":
asyncio.run(main())
7) Sync Transcribe from bytes and BinaryIO
from io import BytesIO
from pathlib import Path
from ringglabs.stt import Client
def main() -> None:
wav_bytes = Path("sample.wav").read_bytes()
with Client(api_key="rk_live_xxx") as client:
# bytes source
bytes_result = client.transcribe(
wav_bytes,
language="en",
enable_cap_punc=True,
filename="sample_bytes.wav",
content_type="audio/wav",
)
print("bytes transcription:", bytes_result.transcription)
# BinaryIO source
fileobj = BytesIO(wav_bytes)
fileobj_result = client.transcribe(
fileobj,
language="en",
enable_cap_punc=True,
filename="sample_fileobj.wav",
content_type="audio/wav",
)
print("fileobj transcription:", fileobj_result.transcription)
if __name__ == "__main__":
main()
8) Async Transcribe from bytes and BinaryIO
import asyncio
from io import BytesIO
from pathlib import Path
from ringglabs.stt import AsyncClient
async def main() -> None:
wav_bytes = Path("sample.wav").read_bytes()
async with AsyncClient(api_key="rk_live_xxx") as client:
# bytes source
bytes_result = await client.transcribe(
wav_bytes,
language="hi",
enable_cap_punc=True,
filename="sample_bytes.wav",
content_type="audio/wav",
)
print("bytes transcription:", bytes_result.transcription)
# BinaryIO source
fileobj = BytesIO(wav_bytes)
fileobj_result = await client.transcribe(
fileobj,
language="hi",
enable_cap_punc=True,
filename="sample_fileobj.wav",
content_type="audio/wav",
)
print("fileobj transcription:", fileobj_result.transcription)
if __name__ == "__main__":
asyncio.run(main())
Error Handling
Common SDK exceptions:
ApiError(includesstatus_code,code,payload)AuthenticationErrorTimeoutErrorTransportErrorProtocolError
from ringglabs.stt import Client, ApiError, TimeoutError, TransportError
try:
with Client(api_key="rk_live_xxx") as client:
result = client.transcribe("sample.wav")
print(result.transcription)
except TimeoutError:
print("request timed out")
except TransportError:
print("network/connection failure")
except ApiError as exc:
print("api error:", exc.status_code, exc.code, exc.message)
Retry Wrapper Examples (retry and async_retry)
Use bounded retries for transient transport/timeouts only.
Sync bounded retry
from ringglabs.stt import Client, retry, TimeoutError, TransportError
with Client(api_key="rk_live_xxx") as client:
def run_once():
return client.transcribe("sample.wav", language="en", enable_cap_punc=True)
result = retry(
run_once,
attempts=3,
initial_backoff_sec=0.25,
max_backoff_sec=1.0,
retry_on=(TimeoutError, TransportError),
)
print(result.transcription)
Async bounded retry
import asyncio
from ringglabs.stt import AsyncClient, async_retry, TimeoutError, TransportError
async def main() -> None:
async with AsyncClient(api_key="rk_live_xxx") as client:
async def run_once():
return await client.transcribe("sample.wav", language="en", enable_cap_punc=True)
result = await async_retry(
run_once,
attempts=3,
initial_backoff_sec=0.25,
max_backoff_sec=1.0,
retry_on=(TimeoutError, TransportError),
)
print(result.transcription)
if __name__ == "__main__":
asyncio.run(main())
Production Guidance
- Reuse client instances in long-running services.
- Set explicit timeout budgets with
TimeoutConfig. - Log
result.rawandevent.rawfor observability. - Use retries only for idempotent operations and transport failures.
- Keep sync and async execution models separate in production apps.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ringglabs-0.1.0.tar.gz.
File metadata
- Download URL: ringglabs-0.1.0.tar.gz
- Upload date:
- Size: 21.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8168e119f615937232b05689f0933b1c0eba3a5d6f0d407d29573e3e7e5907e0
|
|
| MD5 |
934ed2006bb2f53954647caaeba79ee3
|
|
| BLAKE2b-256 |
32324da3ae722a4323defe67460cff44b8f7bda50c7cb85ef44cb992cbb198c4
|
File details
Details for the file ringglabs-0.1.0-py3-none-any.whl.
File metadata
- Download URL: ringglabs-0.1.0-py3-none-any.whl
- Upload date:
- Size: 20.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1a347d3d57aeea27d2901450c20abf4a48d04376eaea1f856c8a7df33b0f6799
|
|
| MD5 |
b25ee428d02bf2f558c7fb25547ba71b
|
|
| BLAKE2b-256 |
f20919d542c493a608551b69e5f109a6577394de2f67ed1c09c6af2c007c51a0
|