A streaming hub. Sort of.
Project description
They say there are only 2 kinds of work: you either move information from one place to another, or you move mass from one place to another.
Señor Octopus is an application that moves data around. It reads a YAML configuration file that describes how to connect nodes. For example, you might want to measure your internet speed every hour and store it in a database:
speedtest:
plugin: source.speedtest
flow: -> db
schedule: @hourly
db:
plugin: sink.db.postgresql
flow: speedtest ->
user: alice
password: XXX
host: localhost
port: 5432
dbname: default
Nodes are connected by the flow attribute. The speedtest node is connected to the db node because it points to it:
speedtest:
flow: -> db
The db node, on the other hand, listens to events from the speedtest node:
db:
flow: speedtest ->
We can also use * as a wildcard, if we want a node to connect to all other nodes, or specify a list of nodes:
speedtest:
flow: -> db, log
db:
flow: "* ->"
Note that in YAML we need to quote attributes that start with an asterisk.
Running Señor Octopus
You can save the configuration above to a file called speedtest.yaml and run:
$ pip install senor-octopus
$ srocto speedtest.yaml
Every hour the speedtest source node will run, and the results will be sent to the db sink node, which writes them to a Postgres database.
How to these results look like?
Events
Señor Octopus uses a very simple but flexible data model to move data around. We have nodes called sources that create a stream of events, each one like this:
class Event(TypedDict):
timestamp: datetime
name: str
value: Any
An event has a timestamp associated with it, a name, and a value. Note that the value can be anything!
A source will produce a stream of events. In the example above, once per hour the speedtest source will produce events like these:
[
{
'timestamp': datetime.datetime(2021, 5, 11, 22, 16, 26, 812083, tzinfo=datetime.timezone.utc),
'name': 'hub.speedtest.download',
'value': 16568200.018792046,
},
{
'timestamp': datetime.datetime(2021, 5, 11, 22, 16, 26, 812966, tzinfo=datetime.timezone.utc),
'name': 'hub.speedtest.upload',
'value': 5449607.159468643,
},
{
'timestamp': datetime.datetime(2021, 5, 11, 22, 16, 26, 820369, tzinfo=datetime.timezone.utc),
'name': 'hub.speedtest.client',
'value': {
'ip': '173.211.12.32',
'lat': '37.751',
'lon': '-97.822',
'isp': 'Colocation America Corporation',
'isprating': '3.7',
'rating': '0',
'ispdlavg': '0',
'ispulavg': '0',
'loggedin': '0',
'country': 'US',
}
},
...
]
The events are sent to sinks, which consume the stream. In this example, the db sink will receive the events and store them in a Postgres database.
Event-driven sources
In the previous example we configured the speedtest source to run hourly. Not all sources need to be scheduled, though. We can have a source that listens to a given topic in MQTT, eg:
mqtt:
plugin: source.mqtt
flow: -> db
topics:
- "srocto/feeds/#"
host: localhost
port: 1883
username: bob
password: XXX
message_is_json: true
The source above will immediately send an event to the db node every time a new message shows up in the topic wildcard srocto/feeds/#, so it can be written to the database — a super easy way of persisting a message queue to disk!
Batching events
The example above is not super efficient, since it writes to the database every time an event arrives. Instead, we can easily batch the events so that they’re accumulated in a queue and processed every, say, 5 minutes:
db:
plugin: sink.db.postgresl
flow: speedtest, mqtt ->
batch: 5 minutes
user: alice
password: XXX
host: localhost
port: 5432
dbname: default
With the batch parameter any incoming events are stored in a queue for the configured time, and processed by the sink together. Any pending events in the queue will still be processed if srocto terminates gracefully (eg, with ctrl+C).
Filtering events
Much of the flexibility of Señor Octopus comes from a third type of node, the filter. Filters can be used to not only filter data, but also format it. For example, let’s say we want to turn on some lights at sunset. The sun source will send events with a value of “sunset” or “sunrise” every time one occurs:
{
'timestamp': ...,
'name': 'hub.sun',
'value': 'sunset',
}
The tuya sink can be used to control a smart switch, but in order to turn it on it expects an event that looks like this:
{
'timestamp': ...,
'name': ...,
'value': 'on',
}
We can use the jinja filter to ignore “sunrise” events, and to convert the “sunset” value into “on”:
sun:
plugin: source.sun
flow: -> sunset
latitude: 38.3
longitude: -123.0
sunset:
plugin: filter.jinja
flow: sun -> lights
template: >
{% if event['value'] == 'sunset' %}
on
{% endif %}
lights:
plugin: sink.tuya
flow: sunset ->
device: "Porch lights"
email: charlie@example.com
password: XXX
country: "1"
application: smart_life
With this configuration the sunset filter will drop any events that don’t have a value of “sunset”. And for those events that have, the value will be replaced by the string “on” so it can activate the lights in the lights node.
Throttling events
Sometimes we want to limit the number of events being consumed by a sink. For example, imagine that we want to use Señor Octopus to monitor air quality using an Awair Element, sending us an SMS when the score is below a given threshold. We would like the SMS to be sent at most once every 30 minutes, and only between 8am and 10pm.
Here’s how we can do that:
awair:
plugin: source.awair
flow: -> bad_air
schedule: 0/10 * * * *
access_token: XXX
device_type: awair-element
device_id: 12345
bad_air:
plugin: filter.jinja
flow: awair -> sms
template: >
{% if
event['timestamp'].astimezone().hour >= 8 and
event['timestamp'].astimezone().hour <= 21 and
event['name'] == 'hub.awair.score' and
event['value'] < 80
%}
Air quality score is low: {{ event['value'] }}
{% endif %}
sms:
plugin: sink.sms
flow: bad_air ->
throttle: 30 minutes
account_sid: XXX
auth_token: XXX
from: "+18002738255"
to: "+15558675309"
In the example above, the awair source will fetch air quality data every 10 minutes, and send it to bad_air. The filter checks for the hour, to prevent sending an SMS from 10pm to 8am, and checks the air quality score — if it’s lower than 80 it will reformat the value of the event to a nice message, eg:
“Air quality score is low: 70”
This is then sent to the sms sink, which has a throttle of 30 minutes. The throttle configuration will prevent the sink from running more than once every 30 minutes, to avoid spamming us with messages in case the score remains low.
Plugins
Señor Octopus supports an increasing list of plugins, and it’s straightforward to add new ones. Each plugin is simply a function that produces, processes, or consumes a stream.
Here’s the random source, which produces random numbers:
async def rand(events: int = 10, prefix: str = "hub.random") -> Stream:
for _ in range(events):
yield {
"timestamp": datetime.now(timezone.utc),
"name": prefix,
"value": random.random(),
}
This is the full source code for the jinja filter:
async def jinja(stream: Stream, template: str) -> Stream:
_logger.debug("Applying template to events")
tmpl = Template(template)
async for event in stream:
value = tmpl.render(event=event)
if value:
yield {
"timestamp": event["timestamp"],
"name": event["name"],
"value": value,
}
And this is the sms sink:
async def sms(
stream: Stream, account_sid: str, auth_token: str, to: str, **kwargs: str
) -> None:
from_ = kwargs["from"]
client = Client(account_sid, auth_token)
async for event in stream:
_logger.debug(event)
_logger.info("Sending SMS")
client.messages.create(body=str(event["value"]).strip(), from_=from_, to=to)
As you can see, a source is an async generator that yields events. A filter receives the stream with additional configuration parameters, and also returns a stream. And a sink receives a stream with additional parameters, and returns nothing.
Sources
The current plugins for sources are:
source.awair: Fetch air quality data from Awair Element monitor.
source.crypto: Fetch price of cryptocurrencies from cryptocompare.com.
source.mqtt: Subscribe to messages on one or more MQTT topics.
source.rand: Generate random numbers between 0 and 1.
source.speed: Measure internet speed.
source.sqla: Read data from database.
source.static: Generate static events.
source.stock: Fetch stock price form Yahoo! Finance.
source.sun: Send events on sunrise and sunset.
source.udp: Listens to UDP messages on a given port.
source.weatherapi: Fetch weather forecast data from weatherapi.com.
source.whistle: Fetch device information and location for a Whistle pet tracker.
Filters
The existing filters are very similar, the main difference being how you configure them:
filter.format: Format an event stream based using Python string formatting.
filter.jinja: Apply a Jinja2 template to events.
filter.jsonpath: Filter event stream based on a JSON path.
Sinks
These are the current sinks:
sink.log: Send events to a logger.
sink.mqtt: Send events as messages to an MQTT topic.
sink.pushover: Send events to the Pushover mobile app.
sink.slack: Send messages to a Slack channel.
sink.sms: Send SMS via Twilio.
sink.tuya: Send commands to a Tuya/Smart Life device.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file senor-octopus-0.2.0.tar.gz
.
File metadata
- Download URL: senor-octopus-0.2.0.tar.gz
- Upload date:
- Size: 65.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.16
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1d4648d138cb2939b21886b57eb5ee7472a1a908dba99fc618fdbac429b851bf |
|
MD5 | eff474b6bdfbef39ca9ac98dc1ea5ad1 |
|
BLAKE2b-256 | bb86b824939331a03634dddf003ebf83f5ec1ad2107187defb885426f479628e |
File details
Details for the file senor_octopus-0.2.0-py2.py3-none-any.whl
.
File metadata
- Download URL: senor_octopus-0.2.0-py2.py3-none-any.whl
- Upload date:
- Size: 37.3 kB
- Tags: Python 2, Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.16
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | c2595ae467e14dd34fc9e4b5cb253a1e5651a89ad1492488cba76ea19301957f |
|
MD5 | e4f185ff955a1211652e0204a74bf48e |
|
BLAKE2b-256 | cda5831ee65bc32006a894ea72bef94a1fae38d73dd010814da6aafaba486cdc |