Skip to main content

Asynchronous orchestration toolkit for Janus Streaming and VideoRoom pipelines, covering mountpoint/room provisioning, RTP forwarding, FFmpeg/GStreamer relays, OBS ingest control, queue workers, lifecycle cleanup, health reporting, and autoscaling signals.

Project description

streaming_utils

streaming_utils is an asynchronous orchestration library for Janus-based media systems. It sits above janus_api and coordinates complete broadcast lifecycles: creating and tearing down Janus Streaming mountpoints, ensuring Janus VideoRoom rooms, wiring rtp_forward from publishers, starting relay pipelines (FFmpeg or GStreamer), controlling OBS over WebSocket v5, and executing the same flows in queue-backed workers.

The package is built for service backends that need predictable, observable media operations across start, run, and stop phases. It includes cleanup stacks for reverse-order teardown, relay and connection health reporting, observer hooks for operational events/errors, and autoscaling signals based on queue backlog.

Core capabilities include:

  • Janus Streaming mountpoint provisioning and cleanup
  • Janus VideoRoom room creation/ensure and RTP forwarding
  • FFmpeg/GStreamer relay supervision with restart and health checks
  • OBS WebSocket v5 ingest configuration and control
  • Redis Streams and Kafka queue backends for worker-driven orchestration
  • structured lifecycle management, health snapshots, and autoscale hooks

The orchestration interfaces are protocol-based, but this implementation is intentionally integrated with this repository's janus_api package:

  • streaming_utils.models imports janus_api.conf.settings
  • default Janus integration is implemented in streaming_utils.adapters
  • examples in this README use janus_api plugin handles directly

What the package is for

Use streaming_utils when you already have Janus plugin handles from janus_api and want one higher-level utility that can:

  • create a Janus Streaming RTP mountpoint
  • optionally ensure a VideoRoom exists
  • start rtp_forward from a VideoRoom publisher into the mountpoint's RTP ports
  • start an FFmpeg or GStreamer relay into Janus for RTMP/SRS or OBS-based flows
  • manage cleanup if the Janus resources are owned by your application
  • run the same work synchronously or through a queue worker

Typical use cases:

  • bridge a Janus VideoRoom publisher into a Janus Streaming mountpoint
  • relay an externally published RTMP feed into Janus Streaming
  • control OBS to publish to SRS, then relay that ingest into Janus
  • wrap the whole flow in a worker process with Redis Streams or Kafka

Package map

  • models.py: dataclass models for tracks, mountpoints, VideoRoom specs, source configs, and broadcast requests/results
  • types.py: TypedDict equivalents for service-layer keyword inputs
  • adapters.py: protocol definitions and Janus adapters that wrap janus_api plugin objects
  • orchestrator.py: main runtime coordinator
  • service.py: thin facade around the orchestrator
  • process.py: managed subprocess lifecycle with restart/backoff and health checks
  • lib/ffmpeg.py: FFmpeg relay wrapper
  • lib/gstreamer.py: GStreamer relay wrapper
  • lib/obs.py: OBS WebSocket client and bridge helpers
  • lib/srs.py: small SRS ingest spec model
  • queue.py: queue protocol plus Redis Streams and Kafka backends
  • observer.py: logging/event/error/metrics hooks
  • lifecycle.py: reverse-order async cleanup manager used by the orchestrator
  • cleanup.py: a second cleanup stack helper that is currently not wired into the orchestrator
  • autoscale.py: backlog-driven autoscaling helpers
  • exceptions.py: package-specific exception types

How it plugs into janus_api

Required Janus-side objects

The orchestrator needs two backend objects:

  • a Streaming backend
  • a VideoRoom backend

When you use janus_api, you normally pass:

  • a StreamingPlugin instance for Streaming mountpoint work
  • a Publisher instance for VideoRoom work

Those are adapted by streaming_utils.adapters.

The exact janus_api methods the adapters expect

JanusStreamingBackend wraps a janus_api Streaming plugin and calls:

JanusVideoRoomBackend wraps a janus_api VideoRoom publisher plugin and calls:

  • plugin.exists(room=...)
  • plugin.create(**kwargs)
  • plugin.rtp_forward(spec)
  • plugin.destroy(room=..., permanent=..., secret=...)

The VideoRoom backend therefore needs a Publisher-style plugin handle, not a Subscriber, because the adapter calls rtp_forward(). That method is implemented on janus_api/lib/plugins/videoroom.py by Publisher.

Real janus_api integration example

import asyncio

from janus_api.session.websocket import WebsocketSession
from janus_api.lib.plugins.streaming import StreamingPlugin
from janus_api.lib.plugins.videoroom import Publisher

from streaming_utils.models import AudioTrack, BroadcastRequest, MountPoint, VideoTrack
from streaming_utils.orchestrator import BroadcastOrchestrator


async def main() -> None:
    session = WebsocketSession()
    await session.create()

    try:
        streaming = await StreamingPlugin(
            session=session,
            admin_key="stream-admin-key",  # optional, but this is where Streaming admin_key belongs
        ).attach()

        videoroom = await Publisher(
            session=session,
            room=1234,
            username="stream-bridge",
        ).attach()

        orchestrator = BroadcastOrchestrator(
            streaming=streaming,
            videoroom=videoroom,
            description="VideoRoom publisher relay",
        )

        result = await orchestrator.start(
            BroadcastRequest(
                source_kind="videoroom",
                relay_kind="none",
                room_id=1234,
                publisher_id=5678,
                create_mountpoint=True,
                owned_by_app=False,
                host="127.0.0.1",
                mountpoint_spec=MountPoint(
                    id=9001,
                    media=[
                        VideoTrack(mid="v", port=5004, pt=96, codec="H264"),
                        AudioTrack(mid="a", port=5006, pt=111, codec="opus"),
                    ],
                ),
            )
        )

        print(result.broadcast_id)
        print(result.state)
        print(result.mountpoint_response)
        print(result.forward_response)
    finally:
        await session.destroy()


asyncio.run(main())

Important points in that example:

  • owned_by_app=False is safer when you are bridging into a room your application did not create itself
  • StreamingPlugin(admin_key=...) is the practical place to provide the Streaming admin key when using the adapter path
  • the orchestrator accepts raw janus_api plugin instances; you do not have to wrap them yourself

Adapter field mapping to janus_api

streaming_utils models use friendly Python field names, but Janus Streaming create requests still expect Janus-specific names. adapters.py translates the important differences for StreamingPlugin.create().

Key mappings:

streaming_utils model field Janus / janus_api create key
multicast or mcast mcast
bind_interface or iface iface
rtp_port rtcpport
h264_sps h264sps
data_type datatype
buffer_latest_message databuffermsg

Most mountpoint-level fields already match the janus_api.models.streaming.request.CreateRequest model in janus_api/models/streaming/request.py.

For VideoRoom rtp_forward, the streaming_utils request model already matches the janus_api.models.videoroom.request.RTPForwardRequest shape in janus_api/models/videoroom/request.py, so the adapter mainly passes the spec through.

Core models

Track models

models.py defines:

  • Track
  • AudioTrack
  • VideoTrack
  • DataTrack

These are used to populate MountPoint.media.

Example:

from streaming_utils.models import AudioTrack, MountPoint, VideoTrack

mountpoint = MountPoint(
    id=9001,
    description="camera feed",
    enabled=True,
    media=[
        VideoTrack(
            mid="v",
            port=5004,
            pt=96,
            codec="H264",
            fmtp="packetization-mode=1;profile-level-id=42e01f",
        ),
        AudioTrack(
            mid="a",
            port=5006,
            pt=111,
            codec="opus",
            fmtp="minptime=10;useinbandfec=1",
        ),
    ],
)

MountPoint

MountPoint is the Janus Streaming create-spec model. It supports:

  • type: "rtp", "live", "ondemand", or "rtsp"
  • mountpoint identity and access fields such as id, description, secret, pin, is_private
  • RTP/media settings through media
  • RTSP-specific fields such as url, rtsp_user, rtsp_pwd, rtsp_timeout, and related flags
  • extra for direct payload extension

RoomSpec and RTPForwardRequestSpec

These model the Janus VideoRoom side:

  • RoomSpec: room create/ensure payload
  • RTPForwardRequestSpec: rtp_forward request
  • RTPForwardStreamSpec: per-stream forward descriptor

RoomSpec.admin_key and RTPForwardRequestSpec.admin_key are part of the dataclasses and can be passed directly to the VideoRoom side.

SRSIngestSpec and RTSPCameraConfig

  • SRSIngestSpec builds publish_url as "{rtmp_url}/{app}/{stream_key}"
  • RTSPCameraConfig stores url, credentials, transport, and timeout settings

BroadcastRequest

BroadcastRequest is the top-level input model the orchestrator understands.

Important fields:

  • source_kind
  • relay_kind
  • room_id
  • publisher_id
  • create_mountpoint
  • owned_by_app
  • host
  • mountpoint_spec
  • input_url
  • gstreamer_pipeline
  • srs
  • room_spec
  • rtsp_camera

mountpoint_spec.media is the canonical Janus stream definition. Each item in that list carries the per-track port, payload type, codec, and optional mindex used to map multiple audio or video source streams to the correct Janus destination.

BroadcastRequest.to_payload() and .from_payload() exist for queue serialization, not for Janus plugin calls directly.

BroadcastResult

BroadcastResult is the orchestrator output. It holds:

  • the original request
  • broadcast_id
  • state
  • room_response
  • mountpoint_response
  • forward_response
  • relay_info
  • srs_info
  • queue_info
  • health
  • note

Backend responses are intentionally not normalized. When using janus_api, fields such as mountpoint_response, room_response, and forward_response may contain raw JanusResponse objects. In the "room already exists" path, room_response may instead be the synthetic dict {"exists": True, "room": <room_id>}.

Supported source flows

The orchestrator does not treat all BroadcastRequest.source_kind values the same way.

1. source_kind="videoroom"

Behavior:

  • optionally create a Streaming mountpoint
  • ensure a VideoRoom exists
  • call rtp_forward from the specified publisher into the ports declared in mountpoint_spec.media

Required in practice:

  • room_id
  • publisher_id
  • mountpoint_spec with a non-empty media list
  • if create_mountpoint=False, every stream in mountpoint_spec.media must already include its Janus RTP port

Cleanup behavior:

  • if owned_by_app=True and room_id is set, the orchestrator registers a room-destroy callback
  • if create_mountpoint=True and owned_by_app=True, it also registers mountpoint cleanup

Use this flow when you want Janus-to-Janus bridging: VideoRoom publisher -> RTP forward -> Streaming mountpoint.

2. source_kind="obs"

Behavior:

  • requires srs
  • creates an OBS WebSocket client
  • configures OBS to publish to a custom RTMP target
  • starts the selected relay (ffmpeg or gstreamer)
  • starts OBS streaming
  • registers cleanup for OBS and the relay

Required in practice:

  • relay_kind must be "ffmpeg" or "gstreamer"
  • srs must be set
  • mountpoint_spec must be set
  • if you omit gstreamer_pipeline, the library auto-generates a Janus-targeted GStreamer pipeline from the resolved media list

3. source_kind="rtmp_external"

Behavior:

  • assumes an external publisher is already sending media to request.srs.publish_url
  • starts the selected relay (ffmpeg or gstreamer)
  • does not control OBS

Required in practice:

  • relay_kind must be "ffmpeg" or "gstreamer"
  • srs must be set
  • mountpoint_spec must be set

4. source_kind="rtsp_camera"

Behavior:

  • expects rtsp_camera
  • expects relay_kind="ffmpeg" or relay_kind="gstreamer"
  • builds the relay input directly from rtsp_camera.url
  • injects RTSP-specific FFmpeg transport and timeout flags when the FFmpeg relay is selected
  • mountpoint_spec must be set

5. source_kind="frontend_owned"

Behavior:

  • no server-side relay is started
  • the result gets a note telling the caller to use VideoRoom or ingest-style browser paths elsewhere

This is effectively a placeholder mode.

Declared but not implemented

SourceKind includes "browser_webrtc" in models.py and types.py, but the orchestrator does not have a corresponding branch for it.

The orchestrator also still accepts the legacy string "external" at runtime, even though that value is not part of the declared SourceKind type.

Relay selection

RelayKind is declared as:

  • "none"
  • "ffmpeg"
  • "gstreamer"

Actual behavior:

  • videoroom can reasonably use "none"
  • frontend_owned does not use a relay
  • obs requires "ffmpeg" or "gstreamer"
  • rtmp_external requires "ffmpeg" or "gstreamer"
  • rtsp_camera expects "ffmpeg"

BroadcastOrchestrator._get_request_relay() only knows how to construct FFmpeg and GStreamer relays.

Orchestrator behavior

BroadcastOrchestrator is the main runtime object.

Construction:

from streaming_utils.orchestrator import BroadcastOrchestrator

orchestrator = BroadcastOrchestrator(
    streaming=streaming_plugin_or_backend,
    videoroom=videoroom_publisher_or_backend,
    observer=observer,   # optional
    queue=queue,         # optional
    description="broadcast feed",
)

What it does during start():

  1. generates a new broadcast_id
  2. creates a JanusLifecycleManager
  3. stores a BroadcastResult(state="starting")
  4. emits broadcast_starting
  5. optionally creates a Streaming mountpoint
  6. branches by source_kind
  7. stores running relay handles in _relays
  8. registers cleanup callbacks in _lifecycles
  9. marks the result running
  10. attaches a health snapshot from health()
  11. emits broadcast_running

State it keeps in memory:

  • _active: broadcast_id -> BroadcastResult
  • _relays: broadcast_id -> relay/process handle
  • _lifecycles: broadcast_id -> JanusLifecycleManager

Stop behavior:

  • stop(broadcast_id) stops one tracked broadcast
  • stop(None) stops all tracked broadcasts
  • cleanup runs in reverse registration order
  • stopped results remain in _active with terminal state

Context manager behavior:

async with BroadcastOrchestrator(streaming=streaming, videoroom=videoroom) as orchestrator:
    ...

Exiting the context stops all tracked broadcasts.

Service facade

service.Broadcast is a thin wrapper around the orchestrator.

Methods:

  • start(**kwargs): builds BroadcastRequest(**kwargs) and calls orchestrator.start()
  • stop(): stops all broadcasts through orchestrator.stop()
  • enqueue(**kwargs): serializes a request and queues it through orchestrator.enqueue_start()
  • health(): delegates to orchestrator.health()
  • listen(...): delegates to orchestrator.worker_loop()
  • autoscale(...): delegates to orchestrator.autoscale_tick()

Use the service if you want a keyword-argument interface around the dataclass models.

Use the orchestrator directly if you need:

  • full BroadcastResult objects
  • per-broadcast stop control
  • tighter control over request/response objects

Queues, workers, and autoscaling

Queue backends

queue.py provides:

  • OrchestrationQueue protocol
  • RedisStreamsQueue
  • KafkaTopicQueue
  • JobEnvelope

RedisStreamsQueue:

  • stores jobs in a Redis stream
  • uses consumer groups
  • requeues failed jobs by appending a new entry with attempts + 1
  • reports real lag through XINFO GROUPS

KafkaTopicQueue:

  • publishes JSON jobs to a topic
  • commits offsets only on ack() or nack()
  • currently returns 0 from lag()

Worker loop

The worker loop:

  • blocks on queue.claim()
  • dispatches start_broadcast or stop_broadcast
  • ack()s on success
  • nack(requeue=True) on failure
  • emits observer.emit_error(..., phase="worker_job", ...) on per-job failure

Autoscaling

Autoscaling is provided by autoscale.py:

  • AutoscalePolicy
  • RelayAutoscaler
  • WorkerPool

The orchestrator's autoscale_tick():

  • reads queue lag
  • estimates current worker count as len(_relays) or 1
  • computes a desired count
  • emits a scale_request observer event through _scale_workers()

It does not spawn or kill worker processes itself.

Observer, lifecycle, and process management

OpsObserver

observer.py provides OpsObserver, which can be given:

  • logger
  • metrics
  • on_state_change
  • on_event
  • on_error

The orchestrator, relay wrappers, OBS client, and cleanup helpers all use it.

JanusLifecycleManager

lifecycle.py is the cleanup stack used by the orchestrator.

It:

  • stores async closers
  • runs them in reverse order
  • emits janus_resource_closed on success
  • emits on_error(..., phase="janus_cleanup", ...) on failure
  • raises LifecycleError if any cleanup step fails

AsyncManagedProcess

process.py supervises long-lived subprocesses.

It provides:

  • async start/stop/restart
  • health check loop every 5 seconds
  • process-group termination on POSIX
  • restart with exponential backoff and jitter
  • optional restart budget

This is the base class used by:

  • FFmpeg
  • GStreamer

FFmpeg, GStreamer, OBS, and SRS helpers

FFmpeg relay

lib/ffmpeg.py defines:

  • Endpoint
  • FFmpeg

FFmpeg.build_command():

  • uses ffmpeg -re -i <input_url>
  • maps audio-like codecs to 0:a:0
  • maps all other destinations to 0:v:0
  • sends RTP to rtp://host:port?pkt_size=1200

GStreamer relay

lib/gstreamer.py defines GStreamer, a gst-launch-1.0 managed wrapper around a caller-supplied pipeline string.

OBS helper

lib/obs.py defines:

  • ObsConnectionConfig
  • OBSWebSocketV5Adapter
  • ObsBridge

OBSWebSocketV5Adapter handles:

  • OBS v5 Hello/Identify handshake
  • optional password auth
  • request/response correlation
  • event dispatch
  • reconnect helpers
  • health checks through GetStreamStatus

ObsBridge adds simple flows:

  • configure_custom_rtmp()
  • connect_and_start()
  • stop()

SRS ingest helper

lib/srs.py defines SRSIngestSpec.

Example:

from streaming_utils.lib.srs import SRSIngestSpec

srs = SRSIngestSpec(
    rtmp_url="rtmp://127.0.0.1",
    app="live",
    stream_key="cam-01",
)

print(srs.publish_url)
# rtmp://127.0.0.1/live/cam-01

Example: VideoRoom publisher -> Streaming mountpoint

This is the cleanest janus_api integration path in the current implementation.

import asyncio

from janus_api.session.websocket import WebsocketSession
from janus_api.lib.plugins.streaming import StreamingPlugin
from janus_api.lib.plugins.videoroom import Publisher

from streaming_utils.models import AudioTrack, BroadcastRequest, MountPoint, VideoTrack
from streaming_utils.orchestrator import BroadcastOrchestrator


async def main() -> None:
    session = WebsocketSession()
    await session.create()

    try:
        streaming = await StreamingPlugin(session=session, admin_key="stream-admin").attach()
        publisher = await Publisher(session=session, room=4321, username="forwarder").attach()

        orchestrator = BroadcastOrchestrator(
            streaming=streaming,
            videoroom=publisher,
            description="publisher 777 mirrored to Streaming",
        )

        result = await orchestrator.start(
            BroadcastRequest(
                source_kind="videoroom",
                relay_kind="none",
                room_id=4321,
                publisher_id=777,
                create_mountpoint=True,
                owned_by_app=False,
                host="127.0.0.1",
                mountpoint_spec=MountPoint(
                    id=9901,
                    media=[
                        VideoTrack(mid="v", port=5004, pt=96, codec="H264"),
                        AudioTrack(mid="a", port=5006, pt=111, codec="opus"),
                    ],
                ),
            )
        )

        print(result.mountpoint_response)
        print(result.forward_response)
    finally:
        await session.destroy()


asyncio.run(main())

What that does:

  • creates a Janus Streaming RTP mountpoint from the mountpoint_spec.media list
  • ensures VideoRoom 4321 exists
  • starts rtp_forward from publisher 777 to those media ports

Example: OBS -> SRS -> FFmpeg -> Janus Streaming

import asyncio

from janus_api.session.websocket import WebsocketSession
from janus_api.lib.plugins.streaming import StreamingPlugin
from janus_api.lib.plugins.videoroom import Publisher

from streaming_utils.lib.srs import SRSIngestSpec
from streaming_utils.models import AudioTrack, BroadcastRequest, MountPoint, VideoTrack
from streaming_utils.orchestrator import BroadcastOrchestrator


async def main() -> None:
    session = WebsocketSession()
    await session.create()

    try:
        streaming = await StreamingPlugin(session=session, admin_key="stream-admin").attach()
        videoroom = await Publisher(session=session, room=1234, username="obs-bridge").attach()

        orchestrator = BroadcastOrchestrator(streaming=streaming, videoroom=videoroom)

        result = await orchestrator.start(
            BroadcastRequest(
                source_kind="obs",
                relay_kind="ffmpeg",
                create_mountpoint=True,
                owned_by_app=True,
                host="127.0.0.1",
                mountpoint_spec=MountPoint(
                    id=9100,
                    media=[
                        VideoTrack(mid="v", port=6004, pt=96, codec="H264"),
                        AudioTrack(mid="a", port=6006, pt=111, codec="opus"),
                    ],
                ),
                srs=SRSIngestSpec(
                    rtmp_url="rtmp://127.0.0.1",
                    app="live",
                    stream_key="obs-9100",
                ),
                obs_host="127.0.0.1",
                obs_port=4455,
                obs_password="secret",
                obs_secure=False,
            )
        )

        print(result.srs_info)
        print(result.relay_info)
    finally:
        await session.destroy()


asyncio.run(main())

Example: queue-backed service wrapper

import asyncio

from janus_api.session.websocket import WebsocketSession
from janus_api.lib.plugins.streaming import StreamingPlugin
from janus_api.lib.plugins.videoroom import Publisher

from streaming_utils.autoscale import AutoscalePolicy
from streaming_utils.orchestrator import BroadcastOrchestrator
from streaming_utils.queue import RedisStreamsQueue
from streaming_utils.service import Broadcast


async def main() -> None:
    session = WebsocketSession()
    await session.create()

    try:
        streaming = await StreamingPlugin(session=session, admin_key="stream-admin").attach()
        videoroom = await Publisher(session=session, room=4321, username="worker").attach()

        queue = RedisStreamsQueue(redis_url="redis://127.0.0.1:6379/0")
        orchestrator = BroadcastOrchestrator(streaming=streaming, videoroom=videoroom, queue=queue)
        service = Broadcast(orchestrator=orchestrator, queue_kind="redis")

        await service.enqueue(
            source_kind="videoroom",
            relay_kind="none",
            room_id=4321,
            publisher_id=777,
            create_mountpoint=True,
            owned_by_app=False,
            host="127.0.0.1",
            mountpoint_spec={
                "id": 9901,
                "media": [
                    {"mid": "v", "type": "video", "port": 5004, "pt": 96, "codec": "H264"},
                    {"mid": "a", "type": "audio", "port": 5006, "pt": 111, "codec": "opus"},
                ],
            },
        )

        stop_event = asyncio.Event()
        worker = asyncio.create_task(
            service.listen(
                consumer="broadcast-worker-1",
                block_ms=1000,
                stop_event=stop_event,
            )
        )

        desired = await service.autoscale(
            policy=AutoscalePolicy(min_workers=1, max_workers=5, target_lag_per_worker=2)
        )
        print(desired)

        await asyncio.sleep(2)
        stop_event.set()
        await worker
    finally:
        await session.destroy()


asyncio.run(main())

Operational caveats and current implementation limits

These points are important if you are using streaming_utils as production glue around janus_api.

  • streaming_utils.__init__ currently exports only Broadcast. Import from submodules directly if you need the lower-level models and orchestrator types.
  • BroadcastRequest.source_kind declares "browser_webrtc", but the orchestrator does not implement that branch.
  • The orchestrator still accepts a legacy "external" string even though it is not part of the declared SourceKind.
  • Automatic cleanup for mountpoints and rooms does not pass secrets back into destroy_mountpoint() or destroy_room(). If you create secret-protected resources, automatic teardown may fail unless your Janus setup allows destruction without those secrets.
  • Automatic room cleanup is controlled only by owned_by_app and room_id. If you point the orchestrator at an existing room and leave owned_by_app=True, stop/cleanup will still try to destroy that room.
  • BroadcastResult.health is a global orchestrator health snapshot, not a per-broadcast-only snapshot.
  • BroadcastResult response fields are raw backend results, not a normalized schema.
  • service.Broadcast.stop() has no broadcast_id parameter and stops every tracked broadcast.
  • queue_kind is stored in enqueued payloads but is not otherwise interpreted by the orchestrator.
  • KafkaTopicQueue.lag() currently returns 0, so autoscaling decisions are only meaningfully backlog-aware with RedisStreamsQueue unless you extend the Kafka backend.
  • The orchestrator tracks relay health through _relays; it does not keep a separate OBS bridge object in that map.
  • BroadcastRequest.obs_secure defaults to janus_api.conf.settings.DEBUG. Set it explicitly if you do not want environment-dependent OBS transport behavior.

When to use what

Use BroadcastOrchestrator if you want:

  • full control over lifecycle
  • per-broadcast stop
  • direct access to dataclass request/result objects
  • easiest integration with raw janus_api plugin instances

Use Broadcast if you want:

  • a thinner interface for RPC or HTTP handlers
  • keyword-argument inputs shaped like BroadcastRequestType
  • queue worker entrypoints

Use the lower-level helpers directly if you only need one part:

  • FFmpeg or GStreamer for process supervision
  • OBSWebSocketV5Adapter or ObsBridge for OBS control
  • RedisStreamsQueue or KafkaTopicQueue for background dispatch
  • JanusLifecycleManager if you only want ordered async cleanup

Repository references

Useful janus_api references when reading this package:

Those files are the closest code-level reference for how streaming_utils is meant to be used as a utility layer on top of janus_api.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streaming_utils-1.1.0.tar.gz (46.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streaming_utils-1.1.0-py3-none-any.whl (54.5 kB view details)

Uploaded Python 3

File details

Details for the file streaming_utils-1.1.0.tar.gz.

File metadata

  • Download URL: streaming_utils-1.1.0.tar.gz
  • Upload date:
  • Size: 46.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.7

File hashes

Hashes for streaming_utils-1.1.0.tar.gz
Algorithm Hash digest
SHA256 28cfe025dfa231e93b84e76848130ed261cf0e475c00a3df4be6798e17ecbbfe
MD5 92a7490b6db4de2c794d84e736db173b
BLAKE2b-256 ae05e5fda3337b74a617719ff5cddec2e8b05e898e00d83dc12711f22af1ef50

See more details on using hashes here.

File details

Details for the file streaming_utils-1.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for streaming_utils-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7918953891de6a249a12e11e3ccf52445933d406e0617aa0f41a9330c0362a53
MD5 df239ae97ff7a3d364f73df3d8b1e31e
BLAKE2b-256 5bc8ece52a1c00aafe284e056a22698d78a14f12df352bbe5021b90d1d473a1e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page