Python asyncio implementation of the MoQT protocol
Project description
aiomoqt - Media over QUIC Transport (MoQT)
aiomoqt is an implementation of the MoQT protocol, based on asyncio and qh3.
Overview
This package implements the MoQT Specification with dual draft-14 and draft-16 support. It is designed for general use as an MoQT client and server library, supporting both 'publish' and 'subscribe' roles.
The architecture follows the asyncio.Protocol design pattern, extending the qh3 QuicConnectionProtocol. It supports both H3/WebTransport and raw QUIC transports, with ALPN-based draft version negotiation, and has been interop tested against 6 relay implementations across draft-14 and draft-16.
The package includes publisher and subscriber example clients, a benchmark suite for throughput/latency measurement, a relay version probe tool, and an interop test client compatible with the moq-interop-runner framework.
Features
- H3/WebTransport and raw QUIC transports
- Async context manager for session lifecycle
- High-level control message API with sync/async response handling
- Low-level message serialization/deserialization
- Version-independent API:
MOQTRequestErrorexception across drafts - Pluggable message handlers via
register_handler() - Data publishing via SubgroupHeader streams or ObjectDatagrams
- Data reception via
on_object_receivedcallback - Draft-14/16: ALPN negotiation (
moq-00/moqt-16) - Draft-16: delta-encoded parameter keys, track extensions, unified REQUEST_OK/REQUEST_ERROR
- Wire format: SubgroupHeader/ObjectDatagram flag encoding, delta-encoded object IDs
Installation
Requires Python 3.12+ (tested on 3.12, 3.13, and 3.14).
pip install aiomoqt
# or
uv pip install aiomoqt
Quick Start
Subscriber
import asyncio
from aiomoqt.client import MOQTClient
def on_object(msg, size, recv_time_ms, group_id=None, subgroup_id=None):
print(f"g={group_id} obj={msg.object_id} {size}B payload={msg.payload}")
async def main():
client = MOQTClient(
'relay.example.com', 4433, endpoint='moq',
use_quic=True, draft_version=16, debug=True,
)
async with client.connect() as session:
await session.client_session_init()
session.on_object_received = on_object
await session.subscribe('ns', 'track', wait_response=True)
await session.async_closed()
asyncio.run(main())
Publisher
import asyncio
from aiomoqt.client import MOQTClient
from aiomoqt.types import MOQTMessageType
from aiomoqt.messages import SubgroupHeader
async def main():
client = MOQTClient(
'relay.example.com', 4433,
endpoint='moq', use_quic=True, draft_version=16,
)
client.register_handler(MOQTMessageType.SUBSCRIBE, on_subscribe)
async with client.connect() as session:
await session.client_session_init()
await session.publish_namespace('ns', wait_response=True)
await session.async_closed() # serve until closed
async def on_subscribe(session, msg):
"""Called when a subscriber requests a track."""
ok = session.subscribe_ok(request_msg=msg)
stream_id = session.open_uni_stream()
hdr = SubgroupHeader(
track_alias=ok.track_alias,
group_id=0, subgroup_id=0, publisher_priority=0,
)
session.stream_write(stream_id, hdr.serialize().data)
session.stream_write(stream_id, hdr.next_object(payload=b"hello").data)
session.transmit()
asyncio.run(main())
The on_subscribe handler above — stream setup, header serialization,
object writing — is the pattern that track data modules will encapsulate
into a higher-level TrackWriter / TrackReader API.
Control Message API
Control messages support both sync and async patterns via wait_response:
# Blocking — awaits and returns the response message
resp = await session.subscribe('ns', 'track', wait_response=True)
# Non-blocking — returns request, response arrives via handler
req = await session.subscribe('ns', 'track')
Relay URL Formats
Examples and benchmarks accept relay URLs in several forms:
moqt://host:port Raw QUIC (default port 4443)
https://host:port/endpoint H3/WebTransport (default port 4433)
host:port H3/WebTransport
host H3/WebTransport, port 4433, endpoint /moq
Examples
Publisher / Subscriber
# Publish (SubgroupHeader streams)
python -m aiomoqt.examples.pub_example --host relay.ex.com --port 4433 --use-quic
# Publish (ObjectDatagrams)
python -m aiomoqt.examples.pub_example --host relay.ex.com --port 4433 --use-quic --datagram
# Subscribe
python -m aiomoqt.examples.sub_example --host relay.ex.com --port 4433 --use-quic
# Subscribe + FETCH (join mid-stream)
python -m aiomoqt.examples.join_example --host relay.ex.com --port 4433 --use-quic
Common options: --namespace, --trackname, --endpoint, --debug, --keylogfile
Benchmarks
# Publisher — configurable size, rate, parallelism
python -m aiomoqt.examples.bench_pub moqt://relay.ex.com:4443 -s 4096 -P 4 -r 120 -t 60
# Subscriber — latency/jitter/loss stats
python -m aiomoqt.examples.bench_sub moqt://relay.ex.com:4443 -t 60
# Combined pub/sub
python -m aiomoqt.examples.bench_relay moqt://relay.ex.com:4443 -s 1024 -g 10000 -t 30
# Local loopback (no relay needed)
python -m aiomoqt.examples.bench_loopback -s 4096 -P 4 -t 20
| Option | Description | Default |
|---|---|---|
-s, --object-size |
Payload size (bytes) | 1024 |
-g, --group-size |
Objects per group | 10000 |
-P, --streams |
Parallel subgroup streams | 1 |
-r, --rate |
Objects/sec per stream (0=max) | 0 |
-t, --duration |
Duration (seconds) | 30 |
-i, --interval |
Report interval (seconds) | 5.0 |
-D, --datagram |
Use datagrams instead of streams | off |
-Q, --force-quic |
Force raw QUIC for https:// URLs | off |
Interop Testing
# All tests (draft-14, auto-detected)
python -m aiomoqt.examples.moq_interop_client -r "moqt://moqx-000.ci.openmoq.org:4433"
# All tests (draft-16)
python -m aiomoqt.examples.moq_interop_client -r "moqt://moqx-000.ci.openmoq.org:4433" --draft 16
# Single test case
python -m aiomoqt.examples.moq_interop_client -r "moqt://relay:4433" -t subscribe-error
# List test cases
python -m aiomoqt.examples.moq_interop_client -l
Relay Probe
python -m aiomoqt.examples.relay_probe
Environment variables: RELAYS_FILE, OUTPUT_FILE,
PROBE_TIMEOUT, PROBE_INTERVAL, PROBE_ONCE
WebTransport Server
python -m aiomoqt.examples.server_example \
--certificate cert.pem --private-key key.pem --port 4433
Example Reference
| Example | Description |
|---|---|
pub_example.py |
Publisher — SubgroupHeader streams or ObjectDatagrams |
sub_example.py |
Subscriber — receives data from a relay |
join_example.py |
SUBSCRIBE + FETCH (join mid-stream) |
bench_pub.py |
Publisher benchmark, configurable parameters |
bench_sub.py |
Subscriber with latency/jitter/loss stats |
bench_relay.py |
Combined pub/sub benchmark |
bench_loopback.py |
Local loopback benchmark (no relay) |
server_example.py |
WebTransport server (origin) |
relay_probe.py |
Relay version probe (draft-14/16) |
moq_interop_client.py |
Interop test client (6 test cases, TAP14) |
Interop Test Results
All 6 moq-interop-runner test cases pass against multiple relays on draft-14 and draft-16:
| Relay | Draft | Transport | Tests | Result |
|---|---|---|---|---|
| OpenMoQ moqx | draft-16 | Raw QUIC | 6/6 | PASS |
| OpenMoQ moqx | draft-14 | Raw QUIC | 6/6 | PASS |
| Meta moxygen | draft-16 | Raw QUIC | 6/6 | PASS |
| Meta moxygen | draft-14 | Raw QUIC | 6/6 | PASS |
| Cloudflare moq-rs | draft-14 | Raw QUIC | 6/6 | PASS |
| Red5 Pro | draft-14 | Raw QUIC | 6/6 | PASS |
| Red5 Pro | draft-14 | WebTransport | 6/6 | PASS |
| Quicr libquicr | draft-14 | Raw QUIC | 5/6 | PASS |
Test cases: setup-only, announce-only, publish-namespace-done,
subscribe-error, announce-subscribe, subscribe-before-announce
Development
git clone https://github.com/gmarzot/aiomoqt.git
cd aiomoqt
python3 -m venv .venv && source .venv/bin/activate
pip install -e ".[test]"
pytest aiomoqt/tests/
Optionally, ./bootstrap_python.sh sets up a full uv-managed environment
with a specific Python version and Cython.
TODO
- Transition from qh3 to aiopquic transport for performance
- Track data modules:
- File transfer (or MOQT File Format?)
- Interactive chat
- MSF/LOC media packaging
- CMSF media packaging
- Dockerize interop test client for moq-interop-runner registration
- Simple relay implementation
Contributing
Contributions are welcome! Please fork the repository, create a branch, and submit a pull request. For major changes, open an issue first.
Resources
Author
Giovanni Marzot — gmarzot@marzresearch.net | moqarean.marzresearch.net
Acknowledgements
This project takes inspiration from, and has benefited from the great work done by the OpenMoQ/moxygen team, and the continued efforts of the MOQ IETF WG.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aiomoqt-0.5.15.tar.gz.
File metadata
- Download URL: aiomoqt-0.5.15.tar.gz
- Upload date:
- Size: 77.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
34441d49971041724a03cf0467a2fb39a58c21d78d854eff7bdffda14bd4975b
|
|
| MD5 |
e67fcd4a11df5184530df0c15edfff85
|
|
| BLAKE2b-256 |
0f41773c8ecbbeb868cadfffe0898cd5ca0a8897c019863be238722cac9b7b3f
|
File details
Details for the file aiomoqt-0.5.15-py3-none-any.whl.
File metadata
- Download URL: aiomoqt-0.5.15-py3-none-any.whl
- Upload date:
- Size: 86.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
750057a98e8a64b24569c6d4e549a33764076e1e4f181f3aa6dd6ab5e6a7dd6f
|
|
| MD5 |
5f91be1c81f300f5ef999b3cb245b1a0
|
|
| BLAKE2b-256 |
bfd9c95216edf0b0e6c6b3c100b2c977b8eb18807cb65c27c4ecd56d758c0ed8
|