Skip to main content

Python asyncio implementation of the MoQT protocol

Project description

aiomoqt - Media over QUIC Transport (MoQT)

aiomoqt is an implementation of MoQT for asyncio, layered on aiopquic.

Overview

aiomoqt implements the MoQT protocol with draft-14 / draft-16 / draft-18 (beta) support, both publish and subscribe roles, H3/WebTransport and raw QUIC transports, and ALPN-based draft negotiation. The architecture extends aiopquic.asyncio.QuicConnectionProtocol. The package ships publisher / subscriber example clients, a benchmark suite, a relay version probe, and a moq-interop-runner-compatible test client.

Features

  • H3/WebTransport and raw QUIC transports
  • Draft-14 / draft-16 / draft-18 negotiation (ALPN moq-00 / moqt-16; d18 via ALPN / WT-Protocol)
  • Draft-16 wire format: delta-encoded param keys, track extensions, unified request/response
  • Draft-18 (beta): vi64 varints, uni-stream control pair, per-request bidi streams, Request-ID-less replies — over both raw QUIC and WebTransport
  • SubgroupHeader / ObjectDatagram flag encoding, delta-encoded object IDs
  • Version-independent control: MOQTRequestError exception across drafts
  • Async context manager for session lifecycle
  • Sync / async response handling on every control message via wait_response
  • High-level publisher: PublishedTrack — stream setup, subgroup writing, pacing
  • High-level subscriber: SubscribedTrack — object reassembly, FETCH / JOIN handling
  • Pluggable message handlers via register_handler()
  • Data publishing via SubgroupHeader streams or ObjectDatagrams
  • Data reception via on_object_received callback
  • Low-level message serialization / deserialization for custom protocol work

Installation

Pure Python, requires Python 3.12+ (tested on 3.12, 3.13, 3.14):

uv pip install aiomoqt    # or: pip install aiomoqt

aiopquic (the QUIC transport) installs as a binary wheel automatically. Only Linux (glibc 2.34+, RHEL 9 / Ubuntu 22.04+) and macOS arm64 have prebuilt wheels; other systems pull aiopquic via sdist and need a C build toolchain — see aiopquic install notes.

A ./bootstrap_python.sh script is provided for a uv-managed .venv if you want a clean dev environment.

Reporting issues

Include the full version report in any issue. It captures aiomoqt, aiopquic, and the picoquic + picotls submodule SHAs aiopquic was built from — useful for diagnosing version-pair mismatches across the stack:

python -m aiomoqt.versions   # or the console script: aiomoqt-versions

Sample output:

aiomoqt:   0.9.7.dev7+g0fa185338 (~/src/aiomoqt/aiomoqt) [2026-06-11 11:57]
aiopquic:  0.3.7.dev12+g6eef9caf6 (~/src/aiopquic/src/aiopquic) [2026-06-11 11:58]
  - picoquic:  1.1.49.2 (d6c5653d) [2026-06-05]
  - picotls:   master (bfa67875) [2026-04-20]

Quick Start

Verify install + relay liveness

Confirm the stack is wired up before writing any code:

# Versions of aiomoqt + aiopquic + picoquic + picotls
python -m aiomoqt.versions   # or: aiomoqt-versions

# Liveness + supported drafts against a single relay (one line per probed transport).
# Exit 0 if the relay answered SERVER_SETUP for at least one draft.
python -m aiomoqt.examples.relay_probe --url moqt://moqx-main.ci.openmoq.org:4433
# → moqx-main.ci.openmoq.org:4433  quic   draft-14,draft-16  ✓ (405ms)

python -m aiomoqt.examples.relay_probe --url https://moqx-main.ci.openmoq.org:4433/moq-relay
# → moqx-main.ci.openmoq.org:4433  h3/wt  draft-14,draft-16  ✓ (315ms)

Subscriber

import asyncio
from aiomoqt.client import MOQTClient

def on_object(msg, size, recv_time_ms, group_id=None, subgroup_id=None):
    print(f"g={group_id} obj={msg.object_id} {size}B payload={msg.payload}")

async def main():
    client = MOQTClient('relay.example.com', 443, path='moq',
                         use_quic=True, draft_version=16)
    async with client.connect() as session:
        await session.client_session_init()
        session.on_object_received = on_object
        await session.subscribe('ns', 'track', wait_response=True)
        await session.async_closed()

asyncio.run(main())

Publisher

import asyncio
from aiomoqt.client import MOQTClient
from aiomoqt.types import MOQTMessageType
from aiomoqt.messages import SubgroupHeader

async def main():
    client = MOQTClient('relay.example.com', 443, path='moq',
                         use_quic=True, draft_version=16)
    client.register_handler(MOQTMessageType.SUBSCRIBE, on_subscribe)
    async with client.connect() as session:
        await session.client_session_init()
        await session.publish_namespace('ns', wait_response=True)
        await session.async_closed()  # serve until closed

async def on_subscribe(session, msg):
    """Called when a subscriber requests a track."""
    ok = session.subscribe_ok(request_msg=msg)
    stream_id = session.open_uni_stream()
    hdr = SubgroupHeader(track_alias=ok.track_alias, group_id=0, subgroup_id=0, publisher_priority=0)
    session.stream_write(stream_id, hdr.serialize().data)
    session.stream_write(stream_id, hdr.next_object(payload=b"hello").data)
    session.transmit()

asyncio.run(main())

The on_subscribe handler above — stream setup, header serialization, object writing — is wrapped by the higher-level PublishedTrack / SubscribedTrack classes in aiomoqt.track. See *_bench.py examples for typical usage.

Control Message API

Control messages support both sync and async patterns via wait_response:

# Blocking — awaits and returns the response message
resp = await session.subscribe('ns', 'track', wait_response=True)

# Non-blocking — returns request, response arrives via handler
req = await session.subscribe('ns', 'track')

Examples

Publisher / Subscriber

# Publish (SubgroupHeader streams)
python -m aiomoqt.examples.pub_example -h relay.ex.com -q

# Publish (ObjectDatagrams)
python -m aiomoqt.examples.pub_example -h relay.ex.com -q --datagram

# Subscribe
python -m aiomoqt.examples.sub_example -h relay.ex.com -q

# Subscribe + FETCH (join mid-stream)
python -m aiomoqt.examples.join_example -h relay.ex.com -q

Common options: --namespace, --trackname, --path, --debug, --keylogfile. Every tool prints its full option set with -? / --help.

Benchmarks

Bench tools take a positional relay URL — moqt://host[:port] for raw QUIC, https://host[:port]/[endpoint] for H3/WebTransport — except loopback_bench, which needs no relay at all:

# Local loopback (canonical stack benchmark, no relay)
python -m aiomoqt.examples.loopback_bench -s 4096 -P 4 -t 20

# Publisher / subscriber through a relay
python -m aiomoqt.examples.pub_bench moqt://relay.ex.com -s 4096 -P 4 -r 120 -t 60
python -m aiomoqt.examples.sub_bench moqt://relay.ex.com

The full tool matrix (two-process, fanout, adaptive ramp), all options, latency methodology (paced vs unpaced, TX budgets), and observed numbers live in PERFORMANCE.md.

Interop Testing

# All tests (draft-14, auto-detected)
python -m aiomoqt.examples.moq_interop_client -r "moqt://relay.ex.com:4433"

# All tests (draft-16)
python -m aiomoqt.examples.moq_interop_client -r "moqt://relay.ex.com:4433" --draft 16

# Single test case
python -m aiomoqt.examples.moq_interop_client -r "moqt://relay.ex.com:4433" -t subscribe-error

# List test cases
python -m aiomoqt.examples.moq_interop_client -l

Relay Probe

Batch liveness + draft-version check. Reads a relay list, does a real CLIENT_SETUP / SERVER_SETUP handshake per (endpoint × draft) — no bare-ALPN tricks — and writes a JSON status report.

Accepts CLI flags, environment variables, or both (CLI overrides env):

# CLI form (typical interactive use)
python -m aiomoqt.examples.relay_probe -f relays.json -o status.json --once

# Env form (typical container/daemon deployment)
RELAYS_FILE=relays.json OUTPUT_FILE=status.json PROBE_ONCE=1 \
  python -m aiomoqt.examples.relay_probe

# Long-running monitor (re-probe every --interval seconds)
python -m aiomoqt.examples.relay_probe -f relays.json -o status.json
CLI flag Env var Default Meaning
-f / --relays-file RELAYS_FILE /app/relays.json input relay list
-o / --output-file OUTPUT_FILE /output/relay-status.json status report destination
--timeout PROBE_TIMEOUT 8 per-probe handshake timeout (s)
--interval PROBE_INTERVAL 300 re-probe cadence in monitor mode (s)
--once PROBE_ONCE=1 unset probe once and exit

WebTransport Server

python -m aiomoqt.examples.server_example \
    --certificate cert.pem --private-key key.pem --port 443

Example Reference

Example Description
pub_example.py Publisher — SubgroupHeader streams or ObjectDatagrams
sub_example.py Subscriber — receives data from a relay
join_example.py SUBSCRIBE + FETCH (join mid-stream)
pub_bench.py Publisher benchmark, configurable parameters
sub_bench.py Subscriber with latency/jitter/loss stats
relay_bench.py Combined pub/sub in one process
multi_sub_bench.py 1 publisher, N subscribers in one process
loopback_bench.py Local loopback (no relay)
adaptive_bench.py Ramps rate until buffer growth; loopback (--mp-loopback for proc-isolated pub/sub) or relay
server_example.py WebTransport server (origin)
relay_probe.py Relay version probe (draft-14/16)
moq_interop_client.py Interop test client (TAP v14 out; 6 standard + fetch/join)

Interop

Validated against live public relays — OpenMoQ moqx, Meta moxygen, Cloudflare moq-rs, Quicr libquicr, Meetecho imquic, OzU moqtail — across draft-14/draft-16 and both transports, using the moq-interop-runner cases plus a multi-subscriber pub-sub bench. The full point-in-time matrix lives in PERFORMANCE.md; the relay catalog with per-endpoint notes is tests/relays.json. Red5 Pro was interop-tested in earlier cycles and is currently untested/unverified.

Performance

aiomoqt sits on aiopquic, which sits on picoquic + the kernel UDP path; throughput at this layer is bounded by the layers below. Observed on commodity hardware over loopback: multi-Gbps sustained throughput at ~4 KiB objects with bounded memory under stream churn, and sub-millisecond to low-millisecond latency when paced below saturation. Numbers vary substantially by platform — methodology, observed figures, the paced-vs-unpaced distinction, and TX budget tuning are in PERFORMANCE.md.

Development

git clone https://github.com/gmarzot/aiomoqt.git
cd aiomoqt
python3 -m venv .venv && source .venv/bin/activate
uv pip install -e ".[test]"    # or: pip install -e ".[test]"

# Self-signed cert for the loopback server (loopback_bench, pub_server,
# and the test_loopback_* suites; skipped if certs/ is absent). One time:
mkdir -p certs && openssl req -x509 -newkey rsa:2048 -nodes -days 3650 \
  -keyout certs/key.pem -out certs/cert.pem -subj "/CN=localhost" \
  -addext "subjectAltName=DNS:localhost,IP:127.0.0.1"

pytest aiomoqt/tests/

Known Limitations

  • draft-18 is beta. Negotiated by default ((18, 16, 14), newest-first) over raw QUIC and WebTransport, with control + SUBGROUP object delivery validated end to end. Not yet complete: SUBSCRIBE / PUBLISH subscription-filter and largest-location still use the d16 nested form; d18 FETCH is pending; Track-Properties extensions encode as RFC9000 varints (correct for the small values in use). draft-14 / draft-16 are unaffected and remain the stable path.
  • WebTransport fetch / join routing -- four [wt] test variants of FETCH and JOINING_SUBSCRIBE return empty results when the underlying transport is WebTransport. Raw-QUIC variants of the same tests pass and cover the MoQT-level invariant. Tracked separately; affects WT-only consumers of fetch/join.

TODO

  • Fix WebTransport fetch / join routing (the 4 [wt] tests currently skipped)
  • Track data modules:
    • File transfer (or MOQT File Format?)
    • Interactive chat
    • MSF/LOC media packaging
    • CMSF media packaging
  • Simple relay implementation

Contributing

Contributions are welcome! Please fork the repository, create a branch, and submit a pull request. For major changes, open an issue first.

Resources


Author

Giovanni Marzot — gmarzot@marzresearch.net | moqarean.marzresearch.net

Acknowledgements

This project takes inspiration from, and has benefited from the great work done by the OpenMoQ/moxygen team, and the continued efforts of the MOQ IETF WG.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiomoqt-0.10.3.tar.gz (300.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aiomoqt-0.10.3-py3-none-any.whl (259.1 kB view details)

Uploaded Python 3

File details

Details for the file aiomoqt-0.10.3.tar.gz.

File metadata

  • Download URL: aiomoqt-0.10.3.tar.gz
  • Upload date:
  • Size: 300.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for aiomoqt-0.10.3.tar.gz
Algorithm Hash digest
SHA256 18f0604db57ed234ce59493a23309ec9cde5c23230f74c5141590cad9a75ed55
MD5 29571f2f900fe6b2933b4035f69d912d
BLAKE2b-256 84819c242c50c1c9d78981eb116f7585cb935e39bc85142e162431d531fba71f

See more details on using hashes here.

File details

Details for the file aiomoqt-0.10.3-py3-none-any.whl.

File metadata

  • Download URL: aiomoqt-0.10.3-py3-none-any.whl
  • Upload date:
  • Size: 259.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for aiomoqt-0.10.3-py3-none-any.whl
Algorithm Hash digest
SHA256 a22fff4bb206fcb05897f1b9ebc9d6d648f27d1aa513e64d27bf52738c3b6133
MD5 40139810ac25c3d9e4cc0c684b70954a
BLAKE2b-256 487cddde401a537ce8944ddc8d619074be264ea50c56f134df16c4614abfa4c6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page