SDK for XRTC API - the next generation TCP streaming protocol
Project description
XRTC API
This is an SDK for XRTC API in Python. It implements the following features:
- non-async context manager with requests package, error management
- async context manager with asyncio/aiohttp for handling parallel HTTP requests, error management
- login and connection configurations loading from .env file or from the environment
- configurations, serialized and deserialized request bodies and response data models and parser with Pydantic
To start using XRTC, please obtain your free API token at XRTC web site
This project is sponsored by Delta Cygni Labs Ltd with the headquarters in Tampere, Finland.
Installation
Installation from Pypi (add --upgrade to update):
pip install xrtc
From source (add -e to make it editable):
pip install .
Login credentials and connection URLs
Login credentials are taken from the environment or from a .env file (e.g. xrtc.env) placed to the work directory. Here is the format of a sample file:
# XRTC credentials
ACCOUNT_ID=123
API_KEY=xxx
Usage examples
See more on GitHub, examples directory.
Simple set and get:
import os
from xrtc import XRTC
# Connection credentials from environment variables
# Get your free account and API key from https://xrtc.org
os.environ["ACCOUNT_ID"] = "123456789"
os.environ["API_KEY"] = "****"
with XRTC() as xrtc:
# Upload data
xrtc.set_item(items=[{"portalid": "send", "payload": "sample"}])
# Download data and parse it
print((xrtc.get_item(portals=[{"portalid": "send"}])).dict())
The same example with the async context manager:
import os
from xrtc import AXRTC
async def main():
# Connection credentials from environment variables
# Get your free account and API key from https://xrtc.org
os.environ["ACCOUNT_ID"] = "123456789"
os.environ["API_KEY"] = "****"
# Use async context manager
async with AXRTC() as xrtc:
# Upload data
await xrtc.set_item(items=[{"portalid": "send", "payload": "sample"}])
# Download data and parse it
print((await xrtc.get_item(portals=[{"portalid": "send"}])).dict())
AXRTC.run(main())
A more sophisticated example for continuous setting and getting with XRTC and async context manager. Measures end-to-end latency in ms.
import asyncio
from time import time
import json
from xrtc import AXRTC
class LatencyTest:
def __init__(self):
self.test_running = True
asyncio.run(self.execute())
async def setting(self):
"""Set time co-routine."""
async with AXRTC(env_file_credentials="xrtc_set.env") as xrtc:
# Keep uploading items
for counter in range(1, 100):
payload = json.dumps({"counter": counter, "time": str(time())})
await xrtc.set_item(items=[{"portalid": "latency", "payload": payload}])
await asyncio.sleep(0.1)
# Uploading finished, sleep to let all items arrive
self.test_running = False
await asyncio.sleep(1)
async def getting(self):
"""Get time co-routine."""
async with AXRTC(env_file_credentials="xrtc_get.env") as xrtc:
# Keep polling for items
while self.test_running:
response = await xrtc.get_item(portals=[{"portalid": "latency"}], polling=1)
if response.items is not None:
for item in response.items:
payload = json.loads(item.payload)
print(payload["counter"], (time() - float(payload["time"])) * 1000)
await asyncio.sleep(0)
async def execute(self):
"""Launch parallel setting and getting tasks."""
await asyncio.gather(self.setting(), self.getting())
LatencyTest()
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.