Skip to main content

Typed MQTT client and topic/payload model for ISA-95-ish messaging.

Project description

FranzMQ

FranzMQ is a structured MQTT communication library for edge and cloud applications. It builds on paho-mqtt and introduces typed payloads, hierarchical topics, priority-based callbacks, and a command/acknowledge pattern -- all with optional ISA-95 topic modeling and TLS auto-configuration.

Features

  • Typed payloads using Python dataclasses with automatic JSON encoding/decoding
  • Priority-based concurrent callbacks for message handling
  • Command/acknowledge pattern with two-phase handshake for request-response over MQTT
  • Class-based topic definitions for type-safe, hierarchical topic construction
  • ISA-95 topic modeling for enterprise-ready messaging structures
  • TLS support with environment-based auto-configuration
  • MQTT-based logging with seamless integration

Installation

pip install franzmq

Quick Start

from franzmq import Client, Topic, Metric

client = Client.autocreate_and_connect(client_id="my-client")

topic = Topic(payload_type=Metric, context=("sensor", "temperature"))
metric = Metric(value=22.5)

client.publish(topic, metric)

Topics

FranzMQ topics follow the structure {prefix}/{version}/{_PayloadType}/{context...}.

Basic Topic

from franzmq import Topic, Metric

topic = Topic(payload_type=Metric, context=("sensor", "temperature"))
# example/v1/_Metric/sensor/temperature

ISA-95 Topic

For enterprise-level communication with ISA-95 hierarchy levels:

from franzmq import Topic, Metric, Isa95Topic, Isa95Fields

basic_topic = Topic(payload_type=Metric, context=("sensor", "temperature"))

isa95_fields = Isa95Fields(
    enterprise="ent1",
    site="s1",
    area="a1",
    production_line="pl1",
    work_cell="wc1",
    origin_id="origin1"
)
isa95_topic = Isa95Topic.from_topic(basic_topic, isa95_fields)
# example/v1-isa95/ent1/s1/a1/pl1/wc1/origin1/_Metric/sensor/temperature

Typed Payloads

All messages use structured dataclasses that encode/decode automatically to/from JSON. The following payload types are included:

Payload Purpose
Metric Timestamped measurement values
Log Structured log entries (level, message, module, etc.)
ServiceDetails Service registration with type and metadata
Cmd Command with correlation ID and expiration
Ack Acknowledgement with result code and message

Custom payloads extend the Payload base class:

from dataclasses import dataclass
from franzmq import Payload

@dataclass
class SensorReading(Payload):
    sensor_id: str
    value: float
    unit: str

Callback System

Subscribe to topics and register callbacks with optional priority. Callbacks receive a single message: Message argument containing the decoded topic and payload.

from franzmq import Message

def on_metric(message: Message):
    print(f"Received: {message.payload.value} on {message.topic}")

client.subscribe(topic, qos=1, callback=on_metric, priority=10)

Callbacks are ordered by descending priority (higher numbers run first). Callbacks with the same priority are executed concurrently in separate threads.

Command/Acknowledge Pattern

FranzMQ supports request-response semantics over MQTT using a two-phase acknowledgement flow. This avoids the need for a separate API when you need confirmed command execution.

Flow

Sender                          Receiver
  |                               |
  |-- Cmd (correlation_id) ------>|
  |                               | (check expiration)
  |<-- Ack (result_code=-1) ------| (handshake)
  |                               | (execute callback)
  |<-- Ack (result_code=200) -----| (final result)
  |                               |

The handshake ack (result_code=-1) confirms the receiver is alive and processing. If the handshake arrives before the command expires, the sender extends its wait up to max_command_duration.

Result codes

Code Meaning
-1 Handshake (receiver acknowledged receipt)
200 Success
400 Bad request
500 Internal error or timeout
598 Exception in command callback

Sending commands

publish_command subscribes to the ack topic, publishes the command, waits for the two-phase response, and returns the final Ack.

from franzmq import Client, Topic, Cmd, Ack

client = Client.autocreate_and_connect(client_id="sender")

cmd_topic = Topic(
    prefix="myproject",
    payload_type=Cmd,
    context=("device1", "settings")
)

ack = client.publish_command(
    topic=cmd_topic,
    command={"enabled": True, "interval_ms": 500},
    validity_duration=30.0,
    max_command_duration=60.0,
)

if ack.result_code >= 500:
    raise Exception(f"Command failed: {ack.message}")

Receiving commands

subscribe_to_command handles expiration checks, handshake acks, and final acks automatically. The callback receives a Message and returns a result code.

from franzmq import Client, Topic, Cmd, Message

client = Client.autocreate_and_connect(client_id="receiver")

cmd_topic = Topic(
    prefix="myproject",
    payload_type=Cmd,
    context=("device1", "settings")
)

def handle_settings(message: Message) -> int:
    settings = message.payload.command
    apply_settings(settings)
    return 200  # success

client.subscribe_to_command(
    topic=cmd_topic,
    callback=handle_settings,
    qos=1,
)

The callback can return:

  • None -- treated as 200 (success)
  • An int result code
  • A (int, str) tuple of (result_code, message)

Commands for the same topic are executed sequentially via an internal queue.

Custom command payloads

Extend Cmd for typed command payloads:

from dataclasses import dataclass, field
from franzmq import Cmd

@dataclass
class DeviceSettingsCmd(Cmd):
    command: dict = field(default_factory=dict)

Then use DeviceSettingsCmd as the topic's payload_type.

Class-Based Topic Definitions

For projects with many topics, use TopicBase and classproperty to define hierarchical topic trees:

from franzmq import TopicBase, classproperty, Metric
from franzmq.data_contracts.base import ServiceDetails

class DeviceTopic(TopicBase):
    prefix = "myproject"
    version = "v1"
    context = ("device1",)

    @classproperty
    def State(cls):
        return cls._topic(["state"], payload_type=ServiceDetails)

    @classproperty
    def Temperature(cls):
        return cls._topic(["temperature"], payload_type=Metric)

Access topics as class attributes:

DeviceTopic.State        # myproject/v1/_ServiceDetails/device1/state
DeviceTopic.Temperature  # myproject/v1/_Metric/device1/temperature

Nested hierarchies use _parent_class_name and _prefix to compose topic paths from parent classes.

Logging over MQTT

Enable MQTT-based logging by calling:

import logging

client.configure_mqtt_logger(level=logging.INFO)

Auto Configuration via Environment Variables

Uses python-decouple for environment configuration.

Variable Required Default Description
MQTT_IP No broker Broker hostname
MQTT_PORT No 8883 (TLS) / 1883 (plain) Broker port
MQTT_USERNAME No franz Auth username
MQTT_PASSWORD No franz Auth password
USE_MQTTS No True Enable TLS
CA_CERT_FILE If TLS -- CA certificate path
TLS_CERT_FILE If TLS -- Client certificate path
TLS_KEY_FILE If TLS -- Client private key path

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

franzmq-0.4.0.tar.gz (21.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

franzmq-0.4.0-py3-none-any.whl (20.0 kB view details)

Uploaded Python 3

File details

Details for the file franzmq-0.4.0.tar.gz.

File metadata

  • Download URL: franzmq-0.4.0.tar.gz
  • Upload date:
  • Size: 21.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for franzmq-0.4.0.tar.gz
Algorithm Hash digest
SHA256 3008cfab5cc2e25775fed273d8ea6efab691b738abc5255362bbfab712d219cb
MD5 23301becef10191d5e43f416932bc595
BLAKE2b-256 e8de87a41f07e8edca3e0811bc5cd4201dc0771406eb4a7ff552e979d524803f

See more details on using hashes here.

Provenance

The following attestation bundles were made for franzmq-0.4.0.tar.gz:

Publisher: publish.yml on alpamayo-solutions/franzmq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file franzmq-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: franzmq-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 20.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for franzmq-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0c68008c8fcce56ff75bd6455d55d5c25756e4fad93e7c91aeec7bbcb4ce9ca0
MD5 fbd02b8cb2ddca7ac833ca9bc1c3ff5b
BLAKE2b-256 2f2f33a6fca9afbc34c1ab48b28b7b9fbb7f42fba610f86db14b199f8199eda5

See more details on using hashes here.

Provenance

The following attestation bundles were made for franzmq-0.4.0-py3-none-any.whl:

Publisher: publish.yml on alpamayo-solutions/franzmq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page