Lightweight Wire.Band client — semantic data middleware for any domain (IoT, AI/ML, DeFi, legal, geospatial, supply chain, and more)
Project description
wire-band-edge
Lightweight Wire.Band client — semantic data middleware for any domain. Connects any data source to a Wire.Band backend: IoT sensors, AI/ML pipelines, DeFi activity, legal records, geospatial streams, supply chain events, and more.
Runs anywhere Python runs: subscribe to an MQTT broker, ingest directly from code, or bridge any structured data source. Classifies each message into a typed semantic frame, buffers locally during connectivity loss, and forwards event batches to a Wire.Band backend over HTTP.
Features
- Zero private dependencies — installs anywhere with
pip install wire-band-edge - Universal domain coverage — 650+ symbol types across IoT, AI/ML, DeFi, legal, geographic, temporal, security, storage, networking, and more
- MQTT bridge — subscribes to topic patterns, classifies + frames each message
- Local ring buffer — survives network interruptions; events are retried with exponential backoff
- Delta filtering — suppress readings that haven't meaningfully changed (configurable threshold)
- Semantic classification — maps topic paths and payload keys to typed symbols across all supported domains
- Async throughout — built on
asyncio+httpx; non-blocking on constrained hardware
Installation
# Core client (HTTP flush only)
pip install wire-band-edge
# With MQTT broker support
pip install "wire-band-edge[mqtt]"
Python 3.10+ required.
Quick Start
CLI (after install)
# Basic — bridge all topics to a local backend
wire-band-edge --broker mqtt://localhost:1883 --backend http://localhost:8000
# With device ID and delta filtering
wire-band-edge \
--broker mqtt://broker.example.com:1883 \
--topics "sensors/#" "machines/#" \
--device-id factory-rpi4 \
--delta-threshold 0.02 \
--backend http://my-wireband-server:8000
# Or with python -m
python -m wire_band_edge --help
Full MQTT gateway
import asyncio
from wire_band_edge import WireBandEdgeClient, MQTTConnector
async def main():
client = WireBandEdgeClient(
backend_url="http://your-wireband-server:8000",
device_id="factory-floor-rpi4",
delta_threshold=0.02, # suppress readings with <2% change
)
connector = MQTTConnector("mqtt://localhost:1883")
await client.run_mqtt(connector, topics=["sensors/#", "machines/#"])
asyncio.run(main())
Programmatic ingest (serial drivers, custom sensors)
import asyncio
from wire_band_edge import WireBandEdgeClient
async def main():
async with WireBandEdgeClient(
backend_url="http://your-wireband-server:8000",
device_id="serial-node-01",
) as client:
await client.ingest({"temp": 72.3, "humidity": 45.1}, topic="env/zone-a")
await client.ingest({"rpm": 1420, "current": 3.2}, topic="motor/spindle")
asyncio.run(main())
TLS broker with authentication
connector = MQTTConnector(
broker_url="mqtts://broker.example.com:8883",
username="gateway",
password="secret",
delta_threshold=0.01,
)
Configuration
WireBandEdgeClient
| Parameter | Default | Description |
|---|---|---|
backend_url |
http://localhost:8000 |
Wire.Band backend URL |
device_id |
edge-node |
Unique gateway identifier |
api_key |
None |
Bearer token for backend auth |
buffer_size |
50000 |
Local ring buffer capacity (events) |
flush_interval |
1.0 |
Flush attempt interval (seconds) |
flush_batch |
200 |
Max events per HTTP request |
max_retries |
3 |
Retry attempts before backoff |
backoff_base |
2.0 |
Exponential backoff base (seconds) |
delta_threshold |
0.0 |
Numeric delta filter threshold (0 = forward all) |
MQTTConnector
| Parameter | Default | Description |
|---|---|---|
broker_url |
mqtt://localhost:1883 |
Broker URL (mqtt:// or mqtts://) |
client_id |
wire-band-connector |
MQTT client identifier |
username |
None |
Broker username |
password |
None |
Broker password |
keepalive |
60 |
MQTT keepalive interval (seconds) |
max_queue_size |
10000 |
Max buffered events before drop |
delta_threshold |
0.0 |
Numeric delta filter threshold |
Semantic Classification
Each MQTT message is automatically classified to a typed symbol based on the topic path and payload keys. Classifications are prioritised in order:
- Exact topic overrides (
register_topic_symbol) - Topic path keyword matching (rightmost segment first)
- Payload key heuristics
- Fallback: generic sensor poll
Topic keyword examples:
| Topic contains | Symbol type |
|---|---|
temp, temperature, humidity, pressure |
Hardware Sensor |
motor, servo, relay, led, buzzer |
Hardware Actuator |
gpio, pin, pwm, adc, dac |
Hardware GPIO |
ota, update, watchdog, twin, shadow |
Edge Lifecycle |
metrics, telemetry, counter, event |
Metrics |
gps, location, geo, geofence, nav |
Geographic |
ai, ml, embed, classify, vision |
AI / ML |
security, threat, audit, encrypt |
Security |
blockchain, ethereum, swap, defi |
DeFi / Protocol |
schedule, cron, timer, sla |
Temporal |
dns, tls, http, grpc, p2p |
Network |
work, task, swarm, consensus |
Agent / System |
Register custom mappings at runtime:
connector.register_topic_symbol("factory/line-1/emergency-stop", 0xFC6D) # SENSOR_ALERT
connector.register_keyword_symbol("spindle", 0xFC90) # ACTUATOR_MOTOR_START
Stats
Both WireBandEdgeClient and MQTTConnector expose a .stats() method:
print(client.stats())
# {
# "events_ingested": 14823,
# "events_flushed": 14800,
# "events_dropped": 0,
# "flush_errors": 0,
# "bytes_sent": 187432,
# "buffer_depth": 23,
# "buffer_capacity": 50000,
# "uptime_seconds": 3612.4,
# "device_id": "factory-floor-rpi4",
# "backend_url": "http://your-wireband-server:8000"
# }
print(connector.stats())
# {
# "messages_received": 14823,
# "messages_compressed": 14823,
# "messages_delta_filtered": 412,
# "bytes_in": 1923441,
# "bytes_out": 187432,
# "compression_ratio": 10.27,
# "connected": true,
# "uptime_seconds": 3612.4
# }
WebSocket Streaming (v0.4.0+)
For high-frequency sensors or compliance-critical streams where the 1-second HTTP batch flush interval adds too much latency, use WireBandStreamClient to deliver each event immediately over a persistent WebSocket connection.
pip install "wire-band-edge[stream]"
import asyncio
from wire_band_edge import WireBandStreamClient
async def main():
async with WireBandStreamClient(
backend_url="http://your-wireband-server:8000",
device_id="compliance-node-01",
) as client:
# Each send() delivers immediately — no buffering, no batch interval
await client.send({"value": 72.3, "unit": "C"}, topic="sensors/temp")
await client.send({"event": "access", "user": "admin"}, topic="audit/access")
print(client.stats())
# {
# "events_sent": 2,
# "send_errors": 0,
# "reconnects": 0,
# "connected": true,
# "uptime_seconds": 0.1,
# "device_id": "compliance-node-01"
# }
asyncio.run(main())
WireBandStreamClient automatically reconnects with exponential backoff if the connection drops.
Choosing between HTTP batch and WebSocket streaming
WireBandEdgeClient (HTTP) |
WireBandStreamClient (WS) |
|
|---|---|---|
| Latency | Up to flush_interval (default 1s) |
Sub-millisecond |
| Throughput | High — up to 200 events/request | Per-event |
| Resilience | Local ring buffer survives network loss | Reconnects on disconnect |
| Best for | IoT telemetry, bulk ingest | Compliance audit, high-frequency signals |
Links
- Homepage: https://wire.band
- Repository: https://github.com/maco144/wireband
- Issues: https://github.com/maco144/wireband/issues
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file wire_band_edge-0.4.0.tar.gz.
File metadata
- Download URL: wire_band_edge-0.4.0.tar.gz
- Upload date:
- Size: 45.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
74951683d8cca2e095120a3f1a2cd94477f51794c04af30baba19111f8df70e6
|
|
| MD5 |
13412c94f158a6b0b452d26f183038e3
|
|
| BLAKE2b-256 |
65f9142c4435108d91b78952e9de7e3d98aeeee6b641a348114a135f4938ee1a
|
File details
Details for the file wire_band_edge-0.4.0-py3-none-any.whl.
File metadata
- Download URL: wire_band_edge-0.4.0-py3-none-any.whl
- Upload date:
- Size: 49.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4ab614b7078ba7aad8396959ee9c0fdadd68f684813c13dac61e0c950f99d001
|
|
| MD5 |
eb2bf2cd43065fd2abebad0c7f058769
|
|
| BLAKE2b-256 |
dbb510b0cbb0c4b41461ef8a3720cc3a8c4facc5405ad71071c49e2bd0d36f65
|