Coordinated Producer Consumer for MQTT
Project description
## MQTT Coordinated consumer.
Kafka Motivated Coordinated Consumer for MQTT
pip install mqtt-coordinated
## MQTT Consumer CoordinatorManager
CoordinatorManager class is manager class for mqtt consumer. It lets you connect to a MQTT server and subscribe to multiple topics. It gives you on_message callback for actions after receiving new message.
` >>> from mqtt import CoordinatorManager >>> >>> manager = CoordinatorManager('my-manager', 'iot.eclipse.org') >>> manager.start() >>> >>> consumer = manager.coordinated_consumer >>> consumer.on_message = on_message # Pass callback name here. >>> consumer.subscribe("house/bulb") >>> consumer.poll(100) # Batch message reading construct for streaming purpose `
` # Disconnect and stop consuming >>> consumer.disconnect() >>> manager.stop() `
There are 2 methods for consuming events, - registering for on_message - Reading messages in batches. Batches are internally stored in memory, and not stored on persistent disc for now.
## MQTT Producer CoordinatedProducer
CoordinatedProducer class is MQTT producer which will create number of partitions on MQTT topic. you can pass partition number or partition_key to this producer. Messages with same partition_key are garuanteed to be produced on same partition.
` >>> from mqtt import CoordinatedProducer >>> producer = CoordinatedProducer('iot.eclipse.org') >>> producer.publish_on_partition("house/bulb", "on") # Message will be published on random partition >>> producer.publish_on_partition("house/bulb", "on", partition=5) # Message will be published on 5th partition >>> producer.publish_on_partition("house/bulb", "on", partition_key='message_key') # All messages with partition_key will be published on same partition. `
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for mqtt_coordinated-0.0.2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 76382fdfab9309c2f2e9d4e185f59acf77feba7bed2e7a45756202a5843235e9 |
|
MD5 | 999b4511e9492fa9aa26bc7e5aa158ae |
|
BLAKE2b-256 | e24994b859d4a03b4158efd4d145b3e0bbf80e8050ab06919fa389a2612ef0fd |