Relay / sender library for various bus / consumer types
Project description
This library’s main purpose is to establish a common framework for relaying messages from one bus to another, or to some message consumer (such as Carbon / GraphiteDB). It currently includes support for MQTT, AMQP, and for using a Slack channel as a poor man’s message bus.
Basic Architecture
You can configure this library pretty much any way you want. Each Sender or Relay class expects a configuration dict with relevant details for connecting. Relays also expect a Sender object, which allows you to pair any receiver type (implemented in the Relay) to any type of Sender. Since each class only expects a dict for configuration, it’s possible to bridge two buses of the same protocol together using different configuration dicts, or subsections of the same larger configuration.
Example: Relay MQTT to Carbon (GraphiteDB)
We can use the MQTT Relay class with the Carbon Sender class to relay between a public MQTT server and a GraphiteDB server without exposing the GraphiteDB server to the internet. First, the configuration, which we could conveniently store in a YAML file:
carbon: host: my.graphitedb.home.net port: 2023 mqtt: host: m16.cloudmqtt.com port: 20996 user: <some-user> password: <some-password>
Then, in our code we start a sender for Carbon, and a relay for MQTT with the sender as a parameter:
from mulay.carbon import PlaintextSender from mulay.mqtt import Relay sender = PlaintextSender(config['carbon']) relay = Relay(config['mqtt'], sender) try: sender.start() # Relay will loop forever relay.start() except KeyboardInterrupt: relay.stop() sender.stop()
Example: Relay MQTT to Carbon (GraphiteDB)
Or, we could opt to use an AMQP service, by specifying the AMQP Relay class with the Carbon Sender class. Again, this will not expose the GraphiteDB server to the internet. First, the configuration, which we could conveniently store in a YAML file:
carbon: host: my.graphitedb.home.net port: 2023 amqp: url: amqp://<amqp-user>:<amqp-password>@wombat.rmq.cloudamqp.com/<amqp-instance> queue: my-metrics
Then, in our code we start a sender for Carbon, and a relay for MQTT with the sender as a parameter:
from mulay.carbon import PlaintextSender from mulay.amqp import Relay sender = PlaintextSender(config['carbon']) relay = Relay(config['amqp'], sender) try: sender.start() # Relay will loop forever relay.start() except KeyboardInterrupt: relay.stop() sender.stop()
Example: Relay Slack to Carbon (GraphiteDB)
We can even use Slack in much the same way as MQTT or AMQP, with the Carbon Sender class. First, the configuration, which we could conveniently store in a YAML file:
carbon: host: my.graphitedb.home.net port: 2023 slack: token: <your-token-here> channel: my-metrics
Then, in our code we start a sender for Carbon, and a relay for Slack with the sender as a parameter:
from mulay.carbon import PlaintextSender from mulay.slack import Relay sender = PlaintextSender(config['carbon']) relay = Relay(config['slack'], sender) try: sender.start() # Relay will loop forever relay.start() except KeyboardInterrupt: relay.stop() sender.stop()
Example: Publishing to MQTT
As a convenience, and to make the relay functions truly flexible, mulay provides a Sender class with all of the protocol implementations. This is to allow bridging from any protocol to any other protocol that’s supported by the library, but it’s also useful for publishing into a public bus some data that will eventually get routed into your internal consumer server (such as GraphiteDB). This only requires configuration for the public bus, which makes it simpler than a relay configuration:
host: m16.cloudmqtt.com port: 20996 user: <some-user> password: <some-password>
Then, in our code, we setup a loop to take measurements and publish them:
from mulay.mqtt import Sender import time import speedtest as st sender = Sender(config) sender.start() test = st.Speedtest() test.get_best_server() try: while True: test.download() test.upload() result = test.results.dict() now = int(time.time()) sender.send_raw(f"my.speedtest.download {result['download']} {now}") sender.send_raw(f"my.speedtest.upload {result['upload']} {now}") time.sleep(30) # check this at most every 30 seconds except KeyboardInterrupt: sender.stop()
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file mulay-0.3.0.tar.gz
.
File metadata
- Download URL: mulay-0.3.0.tar.gz
- Upload date:
- Size: 19.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.21.0 setuptools/41.0.0 requests-toolbelt/0.9.1 tqdm/4.31.1 CPython/3.7.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5f52eb5b9caf97fb7a1f6df421743f57eae89637c76efd97d0914d6fef11f777 |
|
MD5 | 7506feabd069420933fc6e6ea4349706 |
|
BLAKE2b-256 | d55d5d12bfee6525f060c0e83ab1b56ef9dbf6a6f936f370a28e780fb2e3e56a |