Skip to main content

fastapi-mqtt is extension for MQTT protocol

Project description

fastapi-mqtt

MQTT is a lightweight publish/subscribe messaging protocol designed for M2M (machine to machine) telemetry in low bandwidth environments. Fastapi-mqtt is the client for working with MQTT.

For more information about MQTT, please refer to here: MQTT

Fastapi-mqtt wraps around gmqtt module. Gmqtt Python async client for MQTT client implementation. Module has support of MQTT version 5.0 protocol

MIT licensed GitHub stars GitHub forks GitHub issues Downloads


Documentation: FastApi-MQTT

The key feature are:

MQTT specification avaliable with help decarator methods using callbacks:

  • on_connect()

  • on_disconnect()

  • on_subscribe()

  • on_message()

  • subscribe(topic)

  • MQTT Settings available with pydantic class

  • Authentication to broker with credentials

  • unsubscribe certain topics and publish to certain topics

🔨 Installation

pip install fastapi-mqtt

🕹 Guide

from typing import Any

from fastapi import FastAPI
from gmqtt import Client as MQTTClient

from fastapi_mqtt import FastMQTT, MQTTConfig

mqtt_config = MQTTConfig()

fast_mqtt = FastMQTT(config=mqtt_config)

app = FastAPI()
fast_mqtt.init_app(app)


@fast_mqtt.on_connect()
def connect(client: MQTTClient, flags: int, rc: int, properties: Any):
    client.subscribe("/mqtt")  # subscribing mqtt topic
    print("Connected: ", client, flags, rc, properties)

@fast_mqtt.subscribe("mqtt/+/temperature", "mqtt/+/humidity", qos=1)
async def home_message(client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any):
    print("temperature/humidity: ", topic, payload.decode(), qos, properties)

@fast_mqtt.on_message()
async def message(client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any):
    print("Received message: ", topic, payload.decode(), qos, properties)

@fast_mqtt.subscribe("my/mqtt/topic/#", qos=2)
async def message_to_topic_with_high_qos(
    client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any
):
    print(
        "Received message to specific topic and QoS=2: ", topic, payload.decode(), qos, properties
    )

@fast_mqtt.on_disconnect()
def disconnect(client: MQTTClient, packet, exc=None):
    print("Disconnected")

@fast_mqtt.on_subscribe()
def subscribe(client: MQTTClient, mid: int, qos: int, properties: Any):
    print("subscribed", client, mid, qos, properties)

@app.get("/test")
async def func():
    fast_mqtt.publish("/mqtt", "Hello from Fastapi")  # publishing mqtt topic
    return {"result": True, "message": "Published"}

Publish method:

async def func():
    fast_mqtt.publish("/mqtt", "Hello from Fastapi")  # publishing mqtt topic
    return {"result": True, "message": "Published"}

Subscribe method:

@fast_mqtt.on_connect()
def connect(client, flags, rc, properties):
    client.subscribe("/mqtt")  # subscribing mqtt topic
    print("Connected: ", client, flags, rc, properties)

Changing connection params

mqtt_config = MQTTConfig(
    host="mqtt.mosquito.org",
    port=1883,
    keepalive=60,
    username="username",
    password="strong_password",
)
fast_mqtt = FastMQTT(config=mqtt_config)

✅ Testing

  • Clone the repository and install it with poetry.
  • Run tests with pytest, using an external MQTT broker to connect (defaults to 'test.mosquitto.org').
  • Explore the fastapi app examples and run them with uvicorn
# (opc) Run a local mosquitto MQTT broker with docker
docker run -d --name mosquitto -p 9001:9001 -p 1883:1883 eclipse-mosquitto:1.6.15
# Set host for test broker when running pytest
TEST_BROKER_HOST=localhost pytest
# Run the example apps against local broker, with uvicorn
TEST_BROKER_HOST=localhost uvicorn examples.app:app --port 8000 --reload
TEST_BROKER_HOST=localhost uvicorn examples.ws_app.app:application --port 8000 --reload

Contributing

Fell free to open issue and send pull request.

Thanks To Contributors. Contributions of any kind are welcome!

Before you start please read CONTRIBUTING

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastapi_mqtt-2.1.1.tar.gz (9.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastapi_mqtt-2.1.1-py3-none-any.whl (9.1 kB view details)

Uploaded Python 3

File details

Details for the file fastapi_mqtt-2.1.1.tar.gz.

File metadata

  • Download URL: fastapi_mqtt-2.1.1.tar.gz
  • Upload date:
  • Size: 9.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.2 CPython/3.11.8 Linux/6.5.0-1015-azure

File hashes

Hashes for fastapi_mqtt-2.1.1.tar.gz
Algorithm Hash digest
SHA256 9bcf0b616135d89839d4d02f604a2294230af2debc87f83c1174b6b44c8dc454
MD5 989db9abf7753754a5dee74c2e3c6a51
BLAKE2b-256 7096d2c139e0e2b0ff1b3f3b8d815a20d113cc14240687d1fabcca3184317465

See more details on using hashes here.

File details

Details for the file fastapi_mqtt-2.1.1-py3-none-any.whl.

File metadata

  • Download URL: fastapi_mqtt-2.1.1-py3-none-any.whl
  • Upload date:
  • Size: 9.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.2 CPython/3.11.8 Linux/6.5.0-1015-azure

File hashes

Hashes for fastapi_mqtt-2.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 3f17c77bc158636605953d86e69da1a8a5f767ce837412fef2b6cbea499e26fb
MD5 78c634608d46bde3b4e117101e569b18
BLAKE2b-256 794abfa56efc5430977d3bf1716439bc60423106a34d3e0f564ce0ea5565cdb1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page