Skip to main content

Python bindings for Janus webrtc Media Server

Project description

Janus API - Python Client

A modern, async Python client library for the Janus WebRTC Gateway.

Features

  • โœจ Async/Await Support - Built on asyncio for modern Python applications
  • ๐Ÿ”Œ WebSocket Transport - Real-time communication with Janus server via WebSockets
  • ๐Ÿ“น VideoRoom Plugin - Full support for multi-party video conferencing with Publisher/Subscriber patterns
  • ๐ŸŽ™๏ธ AudioBridge Plugin - Audio conferencing capabilities
  • ๐Ÿ’ฌ TextRoom Plugin - Text chat rooms
  • ๐Ÿ“ž P2P Plugin - Peer-to-peer video calls
  • ๐Ÿ“ก Streaming Plugin - Media streaming support
  • โ˜Ž๏ธ SIP Plugin - SIP integration
  • ๐Ÿ”’ Type-Safe - Fully typed with Pydantic models
  • โšก Event-Driven - ReactiveX (RxPY) support for reactive programming
  • ๐Ÿ”„ Auto-Reconnection - Built-in reconnection logic with exponential backoff
  • ๐Ÿ’“ Keep-Alive Management - Automatic session keep-alive handling
  • ๐ŸŒ ASGI Integration - Built-in support for FastAPI/Starlette applications

Installation

# Using uv (recommended)
uv add janus-api

Using pip

pip install janus-api

Quick Start

Basic Session Setup

import asyncio
from janus_api import Janus


async def main():
    # Get the singleton session instance
    session = Janus.get_session()
    await session.create()

    # Your code here...

    # Clean up
    await session.destroy()


asyncio.run(main())

Simplified Setup/Teardown

from janus_api import Janus

Create session in background thread

Janus.setup()

Your application code...

Clean up when done

Janus.teardown()

Configuration

Configure the Janus server URL via environment variable:

# .env file
JANUS_SESSION_URL=ws://localhost:8188/janus

Or use default: ws://localhost:8188/janus

Plugin Usage

Attaching Plugins

The library uses a unified Plugin.attach() interface with automatic plugin discovery:

from janus_api import Plugin

Attach to VideoRoom as publisher

publisher = await Plugin.attach(
    type="videoroom",
    mode="publisher",
    room=1234,
    username="alice"
)

Attach to VideoRoom as subscriber

subscriber = await Plugin.attach(
    type="videoroom",
    mode="subscriber",
    room=1234
)

Other plugins

audiobridge = await Plugin.attach(type="audiobridge", room=5678)
textroom = await Plugin.attach(type="textroom", room=9012)
p2p = await Plugin.attach(type="p2p")
streaming = await Plugin.attach(type="streaming")
sip = await Plugin.attach(type="sip")

Available Plugins

Plugin Type Description
VideoRoom videoroom Multi-party video conferencing
AudioBridge audiobridge Audio conferencing rooms
TextRoom textroom Text chat rooms
P2P p2p Peer-to-peer video calls
Streaming streaming Media streaming
SIP sip SIP gateway integration

VideoRoom Plugin

Publisher Workflow

from janus_api import Plugin

Attach as publisher

publisher = await Plugin.attach(
    type="videoroom",
    mode="publisher",
    room=1234,
    username="alice"
)

Join room and configure media in one step

response = await publisher.join_and_configure(
    sdp="<your-sdp-offer>",
    sdp_type="offer",
    audio=True,
    video=True,
    data=True
)

Handle SDP answer

print(f"SDP Answer: {response.jsep.sdp}")

Send ICE candidates

from janus_api.models.request import TrickleCandidate

await publisher.trickle([
    TrickleCandidate(
        sdpMLineIndex=0,
        candidate="<ice-candidate-string>"
    )
])

Signal ICE complete

await publisher.complete_trickle()

Leave and detach when done

await publisher.leave()
await publisher.detach()

Subscriber Workflow

from janus_api import Plugin
from janus_api.models.videoroom.request import SubscriberStreams

Attach as subscriber

subscriber = await Plugin.attach(
    type="videoroom",
    mode="subscriber",
    room=1234
)

Subscribe to publisher's streams

streams = [
    SubscriberStreams(
        feed='<publisher-id>',
        mid='0',  # optional
        sub_mid='1'  # optional
    )
]

response = await subscriber.join(streams=streams)

Send SDP answer to start receiving media

await subscriber.watch(
    sdp="<your-sdp-answer>",
    sdp_type="answer"
)

Update subscriptions dynamically

await subscriber.update(
    add=[SubscriberStreams(feed='<new-publisher-id>')],
    drop=[{'feed': '<old-publisher-id>'}]
)

Pause/Resume media

await subscriber.pause()
await subscriber.resume()

Leave and detach

await subscriber.leave()
await subscriber.detach()

Room Management

Create a new room

await publisher.create(
    room=1234,
    description="My Video Room",
    publishers=10,
    audiocodec="opus",
    videocodec="vp8",
    record=False,
    permanent=False
)

Check if room exists

exists = await publisher.exists()

List participants

participants = await publisher.participants()
for p in participants:
    print(f"{p.display} - Publisher ID: {p.id}")

Kick a user

await publisher.kick(
    password="<room-password>",
    user_id="<user-id-to-kick>"
)

Moderate room

await publisher.moderate(
    room=1234,
    secret="<admin-secret>",
    moderator_id="<moderator-user-id>"
)

Destroy room

await publisher.destroy(
    secret="<admin-secret>",
    permanent=True
)

Manage allowed tokens

await publisher.allowed(
    passcode="<room-passcode>",
    action="add",
    tokens=["token1", "token2"]
)

Event Handling

Callback-Based Events

from janus_api.models.response import JanusResponse


def on_plugin_event(event: JanusResponse):
    """Handle plugin events"""
    if event.janus == "event":
        print(f"Event: {event.plugindata.data}")
    elif event.janus == "webrtcup":
        print("WebRTC connection is up!")
    elif event.janus == "hangup":
        print("Call ended")


plugin = await Plugin.attach(
    type="videoroom",
    mode="publisher",
    room=1234,
    on_event=on_plugin_event
)

ReactiveX (RxPY) Streams

The library uses ReactiveX for reactive event handling:

def on_rx_event(event):
    """Handle reactive events"""
    event_type = event.get('event')
    payload = event.get('payload')
    sender = event.get('from')
    
    if event_type == 'sdp':
        print(f"Received SDP from {sender}: {payload}")
    elif event_type == 'webrtc':
        print(f"WebRTC event from {sender}: {payload}")


plugin = await Plugin.attach(
    type="videoroom",
    mode="publisher",
    room=1234,
    on_rx_event=on_rx_event
)

# Reactive stream is automatically started
# Stop when needed (usually not required as it's handled internally)
plugin.stop()

ASGI Integration

FastAPI/Starlette Integration

from fastapi import FastAPI
from janus_api.servers.asgi import JanusASGILifespanWrapper

app = FastAPI()

# Wrap your app with Janus lifespan management
app = JanusASGILifespanWrapper(app)

# Janus session is now automatically managed
# It will be created on startup and destroyed on shutdown

@app.get("/")
async def root():
    from janus_api.servers.rpc import get_session
    session = get_session()
    return {"session_id": session.id}

The JanusASGILifespanWrapper handles:

  • โœ… Automatic session creation on application startup
  • โœ… Session keep-alive management
  • โœ… Graceful session destruction on shutdown
  • โœ… WebSocket connection management
  • โœ… ReactiveX stream lifecycle

Django Integration

For Django projects, use the global accessor:

from janus_api import Janus

# In your Django view/middleware
def my_view(request):
    session = Janus.get_session()
    if session:
        # Use the session
        pass

Set up in your Django app configuration:

from django.apps import AppConfig
from janus_api import Janus
from janus_api.session import WebsocketSession

class MyAppConfig(AppConfig):
    def ready(self):
        # Initialize Janus manager
        session = WebsocketSession()
        Janus.set_manager(session)
        
        # Start session
        Janus.setup()

Session Management

Manual Session Creation

from janus_api.session import WebsocketSession

session = WebsocketSession(session_id="custom-id")  # Optional custom ID
await session.create()

# Session automatically manages:
# - WebSocket connection
# - Keep-alive messages (every 15 seconds)
# - Plugin registry
# - Event dispatching

await session.destroy()

Session as Context Manager

from contextlib import asynccontextmanager
from janus_api.servers.rpc import get_session
from janus_api.plugins import Plugin

@asynccontextmanager
async def janus_context():
    session = get_session()
    try:
        await session.create()
        yield session
    finally:
        await session.destroy()

async def manage_session():
    async with janus_context() as session:
        plugin = await Plugin.attach(type="videoroom", mode="publisher", room=1234)
        # Use plugin...

Singleton Pattern

The session uses a singleton pattern via metaclass, ensuring only one session instance exists:

from janus_api.servers.rpc import get_session

session1 = get_session()
session2 = get_session()

assert session1 is session2  # True - same instance

Advanced Features

Plugin Registry

Plugins are automatically registered and discoverable:

from janus_api.plugins.base import PluginRegistry

# List all registered plugins
plugins = PluginRegistry.list_plugins()
print(plugins)  # ['videoroom', 'audiobridge', 'textroom', 'p2p', 'streaming', 'sip']

# Get plugin class by name
VideoRoomPlugin = PluginRegistry.get_plugin('videoroom')

Custom Plugin Development

Create custom plugins by extending PluginBase

from janus_api.plugins.base import PluginBase, PluginRegistry
from janus_api.models.request import JanusRequest
from janus_api.plugins import Plugin

@PluginRegistry.register(name="mycustom")
class MyCustomPlugin(PluginBase):
    name = "janus.plugin.mycustom"
    
    async def custom_method(self):
        request = JanusRequest(janus="create", transaction="...")
        response = await self.send(
            request
        )
        return response

# Use it
async def custom_attach():
    plugin = await Plugin.attach(type="mycustom")
    await plugin.custom_method()

Transport Layer

The WebSocket transport includes:

  • Automatic reconnection with exponential backoff
  • Transaction management for request/response pairing
  • JSEP extraction for SDP handling
  • ReactiveX streams for event dispatching
  • Ping/Pong keepalive (20 second intervals)
# Access transport directly (advanced usage)
from janus_api.servers.rpc import get_session

session = get_session()
transport = session.transport

# Transport metrics
print(transport._metrics)
# {
#     "received": 42,
#     "futures_resolved": 38,
#     "errors": 0,
#     "webrtc_events": 12
# }

Error Handling

from janus_api.core.exceptions import JanusException
from janus_api.plugins import Plugin

async def attach():
    try:
        plugin = await Plugin.attach(type="videoroom", mode="publisher", room=1234)
        await plugin.join()
    except JanusException as e:
        print(f"Janus error: {e}")
    except ConnectionError as e:
        print(f"Connection error: {e}")
    except TimeoutError as e:
        print(f"Request timeout: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")

API Reference

Plugin Base Methods

All plugins inherit these methods:

Method Description
attach() Attach plugin to session
detach() Detach plugin from session
send(body, jsep) Send message to plugin
trickle(candidates) Send ICE candidates
complete_trickle() Signal ICE gathering complete
start() Start reactive stream (auto-called)
stop() Stop reactive stream

VideoRoom Publisher Methods

Method Parameters Description
join(**kwargs) Various room options Join room as publisher
join_and_configure(sdp, sdp_type, **kwargs) SDP + media options Join and configure in one step
publish(sdp, sdp_type, **kwargs) SDP + media options Publish media
configure(sdp, sdp_type, **kwargs) SDP + media options Reconfigure publisher
unpublish() None Stop publishing
leave() None Leave the room
create(**kwargs) Room configuration Create a new room
destroy(secret, permanent) Admin credentials Destroy a room
exists() None Check if room exists
participants() None List room participants
kick(password, user_id) Kick user(s) from room
moderate(**kwargs) Moderation options Moderate room settings
allowed(passcode, action, tokens) Token management Manage allowed tokens

VideoRoom Subscriber Methods

Method Parameters Description
join(streams) List of streams Join as subscriber
subscribe(streams) List of streams Subscribe to streams
watch(sdp, sdp_type) SDP answer Start watching streams
update(add, drop) Stream modifications Update subscriptions
unsubscribe(streams) List of streams Unsubscribe from streams
configure(streams) Stream configurations Configure subscriber streams
pause() None Pause receiving media
resume() None Resume receiving media
leave() None Leave the room

Session Methods

Method Description
create() Create session and connect
destroy() Destroy session and disconnect
attach(plugin) Attach plugin by name
detach(handle_id) Detach plugin by handle ID
send(data) Send request to Janus

Requirements

  • Python 3.10+
  • asyncio
  • websockets
  • pydantic
  • pyee (event emitters)
  • reactivex (reactive streams)
  • decouple (configuration)
  • asgiref (ASGI support)

Project Structure

janus-api/ โ”œโ”€โ”€ src/ โ”‚ โ””โ”€โ”€ janus_api/ โ”‚ โ”œโ”€โ”€ core/ # Core functionality โ”‚ โ”‚ โ”œโ”€โ”€ manager.py # Plugin manager โ”‚ โ”‚ โ”œโ”€โ”€ utils.py # Utilities โ”‚ โ”‚ โ””โ”€โ”€ exceptions.py # Custom exceptions โ”‚ โ”œโ”€โ”€ models/ # Pydantic models โ”‚ โ”‚ โ”œโ”€โ”€ base.py # Base models โ”‚ โ”‚ โ”œโ”€โ”€ request.py # Request models โ”‚ โ”‚ โ”œโ”€โ”€ response.py # Response models โ”‚ โ”‚ โ””โ”€โ”€ videoroom/ # VideoRoom-specific models โ”‚ โ”œโ”€โ”€ plugins/ # Plugin implementations โ”‚ โ”‚ โ”œโ”€โ”€ base.py # Base plugin class โ”‚ โ”‚ โ”œโ”€โ”€ videoroom.py # VideoRoom plugin โ”‚ โ”‚ โ”œโ”€โ”€ audiobridge.py # AudioBridge plugin โ”‚ โ”‚ โ”œโ”€โ”€ textroom.py # TextRoom plugin โ”‚ โ”‚ โ”œโ”€โ”€ p2p.py # P2P plugin โ”‚ โ”‚ โ”œโ”€โ”€ streaming.py # Streaming plugin โ”‚ โ”‚ โ””โ”€โ”€ sip.py # SIP plugin โ”‚ โ”œโ”€โ”€ session/ # Session management โ”‚ โ”‚ โ”œโ”€โ”€ base.py # Abstract base session โ”‚ โ”‚ โ””โ”€โ”€ websocket.py # WebSocket session โ”‚ โ”œโ”€โ”€ transport/ # Transport layer โ”‚ โ”‚ โ””โ”€โ”€ websocket.py # WebSocket client โ”‚ โ”œโ”€โ”€ servers/ # Server integrations โ”‚ โ”‚ โ””โ”€โ”€ asgi/ # ASGI support โ”‚ โ”‚ โ”œโ”€โ”€ asgi.py # Lifespan wrapper โ”‚ โ”‚ โ””โ”€โ”€ rpc.py # RPC helpers โ”‚ โ””โ”€โ”€ types/ # Type definitions โ”œโ”€โ”€ pyproject.toml โ””โ”€โ”€ README.md

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

[MIT]

Credits

Built for use with the Janus WebRTC Gateway.

Support

For issues and questions:

  • GitHub Issues: [https://github.com/Leydotpy/Janus-API/issues]
  • Documentation: [https://pypi.org/project/janus-api/]

Roadmap

  • HTTP/REST transport support
  • Additional plugin method implementations
  • Comprehensive test coverage
  • Enhanced documentation with more examples
  • Type stubs for better IDE support
  • Recording and playback support
  • E2E encryption helpers

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

janus_api-0.1.5.tar.gz (44.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

janus_api-0.1.5-py3-none-any.whl (48.0 kB view details)

Uploaded Python 3

File details

Details for the file janus_api-0.1.5.tar.gz.

File metadata

  • Download URL: janus_api-0.1.5.tar.gz
  • Upload date:
  • Size: 44.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.7

File hashes

Hashes for janus_api-0.1.5.tar.gz
Algorithm Hash digest
SHA256 826110d6d3f099c9d265a84f676115d254a8b9b1e4ca856962574a1f1b79952c
MD5 91eb5ffb306c5a59d2fa8c91fe196ff7
BLAKE2b-256 74eac3f4b0e22c22cab17df97b338e87e5c96d34046643817f509645f3115e2d

See more details on using hashes here.

File details

Details for the file janus_api-0.1.5-py3-none-any.whl.

File metadata

  • Download URL: janus_api-0.1.5-py3-none-any.whl
  • Upload date:
  • Size: 48.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.7

File hashes

Hashes for janus_api-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 c804531b3be456ce5e91a0fdfbb6d3acc3a6f044493df9936dc57bfe27832b62
MD5 44165d2aa748014ebaeb270487df76d9
BLAKE2b-256 b40399827f601378045157c93716bfe07f4f091b834fe342c26a9a8cc1c2f70a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page