MAGPIE is a lightweight, modular messaging engine providing high-performance streaming and RPC over pluggable transports.
Project description
MAGPIE
Message Abstraction & General-Purpose Integration Engine
MAGPIE is a transport-agnostic messaging and RPC framework for developers and AI agents.
Whether the wire is ZeroMQ, MQTT, WebRTC, or something entirely custom, the application layer never changes. Services built with MAGPIE are natively consumable by both code and AI tools via built-in MCP support — making it a natural integration engine for distributed systems, edge devices, and the next generation of AI-driven pipelines.
Table of Contents
Features
- One API, any transport —
StreamWriter,StreamReader,RpcRequester,RpcResponderwork identically over ZMQ, MQTT, and WebRTC; swap transports with one constructor change - Topic-based streaming — high-throughput pub/sub via typed frames; publishers and subscribers are completely decoupled
- Request / Response RPC — synchronous request/reply with ACK, timeout, and per-call demux over any transport
- Schema-based RPC — JSON-RPC 2.0 dispatch via
JsonRpcSchema; define your API once, call methods by name with the proxy interface (client.add(a=3, b=4)) - MCP support out of the box —
McpSchematurns any MAGPIE RPC responder into a fully compliant MCP tool server (initialize,tools/list,tools/call);McpTransportlets any FastMCPClientcall those tools over ZMQ, MQTT, or WebRTC - MQTT transport — full streaming and RPC over MQTT; shared connection; supports
mqtt://,mqtts://,ws://,wss://, TLS, auth, LWT, and auto-reconnect - WebRTC transport — P2P streaming, video/audio, and RPC over WebRTC; MQTT or ZMQ used only for the initial signaling handshake; STUN + optional TURN for NAT traversal
- Typed frames —
ImageFrameJpeg,ImageFrameCV,AudioFrameRaw,AudioFrameFlac, and more; automatic serialization/deserialization across all transports - Node helpers —
SourceNode,SinkNode,ProcessNode,ServerNodeadd lifecycle and thread management on top of the raw transport primitives - Network discovery — mDNS/Zeroconf node advertisement and scanning via
ZconfDiscovery - CLI tools —
magpie-write,magpie-read,magpie-requestand MQTT/WebRTC equivalents; video/audio capture and playback tools - Lightweight core — ZeroMQ is the only core dependency; all media and protocol extras are opt-in
Installation
Core (ZMQ streaming + RPC)
pip install luxai-magpie
Optional extras
| Extra | What it adds |
|---|---|
pip install "luxai-magpie[mqtt]" |
MQTT transport + MQTT CLI tools |
pip install "luxai-magpie[webrtc]" |
WebRTC transport — P2P streaming, video/audio, RPC over internet |
pip install "luxai-magpie[mcp]" |
MCP adapter — McpTransport for FastMCP Client |
pip install "luxai-magpie[audio]" |
Audio frames + capture/player CLI tools |
pip install "luxai-magpie[video]" |
Image frames + capture/viewer CLI tools |
pip install "luxai-magpie[discovery]" |
magpie-discovery CLI tool |
pip install "luxai-magpie[full]" |
All of the above |
Quick Start
ZMQ Streaming
Writer:
import time
from luxai.magpie.transport import ZmqStreamWriter
writer = ZmqStreamWriter("tcp://*:5555")
i = 0
while True:
try:
writer.write({'id': i, 'value': 'hello'}, topic='/mytopic')
i += 1
time.sleep(1)
except KeyboardInterrupt:
writer.close()
break
Reader:
from luxai.magpie.transport import ZmqStreamReader
reader = ZmqStreamReader("tcp://127.0.0.1:5555", topic=['/mytopic'], bind=False)
while True:
try:
data, topic = reader.read()
print(f"{topic}: {data}")
except KeyboardInterrupt:
reader.close()
break
ZMQ Request / Response RPC
Responder:
from luxai.magpie.transport import ZMQRpcResponder
def handle(request):
return {'status': 'ok', 'echo': request}
server = ZMQRpcResponder("tcp://*:5556")
while True:
try:
server.handle_once(handler=handle, timeout=1.0)
except TimeoutError:
pass
except KeyboardInterrupt:
server.close()
break
Requester:
from luxai.magpie.transport import ZMQRpcRequester
client = ZMQRpcRequester("tcp://127.0.0.1:5556")
try:
response = client.call({'action': 'greet', 'name': 'Bob'}, timeout=3.0)
print("Response:", response)
except TimeoutError:
print("Request timed out")
finally:
client.close()
MQTT Streaming
MQTT uses a shared connection — create it once, pass it to any number of writers, readers, and RPC components.
Writer:
from luxai.magpie.transport import MqttConnection, MqttStreamWriter
conn = MqttConnection("mqtt://broker.hivemq.com:1883")
conn.connect()
writer = MqttStreamWriter(conn)
writer.write({"sensor": "temp", "value": 22.5}, topic="sensors/temperature")
writer.close()
conn.disconnect()
Reader:
from luxai.magpie.transport import MqttConnection, MqttStreamReader
conn = MqttConnection("mqtt://broker.hivemq.com:1883")
conn.connect()
reader = MqttStreamReader(conn, topic="sensors/temperature")
while True:
try:
data, topic = reader.read(timeout=5.0)
print(f"{topic}: {data}")
except KeyboardInterrupt:
reader.close()
break
conn.disconnect()
MQTT Request / Response RPC
Responder:
from luxai.magpie.transport import MqttConnection, MqttRpcResponder
conn = MqttConnection("mqtt://broker.hivemq.com:1883")
conn.connect()
def handle(request):
return {"status": "ok", "echo": request}
server = MqttRpcResponder(conn, service_name="myservice/actions")
while True:
try:
server.handle_once(handler=handle, timeout=1.0)
except TimeoutError:
pass
except KeyboardInterrupt:
server.close()
break
conn.disconnect()
Requester:
from luxai.magpie.transport import MqttConnection, MqttRpcRequester
conn = MqttConnection("mqtt://broker.hivemq.com:1883")
conn.connect()
client = MqttRpcRequester(conn, service_name="myservice/actions")
try:
response = client.call({"action": "move", "x": 1.0}, timeout=5.0)
print("Response:", response)
except TimeoutError:
print("Request timed out")
finally:
client.close()
conn.disconnect()
MQTT Advanced Options
from luxai.magpie.transport import (
MqttConnection, MqttOptions,
MqttAuthOptions, MqttTlsOptions, MqttWillOptions, MqttDefaultsOptions,
)
conn = MqttConnection(
"wss://broker.example.com:8884/mqtt",
client_id="node-01",
options=MqttOptions(
auth=MqttAuthOptions(mode="username_password", username="node", password="secret"),
tls=MqttTlsOptions(ca_file="/etc/ssl/certs/ca.pem", verify_peer=True),
will=MqttWillOptions(enabled=True, topic="nodes/node-01/status",
payload="offline", qos=1, retain=True),
defaults=MqttDefaultsOptions(publish_qos=1, subscribe_qos=1),
),
)
conn.connect()
WebRTC Streaming
WebRTC enables P2P communication over the internet — no broker in the data path after the initial signaling handshake. Signaling is exchanged via MQTT (internet) or ZMQ (LAN).
Video and audio frames are carried over native WebRTC RTP media tracks when topics are declared in WebRTCOptions; all other data flows over the data channel.
Writer (MQTT signaling):
from luxai.magpie.transport.webrtc import WebRTCConnection, WebRtcStreamWriter, WebRTCOptions
conn = WebRTCConnection.with_mqtt(
"mqtt://broker.hivemq.com:1883", session_id="my-node",
options=WebRTCOptions(video_topics=["/camera/color/image"]),
)
conn.connect()
writer = WebRtcStreamWriter(conn)
writer.write({"telemetry": [0.1, 0.2, 0.3]}, topic="service/state") # → data channel
writer.write(ImageFrameRaw(...), topic="/camera/color/image") # → RTP video track
writer.close()
conn.disconnect()
Reader:
from luxai.magpie.transport.webrtc import WebRTCConnection, WebRtcStreamReader, WebRTCOptions
conn = WebRTCConnection.with_mqtt(
"mqtt://broker.hivemq.com:1883", session_id="my-node",
options=WebRTCOptions(video_topics=["/camera/color/image"]),
)
conn.connect()
reader = WebRtcStreamReader(conn, topic="service/state")
vreader = WebRtcStreamReader(conn, topic="/camera/color/image")
data, _ = reader.read(timeout=5.0)
frame, _ = vreader.read(timeout=5.0) # ImageFrameRaw
reader.close()
vreader.close()
conn.disconnect()
LAN / localhost: replace
with_mqtt(...)withwith_zmq("tcp://127.0.0.1:5555", ..., bind=True/False)— no broker needed.
WebRTC Request / Response RPC
No broker in the hot path — the data channel is bidirectional P2P, so no reply_to topic is needed.
Responder:
from luxai.magpie.transport.webrtc import WebRTCConnection, WebRTCRpcResponder
conn = WebRTCConnection.with_mqtt("mqtt://broker.hivemq.com:1883", session_id="my-node-rpc")
conn.connect()
def handle(request):
return {"status": "ok", "echo": request}
server = WebRTCRpcResponder(conn, service_name="service/actions")
while True:
try:
server.handle_once(handler=handle, timeout=1.0)
except TimeoutError:
pass
except KeyboardInterrupt:
server.close()
break
conn.disconnect()
Requester:
from luxai.magpie.transport.webrtc import WebRTCConnection, WebRTCRpcRequester
conn = WebRTCConnection.with_mqtt("mqtt://broker.hivemq.com:1883", session_id="my-node-rpc")
conn.connect()
client = WebRTCRpcRequester(conn, service_name="service/actions")
try:
response = client.call({"action": "move", "x": 1.0}, timeout=5.0)
print("Response:", response)
except TimeoutError:
print("Request timed out")
finally:
client.close()
conn.disconnect()
WebRTC Advanced Options
from luxai.magpie.transport.webrtc import WebRTCConnection, WebRTCOptions, WebRTCTurnServer
opts = WebRTCOptions(
stun_servers=["stun:stun.l.google.com:19302"],
turn_servers=[WebRTCTurnServer(url="turn:myturn.server:3478", username="u", credential="p")],
ice_transport_policy="all", # "all" or "relay" (force TURN only)
video_codec="H264", # "H264", "VP8", "VP9"
video_bitrate=2000, # kbps
video_topics=["/camera/color/image"],
audio_topics=["/mic/audio/stream"],
use_media_channels=True,
)
conn = WebRTCConnection.with_mqtt("mqtt://broker.hivemq.com:1883", "my-node", options=opts)
conn.connect()
Auto-reconnect:
conn = WebRTCConnection.with_mqtt("mqtt://broker.hivemq.com:1883",
session_id="my-node", reconnect=True)
Schema-based RPC
JsonRpcSchema adds JSON-RPC 2.0 dispatch on top of any MAGPIE transport. Define your API once — shape, description, and types — then attach handlers and call methods by name. The same schema object works on both sides.
Responder — three ways to define methods:
from luxai.magpie.transport import ZMQRpcResponder
from luxai.magpie.schema import JsonRpcSchema
schema = JsonRpcSchema()
# Way A: decorator — shape and handler together (infers schema from type hints)
@schema.method()
def add(a: float, b: float) -> float:
"""Add two numbers."""
return a + b
# Way B: load from a standard JSON Schema file, attach handlers separately
schema2 = JsonRpcSchema.from_json_file("api.json")
@schema2.handler("convert")
def handle_convert(value, from_unit, to_unit):
return {"result": value, "unit": to_unit}
# Way C: explicit register — full control over schema
schema.register(
name="scale",
func=lambda value, factor: value * factor,
description="Scale a value",
input_schema={
"type": "object",
"properties": {
"value": {"type": "number"},
"factor": {"type": "number"},
},
"required": ["value", "factor"],
},
)
server = ZMQRpcResponder("tcp://*:5556", schema=schema)
while True:
try:
server.handle_once(timeout=1.0)
except TimeoutError:
pass
except KeyboardInterrupt:
server.close()
break
The JSON file (api.json) uses standard MCP/JSON Schema format. description and outputSchema are optional; inputSchema defaults to {} if omitted. outputSchema must describe an object ("type": "object") — scalar return types are expressed only in content[0].text, not as structured output.
[
{
"name": "convert",
"description": "Convert a value from one unit to another",
"inputSchema": {
"type": "object",
"properties": {
"value": {"type": "number"},
"from_unit": {"type": "string"},
"to_unit": {"type": "string"}
},
"required": ["value", "from_unit", "to_unit"]
}
},
{
"name": "get_status",
"description": "Return the current service status",
"inputSchema": {
"type": "object",
"properties": {
"service": {"type": "string"}
},
"required": ["service"]
},
"outputSchema": {
"type": "object",
"properties": {
"status": {"type": "string"},
"uptime": {"type": "number"}
}
}
}
]
Requester — proxy interface:
from luxai.magpie.transport import ZMQRpcRequester
from luxai.magpie.schema import JsonRpcSchema, JsonRpcError
# Define shape on the requester side (stub bodies are fine)
schema = JsonRpcSchema()
@schema.method()
def add(a: float, b: float) -> float: ...
client = ZMQRpcRequester("tcp://127.0.0.1:5556", schema=schema)
# Proxy style — method name as attribute
result = client.add(a=3, b=4) # → 7
# Base call style
result = client.call("add", a=3, b=4) # → 7
# With explicit transport timeout
result = client.call("add", a=3, b=4, _timeout=5.0)
try:
client.call("nonexistent")
except JsonRpcError as e:
print(e.code, e.message) # -32601 Method not found
client.close()
MCP Integration
MAGPIE has native MCP support on both sides of the connection — no separate MCP server process required.
Server side — McpSchema extends JsonRpcSchema with the full MCP handshake. Any registered method is automatically exposed as an MCP tool.
Agent / cloud side — McpTransport is a FastMCP ClientTransport that wraps any MAGPIE RpcRequester. The caller creates and owns the requester; McpTransport borrows it.
The key value proposition: a service behind NAT connects outbound to a broker; an LLM agent on the cloud connects to the same broker. No port forwarding, no VPN.
pip install "luxai-magpie[mcp]" # MCP + FastMCP (ZMQ always available)
pip install "luxai-magpie[mqtt,mcp]" # add MQTT transport
pip install "luxai-magpie[webrtc,mcp]" # add WebRTC transport
Server side — serve tools over any transport
from luxai.magpie.schema import McpSchema
schema = McpSchema(name="my-service", version="1.0.0")
@schema.method()
def translate(text: str, target_lang: str) -> dict:
"""Translate text into the target language."""
return {"translated": f"[{target_lang}] {text}", "lang": target_lang}
@schema.method()
def summarize(text: str, max_length: int) -> dict:
"""Summarize text to at most max_length characters."""
return {"summary": text[:max_length]}
Attach to any responder:
# ZMQ — no broker needed
from luxai.magpie.transport import ZMQRpcResponder
server = ZMQRpcResponder("tcp://*:5556", schema=schema)
# MQTT — service behind NAT
from luxai.magpie.transport.mqtt import MqttConnection
from luxai.magpie.transport import MqttRpcResponder
conn = MqttConnection("mqtt://broker.hivemq.com:1883")
conn.connect()
server = MqttRpcResponder(conn, service_name="node-01", schema=schema)
# WebRTC — P2P, lowest latency
from luxai.magpie.transport.webrtc import WebRTCConnection, WebRTCRpcResponder
conn = WebRTCConnection.with_mqtt("mqtt://broker.hivemq.com:1883", session_id="node-01")
conn.connect()
server = WebRTCRpcResponder(conn, service_name="node-01", schema=schema)
Serve loop is the same for all:
while True:
try:
server.handle_once(timeout=1.0)
except TimeoutError:
pass
except KeyboardInterrupt:
server.close()
break
Agent / cloud side — call tools with FastMCP Client
import asyncio
from fastmcp import Client
from fastmcp.exceptions import ToolError
from luxai.magpie.adapters.mcp import McpTransport
# ZMQ
from luxai.magpie.transport import ZMQRpcRequester
async def main():
req = ZMQRpcRequester("tcp://127.0.0.1:5556")
async with Client(McpTransport(req)) as client:
tools = await client.list_tools()
for tool in tools:
print(f" {tool.name}: {tool.description}")
result = await client.call_tool("translate", {"text": "Hello", "target_lang": "fr"})
print(result.content[0].text)
try:
await client.call_tool("translate", {"text": "Hello"}) # missing target_lang
except ToolError as e:
print(f"tool error: {e}")
req.close()
asyncio.run(main())
For MQTT or WebRTC, just swap the requester — McpTransport is identical:
# MQTT
from luxai.magpie.transport.mqtt import MqttConnection
from luxai.magpie.transport import MqttRpcRequester
conn = MqttConnection("mqtt://broker.hivemq.com:1883")
conn.connect()
req = MqttRpcRequester(conn, service_name="node-01")
async with Client(McpTransport(req)) as client:
result = await client.call_tool("translate", {"text": "Hello", "target_lang": "fr"})
req.close()
conn.disconnect()
Loading tools from an MCP tool-list file
from luxai.magpie.schema import McpSchema
schema = McpSchema.from_json_file("tools.json") # MCP native format
@schema.handler("translate")
def handle_translate(text: str, target_lang: str) -> dict:
return {"translated": f"[{target_lang}] {text}", "lang": target_lang}
Network Discovery
from luxai.magpie.discovery import ZconfDiscovery
# Advertise a node
with ZconfDiscovery() as disc:
disc.advertise_node("my-node", port=5555, payload={"role": "service"})
input("Press Enter to stop advertising...")
# Discover nodes
with ZconfDiscovery() as disc:
info = disc.resolve_node("my-node", timeout=5.0)
if info:
ip = disc.pick_best_ip(info)
print(f"Found at tcp://{ip}:{info.port}")
CLI Tools
ZMQ Tools
pip install luxai-magpie
magpie-write — publish to a topic:
magpie-write tcp://127.0.0.1:5555 /mytopic '{"name": "Bob", "value": 42}'
magpie-write tcp://127.0.0.1:5555 /mytopic '{"x": 1}' --rate 10 --loop
magpie-write tcp://*:5555 /mytopic @payload.json --bind
magpie-read — subscribe and print:
magpie-read tcp://127.0.0.1:5555 /mytopic --pretty
magpie-read tcp://127.0.0.1:5555 /topic1 /topic2
magpie-request — send an RPC request:
magpie-request tcp://127.0.0.1:5556 '{"action": "greet", "name": "Bob"}' --pretty
magpie-request tcp://127.0.0.1:5556 '{"jsonrpc":"2.0","method":"add","params":{"a":3,"b":4},"id":1}' --pretty
magpie-discovery — mDNS node discovery:
magpie-discovery # scan continuously
magpie-discovery --advertise --port 5555 --id MY_NODE # advertise
Audio / Video tools (require [audio] / [video] extras):
magpie-video-capture tcp://*:5555 /camera --encoder jpeg
magpie-video-viewer tcp://127.0.0.1:5555 /camera
magpie-audio-capture tcp://*:5556 /audio --samplerate 16000
magpie-audio-player tcp://127.0.0.1:5556 /audio
MQTT Tools
pip install "luxai-magpie[mqtt]"
magpie-write-mqtt:
magpie-write-mqtt mqtt://broker.hivemq.com:1883 /magpie/test '{"x": 1}' --rate 5 --loop
magpie-write-mqtt mqtt://broker.hivemq.com:1883 /magpie/status '{"state": "ready"}' --retain
magpie-read-mqtt:
magpie-read-mqtt mqtt://broker.hivemq.com:1883 /magpie/test --pretty
magpie-read-mqtt mqtt://broker.hivemq.com:1883 "/magpie/+" --hz # wildcard + frequency
magpie-request-mqtt:
magpie-request-mqtt mqtt://broker.hivemq.com:1883 myservice/actions '{"action": "run"}' --pretty
magpie-request-mqtt mqtt://broker.hivemq.com:1883 myservice/actions @req.json --timeout 10
Advanced broker options (auth, TLS, QoS, LWT) can be passed via
--mqtt-params @params.json.
WebRTC Tools
pip install "luxai-magpie[webrtc,mqtt]"
Both peers use the same session_id. Signaling via --signaling mqtt://... (internet) or --signaling tcp://... (LAN, add --bind on one side).
magpie-write-webrtc / magpie-read-webrtc:
magpie-write-webrtc my-node /service/state '{"x": 1.0}' --signaling mqtt://broker.hivemq.com:1883
magpie-read-webrtc my-node /service/state --signaling mqtt://broker.hivemq.com:1883 --pretty
magpie-request-webrtc:
magpie-request-webrtc my-node service/actions '{"action": "run"}' \
--signaling mqtt://broker.hivemq.com:1883 --pretty
Video / Audio over WebRTC:
magpie-video-capture-webrtc my-node /camera --signaling mqtt://broker.hivemq.com:1883
magpie-video-viewer-webrtc my-node /camera --signaling mqtt://broker.hivemq.com:1883
magpie-audio-capture-webrtc my-node /audio --signaling mqtt://broker.hivemq.com:1883
magpie-audio-player-webrtc my-node /audio --signaling mqtt://broker.hivemq.com:1883
Architecture
MAGPIE is built around four abstract base classes — StreamWriter, StreamReader, RpcRequester, RpcResponder — that absorb all threading, queuing, and lifecycle complexity. Transport implementations fill in two or three pure transport methods; everything else is handled by the base classes. This makes adding a new transport a matter of minutes, not days, and keeps user code completely transport-agnostic.
For the full architecture diagram, layer-by-layer breakdown, schema and MCP adapter design, and guides for adding new transports, serializers, and frame types, see ARCHITECTURE.md.
Related Projects
| Project | Language | Repository |
|---|---|---|
| MAGPIE | Python | this repo |
| MAGPIE C++ | C++ (libmagpie, libmagpie-mqtt) |
luxai-qtrobot/magpie-cpp |
| MAGPIE.js | TypeScript/JavaScript | luxai-qtrobot/magpie-js |
License
Licensed under the GNU General Public License v3 (GPLv3).
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file luxai_magpie-0.9.3.tar.gz.
File metadata
- Download URL: luxai_magpie-0.9.3.tar.gz
- Upload date:
- Size: 433.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
359aa40d488e654cefce8d0a1b56d070b897a1cfd362aa692dcdb0fcd6ae8564
|
|
| MD5 |
ad6cc33a4a6e6d98e8f5e99256ed8921
|
|
| BLAKE2b-256 |
113b02bed621e4d591b2142909af7d6d9642d428318e9e8cdf4fa11c99ddc8ae
|
File details
Details for the file luxai_magpie-0.9.3-py3-none-any.whl.
File metadata
- Download URL: luxai_magpie-0.9.3-py3-none-any.whl
- Upload date:
- Size: 426.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
039c511463578c34aacd25a68c8a260e92fb8634a9a698789eab2f1fa98c2efc
|
|
| MD5 |
f1f7f534a77e7daa76f15bedbb462259
|
|
| BLAKE2b-256 |
622c13fea105d348974de81ee5b726f3a3539ba3766ac44fb06a67e1376cb2de
|