Skip to main content

SDK for XRTC API - the next generation TCP streaming protocol

Project description

XRTC API

xrtc

XRTC is the next generation ultra-low latency TCP streaming protocol. It is 30-50x faster than LL-HLS and RTMP, and it is 2-5x faster than WebRTC. Unlike UDP-based WebRTC, XRTC uses a pure TCP/HTTP for ease of cross-firewall deployment and security.

This is an SDK for XRTC API in Python. The SDK implements the following convenience features:

  • non-async context manager with requests package, error management
  • async context manager with asyncio/aiohttp for handling parallel HTTP requests, error management
  • login and connection configurations loading from .env file or from the environment
  • configurations, serialized and deserialized request bodies and response data models and parser with Pydantic

To start using XRTC, please obtain your free API token at XRTC web site

This project is maintained by Delta Cygni Labs Ltd with the headquarters in Tampere, Finland.

Installation

Installation from Pypi:

pip install xrtc

Update from Pypi if you have already installed the package:

pip install xrtc --upgrade

Installation from source (advanced users only):

pip install .

Installation from source if you want the package to be editable (advanced users only):

pip install . -e

Login credentials and connection URLs

Login credentials you can provide as arguments to the contexts or specify in a dotenv file (by default xrtc.env) placed to the work directory.

Example of xrtc.env file content:

# XRTC credentials
ACCOUNT_ID=AC0987654321012345
API_KEY=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

Usage examples

See more on GitHub, examples directory.

Simple set and get:

from xrtc import XRTC

# Get your free account and API key from https://xrtc.org
with XRTC(account_id="AC0987654321012345", api_key="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx") as xrtc:
    # Upload an item
    xrtc.set_item(items=[{"portalid": "exampleportal", "payload": "examplepayload"}])

    # Download items and iterate through them
    for item in xrtc.get_item(portals=[{"portalid": "exampleportal"}]):
        print(item.dict())

The same example with the async context manager:

from xrtc import AXRTC

async def main():
    """Async function that enables the use of async context manager."""

    # Get your free account and API key from https://xrtc.org
    async with AXRTC(account_id="AC0987654321012345", api_key="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx") as xrtc:
        # Upload an item
        await xrtc.set_item(items=[{"portalid": "exampleportal", "payload": "examplepayload"}])

        # Download items and iterate through them
        async for item in xrtc.get_item(portals=[{"portalid": "exampleportal"}]):
            print(item.dict())


AXRTC.run(main())

A more sophisticated example for continuous setting and getting with XRTC and async context manager. Measures end-to-end latency in ms. Note different get item modes (watch, probe) as well as cutoff parameter to discard the items from previous runs. Two set of credentials are used for setting and getting: the accountid is the same, but the apikey are different (request them twice from XRTC web site), and the credentials are loaded from the respective dotenv files.

import asyncio
from time import time
import json

from xrtc import AXRTC


class LatencyTest:
    def __init__(self):
        self.test_running = True

    async def setting(self):
        """Set time co-routine."""
        async with AXRTC(env_file_credentials="xrtc_set.env") as xrtc:
            # Keep uploading items
            for counter in range(0, 100):
                payload = json.dumps({"time": str(time())})
                await xrtc.set_item(items=[{"portalid": "latency", "payload": payload}])
                await asyncio.sleep(0.1)

        # Uploading finished, sleep to let all items arrive
        await asyncio.sleep(1)
        self.test_running = False

    async def getting(self):
        """Get time co-routine."""
        mean = 0
        iteration = 0
        async with AXRTC(env_file_credentials="xrtc_get.env") as xrtc:
            # Keep polling for items
            while self.test_running:
                # mode="watch" means wait until there is fresh item. Compare to mode="probe"
                # cutoff=500 discards the items older than 500ms, e.g. from the previous run
                # iterate through the items (a single request may bring several items)
                async for item in xrtc.get_item(
                    portals=[{"portalid": "latency"}], mode="stream", cutoff=500
                ):
                    received_time = json.loads(item.payload)["time"]
                    latency = round((time() - float(received_time)) * 1000, 1)

                    # Recurring sample mean
                    mean = round(1 / (iteration + 1) * (mean * iteration + latency), 1)
                    iteration += 1

                    print(f"{iteration = }: {latency = } ms, {mean = } ms")

    async def execute(self):
        """Launch parallel setting and getting tasks."""
        await asyncio.gather(self.setting(), self.getting())


latency_test = LatencyTest()
asyncio.run(latency_test.execute())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

xrtc-0.1.5.tar.gz (10.0 kB view hashes)

Uploaded Source

Built Distribution

xrtc-0.1.5-py3-none-any.whl (10.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page