Python asyncio implementation of the MoQT protocol
Project description
aiomoqt - Media over QUIC Transport (MoQT)
aiomoqt is an implementation of MoQT for asyncio, layered on aiopquic.
Overview
aiomoqt implements the MoQT protocol with dual draft-14 / draft-16 support, both publish and subscribe roles, H3/WebTransport and raw QUIC transports, and ALPN-based draft negotiation. The architecture extends aiopquic.asyncio.QuicConnectionProtocol. The package ships publisher / subscriber example clients, a benchmark suite, a relay version probe, and a moq-interop-runner-compatible test client.
Features
- H3/WebTransport and raw QUIC transports
- Draft-14 / draft-16 ALPN negotiation (
moq-00/moqt-16) - Draft-16 wire format: delta-encoded param keys, track extensions, unified request/response
- SubgroupHeader / ObjectDatagram flag encoding, delta-encoded object IDs
- Version-independent control:
MOQTRequestErrorexception across drafts - Async context manager for session lifecycle
- Sync / async response handling on every control message via
wait_response - High-level publisher:
PublishedTrack— stream setup, subgroup writing, pacing - High-level subscriber:
SubscribedTrack— object reassembly, FETCH / JOIN handling - Pluggable message handlers via
register_handler() - Data publishing via SubgroupHeader streams or ObjectDatagrams
- Data reception via
on_object_receivedcallback - Low-level message serialization / deserialization for custom protocol work
Installation
Pure Python, requires Python 3.12+ (tested on 3.12, 3.13, 3.14):
uv pip install aiomoqt # or: pip install aiomoqt
aiopquic (the QUIC transport) installs as a binary wheel automatically. Only Linux (glibc 2.34+, RHEL 9 / Ubuntu 22.04+) and macOS arm64 have prebuilt wheels; other systems pull aiopquic via sdist and need a C build toolchain — see aiopquic install notes.
A ./bootstrap_python.sh script is provided for a uv-managed .venv if you want a clean dev environment.
Quick Start
Subscriber
import asyncio
from aiomoqt.client import MOQTClient
def on_object(msg, size, recv_time_ms, group_id=None, subgroup_id=None):
print(f"g={group_id} obj={msg.object_id} {size}B payload={msg.payload}")
async def main():
client = MOQTClient('relay.example.com', 443, path='moq',
use_quic=True, draft_version=16)
async with client.connect() as session:
await session.client_session_init()
session.on_object_received = on_object
await session.subscribe('ns', 'track', wait_response=True)
await session.async_closed()
asyncio.run(main())
Publisher
import asyncio
from aiomoqt.client import MOQTClient
from aiomoqt.types import MOQTMessageType
from aiomoqt.messages import SubgroupHeader
async def main():
client = MOQTClient('relay.example.com', 443, path='moq',
use_quic=True, draft_version=16)
client.register_handler(MOQTMessageType.SUBSCRIBE, on_subscribe)
async with client.connect() as session:
await session.client_session_init()
await session.publish_namespace('ns', wait_response=True)
await session.async_closed() # serve until closed
async def on_subscribe(session, msg):
"""Called when a subscriber requests a track."""
ok = session.subscribe_ok(request_msg=msg)
stream_id = session.open_uni_stream()
hdr = SubgroupHeader(track_alias=ok.track_alias, group_id=0, subgroup_id=0, publisher_priority=0)
session.stream_write(stream_id, hdr.serialize().data)
session.stream_write(stream_id, hdr.next_object(payload=b"hello").data)
session.transmit()
asyncio.run(main())
The on_subscribe handler above — stream setup, header serialization, object writing — is wrapped by the higher-level PublishedTrack / SubscribedTrack classes in aiomoqt.track. See *_bench.py examples for typical usage.
Control Message API
Control messages support both sync and async patterns via wait_response:
# Blocking — awaits and returns the response message
resp = await session.subscribe('ns', 'track', wait_response=True)
# Non-blocking — returns request, response arrives via handler
req = await session.subscribe('ns', 'track')
Examples
Publisher / Subscriber
# Publish (SubgroupHeader streams)
python -m aiomoqt.examples.pub_example --host relay.ex.com --use-quic
# Publish (ObjectDatagrams)
python -m aiomoqt.examples.pub_example --host relay.ex.com --use-quic --datagram
# Subscribe
python -m aiomoqt.examples.sub_example --host relay.ex.com --use-quic
# Subscribe + FETCH (join mid-stream)
python -m aiomoqt.examples.join_example --host relay.ex.com --use-quic
Common options: --namespace, --trackname, --path, --debug, --keylogfile
Benchmarks
Bench tools take a positional relay URL:
moqt://host[:port] Raw QUIC (default port 443)
https://host[:port]/[endpoint] H3/WebTransport (default port 443)
# Publisher — configurable size, rate, parallelism
python -m aiomoqt.examples.pub_bench moqt://relay.ex.com -s 4096 -P 4 -r 120 -t 60
# Subscriber — latency/jitter/loss stats (omit -t; subscriber exits on PUBLISH_DONE)
python -m aiomoqt.examples.sub_bench moqt://relay.ex.com
# Combined pub/sub in one process
python -m aiomoqt.examples.relay_bench moqt://relay.ex.com -s 1024 -g 10000 -t 30
# 1 publisher, N subscribers in one process (fanout capacity)
python -m aiomoqt.examples.multi_sub_bench moqt://relay.ex.com -n 100 --video 720p -t 60
# Local loopback (no relay needed)
python -m aiomoqt.examples.loopback_bench -s 4096 -P 4 -t 20
Publisher flow selection (for relays that require a specific message pattern):
--pub-ns # send only PUBLISH_NAMESPACE (wait for SUBSCRIBE)
--pub-both # send PUBLISH_NAMESPACE and PUBLISH (required by Cloudflare d14)
# default: send only PUBLISH
| Option | Description | Default |
|---|---|---|
-s, --object-size |
Payload size (bytes) | 1024 |
-g, --group-size |
Objects per group | 10000 |
-P, --streams |
Parallel subgroup streams | 1 |
-r, --rate |
Objects/sec per stream (0=max) | 0 |
-t, --duration |
Duration (seconds) | 30 |
-i, --interval |
Report interval (seconds) | 5.0 |
-D, --datagram |
Use datagrams instead of streams | off |
-Q, --force-quic |
Force raw QUIC for https:// URLs | off |
Interop Testing
# All tests (draft-14, auto-detected)
python -m aiomoqt.examples.moq_interop_client -r "moqt://relay.ex.com:4433"
# All tests (draft-16)
python -m aiomoqt.examples.moq_interop_client -r "moqt://relay.ex.com:4433" --draft 16
# Single test case
python -m aiomoqt.examples.moq_interop_client -r "moqt://relay.ex.com:4433" -t subscribe-error
# List test cases
python -m aiomoqt.examples.moq_interop_client -l
Relay Probe
Batch liveness + draft-version check. Reads a relay list, does a real CLIENT_SETUP / SERVER_SETUP handshake per (endpoint × draft) — no bare-ALPN tricks — and writes a JSON status report.
Accepts CLI flags, environment variables, or both (CLI overrides env):
# CLI form (typical interactive use)
python -m aiomoqt.examples.relay_probe -f relays.json -o status.json --once
# Env form (typical container/daemon deployment)
RELAYS_FILE=relays.json OUTPUT_FILE=status.json PROBE_ONCE=1 \
python -m aiomoqt.examples.relay_probe
# Long-running monitor (re-probe every --interval seconds)
python -m aiomoqt.examples.relay_probe -f relays.json -o status.json
| CLI flag | Env var | Default | Meaning |
|---|---|---|---|
-f / --relays-file |
RELAYS_FILE |
/app/relays.json |
input relay list |
-o / --output-file |
OUTPUT_FILE |
/output/relay-status.json |
status report destination |
--timeout |
PROBE_TIMEOUT |
8 |
per-probe handshake timeout (s) |
--interval |
PROBE_INTERVAL |
300 |
re-probe cadence in monitor mode (s) |
--once |
PROBE_ONCE=1 |
unset | probe once and exit |
WebTransport Server
python -m aiomoqt.examples.server_example \
--certificate cert.pem --private-key key.pem --port 443
Example Reference
| Example | Description |
|---|---|
pub_example.py |
Publisher — SubgroupHeader streams or ObjectDatagrams |
sub_example.py |
Subscriber — receives data from a relay |
join_example.py |
SUBSCRIBE + FETCH (join mid-stream) |
pub_bench.py |
Publisher benchmark, configurable parameters |
sub_bench.py |
Subscriber with latency/jitter/loss stats |
relay_bench.py |
Combined pub/sub in one process |
multi_sub_bench.py |
1 publisher, N subscribers in one process |
loopback_bench.py |
Local loopback (no relay) |
adaptive_bench.py |
Ramps rate until buffer growth; loopback (--mp-loopback for proc-isolated pub/sub) or relay |
server_example.py |
WebTransport server (origin) |
relay_probe.py |
Relay version probe (draft-14/16) |
moq_interop_client.py |
Interop test client (TAP v14 out; 6 standard + fetch/join) |
Interop Test Results
Results against live public relays, as probed by
tests/release_regression_test.py --test-tier interop in v0.8.1.
Tests use the 6 moq-interop-runner
cases plus relay-pub-sub (3-subscriber multi-sub bench).
Error codes are validated to spec-conformant values
(e.g. subscribe-error requires TRACK_DOES_NOT_EXIST, not INTERNAL_ERROR).
| Relay | Draft | Transport | ctrl-msg | pub-sub |
|---|---|---|---|---|
| OpenMoQ moqx | d14 | QUIC | 6/6 | 3/3 |
| OpenMoQ moqx | d14 | H3/WT | 6/6 | 3/3 |
| OpenMoQ moqx | d16 | QUIC | 6/6 | 3/3 |
| OpenMoQ moqx | d16 | H3/WT | 6/6 | 3/3 |
| Meta moxygen | d14 | QUIC | 6/6 | 3/3 |
| Meta moxygen | d14 | H3/WT | 6/6 | 3/3 |
| Meta moxygen | d16 | QUIC | 6/6 | 3/3 |
| Meta moxygen | d16 | H3/WT | 6/6 | 3/3 |
| Cloudflare moq-rs | d14 | QUIC | 5/6 | 3/3 |
| Cloudflare moq-rs (d16 interop branch) | d16 | QUIC | 6/6 | unverified |
| Red5 Pro | d14 | QUIC | unreachable | unreachable |
| Red5 Pro | d14 | H3/WT | 6/6 | unverified |
| Red5 Pro | d16 | QUIC | unreachable | unreachable |
| Red5 Pro | d16 | H3/WT | 6/6 | unverified |
| Quicr libquicr | d14 | QUIC | 5/6 | 3/3 |
| Quicr libquicr | d14 | H3/WT | 5/6 | 3/3 |
| Quicr libquicr | d16 | QUIC | unverified | unverified |
| Quicr libquicr | d16 | H3/WT | 5/6 | 3/3 |
| Meetecho imquic | d16 | QUIC | 6/6 | unverified |
| Meetecho imquic | d16 | H3/WT | 5/6 | unverified |
| OzU moqtail | d14 | H3/WT | 6/6 | unverified |
unverified— suite did not complete end-to-end.unreachable— no response to QUIC Initial.- See
tests/relays.jsonfor the full catalog, per-endpoint notes, and relays disabled by default.
Test cases: setup-only, announce-only, publish-namespace-done,
subscribe-error, announce-subscribe, subscribe-before-announce,
plus fetch and join probes (not in default catalog matrix —
most relays do not implement these yet).
Performance
Single-host loopback on AMD Ryzen 7 PRO 7840U (Zen 4), Linux, Python 3.14, against aiopquic 0.2.5. Single-publisher single-subscriber MoQT loopback (in-process, raw QUIC):
| obj | obj/s | throughput | notes |
|---|---|---|---|
| 8 KiB | 16,084 | 1,055 Mbps | b6 subgroup-churn microbench |
| 1 KiB | 99,000 | 810 Mbps | adaptive_bench --mp-loopback, P=4, tx-side |
aiopquic (the underlying transport) sustains 2.4–2.7 Gbps single-stream and peaks at 5.5 Gbps with P=64 × 16 KiB concurrent streams — see aiopquic perf table. The aiomoqt-level gap to the transport ceiling is per-object framer + asyncio orchestration cost; closing it is on the 0.9.x roadmap (framer batching).
Development
git clone https://github.com/gmarzot/aiomoqt.git
cd aiomoqt
python3 -m venv .venv && source .venv/bin/activate
uv pip install -e ".[test]" # or: pip install -e ".[test]"
pytest aiomoqt/tests/
Known Limitations
- WebTransport fetch / join routing -- four
[wt]test variants of FETCH and JOINING_SUBSCRIBE return empty results when the underlying transport is WebTransport. Raw-QUIC variants of the same tests pass and cover the MoQT-level invariant. Tracked separately; affects WT-only consumers of fetch/join. - Subscriber framer desync at high tx rates -- under sustained tx > ~400 Mbps the subscriber's data-stream parser occasionally rejects a stream with a
framer desyncerror. Stream-level reject (not session-fatal); the publisher continues. Pre-existing; surfaced more visibly by--mp-loopbackheadroom. Tracked for 0.9.x.
TODO
- Diagnose and fix the subscriber framer desync at high tx rates
- Close the aiomoqt-level perf gap to the aiopquic transport ceiling (framer batching)
- Fix WebTransport fetch / join routing (the 4
[wt]tests currently skipped) - Track data modules:
- File transfer (or MOQT File Format?)
- Interactive chat
- MSF/LOC media packaging
- CMSF media packaging
- Simple relay implementation
Contributing
Contributions are welcome! Please fork the repository, create a branch, and submit a pull request. For major changes, open an issue first.
Resources
Author
Giovanni Marzot — gmarzot@marzresearch.net | moqarean.marzresearch.net
Acknowledgements
This project takes inspiration from, and has benefited from the great work done by the OpenMoQ/moxygen team, and the continued efforts of the MOQ IETF WG.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aiomoqt-0.9.1.tar.gz.
File metadata
- Download URL: aiomoqt-0.9.1.tar.gz
- Upload date:
- Size: 177.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c9e313f5f024e56fbf55c04f530c55e5649eb185a60ebb0feefc531bc3596b90
|
|
| MD5 |
875e74b38b6909cd51e87dbc003c7ab9
|
|
| BLAKE2b-256 |
8c0391e42368ca6e068d43fc5d3ce0e9078aabc279afffec4066b0559a6026f7
|
File details
Details for the file aiomoqt-0.9.1-py3-none-any.whl.
File metadata
- Download URL: aiomoqt-0.9.1-py3-none-any.whl
- Upload date:
- Size: 183.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
07baded0f41ab8e5cfd038eb2c4b3489358062807b97a4b7e052fbcc5e39afd5
|
|
| MD5 |
67148a468200abfce345ea8126f5a0c4
|
|
| BLAKE2b-256 |
075d20c94e84e402d1f6127d24dd444429b50d8f3b582249be4d6b40a5cc87f5
|