Skip to main content

A streaming hub. Sort of.

Project description

https://coveralls.io/repos/github/betodealmeida/senor-octopus/badge.svg?branch=main Cirrus CI - Base Branch Build Status https://badge.fury.io/py/senor-octopus.svg PyPI - Python Version

They say there are only 2 kinds of work: you either move information from one place to another, or you move mass from one place to another.

Señor Octopus is an application that moves data around. It reads a YAML configuration file that describes how to connect nodes. For example, you might want to measure your internet speed every hour and store it in a database:

speedtest:
  plugin: source.speedtest
  flow: -> db
  schedule: @hourly

db:
  plugin: sink.db.postgresql
  flow: speedtest ->
  user: alice
  password: XXX
  host: localhost
  port: 5432
  dbname: default

Nodes are connected by the flow attribute. The speedtest node is connected to the db node because it points to it:

speedtest:
  flow: -> db

The db node, on the other hand, listens to events from the speedtest node:

db:
  flow: speedtest ->

We can also use * as a wildcard, if we want a node to connect to all other nodes, or specify a list of nodes:

speedtest:
  flow: -> db, log

db:
  flow: "* ->"

Note that in YAML we need to quote attributes that start with an asterisk.

Running Señor Octopus

You can save the configuration above to a file called speedtest.yaml and run:

$ pip install senor-octopus
$ srocto speedtest.yaml

Every hour the speedtest source node will run, and the results will be sent to the db sink node, which writes them to a Postgres database.

How to these results look like?

Events

Señor Octopus uses a very simple but flexible data model to move data around. We have nodes called sources that create a stream of events, each one like this:

class Event(TypedDict):
    timestamp: datetime
    name: str
    value: Any

An event has a timestamp associated with it, a name, and a value. Note that the value can be anything!

A source will produce a stream of events. In the example above, once per hour the speedtest source will produce events like these:

[
    {
        'timestamp': datetime.datetime(2021, 5, 11, 22, 16, 26, 812083, tzinfo=datetime.timezone.utc),
        'name': 'hub.speedtest.download',
        'value': 16568200.018792046,
    },
    {
        'timestamp': datetime.datetime(2021, 5, 11, 22, 16, 26, 812966, tzinfo=datetime.timezone.utc),
        'name': 'hub.speedtest.upload',
        'value': 5449607.159468643,
    },
    {
        'timestamp': datetime.datetime(2021, 5, 11, 22, 16, 26, 820369, tzinfo=datetime.timezone.utc),
        'name': 'hub.speedtest.client',
        'value': {
            'ip': '173.211.12.32',
            'lat': '37.751',
            'lon': '-97.822',
            'isp': 'Colocation America Corporation',
            'isprating': '3.7',
            'rating': '0',
            'ispdlavg': '0',
            'ispulavg': '0',
            'loggedin': '0',
            'country': 'US',
        }
    },
    ...
]

The events are sent to sinks, which consume the stream. In this example, the db sink will receive the events and store them in a Postgres database.

Event-driven sources

In the previous example we configured the speedtest source to run hourly. Not all sources need to be scheduled, though. We can have a source that listens to a given topic in MQTT, eg:

mqtt:
  plugin: source.mqtt
  flow: -> db
  topics:
    - "srocto/feeds/#"
  host: localhost
  port: 1883
  username: bob
  password: XXX
  message_is_json: true

The source above will immediately send an event to the db node every time a new message shows up in the topic wildcard srocto/feeds/#, so it can be written to the database — a super easy way of persisting a message queue to disk!

Batching events

The example above is not super efficient, since it writes to the database every time an event arrives. Instead, we can easily batch the events so that they’re accumulated in a queue and processed every, say, 5 minutes:

db:
  plugin: sink.db.postgresl
  flow: speedtest, mqtt ->
  batch: 5 minutes
  user: alice
  password: XXX
  host: localhost
  port: 5432
  dbname: default

With the batch parameter any incoming events are stored in a queue for the configured time, and processed by the sink together. Any pending events in the queue will still be processed if srocto terminates gracefully (eg, with ctrl+C).

Filtering events

Much of the flexibility of Señor Octopus comes from a third type of node, the filter. Filters can be used to not only filter data, but also format it. For example, let’s say we want to turn on some lights at sunset. The sun source will send events with a value of “sunset” or “sunrise” every time one occurs:

{
    'timestamp': ...,
    'name': 'hub.sun',
    'value': 'sunset',
}

The tuya sink can be used to control a smart switch, but in order to turn it on it expects an event that looks like this:

{
    'timestamp': ...,
    'name': ...,
    'value': 'on',
}

We can use the jinja filter to ignore “sunrise” events, and to convert the “sunset” value into “on”:

sun:
  plugin: source.sun
  flow: -> sunset
  latitude: 38.3
  longitude: -123.0

sunset:
  plugin: filter.jinja
  flow: sun -> lights
  template: >
    {% if event['value'] == 'sunset' %}
      on
    {% endif %}

lights:
  plugin: sink.tuya
  flow: sunset ->
  device: "Porch lights"
  email: charlie@example.com
  password: XXX
  country: "1"
  application: smart_life

With this configuration the sunset filter will drop any events that don’t have a value of “sunset”. And for those events that have, the value will be replaced by the string “on” so it can activate the lights in the lights node.

Throttling events

Sometimes we want to limit the number of events being consumed by a sink. For example, imagine that we want to use Señor Octopus to monitor air quality using an Awair Element, sending us an SMS when the score is below a given threshold. We would like the SMS to be sent at most once every 30 minutes, and only between 8am and 10pm.

Here’s how we can do that:

awair:
  plugin: source.awair
  flow: -> bad_air
  schedule: 0/10 * * * *
  access_token: XXX
  device_type: awair-element
  device_id: 12345

bad_air:
  plugin: filter.jinja
  flow: awair -> sms
  template: >
    {% if
       event['timestamp'].astimezone().hour >= 8 and
       event['timestamp'].astimezone().hour <= 21 and
       event['name'] == 'hub.awair.score' and
       event['value'] < 80
    %}
      Air quality score is low: {{ event['value'] }}
    {% endif %}

sms:
  plugin: sink.sms
  flow: bad_air ->
  throttle: 30 minutes
  account_sid: XXX
  auth_token: XXX
  from: "+18002738255"
  to: "+15558675309"

In the example above, the awair source will fetch air quality data every 10 minutes, and send it to bad_air. The filter checks for the hour, to prevent sending an SMS from 10pm to 8am, and checks the air quality score — if it’s lower than 80 it will reformat the value of the event to a nice message, eg:

“Air quality score is low: 70”

This is then sent to the sms sink, which has a throttle of 30 minutes. The throttle configuration will prevent the sink from running more than once every 30 minutes, to avoid spamming us with messages in case the score remains low.

Plugins

Señor Octopus supports an increasing list of plugins, and it’s straightforward to add new ones. Each plugin is simply a function that produces, processes, or consumes a stream.

Here’s the random source, which produces random numbers:

async def rand(events: int = 10, prefix: str = "hub.random") -> Stream:
    for _ in range(events):
        yield {
            "timestamp": datetime.now(timezone.utc),
            "name": prefix,
            "value": random.random(),
        }

This is the full source code for the jinja filter:

async def jinja(stream: Stream, template: str) -> Stream:
    _logger.debug("Applying template to events")
    tmpl = Template(template)
    async for event in stream:
        value = tmpl.render(event=event)
        if value:
            yield {
                "timestamp": event["timestamp"],
                "name": event["name"],
                "value": value,
            }

And this is the sms sink:

async def sms(
    stream: Stream, account_sid: str, auth_token: str, to: str, **kwargs: str
) -> None:
    from_ = kwargs["from"]
    client = Client(account_sid, auth_token)
    async for event in stream:
        _logger.debug(event)
        _logger.info("Sending SMS")
        client.messages.create(body=str(event["value"]).strip(), from_=from_, to=to)

As you can see, a source is an async generator that yields events. A filter receives the stream with additional configuration parameters, and also returns a stream. And a sink receives a stream with additional parameters, and returns nothing.

Sources

The current plugins for sources are:

Filters

The existing filters are very similar, the main difference being how you configure them:

Sinks

These are the current sinks:

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

senor-octopus-0.2.0.tar.gz (65.3 kB view details)

Uploaded Source

Built Distribution

senor_octopus-0.2.0-py2.py3-none-any.whl (37.3 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file senor-octopus-0.2.0.tar.gz.

File metadata

  • Download URL: senor-octopus-0.2.0.tar.gz
  • Upload date:
  • Size: 65.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.16

File hashes

Hashes for senor-octopus-0.2.0.tar.gz
Algorithm Hash digest
SHA256 1d4648d138cb2939b21886b57eb5ee7472a1a908dba99fc618fdbac429b851bf
MD5 eff474b6bdfbef39ca9ac98dc1ea5ad1
BLAKE2b-256 bb86b824939331a03634dddf003ebf83f5ec1ad2107187defb885426f479628e

See more details on using hashes here.

File details

Details for the file senor_octopus-0.2.0-py2.py3-none-any.whl.

File metadata

File hashes

Hashes for senor_octopus-0.2.0-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 c2595ae467e14dd34fc9e4b5cb253a1e5651a89ad1492488cba76ea19301957f
MD5 e4f185ff955a1211652e0204a74bf48e
BLAKE2b-256 cda5831ee65bc32006a894ea72bef94a1fae38d73dd010814da6aafaba486cdc

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page