getstream.io transport support for Pipecat agents framework.
Project description
pipecat-getstream
A GetStream.io WebRTC transport for Pipecat — the open-source framework for building conversational AI agents.
This plugin enables bidirectional audio and video streaming between Pipecat pipelines and GetStream video calls, allowing you to build voice and multimodal AI agents that interact with users through GetStream's real-time communication infrastructure.
Maintained by GetStream.
Features
- Bidirectional audio and video — send and receive audio/video between your Pipecat agent and call participants
- Participant lifecycle events — react to participants joining, leaving, and subscribing/unsubscribing tracks
- Interruption handling — immediate audio buffer flushing when a user interrupts the agent
- REST helper — manage users, calls, and authentication tokens via the GetStream API
- Custom events — send and receive structured JSON events between the agent and call participants (up to 5KB per event)
- Clock-based audio pacing — drift-free real-time audio output
Installation
pip install pipecat-getstream
Quickstart
import asyncio
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat_getstream import GetstreamTransport
from pipecat_getstream.transport import GetstreamParams
from pipecat_getstream.utils import GetstreamRESTHelper
async def main():
transport = GetstreamTransport(
api_key="your-api-key",
api_secret="your-api-secret",
call_type="default",
call_id="my-call-id",
user_id="pipecat-bot",
params=GetstreamParams(
audio_out_enabled=True,
audio_in_enabled=True,
),
)
pipeline = Pipeline([
transport.input(),
stt, # your STT service
user_aggregator, # LLM context aggregator
llm, # your LLM service
tts, # your TTS service
transport.output(),
assistant_aggregator,
])
runner = PipelineRunner()
task = PipelineTask(pipeline, params=PipelineParams(enable_metrics=True))
await runner.run(task)
asyncio.run(main())
Configuration
GetstreamTransport
| Parameter | Type | Description |
|---|---|---|
api_key |
str |
GetStream API key |
api_secret |
str |
GetStream API secret |
call_type |
str |
Call type (e.g. "default") |
call_id |
str |
Unique call identifier |
user_id |
str |
Bot/agent user ID |
params |
GetstreamParams |
Transport parameters (optional) |
GetstreamParams
Extends Pipecat's TransportParams. Commonly used options:
| Parameter | Type | Default | Description |
|---|---|---|---|
audio_in_enabled |
bool |
True |
Receive audio from participants |
audio_out_enabled |
bool |
True |
Send agent audio |
audio_in_sample_rate |
int |
16000 |
Input audio sample rate |
audio_out_sample_rate |
int |
16000 |
Output audio sample rate |
video_in_enabled |
bool |
False |
Receive video from participants |
video_out_enabled |
bool |
False |
Send agent video |
video_out_framerate |
int |
30 |
Output video frame rate |
video_out_is_live |
bool |
False |
Live video mode |
REST Helper
GetstreamRESTHelper provides convenience methods for managing calls and users via the GetStream API.
from pipecat_getstream.utils import GetstreamRESTHelper
helper = GetstreamRESTHelper(api_key="your-api-key", api_secret="your-api-secret")
# Create or update a user
await helper.create_user(user_id="demo-user", name="Demo User")
# Create or get a call
await helper.create_call(call_type="default", call_id="my-call", created_by_id="demo-user")
# Generate a JWT token for a user
token = helper.create_token(user_id="demo-user", expiration=3600)
# Delete a call
await helper.delete_call(call_type="default", call_id="my-call")
Custom Events
Send structured events to everyone watching the call:
await transport.send_custom_event({"type": "agent_state", "state": "thinking"})
Receive events from participants by registering on_stream_custom_event:
@transport.event_handler("on_stream_custom_event")
async def on_stream_custom_event(transport, event):
print(f"Got custom event: {event}")
Payloads are limited to 5KB. Events are delivered only to clients that are currently watching the call.
Event Handlers
Register event handlers on the transport to respond to call lifecycle events:
@transport.event_handler("on_connected")
async def on_connected(*args):
print("Bot connected to the call")
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant_id):
print(f"First participant joined: {participant_id}")
@transport.event_handler("on_stream_custom_event")
async def on_first_participant_joined(transport, payload):
print(f"Received custom event: {payload}")
Available Events
| Event | Arguments | Description |
|---|---|---|
on_connected |
— | Bot has connected to the call |
on_disconnected |
— | Bot has disconnected from the call |
on_before_disconnect |
— | Called before the bot disconnects |
on_participant_connected |
participant_id |
A participant has joined the call |
on_participant_disconnected |
participant_id |
A participant has left the call |
on_participant_left |
participant_id |
A participant has left the call |
on_first_participant_joined |
participant_id |
The first participant has joined the call |
on_audio_track_subscribed |
participant_id |
Audio from a participant is now being received |
on_audio_track_unsubscribed |
participant_id |
Audio from a participant is no longer being received |
on_video_track_subscribed |
participant_id |
Video from a participant is now being received |
on_video_track_unsubscribed |
participant_id |
Video from a participant is no longer being received |
on_stream_custom_event |
event |
Custom event from a client watching the call |
Running the Example
The included example.py demonstrates a complete voice agent using Deepgram STT/TTS and Google LLM.
1. Set environment variables
export STREAM_BASE_URL="https://your-stream-app-url"
export STREAM_API_KEY="your-api-key"
export STREAM_API_SECRET="your-api-secret"
export DEEPGRAM_API_KEY="your-deepgram-key"
export GOOGLE_API_KEY="your-google-key"
# Optional
export STREAM_CALL_TYPE="default"
export STREAM_CALL_ID="my-call-id"
export STREAM_USER_ID="pipecat-bot"
2. Run
uv run example.py
The bot will join the call and automatically open a browser window so you can join as a participant.
Compatibility
- Python: 3.10>=, <3.14
- Pipecat:
>= 0.0.108 - GetStream SDK:
>= 3.3.0, < 4
License
Apache 2.0 — see LICENSE for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pipecat_getstream-0.1.0.tar.gz.
File metadata
- Download URL: pipecat_getstream-0.1.0.tar.gz
- Upload date:
- Size: 19.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dddc65c1f28f1b576ff43f349bb11abc9bc86bd30878538aef55a27e02afa9d6
|
|
| MD5 |
6a5c8f1fad3ad661883e4dc58607f27e
|
|
| BLAKE2b-256 |
10a2e53739db07431a8fd02bac53207941a1ceec03ce6448408c709118d1ad0e
|
File details
Details for the file pipecat_getstream-0.1.0-py3-none-any.whl.
File metadata
- Download URL: pipecat_getstream-0.1.0-py3-none-any.whl
- Upload date:
- Size: 19.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2f9748c71f99260c822e858e8045e2b493cb5b4ecbda144eeb7fd3030ed50d50
|
|
| MD5 |
b5d4c987208935cab2a80eb12386b8d7
|
|
| BLAKE2b-256 |
b18514895df022812442ca05da1b582cd0b5be0efbd66ffb9e97113b9d13578e
|