Skip to main content

Real-time event streaming client for Autoplay connectors

Project description

autoplay-sdk

Real-time event streaming client for Autoplay connectors.

Receive live UI actions and session summaries from the Autoplay event connector via SSE (Server-Sent Events) — ideal as a real-time data source for RAG pipelines, analytics systems, or custom automation.


Installation

pip install autoplay-sdk

Requirements: Python 3.10+


Quickstart

from autoplay_sdk import ConnectorClient, ActionsPayload

STREAM_URL = "https://your-connector.onrender.com/stream/YOUR_PRODUCT_ID"
API_TOKEN  = "uk_live_..."

def on_actions(payload: ActionsPayload):
    print(payload.to_text())

ConnectorClient(url=STREAM_URL, token=API_TOKEN) \
    .on_actions(on_actions) \
    .run()

run() blocks and reconnects automatically on any network failure. Press Ctrl-C to stop.


Typed payloads

All callbacks receive typed dataclass instances — not raw dicts. Your IDE will autocomplete fields and you get a .to_text() method on every payload that returns an embedding-ready string.

from autoplay_sdk import ActionsPayload, SummaryPayload

def on_actions(payload: ActionsPayload):
    print(payload.session_id)       # str | None
    print(payload.email)            # str | None
    print(payload.count)            # int
    for action in payload.actions:
        print(action.title)         # str
        print(action.description)   # str
        print(action.canonical_url) # str

    # embedding-ready text — one call, no formatting logic needed
    text = payload.to_text()
    # "Session ps_abc123 — 3 actions\n1. Viewed Dashboard — https://...\n..."

def on_summary(payload: SummaryPayload):
    print(payload.session_id)
    print(payload.replaces)         # number of actions this summary replaces
    text = payload.to_text()        # the prose summary string directly

Background usage (non-blocking)

client = ConnectorClient(url=STREAM_URL, token=API_TOKEN)
client.on_actions(on_actions)
client.run_in_background()

# your application continues here
import time
time.sleep(60)

client.stop()

Context manager

with ConnectorClient(url=STREAM_URL, token=API_TOKEN) as client:
    client.on_actions(on_actions).run_in_background()
    do_other_work()
# stop() is called automatically on exit

Async RAG pipeline (recommended)

Use AsyncConnectorClient when your RAG pipeline is built on asyncio — LangChain, LlamaIndex, FastAPI, or any framework where embedding/vector calls are already async.

import asyncio
import openai
from autoplay_sdk import AsyncConnectorClient, ActionsPayload, SummaryPayload

openai_client = openai.AsyncOpenAI()

async def on_actions(payload: ActionsPayload):
    """Embed the batch and upsert into your vector store."""
    text = payload.to_text()  # embedding-ready string, no formatting needed
    response = await openai_client.embeddings.create(
        input=text, model="text-embedding-3-small"
    )
    embedding = response.data[0].embedding

    # upsert into Pinecone, Weaviate, Chroma, pgvector, etc.
    await your_vector_store.upsert(
        id=payload.session_id or "unknown",
        vector=embedding,
        metadata={
            "session_id": payload.session_id,
            "email":      payload.email,
            "count":      payload.count,
        },
    )

async def on_summary(payload: SummaryPayload):
    """Replace the raw action history with a compact prose summary."""
    text = payload.to_text()  # the prose summary string directly
    print(f"[summary] session={payload.session_id}: {text}")
    # store in your RAG context window / vector store as well

async def main():
    async with AsyncConnectorClient(url=STREAM_URL, token=API_TOKEN) as client:
        client.on_actions(on_actions).on_summary(on_summary)
        await client.run()

asyncio.run(main())

Non-blocking inside an existing event loop

client = AsyncConnectorClient(url=STREAM_URL, token=API_TOKEN)
client.on_actions(on_actions)
task = client.run_in_background()  # asyncio.Task — returns immediately

await do_other_async_work()

client.stop()
await task

Sync RAG pipeline

If your pipeline is synchronous (no async/await), use ConnectorClient. Slow callbacks are safe — the SSE reader and the callback executor run on separate threads.

import openai
from autoplay_sdk import ConnectorClient, ActionsPayload

openai_client = openai.OpenAI()

def on_actions(payload: ActionsPayload):
    text = payload.to_text()
    embedding = openai_client.embeddings.create(
        input=text, model="text-embedding-3-small"
    ).data[0].embedding
    your_vector_store.upsert(id=payload.session_id or "unknown", vector=embedding)

ConnectorClient(url=STREAM_URL, token=API_TOKEN) \
    .on_actions(on_actions) \
    .run()

Drop handling (high-volume)

If your callback is slower than the incoming event rate, events are queued internally. When the queue is full, events are dropped. Register a handler to be notified:

def on_drop(payload, total_dropped):
    print(f"WARNING: dropped event (type={payload['type']}, total={total_dropped})")
    # alert your on-call rotation, increment a metric, etc.

ConnectorClient(url=STREAM_URL, token=API_TOKEN, max_queue_size=1000) \
    .on_actions(on_actions) \
    .on_drop(on_drop) \
    .run()

# Check the running drop count at any time:
print(client.dropped_count)   # events dropped so far
print(client.queue_size)      # events waiting in the queue right now

Payload schemas

actions event

{
  "type":       "actions",
  "session_id": "ps_abc123",
  "ts":         "2024-01-15T10:30:00Z",
  "actions": [
    {
      "event_type":     "pageview",
      "title":          "Dashboard",
      "canonical_url":  "https://app.example.com/dashboard",
      "ts":             "2024-01-15T10:30:00Z",
      "properties": {
        "referrer": "https://app.example.com/login"
      }
    },
    {
      "event_type":     "click",
      "title":          "Export CSV button",
      "canonical_url":  "https://app.example.com/dashboard",
      "ts":             "2024-01-15T10:30:05Z",
      "properties": {}
    }
  ]
}
Field Type Description
type string Always "actions"
session_id string Unique user session identifier
ts string ISO-8601 timestamp of the batch
actions array Ordered list of user actions in this batch
actions[].event_type string pageview, click, input, …
actions[].title string Human-readable page or element title
actions[].canonical_url string Page URL where the action occurred
actions[].ts string ISO-8601 timestamp of the individual action
actions[].properties object Additional event properties (may be empty)

summary event

{
  "type":       "summary",
  "session_id": "ps_abc123",
  "ts":         "2024-01-15T10:35:00Z",
  "content":    "The user navigated to the Dashboard, exported a CSV report, then opened account settings to update their billing plan."
}
Field Type Description
type string Always "summary"
session_id string Unique user session identifier
ts string ISO-8601 timestamp when the summary was made
content string Prose summary of the session up to this point

API reference

Typed models

Class Description
SlimAction One UI action: title, description, canonical_url + .to_text()
ActionsPayload A batch of actions for a session + .to_text() for embedding
SummaryPayload LLM prose summary for a session + .to_text() for embedding

ConnectorClient(url, token="", max_queue_size=500)

Parameter Type Default Description
url str Full URL to GET /stream/{product_id} on the connector
token str "" Unkey API key (uk_live_...)
max_queue_size int 500 Max events buffered before drops start occurring

Methods

Method Returns Description
.on_actions(fn) ConnectorClient Register callback; fn receives ActionsPayload
.on_summary(fn) ConnectorClient Register callback; fn receives SummaryPayload
.on_drop(fn) ConnectorClient Register callback when events are dropped (queue full)
.run() None (blocks) Connect and process events; reconnects automatically
.run_in_background() threading.Thread Start the client on a daemon thread; returns immediately
.stop() None Signal the client to stop cleanly

Properties

Property Type Description
dropped_count int Running total of events dropped due to a full queue
queue_size int Number of events currently waiting in the queue

AsyncConnectorClient(url, token="")

Async-native client for asyncio environments. Callbacks are async def coroutines.

Parameter Type Default Description
url str Full URL to GET /stream/{product_id} on the connector
token str "" Unkey API key (uk_live_...)

Methods

Method Returns Description
.on_actions(fn) AsyncConnectorClient Register async callback; fn receives ActionsPayload
.on_summary(fn) AsyncConnectorClient Register async callback; fn receives SummaryPayload
.run() Coroutine Connect and process events (await this)
.run_in_background() asyncio.Task Schedule run() as a background task; returns immediately
.stop() None Signal the client to stop cleanly

Supports async with for automatic stop() on exit.


Reconnection behaviour

The client reconnects automatically on any network failure or non-fatal HTTP error. It uses exponential backoff starting at 1 s and capping at 30 s.

Fatal HTTP errors (401 Unauthorized, 403 Forbidden, 404 Not Found) are not retried and will raise immediately — check your url and token.


Logging

The SDK logs to the standard Python logging hierarchy under the logger name autoplay_sdk. Enable it in your application:

import logging
logging.basicConfig(level=logging.INFO)

Or to see only SDK output:

logging.getLogger("autoplay_sdk").setLevel(logging.DEBUG)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

autoplay_sdk-0.1.0.tar.gz (18.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

autoplay_sdk-0.1.0-py3-none-any.whl (21.7 kB view details)

Uploaded Python 3

File details

Details for the file autoplay_sdk-0.1.0.tar.gz.

File metadata

  • Download URL: autoplay_sdk-0.1.0.tar.gz
  • Upload date:
  • Size: 18.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for autoplay_sdk-0.1.0.tar.gz
Algorithm Hash digest
SHA256 686b0d849b92b3b9f36bbbb833f342ecef7fbde0b6ce85f0c5f1981f0f4217d4
MD5 1b175edd974bb6a0c319cdf7d9f3fb12
BLAKE2b-256 6b43aee19b2900bcda9b9b42deaee054666b814a6ccc4cfbcb752ec39a5a3aa9

See more details on using hashes here.

Provenance

The following attestation bundles were made for autoplay_sdk-0.1.0.tar.gz:

Publisher: publish-sdk.yml on Autoplay-AI/real-time-poc

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file autoplay_sdk-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: autoplay_sdk-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 21.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for autoplay_sdk-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 198feaf3451978671d9fe47b118770dfdcf682c7bcc36c856df6c4b1e726687c
MD5 9b04f81e4900a8620aa41a86cbb84cf7
BLAKE2b-256 e62ef88fda114008f5ddfb9104e1df6c49dfeae0cff30edcce5c0ba967b634d7

See more details on using hashes here.

Provenance

The following attestation bundles were made for autoplay_sdk-0.1.0-py3-none-any.whl:

Publisher: publish-sdk.yml on Autoplay-AI/real-time-poc

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page