Skip to main content

Lightweight Wire.Band client — semantic data middleware for any domain (IoT, AI/ML, DeFi, legal, geospatial, supply chain, and more)

Project description

wire-band-edge

Lightweight Wire.Band client — semantic data middleware for any domain. Connects any data source to a Wire.Band backend: IoT sensors, AI/ML pipelines, DeFi activity, legal records, geospatial streams, supply chain events, and more.

Runs anywhere Python runs: subscribe to an MQTT broker, ingest directly from code, or bridge any structured data source. Classifies each message into a typed semantic frame, buffers locally during connectivity loss, and forwards event batches to a Wire.Band backend over HTTP.

Features

  • Zero private dependencies — installs anywhere with pip install wire-band-edge
  • Universal domain coverage — 650+ symbol types across IoT, AI/ML, DeFi, legal, geographic, temporal, security, storage, networking, and more
  • MQTT bridge — subscribes to topic patterns, classifies + frames each message
  • Local ring buffer — survives network interruptions; events are retried with exponential backoff
  • Delta filtering — suppress readings that haven't meaningfully changed (configurable threshold)
  • Semantic classification — maps topic paths and payload keys to typed symbols across all supported domains
  • Async throughout — built on asyncio + httpx; non-blocking on constrained hardware

Installation

# Core client (HTTP flush only)
pip install wire-band-edge

# With MQTT broker support
pip install "wire-band-edge[mqtt]"

Python 3.10+ required.

Quick Start

CLI (after install)

# Basic — bridge all topics to a local backend
wire-band-edge --broker mqtt://localhost:1883 --backend http://localhost:8000

# With device ID and delta filtering
wire-band-edge \
    --broker mqtt://broker.example.com:1883 \
    --topics "sensors/#" "machines/#" \
    --device-id factory-rpi4 \
    --delta-threshold 0.02 \
    --backend http://my-wireband-server:8000

# Or with python -m
python -m wire_band_edge --help

Full MQTT gateway

import asyncio
from wire_band_edge import WireBandEdgeClient, MQTTConnector

async def main():
    client = WireBandEdgeClient(
        backend_url="http://your-wireband-server:8000",
        device_id="factory-floor-rpi4",
        delta_threshold=0.02,   # suppress readings with <2% change
    )
    connector = MQTTConnector("mqtt://localhost:1883")
    await client.run_mqtt(connector, topics=["sensors/#", "machines/#"])

asyncio.run(main())

Programmatic ingest (serial drivers, custom sensors)

import asyncio
from wire_band_edge import WireBandEdgeClient

async def main():
    async with WireBandEdgeClient(
        backend_url="http://your-wireband-server:8000",
        device_id="serial-node-01",
    ) as client:
        await client.ingest({"temp": 72.3, "humidity": 45.1}, topic="env/zone-a")
        await client.ingest({"rpm": 1420, "current": 3.2},    topic="motor/spindle")

asyncio.run(main())

TLS broker with authentication

connector = MQTTConnector(
    broker_url="mqtts://broker.example.com:8883",
    username="gateway",
    password="secret",
    delta_threshold=0.01,
)

Configuration

WireBandEdgeClient

Parameter Default Description
backend_url http://localhost:8000 Wire.Band backend URL
device_id edge-node Unique gateway identifier
api_key None Bearer token for backend auth
buffer_size 50000 Local ring buffer capacity (events)
flush_interval 1.0 Flush attempt interval (seconds)
flush_batch 200 Max events per HTTP request
max_retries 3 Retry attempts before backoff
backoff_base 2.0 Exponential backoff base (seconds)
delta_threshold 0.0 Numeric delta filter threshold (0 = forward all)

MQTTConnector

Parameter Default Description
broker_url mqtt://localhost:1883 Broker URL (mqtt:// or mqtts://)
client_id wire-band-connector MQTT client identifier
username None Broker username
password None Broker password
keepalive 60 MQTT keepalive interval (seconds)
max_queue_size 10000 Max buffered events before drop
delta_threshold 0.0 Numeric delta filter threshold

Semantic Classification

Each MQTT message is automatically classified to a typed symbol based on the topic path and payload keys. Classifications are prioritised in order:

  1. Exact topic overrides (register_topic_symbol)
  2. Topic path keyword matching (rightmost segment first)
  3. Payload key heuristics
  4. Fallback: generic sensor poll

Topic keyword examples:

Topic contains Symbol type
temp, temperature, humidity, pressure Hardware Sensor
motor, servo, relay, led, buzzer Hardware Actuator
gpio, pin, pwm, adc, dac Hardware GPIO
ota, update, watchdog, twin, shadow Edge Lifecycle
metrics, telemetry, counter, event Metrics
gps, location, geo, geofence, nav Geographic
ai, ml, embed, classify, vision AI / ML
security, threat, audit, encrypt Security
blockchain, ethereum, swap, defi DeFi / Protocol
schedule, cron, timer, sla Temporal
dns, tls, http, grpc, p2p Network
work, task, swarm, consensus Agent / System

Register custom mappings at runtime:

connector.register_topic_symbol("factory/line-1/emergency-stop", 0xFC6D)  # SENSOR_ALERT
connector.register_keyword_symbol("spindle", 0xFC90)                       # ACTUATOR_MOTOR_START

Stats

Both WireBandEdgeClient and MQTTConnector expose a .stats() method:

print(client.stats())
# {
#   "events_ingested": 14823,
#   "events_flushed": 14800,
#   "events_dropped": 0,
#   "flush_errors": 0,
#   "bytes_sent": 187432,
#   "buffer_depth": 23,
#   "buffer_capacity": 50000,
#   "uptime_seconds": 3612.4,
#   "device_id": "factory-floor-rpi4",
#   "backend_url": "http://your-wireband-server:8000"
# }

print(connector.stats())
# {
#   "messages_received": 14823,
#   "messages_compressed": 14823,
#   "messages_delta_filtered": 412,
#   "bytes_in": 1923441,
#   "bytes_out": 187432,
#   "compression_ratio": 10.27,
#   "connected": true,
#   "uptime_seconds": 3612.4
# }

WebSocket Streaming (v0.4.0+)

For high-frequency sensors or compliance-critical streams where the 1-second HTTP batch flush interval adds too much latency, use WireBandStreamClient to deliver each event immediately over a persistent WebSocket connection.

pip install "wire-band-edge[stream]"
import asyncio
from wire_band_edge import WireBandStreamClient

async def main():
    async with WireBandStreamClient(
        backend_url="http://your-wireband-server:8000",
        device_id="compliance-node-01",
    ) as client:
        # Each send() delivers immediately — no buffering, no batch interval
        await client.send({"value": 72.3, "unit": "C"}, topic="sensors/temp")
        await client.send({"event": "access", "user": "admin"}, topic="audit/access")

        print(client.stats())
        # {
        #   "events_sent": 2,
        #   "send_errors": 0,
        #   "reconnects": 0,
        #   "connected": true,
        #   "uptime_seconds": 0.1,
        #   "device_id": "compliance-node-01"
        # }

asyncio.run(main())

WireBandStreamClient automatically reconnects with exponential backoff if the connection drops.

Choosing between HTTP batch and WebSocket streaming

WireBandEdgeClient (HTTP) WireBandStreamClient (WS)
Latency Up to flush_interval (default 1s) Sub-millisecond
Throughput High — up to 200 events/request Per-event
Resilience Local ring buffer survives network loss Reconnects on disconnect
Best for IoT telemetry, bulk ingest Compliance audit, high-frequency signals

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

wire_band_edge-0.4.0.tar.gz (45.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

wire_band_edge-0.4.0-py3-none-any.whl (49.7 kB view details)

Uploaded Python 3

File details

Details for the file wire_band_edge-0.4.0.tar.gz.

File metadata

  • Download URL: wire_band_edge-0.4.0.tar.gz
  • Upload date:
  • Size: 45.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for wire_band_edge-0.4.0.tar.gz
Algorithm Hash digest
SHA256 74951683d8cca2e095120a3f1a2cd94477f51794c04af30baba19111f8df70e6
MD5 13412c94f158a6b0b452d26f183038e3
BLAKE2b-256 65f9142c4435108d91b78952e9de7e3d98aeeee6b641a348114a135f4938ee1a

See more details on using hashes here.

File details

Details for the file wire_band_edge-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: wire_band_edge-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 49.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for wire_band_edge-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 4ab614b7078ba7aad8396959ee9c0fdadd68f684813c13dac61e0c950f99d001
MD5 eb2bf2cd43065fd2abebad0c7f058769
BLAKE2b-256 dbb510b0cbb0c4b41461ef8a3720cc3a8c4facc5405ad71071c49e2bd0d36f65

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page