Real-time event streaming client for Autoplay connectors
Project description
autoplay-sdk
Real-time event streaming client for Autoplay connectors.
Receive live UI actions and session summaries from the Autoplay event connector via SSE (Server-Sent Events) — ideal as a real-time data source for RAG pipelines, analytics systems, or custom automation.
Installation
pip install autoplay-sdk
Requirements: Python 3.10+
Quickstart
from autoplay_sdk import ConnectorClient, ActionsPayload
STREAM_URL = "https://your-connector.onrender.com/stream/YOUR_PRODUCT_ID"
API_TOKEN = "uk_live_..."
def on_actions(payload: ActionsPayload):
print(payload.to_text())
ConnectorClient(url=STREAM_URL, token=API_TOKEN) \
.on_actions(on_actions) \
.run()
run() blocks and reconnects automatically on any network failure.
Press Ctrl-C to stop.
Typed payloads
All callbacks receive typed dataclass instances — not raw dicts. Your IDE
will autocomplete fields and you get a .to_text() method on every payload
that returns an embedding-ready string.
from autoplay_sdk import ActionsPayload, SummaryPayload
def on_actions(payload: ActionsPayload):
print(payload.session_id) # str | None
print(payload.email) # str | None
print(payload.count) # int
for action in payload.actions:
print(action.title) # str
print(action.description) # str
print(action.canonical_url) # str
# embedding-ready text — one call, no formatting logic needed
text = payload.to_text()
# "Session ps_abc123 — 3 actions\n1. Viewed Dashboard — https://...\n..."
def on_summary(payload: SummaryPayload):
print(payload.session_id)
print(payload.replaces) # number of actions this summary replaces
text = payload.to_text() # the prose summary string directly
Background usage (non-blocking)
client = ConnectorClient(url=STREAM_URL, token=API_TOKEN)
client.on_actions(on_actions)
client.run_in_background()
# your application continues here
import time
time.sleep(60)
client.stop()
Context manager
with ConnectorClient(url=STREAM_URL, token=API_TOKEN) as client:
client.on_actions(on_actions).run_in_background()
do_other_work()
# stop() is called automatically on exit
Async RAG pipeline (recommended)
Use AsyncConnectorClient when your RAG pipeline is built on asyncio —
LangChain, LlamaIndex, FastAPI, or any framework where embedding/vector calls
are already async.
import asyncio
import openai
from autoplay_sdk import AsyncConnectorClient, ActionsPayload, SummaryPayload
openai_client = openai.AsyncOpenAI()
async def on_actions(payload: ActionsPayload):
"""Embed the batch and upsert into your vector store."""
text = payload.to_text() # embedding-ready string, no formatting needed
response = await openai_client.embeddings.create(
input=text, model="text-embedding-3-small"
)
embedding = response.data[0].embedding
# upsert into Pinecone, Weaviate, Chroma, pgvector, etc.
await your_vector_store.upsert(
id=payload.session_id or "unknown",
vector=embedding,
metadata={
"session_id": payload.session_id,
"email": payload.email,
"count": payload.count,
},
)
async def on_summary(payload: SummaryPayload):
"""Replace the raw action history with a compact prose summary."""
text = payload.to_text() # the prose summary string directly
print(f"[summary] session={payload.session_id}: {text}")
# store in your RAG context window / vector store as well
async def main():
async with AsyncConnectorClient(url=STREAM_URL, token=API_TOKEN) as client:
client.on_actions(on_actions).on_summary(on_summary)
await client.run()
asyncio.run(main())
Non-blocking inside an existing event loop
client = AsyncConnectorClient(url=STREAM_URL, token=API_TOKEN)
client.on_actions(on_actions)
task = client.run_in_background() # asyncio.Task — returns immediately
await do_other_async_work()
client.stop()
await task
Sync RAG pipeline
If your pipeline is synchronous (no async/await), use ConnectorClient.
Slow callbacks are safe — the SSE reader and the callback executor run on
separate threads.
import openai
from autoplay_sdk import ConnectorClient, ActionsPayload
openai_client = openai.OpenAI()
def on_actions(payload: ActionsPayload):
text = payload.to_text()
embedding = openai_client.embeddings.create(
input=text, model="text-embedding-3-small"
).data[0].embedding
your_vector_store.upsert(id=payload.session_id or "unknown", vector=embedding)
ConnectorClient(url=STREAM_URL, token=API_TOKEN) \
.on_actions(on_actions) \
.run()
Drop handling (high-volume)
If your callback is slower than the incoming event rate, events are queued internally. When the queue is full, events are dropped. Register a handler to be notified:
def on_drop(payload, total_dropped):
print(f"WARNING: dropped event (type={payload['type']}, total={total_dropped})")
# alert your on-call rotation, increment a metric, etc.
ConnectorClient(url=STREAM_URL, token=API_TOKEN, max_queue_size=1000) \
.on_actions(on_actions) \
.on_drop(on_drop) \
.run()
# Check the running drop count at any time:
print(client.dropped_count) # events dropped so far
print(client.queue_size) # events waiting in the queue right now
Payload schemas
actions event
{
"type": "actions",
"session_id": "ps_abc123",
"ts": "2024-01-15T10:30:00Z",
"actions": [
{
"event_type": "pageview",
"title": "Dashboard",
"canonical_url": "https://app.example.com/dashboard",
"ts": "2024-01-15T10:30:00Z",
"properties": {
"referrer": "https://app.example.com/login"
}
},
{
"event_type": "click",
"title": "Export CSV button",
"canonical_url": "https://app.example.com/dashboard",
"ts": "2024-01-15T10:30:05Z",
"properties": {}
}
]
}
| Field | Type | Description |
|---|---|---|
type |
string | Always "actions" |
session_id |
string | Unique user session identifier |
ts |
string | ISO-8601 timestamp of the batch |
actions |
array | Ordered list of user actions in this batch |
actions[].event_type |
string | pageview, click, input, … |
actions[].title |
string | Human-readable page or element title |
actions[].canonical_url |
string | Page URL where the action occurred |
actions[].ts |
string | ISO-8601 timestamp of the individual action |
actions[].properties |
object | Additional event properties (may be empty) |
summary event
{
"type": "summary",
"session_id": "ps_abc123",
"ts": "2024-01-15T10:35:00Z",
"content": "The user navigated to the Dashboard, exported a CSV report, then opened account settings to update their billing plan."
}
| Field | Type | Description |
|---|---|---|
type |
string | Always "summary" |
session_id |
string | Unique user session identifier |
ts |
string | ISO-8601 timestamp when the summary was made |
content |
string | Prose summary of the session up to this point |
API reference
Typed models
| Class | Description |
|---|---|
SlimAction |
One UI action: title, description, canonical_url + .to_text() |
ActionsPayload |
A batch of actions for a session + .to_text() for embedding |
SummaryPayload |
LLM prose summary for a session + .to_text() for embedding |
ConnectorClient(url, token="", max_queue_size=500)
| Parameter | Type | Default | Description |
|---|---|---|---|
url |
str | — | Full URL to GET /stream/{product_id} on the connector |
token |
str | "" |
Unkey API key (uk_live_...) |
max_queue_size |
int | 500 |
Max events buffered before drops start occurring |
Methods
| Method | Returns | Description |
|---|---|---|
.on_actions(fn) |
ConnectorClient |
Register callback; fn receives ActionsPayload |
.on_summary(fn) |
ConnectorClient |
Register callback; fn receives SummaryPayload |
.on_drop(fn) |
ConnectorClient |
Register callback when events are dropped (queue full) |
.run() |
None (blocks) |
Connect and process events; reconnects automatically |
.run_in_background() |
threading.Thread |
Start the client on a daemon thread; returns immediately |
.stop() |
None |
Signal the client to stop cleanly |
Properties
| Property | Type | Description |
|---|---|---|
dropped_count |
int | Running total of events dropped due to a full queue |
queue_size |
int | Number of events currently waiting in the queue |
AsyncConnectorClient(url, token="")
Async-native client for asyncio environments. Callbacks are async def coroutines.
| Parameter | Type | Default | Description |
|---|---|---|---|
url |
str | — | Full URL to GET /stream/{product_id} on the connector |
token |
str | "" |
Unkey API key (uk_live_...) |
Methods
| Method | Returns | Description |
|---|---|---|
.on_actions(fn) |
AsyncConnectorClient |
Register async callback; fn receives ActionsPayload |
.on_summary(fn) |
AsyncConnectorClient |
Register async callback; fn receives SummaryPayload |
.run() |
Coroutine |
Connect and process events (await this) |
.run_in_background() |
asyncio.Task |
Schedule run() as a background task; returns immediately |
.stop() |
None |
Signal the client to stop cleanly |
Supports async with for automatic stop() on exit.
Reconnection behaviour
The client reconnects automatically on any network failure or non-fatal HTTP error. It uses exponential backoff starting at 1 s and capping at 30 s.
Fatal HTTP errors (401 Unauthorized, 403 Forbidden, 404 Not Found) are
not retried and will raise immediately — check your url and token.
Logging
The SDK logs to the standard Python logging hierarchy under the logger name
autoplay_sdk. Enable it in your application:
import logging
logging.basicConfig(level=logging.INFO)
Or to see only SDK output:
logging.getLogger("autoplay_sdk").setLevel(logging.DEBUG)
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file autoplay_sdk-0.1.0.tar.gz.
File metadata
- Download URL: autoplay_sdk-0.1.0.tar.gz
- Upload date:
- Size: 18.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
686b0d849b92b3b9f36bbbb833f342ecef7fbde0b6ce85f0c5f1981f0f4217d4
|
|
| MD5 |
1b175edd974bb6a0c319cdf7d9f3fb12
|
|
| BLAKE2b-256 |
6b43aee19b2900bcda9b9b42deaee054666b814a6ccc4cfbcb752ec39a5a3aa9
|
Provenance
The following attestation bundles were made for autoplay_sdk-0.1.0.tar.gz:
Publisher:
publish-sdk.yml on Autoplay-AI/real-time-poc
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
autoplay_sdk-0.1.0.tar.gz -
Subject digest:
686b0d849b92b3b9f36bbbb833f342ecef7fbde0b6ce85f0c5f1981f0f4217d4 - Sigstore transparency entry: 1249829381
- Sigstore integration time:
-
Permalink:
Autoplay-AI/real-time-poc@03669aaa57f37f343104baf6565fac0bdcd01e49 -
Branch / Tag:
refs/tags/sdk-v0.1.0 - Owner: https://github.com/Autoplay-AI
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-sdk.yml@03669aaa57f37f343104baf6565fac0bdcd01e49 -
Trigger Event:
push
-
Statement type:
File details
Details for the file autoplay_sdk-0.1.0-py3-none-any.whl.
File metadata
- Download URL: autoplay_sdk-0.1.0-py3-none-any.whl
- Upload date:
- Size: 21.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
198feaf3451978671d9fe47b118770dfdcf682c7bcc36c856df6c4b1e726687c
|
|
| MD5 |
9b04f81e4900a8620aa41a86cbb84cf7
|
|
| BLAKE2b-256 |
e62ef88fda114008f5ddfb9104e1df6c49dfeae0cff30edcce5c0ba967b634d7
|
Provenance
The following attestation bundles were made for autoplay_sdk-0.1.0-py3-none-any.whl:
Publisher:
publish-sdk.yml on Autoplay-AI/real-time-poc
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
autoplay_sdk-0.1.0-py3-none-any.whl -
Subject digest:
198feaf3451978671d9fe47b118770dfdcf682c7bcc36c856df6c4b1e726687c - Sigstore transparency entry: 1249829385
- Sigstore integration time:
-
Permalink:
Autoplay-AI/real-time-poc@03669aaa57f37f343104baf6565fac0bdcd01e49 -
Branch / Tag:
refs/tags/sdk-v0.1.0 - Owner: https://github.com/Autoplay-AI
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-sdk.yml@03669aaa57f37f343104baf6565fac0bdcd01e49 -
Trigger Event:
push
-
Statement type: