Skip to main content

Pull 'n' Push

Project description

Pull 'n' Push

PyPI version Build Status Docker: hub License: MIT

Pulls data from sources and pushes it to sinks.

1. Installation
2. Getting started
3. Runner
4. Building Blocks
4.1. Pull
4.2. Push
4.3. Selector
4.4. Dependencies
4.5. Envelope (>= 0.7.1)
4.6. Payload unwrapping
4.7. Engines (>= 0.10.0)
4.7.1. pnp.engines.sequential.SequentialEngine
4.7.2. pnp.engines.thread.ThreadEngine
4.7.3. pnp.engines.process.ProcessEngine
5. Useful hints
5.1. Configuration checking
5.2. Logging (>= 0.11.0)
5.3. dictmentor (>= 0.11.0)
5.4. Advanced selector expressions (>= 0.12.0)
5.5. Docker images
6. Plugins
6.1. pnp.plugins.pull.fitbit.Current
6.2. pnp.plugins.pull.fitbit.Devices
6.3. pnp.plugins.pull.fitbit.Goal
6.4. pnp.plugins.pull.fs.FileSystemWatcher
6.5. pnp.plugins.pull.gpio.Watcher
6.6. pnp.plugins.pull.hass.State
6.7. pnp.plugins.pull.http.Server
6.8. pnp.plugins.pull.monitor.Stats
6.9. pnp.plugins.pull.mqtt.Subscribe
6.10. pnp.plugins.pull.sensor.DHT
6.11. pnp.plugins.pull.sensor.OpenWeather
6.12. pnp.plugins.pull.simple.Count
6.13. pnp.plugins.pull.simple.Repeat
6.14. pnp.plugins.pull.zway.ZwayPoll
6.15. pnp.plugins.pull.zway.ZwayReceiver
6.16. pnp.plugins.push.fs.FileDump
6.17. pnp.plugins.push.http.Call
6.19. pnp.plugins.push.mqtt.Discovery
6.20. pnp.plugins.push.mqtt.Publish
6.21. pnp.plugins.push.notify.Pushbullet
6.22. pnp.plugins.push.simple.Echo
6.23. pnp.plugins.push.simple.Execute
6.25. pnp.plugins.push.timedb.InfluxPush
6.26. pnp.plugins.udf.hass.State
6.27. pnp.plugins.udf.simple.Counter
6.28. pnp.plugins.udf.simple.Memory
7. Changelog

1. Installation

pip install pnp

Optional extras

  • dht: Enables pnp.plugins.pull.sensor.DHT (temperature and humidity sensor). Only works on ARM-based systems (like raspberry, arduino)
  • fswatcher: Enables pnp.plugins.pull.fs.FileSystemWatcher (Watch file system for created, modified, deleted, moved files)
  • faceR: Enables (Screen image files for known faces)

Installation with extras:

pip install pnp[fswatcher,faceR]
# In case of extra 'dht' you have to enable the option --process-dependency-links ...
# ... cause the required adafruit package is not on pypi.
pip install --process-dependency-links pnp[dht]

2. Getting started

Define pulls to suck/pull data from source systems. Define one push or multiple pushes per pull to transfer the pulled data anywhere else (you only need a plugin that knows how to handle the target). You can define your configurations in yaml or json. It is up to you. I prefer yaml...

- name: hello-world
    plugin: pnp.plugins.pull.simple.Repeat
      wait: 1
      repeat: "Hello World"
    - plugin: pnp.plugins.push.simple.Echo

Copy this configuration and create the file helloworld.yaml. Run it:

pnp helloworld.yaml

This example yields the string 'Hello World' every second.

Tip: You can validate your config without actually executing it with

   pnp --check helloworld.yaml

3. Runner

> pnp --help
Pull 'n' Push

  pnp [(-c | --check)] [(-v | --verbose)] [--log=<log_conf>] <configuration>
  pnp (-h | --help)
  pnp --version

  -c --check        Only check configuration and do not run it.
  -v --verbose      Switches log level to debug.
  --log=<log_conf>  Specify logging configuration to load.
  -h --help         Show this screen.
  --version         Show version.

4. Building Blocks

Below the basic building blocks of pull 'n' push are explained in more detail

4.1. Pull

As stated before pulls fetch data from various source systems and/or apis. Please see the section plugins for already implemented pulls. To instantiate a pull by configuration file you only have to provide it's fully qualified name and the argument that should be passed.

- name: example
    plugin: pnp.plugins.pull.mqtt.Subscribe
      host: localhost
      port: 1883
      topic: test/#

The above snippet will create a pull that listens on the topic test/# on a mqtt broker. The output of the pull is a dictionary that contains the topic, levels and the actual payload.

# When the message 'Here i am' arrives on the topic 'test/foo/bar' then the output will look like that:
{'topic': 'test/foo/bar', 'levels': ['test', 'foo', 'bar'], 'payload': 'Here i am'}

4.2. Push

A pull passes its data to multiple pushes to transfer/transform the data. For example a push might save sensor data to influx or dump a file to the file system.

- name: example
    plugin: pnp.plugins.pull.mqtt.Subscribe
      host: localhost
      port: 1883
      topic: test/#
    - plugin: pnp.plugins.push.fs.FileDump
        directory: "/tmp"
        binary_mode: False
    - plugin: pnp.plugins.push.simple.Echo

The above snippet adds two pushes to the already known pull. The first push takes the incoming data and dumps it into the specified directory as a textfile. The second push just prints out the incoming data.

4.3. Selector

Sometimes the output of a pull needs to be transformed before the specified push can handle it. Selectors to the rescue. Given our input we decide to just dump the payload and print out the first level of the topic.

- name: example
    plugin: pnp.plugins.pull.mqtt.Subscribe
      host: localhost
      port: 1883
      topic: test/#
    - plugin: pnp.plugins.push.fs.FileDump
      selector: data.payload
        directory: "/tmp"
        binary_mode: False
    - plugin: pnp.plugins.push.simple.Echo
      selector: data.levels[0]

Easy as that. We can reference our incoming data via data or payload.

4.4. Dependencies

By default any pushes will execute in parallel (not completly true) when new incoming data is available. But now it would be nice if we could chain pushes together. So that the output if one push becomes the input of the next push. The good thing is: Yes we can.

Back to our example let's assume we want to print out the path to the created file dump after the dump is created.

- name: example
    plugin: pnp.plugins.pull.mqtt.Subscribe
      host: localhost
      port: 1883
      topic: test/#
    - plugin: pnp.plugins.push.fs.FileDump
      selector: data.payload
        directory: "/tmp"
        binary_mode: False
        - plugin: pnp.plugins.push.simple.Echo
    - plugin: pnp.plugins.push.simple.Echo
      selector: data.levels[0]

As you can see we just add a dependant push to the previous one.

4.5. Envelope (>= 0.7.1)

Using envelopes it is possible to change the behaviour of pushes during runtime. Best examples are the pnp.plugins.push.fs.FileDump and pnp.plugins.push.mqtt.MQTTPush plugins, where you can override / set the actual file_name and extension of the file to dump resp. the topic where the message should be published.

Given the example ...

- name: envelope
    plugin: pnp.plugins.pull.simple.Count
      wait: 1
    plugin: pnp.plugins.push.fs.FileDump
      file_name: "lambda data: str(data)"
      extension: ".cnt"
      data: "lambda data: data"
      directory: "/tmp/counter"
      file_name: "counter"  # Overridden by envelope
      extension: ".txt"  #  Overridden by envelope
      binary_mode: False  # text mode

... this push dumps multiple files (0.cnt, 1.cnt, 2.cnt, ...) for each pulled counter value, instead of dumping one file 'couter.txt' which is overridden each time a new counter is emitted.

How does this work: If the emitted or transformed payload (via selector) contains the key data or payload it is assumed that the actual payload is the data stored in this key and all other keys represent the so called envelope.

Remark: This feature might actually break your existing configurations if you use the plugin pnp.plugins.pull.mqtt.MQTTPull which will now emit an enveloped payload.

This snippet echoed a dictionary with the keys 'topic', 'levels' and 'payload' previously to version 0.7.2. It will now differentiate between the actual 'payload' (key 'payload' resp. 'data') and the envelope (other keys).

- name: subscriber
    plugin: pnp.plugins.pull.mqtt.MQTTPull
      host: localhost
      topic: test/counter
    plugin: pnp.plugins.push.simple.Echo

If you want to "restore" the previous behaviour, you only have to wrap the whole payload into a dictionary inside the 'payload' or 'data' key via selector.

- name: subscriber
    plugin: pnp.plugins.pull.mqtt.MQTTPull
      host: localhost
      topic: test/counter
    plugin: pnp.plugins.push.simple.Echo
      data: "lambda data: data"

4.6. Payload unwrapping

By default any payload that is provided to a push will be "as-is". If the payload is an iterable, it is possible to unwrap each individual item of the iterable and providing that item to the push instead of the whole list. Yes, now you can perform for each loops for pushes.

- name: unwrapping
    plugin: pnp.plugins.pull.simple.Repeat
      wait: 1
        - 1
        - 2
        - 3
    - plugin: pnp.plugins.push.simple.Echo
      unwrap: True

Hint: Selector expressions are applied after unwrapping. So the selector is applied to each individual item. If you need the selector to augment your list, use a push.simple.Nop with unwrap = False and a dependent push.

- name: unwrapping
    plugin: pnp.plugins.pull.simple.Repeat
      wait: 1
        - 1
        - 2
        - 3
    - plugin: pnp.plugins.push.simple.Nop
      selector: "data + [4, 5, 6]"
      unwrap: False  # Which is the default
        - plugin: pnp.plugins.push.simple.Echo
          unwrap: True

4.7. Engines (>= 0.10.0)

If you do not specify any engine the ThreadEngine is chosen by default accompanied by the AdvancedRetryHandler. This keeps maximum backwards compatibility.

4.7.1. pnp.engines.sequential.SequentialEngine

By using the Sequential engine you can run your configs as scripts. Given the example below, the "script" will end when it has finished counting to 3. Make sure to use the NoRetryHandler to actually end the runner when the pull has finished instead of retrying the "failed" pull. You cn only run a single task not multiple. When you want to run multiple task in a concurrent manner you have to use the ThreadEngine or the ProcessEngine.

#### Simple sequential handler
#### Counts from 1 to 3 and then terminates
engine: !engine
  type: pnp.engines.sequential.SequentialEngine
  retry_handler: !retry
    type: pnp.engines.NoRetryHandler  # Is the key to termination after counting has finished
  - name: sequential
      plugin: pnp.plugins.pull.simple.Count
        wait: 1
        from_cnt: 1
        to_cnt: 4
      - plugin: pnp.plugins.push.simple.Echo

4.7.2. pnp.engines.thread.ThreadEngine

#### Will use threads to accomplish concurrency
#### Drawback: If a plugin does not stop gracefully the termination will hang...
engine: !engine
  type: pnp.engines.thread.ThreadEngine
  queue_worker: 1
  retry_handler: !retry
    type: pnp.engines.SimpleRetryHandler
  - name: threading
      plugin: pnp.plugins.pull.simple.Repeat
        wait: 1
        repeat: "Hello World"
      - plugin: pnp.plugins.push.simple.Echo

4.7.3. pnp.engines.process.ProcessEngine

#### Will use multiprocessing to accomplish concurrency
#### Drawback: Some plugins might not work or need to be aware of
engine: !engine
  type: pnp.engines.process.ProcessEngine
  queue_worker: 1
  retry_handler: !retry
    type: pnp.engines.SimpleRetryHandler
  - name: process
      plugin: pnp.plugins.pull.simple.Repeat
        wait: 1
        repeat: "Hello World"
      - plugin: pnp.plugins.push.simple.Echo

5. Useful hints

5.1. Configuration checking

You can check your pnp configuration file by starting pnp with the -c | --check flag set. This will only run the initializer but not execute the configuration.

pnp --check <pnp_configuration>

5.2. Logging (>= 0.11.0)

You can use different logging configurations in two ways:

# Specify when starting pnp
pnp --log=<logging_configuration> <pnp_configuration>
# Specify by environment variable
export PNP_LOG_CONF=<logging_configuration>
pnp <pnp_configuration>

A simple logging configuration that will log severe errors to a separate rotating log file looks like this:

version: 1
disable_existing_loggers: False

        format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

        class: logging.StreamHandler
        level: DEBUG
        formatter: simple
        stream: ext://sys.stdout

        class: logging.handlers.RotatingFileHandler
        level: ERROR
        formatter: simple
        filename: errors.log
        maxBytes: 10485760 # 10MB
        backupCount: 20
        encoding: utf8

    level: INFO
    handlers: [console, error_file_handler]

5.3. dictmentor (>= 0.11.0)

You can augment the configuration by extensions from the dictmentor package. Please see DictMentor for further reference.

The DictMentor instance will be instantiated with the following code and thus the following extensions:

from dictmentor import DictMentor, ext
return DictMentor(


# Uses the dictmentor package to augment the configuration by dictmentor extensions.
# Make sure to export the environment variable to echo:
# export MESSAGE="Hello World"

- name: dictmentor
    plugin: pnp.plugins.pull.simple.Repeat
      wait: 1
      repeat: "{{env::MESSAGE}}"
    - external: echo.pull
    - external: nop.pull
# Contents of echo.pull
plugin: pnp.plugins.push.simple.Echo
# Contents of nop.pull
plugin: pnp.plugins.push.simple.Nop

5.4. Advanced selector expressions (>= 0.12.0)

Instead of string-only selector expressions, you may now use complex dictionary and/or list constructs in your yaml to define a selector expression. If you use a dictionary or a list make sure to provide "real" selectors as a lambda expression, so the evaluator can decide if this is a string literal or an expression to evaluate.

The configuration below will repeat {'hello': 'Hello', 'words': ['World', 'Moon', 'Mars']}.

- name: selector
    plugin: pnp.plugins.pull.simple.Repeat
      wait: 1
      repeat: "Hello World Moon Mars"
    - plugin: pnp.plugins.push.simple.Echo
        hello: "lambda payload: payload.split(' ')[0]"
          - "lambda payload: payload.split(' ')[1]"
          - "lambda payload: payload.split(' ')[2]"
          - "lambda payload: payload.split(' ')[3]"

Before the advanced selector feature your epxressions would have probably looked similiar to this: dict(hello=payload.split(' ')[0], words=[payload.split(' ')[1], payload.split(' ')[2], payload.split(' ')[3]]). The first one is more readable, isn't it?

Additional example:

- name: selector
    plugin: pnp.plugins.pull.simple.Repeat
      wait: 1
      repeat: "Hello World"
    - plugin: pnp.plugins.push.simple.Echo
      # Returns: 'World'
      selector: "str(payload.split(' ')[0])"  # no complex structure. Evaluator assumes that this is an expression -> you do not need a lambda
    - plugin: pnp.plugins.push.simple.Echo
      selector:  # Returns {'header': 'this is a header', 'data': 'World', 'Hello': 'World'}
        header: this is a header  # Just string literals
        data: "lambda data: data.split(' ')[1]"  # Value is lambda and therefore evaluated
        "lambda data: str(data.split(' ')[0])": "lambda data: data.split(' ')[1]"  # Both are lambdas and therefore evaluated
    - plugin: pnp.plugins.push.simple.Echo
      selector:  # Returns ['foo', 'bar', 'Hello', 'World']
        - foo  # String literal
        - bar  # String literal
        - "lambda d: d.split(' ')[0]"  # Lambda -> evaluate the expression
        - "lambda d: d.split(' ')[1]"  # Lambda -> evaluate the expression

5.5. Docker images

# Mount the task and logging configuration when starting up the container
docker run --rm \
    -v /path/to/pnp/config/01_hello_world.yaml:/config/config.yaml \
    -v /path/to/logging/config/file.logging:/config/logging.yaml \

6. Plugins

6.1. pnp.plugins.pull.fitbit.Current

Requests various latest / current metrics (steps, calories, distance, ...) from the fitbit api.

Requires extra fitbit.


config (str): The configuration file that keeps your initial and refreshed authentication tokens (see below for detailed information).
system (str, optional): The metric system to use based on your localisation (de_DE, en_US, ...). Default is your configured metric system in your fitbit account
resources (str or list[str]): The resources to request (see below for detailed information)

Available resources are:

  • activities/calories
  • activities/caloriesBMR
  • activities/steps
  • activities/distance
  • activities/floors
  • activities/elevation
  • activities/minutesSedentary
  • activities/minutesLightlyActive
  • activities/minutesFairlyActive
  • activities/minutesVeryActive
  • activities/activityCalories
  • body/bmi
  • body/fat
  • body/weight
  • foods/log/caloriesIn
  • foods/log/water
  • sleep/awakeningsCount
  • sleep/efficiency
  • sleep/minutesAfterWakeup
  • sleep/minutesAsleep
  • sleep/minutesAwake
  • sleep/minutesToFallAsleep
  • sleep/startTime
  • sleep/timeInBed


Emits a map that contains the requested resources and their associated values:

	'activities/calories': 1216,
	'activities/caloriesBMR': 781,
	'activities/steps': 4048,
	'activities/distance': 3.02385,
	'activities/floors': 4,
	'activities/elevation': 12,
	'activities/minutes_sedentary': 127,
	'activities/minutes_lightly_active': 61,
	'activities/minutes_fairly_active': 8,
	'activities/minutes_very_active': 24,
	'activities/activity_calories': 484,
	'body/bmi': 23.086421966552734,
	'body/fat': 0.0,
	'body/weight': 74.8,
	'foods/log/calories_in': 0,
	'foods/log/water': 0.0,
	'sleep/awakenings_count': 0,
	'sleep/efficiency': 84,
	'sleep/minutes_after_wakeup': 0,
	'sleep/minutes_asleep': 369,
	'sleep/minutes_awake': 69,
	'sleep/minutes_to_fall_asleep': 0,
	'sleep/start_time': '21:50',
	'sleep/time_in_bed': 438


To request data from the fitbit account it is necessary to create an app. Go to Under Manage go to Register an App. For the application website and organization website, name it anything starting with http:// or https://. Secondly, make sure the OAuth 2.0 Application Type is Personal. Lastly, make sure the Callback URL is in order to get our Fitbit API to connect properly. After that, click on the agreement box and submit. You will be redirected to a page that contains your Client ID and your Client Secret.

Next we need to acquire your initial access- and refresh-token.

git clone
cd python-fitbit
python3 -m venv venv
source venv/bin/activate
pip install -r dev.txt
./ <client_id> <client_secret>

You will be redirected to your browser and asked to login to your fitbit account. Next you can restrict the app to certain data. If everything is fine, your console window should print your access- and refresh-token and also expires_at.

Put your client_id, client_secret, access_token, refresh_token and expires_at to a yaml file and use this file-path as the config argument of this plugin. Please see the example below:

access_token: <access_token>
client_id: <client_id>
client_secret: <client_secret>
expires_at: <expires_at>
refresh_token: <refresh_token>

That's it. If your token expires it will be refreshed automatically by the plugin.


### Please point your environment variable `FITBIT_AUTH` to your authentication configuration

- name: fitbit_current
    plugin: pnp.plugins.pull.fitbit.Current
      config: "{{env::FITBIT_AUTH}}"
      instant_run: True
      interval: 5m
        - 'activities/calories'
        - 'activities/caloriesBMR'
        - 'activities/steps'
        - 'activities/distance'
        - 'activities/floors'
        - 'activities/elevation'
        - 'activities/minutesSedentary'
        - 'activities/minutesLightlyActive'
        - 'activities/minutesFairlyActive'
        - 'activities/minutesVeryActive'
        - 'activities/activityCalories'
        - 'body/bmi'
        - 'body/fat'
        - 'body/weight'
        - 'foods/log/caloriesIn'
        - 'foods/log/water'
        - 'sleep/awakeningsCount'
        - 'sleep/efficiency'
        - 'sleep/minutesAfterWakeup'
        - 'sleep/minutesAsleep'
        - 'sleep/minutesAwake'
        - 'sleep/minutesToFallAsleep'
        - 'sleep/startTime'
        - 'sleep/timeInBed'
    - plugin: pnp.plugins.push.simple.Echo

6.2. pnp.plugins.pull.fitbit.Devices

Requests details (battery, model, ...) about your fitbit devices / trackers associated to your account.

Requires extra fitbit.


config (str): The configuration file that keeps your initial and refreshed authentication tokens (see below for detailed information).
system (str, optional): The metric system to use based on your localisation (de_DE, en_US, ...). Default is your configured metric system in your fitbit account


Emits a list that contains your available trackers and/or devices and their associated details:

	'battery': 'Empty',
	'battery_level': 10,
	'device_version': 'Charge 2',
	'features': [],
	'id': 'abc',
	'last_sync_time': '2018-12-23T10:47:40.000',
	'type': 'TRACKER'
}, {
	'battery': 'High',
	'battery_level': 95,
	'device_version': 'Blaze',
	'features': [],
	'id': 'xyz',
	'last_sync_time': '2019-01-02T10:48:39.000',
	'type': 'TRACKER'


To request data from the fitbit account it is necessary to create an app. Go to Under Manage go to Register an App. For the application website and organization website, name it anything starting with http:// or https://. Secondly, make sure the OAuth 2.0 Application Type is Personal. Lastly, make sure the Callback URL is in order to get our Fitbit API to connect properly. After that, click on the agreement box and submit. You will be redirected to a page that contains your Client ID and your Client Secret.

Next we need to acquire your initial access- and refresh-token.

git clone
cd python-fitbit
python3 -m venv venv
source venv/bin/activate
pip install -r dev.txt
./ <client_id> <client_secret>

You will be redirected to your browser and asked to login to your fitbit account. Next you can restrict the app to certain data. If everything is fine, your console window should print your access- and refresh-token and also expires_at.

Put your client_id, client_secret, access_token, refresh_token and expires_at to a yaml file and use this file-path as the config argument of this plugin. Please see the example below:

access_token: <access_token>
client_id: <client_id>
client_secret: <client_secret>
expires_at: <expires_at>
refresh_token: <refresh_token>

That's it. If your token expires it will be refreshed automatically by the plugin.


### Please point your environment variable `FITBIT_AUTH` to your authentication configuration

- name: fitbit_devices
    plugin: pnp.plugins.pull.fitbit.Devices
      config: "{{env::FITBIT_AUTH}}"
      instant_run: True
      interval: 5m
    - plugin: pnp.plugins.push.simple.Echo

6.3. pnp.plugins.pull.fitbit.Goal

Requests your goals (water, steps, ...) from the fitbit api.

Requires extra fitbit.


config (str): The configuration file that keeps your initial and refreshed authentication tokens (see below for detailed information).
system (str, optional): The metric system to use based on your localisation (de_DE, en_US, ...). Default is your configured metric system in your fitbit account
goals (str, list[str]): The goals to request (see below for detailed information)

Available goals are:

  • body/fat
  • body/weight
  • activities/daily/activeMinutes
  • activities/daily/caloriesOut
  • activities/daily/distance
  • activities/daily/floors
  • activities/daily/steps
  • activities/weekly/distance
  • activities/weekly/floors
  • activities/weekly/steps
  • foods/calories
  • foods/water


Emits a dictionary structure that consists of the requested goals:

	'body/fat': 15.0,
	'body/weight': 70.0,
	'activities/daily/active_minutes': 30,
	'activities/daily/calories_out': 2100,
	'activities/daily/distance': 5.0,
	'activities/daily/floors': 10,
	'activities/daily/steps': 6000,
	'activities/weekly/distance': 5.0,
	'activities/weekly/floors': 10.0,
	'activities/weekly/steps': 6000.0,
	'foods/calories': 2220,
	'foods/water': 1893


To request data from the fitbit account it is necessary to create an app. Go to Under Manage go to Register an App. For the application website and organization website, name it anything starting with http:// or https://. Secondly, make sure the OAuth 2.0 Application Type is Personal. Lastly, make sure the Callback URL is in order to get our Fitbit API to connect properly. After that, click on the agreement box and submit. You will be redirected to a page that contains your Client ID and your Client Secret.

Next we need to acquire your initial access- and refresh-token.

git clone
cd python-fitbit
python3 -m venv venv
source venv/bin/activate
pip install -r dev.txt
./ <client_id> <client_secret>

You will be redirected to your browser and asked to login to your fitbit account. Next you can restrict the app to certain data. If everything is fine, your console window should print your access- and refresh-token and also expires_at.

Put your client_id, client_secret, access_token, refresh_token and expires_at to a yaml file and use this file-path as the config argument of this plugin. Please see the example below:

access_token: <access_token>
client_id: <client_id>
client_secret: <client_secret>
expires_at: <expires_at>
refresh_token: <refresh_token>

That's it. If your token expires it will be refreshed automatically by the plugin.


### Please point your environment variable `FITBIT_AUTH` to your authentication configuration

- name: fitbit_goal
    plugin: pnp.plugins.pull.fitbit.Goal
      config: "{{env::FITBIT_AUTH}}"
      instant_run: True
      interval: 5m
        - body/fat
        - body/weight
        - activities/daily/activeMinutes
        - activities/daily/caloriesOut
        - activities/daily/distance
        - activities/daily/floors
        - activities/daily/steps
        - activities/weekly/distance
        - activities/weekly/floors
        - activities/weekly/steps
        - foods/calories
        - foods/water
    - plugin: pnp.plugins.push.simple.Echo

6.4. pnp.plugins.pull.fs.FileSystemWatcher

Watches the given directory for changes like created, moved, modified and deleted files. If ignore_directories is set to False, then directories will be reported as well.

Per default will recursively report any file that is touched, changed or deleted in the given path. The directory itself or subdirectories will be object to reporting too, if ignore_directories is set to False.

Requires extra fswatcher.


path (str): The path to track for file / directory changes.
recursive (bool, optional): If set to True, any subfolders of the given path will be tracked too. Default is True.
patterns (str or list, optional): Any file pattern (e.g. .txt or [.txt, *.md]. If set to None no filter is applied. Default is None.
ignore_patterns (str or list, optional): Any patterns to ignore (specify like argument patterns). If set to None, nothing will be ignored. Default is None.
ignore_directories (str, optional): If set to True will send events for directories when file change. Default is False.
case_sensitive (bool, optional): If set to True, any pattern is case_sensitive, otherwise it is case insensitive. Default is False.
events (str or list, optional): The events to track. One or multiple of 'moved', 'deleted', 'created' and/or 'modified'. If set to None all events will be reported. Default is None.
load_file (bool, optional): If set to True the file contents will be loaded into the result. Default is False.
mode (str, optional): Open mode of the file (only necessary when load_file is True). Can be text, binary or auto (guessing). Default is auto.
base64 (bool, optional): If set to True the loaded file contents will be converted to base64 (only applicable when load_file is True). Argument mode will be automatically set to 'binary'. Default is False.
defer_modified (float, optional): If set greater than 0, it will defer the sending of modified events for that amount of time (seconds). There might be multiple flushes of a file before it is written completely to disk. Without defer_modified each flush will raise a modified event. Default is 0.5.


Example of an emitted message

    'operation': 'modified',
    'source': '/tmp/abc.txt',
    'is_directory': False,
    'destination': None,  # Only non-None when operation = 'moved'
    'file': {  # Only present when load_file is True
        'file_name': 'abc.txt',
        'content': 'foo and bar',
        'read_mode': 'text',
        'base64': False


- name: file_watcher
    plugin: pnp.plugins.pull.fs.FileSystemWatcher
      path: "/tmp"
      ignore_directories: True
      events: [created, deleted, modified]
      load_file: False
    plugin: pnp.plugins.push.simple.Echo

6.5. pnp.plugins.pull.gpio.Watcher

Listens for low/high state changes on the configured gpio pins.

In more detail the plugin can raise events when one of the following situations occur:

  • rising (high) of a gpio pin - multiple events may occur in a short period of time
  • falling (low) of a gpio pin - multiple events may occur in a short period of time
  • switch of gpio pin - will suppress multiple events a defined period of time (bounce time)
  • motion of gpio pin - will raise the event motion_on if the pin rises and set a timer with a configurable amount of time. Any other gpio rising events will reset the timer. When the timer expires the motion_off event is raised.

Requires extra gpio.


pins (list): The gpio pins to observe for state changes. Please see the examples section on how to configure it.
default (on of [rising, falling, switch, motion]: The default edge that is applied when not configured. Please see the examples section for further details.


    "gpio_pin": 17  # The gpio pin which state has changed
    "event": rising  # One of [rising, falling, switch, motion_on, motion_off]


- name: gpio
    plugin: pnp.plugins.pull.gpio.Watcher
      default: rising
        - 2               # No mode specified: Default mode (in this case 'rising')
        - 2               # Duplicates will get ignored
        - 3:rising        # Equal to '3' (without explicit mode)
        - 3:falling       # Get the falling event for gpio pin 3 as well
        - 4:switch        # Uses some debouncing magic and emits only one rising event
        - 5:switch(1000)  # Specify debounce in millseconds (default is 500ms)
        - 5:switch(500)   # Duplicates - even when they have other arguments - will get ignored
        - 7:motion        # Uses some delay magic to emit only one motion on and one motion off event
        - 9:motion(1m)    # Specify delay (default is 30 seconds)
    - plugin: pnp.plugins.push.simple.Echo

6.6. pnp.plugins.pull.hass.State

Connects to the home assistant websocket api and listens for state changes. If no include or exclude is defined it will report all state changes. If include is defined only entities that match one of the specified patterns will be emitted. If exclude if defined entities that match at least one of the specified patterns will be ignored. exclude patterns overrides include patterns.


host (str): Url to your home assistant instance (e.g. http://my-hass:8123)
token (str): Your long lived access token to access the websocket api. See below for further instructions
include (str or list[str]): Patterns of entity state changes to include. All state changes that do not match the defined patterns will be ignored
exclude (str or list[str]:Patterns of entity state changes to exclude. All state changes that do match the defined patterns will be ignored


  • include and exclude support wildcards (e.g * and ?)
  • exclude overrides include. So you can include everything from a domain (sensor.*) but exclude individual entities.
  • Create a long lived access token: Home Assistant documentation


The emitted result always contains the entity_id, new_state and old_state:

	'entity_id': 'light.bedroom_lamp',
	'old_state': {
		'state': 'off',
		'attributes': {},
		'last_changed': '2019-01-08T18:24:42.087195+00:00',
		'last_updated': '2019-01-08T18:40:40.011459+00:00'
	'new_state': {
		'state': 'on',
		'attributes': {},
		'last_changed': '2019-01-08T18:41:06.329699+00:00',
		'last_updated': '2019-01-08T18:41:06.329699+00:00'


- name: hass_state
    plugin: pnp.plugins.pull.hass.State
      url: http://localhost:8123
      token: "{{env::HA_TOKEN}}"
        - light.lamp
        - light.*
    - plugin: pnp.plugins.push.simple.Echo

6.7. pnp.plugins.pull.http.Server

Listens on the specified port for requests to any endpoint. Any data passed to the endpoint will be tried to be parsed to a dictionary (json). If this is not possible the data will be passed as is. See sections Result for specific payload and examples.

Remark: You will not able to make requests to the endpoint DELETE /_shutdown because it is used internally.

Requires extra http-server.


port (int, optional): The port the rest server should listen to for requests. Default is 5000.
allowed_methods (str or list, optional): List of http methods that are allowed. Default is 'GET'.
server_impl (str, optional): Choose the implementation of the WSGI-Server (wraps the flask-app). Possible values are: [flask, gevent]. flask uses the internal Flask Development server. Not recommended for production use. gevent uses gevent. Default is gevent.


curl -X GET 'http://localhost:5000/resource/endpoint?foo=bar&bar=baz' --data '{"baz": "bar"}'
    'endpoint': 'resource/endpoint,
    'method': 'GET',
    'query': {'foo': 'bar', 'bar': 'baz'},
    'data': {'baz': 'bar'},
    'is_json': True
curl -X GET 'http://localhost:5000/resource/endpoint' --data 'no json obviously'
    'endpoint': 'resource/endpoint,
    'method': 'GET',
    'query': {},
    'data': b'no json obviously',
    'is_json': False


- name: rest
    plugin: pnp.plugins.pull.http.Server
      port: 5000
      allowed_methods: [GET, POST]
    plugin: pnp.plugins.push.simple.Echo

6.8. pnp.plugins.pull.monitor.Stats

Emits every interval various metrics / statistics about the host system. Please see the 'Result' section for available metrics.


	'cpu_count': 4,
	'cpu_freq': 700,  # in Mhz
	'cpu_use': 6.6,  # in %
	'cpu_temp': 52.6,  # in °C (might not be available on all systems, e.g. MacOS)
	'memory_use': 56.0,  # in %
	'swap_use': 23.2,  # in %
	'disk_use': 69.8,  # in %  (of your root)
	'load_1m': 1.81591796875,  # CPU queue length last minute
	'load_5m': 2.06689453125,  # CPU queue length last 5 minutes
	'load_15m': 2.15478515625  # CPU queue length last 15 minutes


- name: stats
    plugin: pnp.plugins.pull.monitor.Stats
      interval: 10s
      instant_run: True
    plugin: pnp.plugins.push.simple.Echo

6.9. pnp.plugins.pull.mqtt.Subscribe

Pulls messages from the specified topic from the given mosquitto mqtt broker (identified by host and port).


host (str): Host where the mosquitto broker is running.
port (int): Port where the mosquitto broker is listening.
topic (str): Topic to pull messages from. You can listen to multiple topics by using the #-wildcard (e.g. test/# will listen to all topics underneath test).

All arguments can be automatically injected via environment variables with MQTT prefix (e.g. MQTT_HOST).


The emitted message will look like this:

    'topic': 'test/device/device1',
    'levels': ['test', 'device', 'device1']
    'payload': 'The actual event message'


- name: mqtt
    plugin: pnp.plugins.pull.mqtt.Subscribe
      host: localhost
      port: 1883
      topic: test/#
    plugin: pnp.plugins.push.simple.Echo

6.10. pnp.plugins.pull.sensor.DHT

Periodically polls a dht11 or dht22 (aka am2302) for temperature and humidity readings. Polling interval is controlled by interval.

Requires extra dht.


device (str, optional): The device to poll (one of dht22, dht11, am2302). Default is 'dht22'.
data_gpio (int, optional): The data gpio port where the device operates on. Default is 17.
humidity_offset (float, optional): Positive/Negative offset for humidity. Default is 0.0.
temp_offset (float, optional): Positive/Negative offset for temperature. Default is 0.0.


    "humidity": 65.4  # in %
    "temperature": 23.7  # in celsius


- name: dht
    plugin: pnp.plugins.pull.sensor.DHT
      device: dht22  # Connect to a dht22
      data_gpio: 17  # DHT is connected to gpio port 17
      interval: 5m  # Polls the readings every 5 minutes
      humidity_offset: -5.0  # Subtracts 5% from the humidity reading
      temp_offset: 1.0  # Adds 1 °C to the temperature reading
      instant_run: True
    - plugin: pnp.plugins.push.simple.Echo
      selector: payload.temperature  # Temperature reading
    - plugin: pnp.plugins.push.simple.Echo
      selector: payload.humidity  # Humidity reading

6.11. pnp.plugins.pull.sensor.OpenWeather

Periodically polls weather data from the OpenWeatherMap api.


api_key (str): The api_key you got from OpenWeatherMap after registration.
lat (float): Latitude. If you pass lat, you have to pass lon as well.
lon (float): Longitude. If you pass lon, you have to pass lat as well.
city_name (str): The name of your city. To minimize ambiguity use lat/lon or your country as a suffix, e.g. London,GB.
units (str on of (metric, imperial, kelvin)): Specify units for temperature and speed.
imperial = fahrenheit + miles/hour, metric = celsius + m/secs, kelvin = kelvin + m/secs. Default is metric.
tz (str, optional): Time zone to use for current time and last updated time. Default is your local timezone.

Remark: You have to pass whether city_name or lat/lon.


	"temperature": 13.03,
	"pressure": 1021,
	"humidity": 62,
	"cloudiness": 40,
	"wind": {
		"speed": 9.3,
		"deg": 300
	"poll_dts": "2018-10-03T15:41:32.156930+02:00",
	"last_updated_dts": "2018-10-03T15:20:00+02:00",
	"raw": {
		"coord": {
			"lon": 10,
			"lat": 53.55
		"weather": [{
			"id": 521,
			"main": "Rain",
			"description": "shower rain",
			"icon": "09d"
		"base": "stations",
		"main": {
			"temp": 13.03,
			"pressure": 1021,
			"humidity": 62,
			"temp_min": 12,
			"temp_max": 14
		"visibility": 10000,
		"wind": {
			"speed": 9.3,
			"deg": 300
		"clouds": {
			"all": 40
		"dt": 1538572800,
		"sys": {
			"type": 1,
			"id": 4883,
			"message": 0.0202,
			"country": "DE",
			"sunrise": 1538544356,
			"sunset": 1538585449
		"id": 2911298,
		"name": "Hamburg",
		"cod": 200

You can consult the specification to checkout the documentation about the meaning of individual fields.


### Make sure you export your api key with: `export OPENWEATHER_API_KEY=<your_api_key>`

- name: openweather
    plugin: pnp.plugins.pull.sensor.OpenWeather
      city_name: "Hamburg,DE"  # Alternative: pass lat and lon
      # lon: 10
      # lat: 53.55
      units: metric  # imperial (fahrenheit + miles/hour), metric (celsius + m/secs), kelvin (kelvin + m/secs)
      instant_run: True
      # tz: GMT
    plugin: pnp.plugins.push.simple.Echo

6.12. pnp.plugins.pull.simple.Count

Emits every wait seconds a counting value which runs from from_cnt to to_cnt. If to_cnt is None the counter will count to infinity.


wait (int): Wait the amount of seconds before emitting the next counter.
from_cnt (int): Starting value of the counter.
to_cnt (int, optional): End value of the counter. If not passed set to "infinity" (precise: int.max).


Counter value (int).


- name: count
    plugin: pnp.plugins.pull.simple.Count
      wait: 1
      from_cnt: 1
      to_cnt: 10
    plugin: pnp.plugins.push.simple.Echo

6.13. pnp.plugins.pull.simple.Repeat

Emits every wait seconds the same repeat.


wait (int): Wait the amount of seconds before emitting the next repeat.
repeat (any): The object to emit.


Emits the repeat-object as it is.


- name: repeat
    plugin: pnp.plugins.pull.simple.Repeat
      repeat: "Hello World"  # Repeats 'Hello World'
      wait: 1  # Every second
    plugin: pnp.plugins.push.simple.Echo

6.14. pnp.plugins.pull.zway.ZwayPoll

Pulls the specified json content from the zway rest api. The content is specified by the url, e.g. http://<host>:8083/ZWaveAPI/Run/devices will pull all devices and serve the result as a json.

Specify the polling interval by setting the argument interval. User / password combination is required when your api is protected against guest access (by default it is).

Use multiple pushes and the related selectors to extract the required content like temperature readings (see the examples section for guidance).


url (str): The url to poll periodically.
user (str): Authentication user name.
password (str): Authentication password.
interval (polling literal, optional): Polling interval (default: 1m).

All arguments (url, user and password) can be automatically injected via environment variables.



Emits the content of the fetched url as it is.


### Please make sure to adjust url and device ids
### Username and Password are injected from environment variables:
###     export ZWAY_USER=admin
###     export ZWAY_PASSWORD=secret_one
- name: zway
    plugin: pnp.plugins.pull.zway.ZwayPoll
      url: "http://smarthome:8083/ZWaveAPI/Run/devices"
      interval: 5s
    - plugin: pnp.plugins.push.simple.Echo
      # Temperature of fibaro motion sensor
      # You can access the returned json like you would inquire the zway-api
      selector: payload[19].instances[0].commandClasses[49].data[1].val.value
    - plugin: pnp.plugins.push.simple.Echo
      # Luminiscence of fibaro motion sensor
      selector: payload[19].instances[0].commandClasses[49].data[3].val.value


Below are some common selector examples to fetch various metrics from various devices

Fibaro Motion Sensor

  • Temperature payload[deviceid].instances[0].commandClasses[49].data[1].val.value
  • Luminescence payload[deviceid].instances[0].commandClasses[49].data[3].val.value

fibaro Wallplug

  • Meter payload[deviceid].instances[0].commandClasses[50].data[0].val.value

Thermostat (Danfoss / other should work as well)

  • Setpoint payload[deviceid].instances[0].commandClasses[67].data[1].val.value

Battery operated devices

  • Battery level payload[deviceid].instances[0].commandClasses[128].data.last.value

6.15. pnp.plugins.pull.zway.ZwayReceiver

Setups a http server to process incoming GET-requests from the Zway-App HttpGet.


url_format (str): The url_format that is configured in your HttpGet App. If you configured http://<ip>:<port>/set?device=%DEVICE%&state=%VALUE% (default of the App), you basically have to copy the path component set?device=%DEVICE%&state=%VALUE% to be your url_format.
mode ([mapping, auto, both]): If set to mapping (default) you should provide the device_mapping to manually map your virtual devices. If set to auto the plugin will try to determine the device_id, command class, mode and the type on it's own. If set to both the plugin will first try the device_mapping and then perform the auto-magic.
device_mapping (Or(Dict[Str, Str], Dict[Str, Dict]), optional): A mapping to map the somewhat cryptic virtual device names to human readable ones. Default is None, which means that no mapping will be performed. Two ways possible:

  1. Ordinary mapping from virtual device name -> alias.
  2. Enhanced mapping from virtual device name to dictionary with additional properties. One property has to be alias.
    ignore_unknown_devices (bool, optional): If set to True all incoming requests that are associated with an device that is not part of the mapping or - when mode = [auto, both] - cannot be auto mapped will be ignored. Default is False.

Additionally the component will accept any arguments that pnp.plugins.pull.http.Server would accept.


Given the url_format set?%DEVICE%&value=%VALUE%, the url http://<ip>:<port>/set?vdevice1&value=5.5 and the device_mapping vdevice1 -> alias of vdevice1 the emitted message will look like this:

    'device_name': 'alias of vdevice1',
    'raw_device': 'vdevice1'
    'value': '5.5',
    'props': {}

When mode is auto or both the plugin will try to determine the device id and the type of the virtual device on it's own. Given the virtual device name ZWayVDev_zway_7-0-48-1 and the value of on will produce the following:

    'device_name': '7',
    'raw_device': 'ZWayVDev_zway_7-0-48-1',
    'value': 'on'
    'props': {
        'command_class': '48',
        'mode': '1',
        'type': 'motion'


- name: zway_receiver
    plugin: pnp.plugins.pull.zway.ZwayReceiver
      port: 5000
      mode: mapping  # mapping, auto or both
        vdevice1:  # Props = {type: motion}
          alias: dev1
          type: motion
        vdevice2:  # Props = {type: switch, other_prop: foo}
          alias: dev2
          type: switch
          other_prop: foo
        vdevice3: dev3  # props == {}
      url_format: "%DEVICE%?value=%VALUE%"
      ignore_unknown_devices: False
    - plugin: pnp.plugins.push.simple.Echo
      selector: "'Got value {} from device {} ({}) with props {}'.format(data.value, data.device_name, data.raw_device, data.props)"

6.16. pnp.plugins.push.fs.FileDump

This push dumps the given payload to a file to the specified directory. If argument file_name is None, a name will be generated based on the current datetime (%Y%m%d-%H%M%S). If file_name is not passed (or None) you should pass extension to specify the extension of the generated file name. Argument binary_mode controls whether the dump is binary (mode=wb) or text (mode=w).


directory (str, optional): The target directory to store the dumps. Default is '.' (current directory).
file_name (str, optional): The name of the file to dump. If set to None a file name will be automatically generated. You can specify the file_name via the envelope, too. Envelope will override init file name. Default is None.
extension (str, optional): The extension to use when the file name is automatically generated. Can be overridden by envelope. Default is '.dump'.
binary_mode (bool, optional): If set to True the file will be written in binary mode ('wb'); otherwise in text mode ('w'). Default is False.


Will return an absolute path to the file created.


- name: file_dump
    plugin: pnp.plugins.pull.simple.Repeat
      repeat: "Hello World"
    plugin: pnp.plugins.push.fs.FileDump
      directory: "/tmp"
      file_name: null  # Auto-generated file (timestamp)
      extension: ".txt"  # Extension of auto-generated file
      binary_mode: False  # text mode
      - plugin: pnp.plugins.push.simple.Echo
- name: file_dump
    plugin: pnp.plugins.pull.simple.Repeat
      repeat: "Hello World"
    plugin: pnp.plugins.push.fs.FileDump
    # Override `file_name` and `extension` via envelope.
    # Instead of an auto generated file, the file '/tmp/hello-world.hello' will be dumped.
      data: "lambda data: data"
      file_name: hello-world
      extension: .hello
      directory: "/tmp"
      file_name: null  # Auto-generated file (timestamp)
      extension: ".txt"  # Extension of auto-generated file
      binary_mode: False  # text mode
      - plugin: pnp.plugins.push.simple.Echo

6.17. pnp.plugins.push.http.Call

Makes a request to a http resource.


url (str): Request url. Can be overridden via envelope.
method (str, optional): The http method to use for the request. Must be a valid http method (GET, POST, ...). Default is 'GET'. Can be overridden via envelope.
fail_on_error (bool, optional): If True the push will fail on a http status code <> 2xx. This leads to an error message recorded into the logs and no further execution of any dependencies. Default is False. Can be overridden by the envelope.
provide_response (bool, optional): If True the push will not return the payload as it is, but instead provide the response status_code, fetched url content and a flag if the url content is a json response. This is useful for other push instances in the dependency chain. Default is False.


Will return the payload as it is for easy chaining of dependencies. If provide_response is True the push will return a dictionary that looks like this:

    "status_code": 200,
    "data": "fetched url content",
    "is_json": False

Please note that this structure will be interpreted as an envelope with the keys status_code and is_json along with the payload 'fetched url content' by other push instances in the dependency chain.


### Simple example calling the built-in rest server
### Oscillates between http method GET and POST. Depending on the fact if the counter is even or not.
- name: http_call
    plugin: pnp.plugins.pull.simple.Count
      wait: 5
    plugin: pnp.plugins.push.http.Call
        counter: "lambda data: data"
      method: "lambda data: 'POST' if int(data) % 2 == 0 else 'GET'"
      url: http://localhost:5000/
- name: rest_server
    plugin: pnp.plugins.pull.http.Server
      port: 5000
        - GET
        - POST
    plugin: pnp.plugins.push.simple.Echo
### Demonstrates the use of `provide_response` set to True.
### Call will return a response object to dependent push instances.
- name: http_call
    plugin: pnp.plugins.pull.simple.Count
      wait: 5
    plugin: pnp.plugins.push.http.Call
      url: http://localhost:5000/
      provide_response: True
      plugin: pnp.plugins.push.simple.Echo
- name: rest_server
    plugin: pnp.plugins.pull.http.Server
      port: 5000
        - GET
    plugin: pnp.plugins.push.simple.Nop


FaceR (short one for face recognition) tags known faces in images. Output is the image with all faces tagged whether with the known name or an unknown_label. Default for unknown ones is 'Unknown'.

Known faces can be ingested either by a directory of known faces (known_faces_dir) or by mapping of known_faces (dictionary: name -> [list of face files]).

The payload passed to the push method is expected to be a valid byte array that represents an image in memory.


known_faces (dict<str, file_path as str>, optional): Mapping of a person's name to a list of images that contain the person's face. Default is None.
known_faces_dir (str, optional): A directory containing images with known persons (file_name -> person's name). Default is None.
unknown_label (str, optional): Tag label of unknown faces. Default is 'Unknown'.

You have to specify either known_faces or known_faces_dir. If both are unsupplied the push will fail.


Will return a dictionary that contains the bytes of the tagged image (key tagged_image) and metadata (no_of_faces, known_faces)

    'tagged_image': <bytes of tagged image>
    'no_of_faces': 2
    'known_faces': ['obama']


- name: faceR
    plugin: pnp.plugins.pull.fs.FileSystemWatcher
      path: "/tmp/camera"
      recursive: True
      patterns: "*.jpg"
      ignore_directories: True
      case_sensitive: False
      events: [created]
      load_file: True
      mode: binary
      base64: False
      known_faces_dir: "/tmp/faces"
      unknown_label: "don't know him"

6.19. pnp.plugins.push.mqtt.Discovery





For chaining of pushes the payload is simply returned as is.


### Please point your environment variable `FITBIT_AUTH` to your authentication configuration

- name: fitbit_steps
    plugin: pnp.plugins.pull.fitbit.Current
      config: "{{env::FITBIT_AUTH}}"
      instant_run: True
      interval: 5m
        - activities/steps
    - plugin: pnp.plugins.push.mqtt.Discovery
      selector: "data.get('activities/steps')"
        host: localhost
        discovery_prefix: homeassistant
        component: sensor
        object_id: fitbit_steps
          name: "{{var::object_id}}"
          icon: "mdi:soccer"

- name: fitbit_devices_battery
    plugin: pnp.plugins.pull.fitbit.Devices
      config: "{{env::FITBIT_AUTH}}"
      instant_run: True
      interval: 5m
    - plugin: pnp.plugins.push.mqtt.Discovery
        data: "lambda data: data.get('battery_level')"
        object_id: "lambda data: 'fb_{}_battery'.format(data.get('device_version', '').replace(' ', '_').lower())"
      unwrap: True
        host: localhost
        discovery_prefix: homeassistant
        component: sensor
          name: "{{var::object_id}}"
          device_class: "battery"
          unit_of_measurement: "%"
    - plugin: pnp.plugins.push.mqtt.Discovery
        data: "lambda data: data.get('last_sync_time')"
        object_id: "lambda data: 'fb_{}_lastsync'.format(data.get('device_version', '').replace(' ', '_').lower())"
      unwrap: True
        host: localhost
        discovery_prefix: homeassistant
        component: sensor
          name: "{{var::object_id}}"

6.20. pnp.plugins.push.mqtt.Publish

Will push the given payload to a mqtt broker (in this case mosquitto). The broker is specified by host and port. In addition a topic needs to be specified were the payload is pushed to (e.g. home/living/thermostat).

The payload will be pushed as it is. No transformation is applied. If you need to some transformations, use the selector.


host (str): The host where the mosquitto broker is running.
port (int, optional): The port where the mosquitto broker is listening. Default is 1883.
topic (str, optional): The topic to subscribe to. If set to None the envelope of the payload has to contain a 'topic' key or the push will fail (default is None). If both exists the topic from the envelope will overrule the init one.
retain (bool, optional): If set to True will mark the message as retained. Default is False. See the mosquitto man page for further guidance
multi (bool, optional): If set to True the payload is expected to be a dictionary. Each item of that dictionary will be send individually to the broker. The key of the item will be appended to the configured topic. The value of the item is the actual payload. Default is False.


For chaining of pushes the payload is simply returned as is.


- name: mqtt
    plugin: pnp.plugins.pull.simple.Count
    # Will push the counter to the 'home/counter/state' topic
    plugin: pnp.plugins.push.mqtt.Publish
      host: localhost
      topic: home/counter/state
      port: 1883
      retain: True
- name: mqtt
    plugin: pnp.plugins.pull.simple.Count
      wait: 1
    plugin: pnp.plugins.push.mqtt.Publish
    # Lets override the topic via envelope mechanism
    # Will publish even counts on topic 'even' and uneven counts on 'uneven'
      data: "lambda data: data"
      topic: "lambda data: 'test/even' if int(data) % 2 == 0 else 'test/uneven'"
      host: localhost
      port: 1883
- name: mqtt
    # Periodically gets metrics about your system
    plugin: pnp.plugins.pull.monitor.Stats
      instant_run: True
      interval: 10s
    # Push them to the mqtt
    plugin: pnp.plugins.push.mqtt.Publish
      host: localhost
      topic: devices/localhost/
      port: 1883
      retain: True
      # Each item of the payload-dict (cpu_count, cpu_usage, ...) will be pushed to the broker as multiple items.
      # The key of the item will be appended to the topic, e.g. `devices/localhost/cpu_count`.
      # The value of the item is the actual payload.
      multi: True

6.21. pnp.plugins.push.notify.Pushbullet

Sends a message to the Pushbullet service. The type of the message will guessed:

  • push_link for a single http link
  • push_file if the link is directed to a file (mimetype will be guessed)
  • push_note for everything else (converted to str)

Requires extra pushbullet.


api_key (str): The api key to your pushbullet account.
title (str, optional): The title to use for your messages. Defaults to pnp


Will return the payload as it is for easy chaining of dependencies.


### Make sure that you provided PUSHBULETT_API_KEY as an environment variable

- name: pushbullet
    plugin: pnp.plugins.pull.fs.FileSystemWatcher
      path: "/tmp"
      ignore_directories: True
        - created
      load_file: False
    plugin: pnp.plugins.push.notify.Pushbullet
      title: "Watcher"
    selector: "'New file: {}'.format(data.source)"

6.22. pnp.plugins.push.simple.Echo

Simply log the passed payload to the default logging instance.




Will return the payload as it is for easy chaining of dependencies.


- name: count
    plugin: pnp.plugins.pull.simple.Count
      wait: 1
      from_cnt: 1
      to_cnt: 10
    plugin: pnp.plugins.push.simple.Echo

6.23. pnp.plugins.push.simple.Execute

Executes a command with given arguments in a shell of the operating system.

Will return the exit code of the command and optionally the output from stdout and stderr.


command (str): The command to execute.
args (str or iterable, optional): The arguments to pass to the command. Default is no arguments.
cwd (str, optional): Specifies where to execute the command (working directory). Default is current working directory.
timeout (duration literal, optional): Specifies how long the worker should wait for the command to finish.
capture (bool, optional): If True stdout and stderr output is captured, otherwise not.


Returns a dictionary that contains the return_code and optionally the output from stdout and stderr whether capture is set or not. The output is a list of lines.

    'return_code': 0
    'stdout': ["hello", "dude!"]
    'stderr': []


- name: execute
    plugin: pnp.plugins.pull.simple.Count
      wait: 1
      from_cnt: 1
      to_cnt: 10
    plugin: pnp.plugins.push.simple.Execute
      command: date  # The command to execute
      args:  # Argument passed to the command
        - "-v"
        - "-1d"
        - "+%Y-%m-%d"
      timeout: 2s
      cwd:  # None -> current directory
      capture: True  # Capture stdout and stderr
      - plugin: pnp.plugins.push.simple.Echo


Uploads provided file to the specified dropbox account.


api_key (str): The api key to your dropbox account/app.
target_file_name (str, optional): The file path on the server where to upload the file to. If not specified you have to specify this argument during push time by setting it in the envelope.
create_shared_link (bool, optional): If set to True, the push will create a publicly available link to your uploaded file. Default is True.

Requires extra dropbox.


Returns a dictionary that contains metadata information about your uploaded file. If you uploaded a file named 42.txt, your result will be similiar to the one below:

    "name": "42.txt",
    "id": "HkdashdasdOOOOOadss",
    "content_hash": "aljdhfjdahfafuhu489",
    "size": 42,
    "path": "/42.txt",
    "shared_link": "http://someserver/tosomestuff/asdasd?dl=1",
    "raw_link": "http://someserver/tosomestuff/asdasd?raw=1"

shared_link is the one that is publicly available (if you know the link). Same for raw_link, but this link will return the raw file (without the dropbox overhead). Both are None if create_shared_link is set to False.


### Make sure that you provided DROPBOX_API_KEY as an environment variable

- name: dropbox
    plugin: pnp.plugins.pull.fs.FileSystemWatcher
      path: "/tmp"
      ignore_directories: True
        - created
        - modified
      load_file: False
    - plugin:
        create_shared_link: True  # Create a publicly available link
        data: "lambda data: data.source"  # Absolute path to file
        target_file_name: "lambda data: basename(data.source)"  # File name only

6.25. pnp.plugins.push.timedb.InfluxPush

Pushes the given payload to an influx database using the line protocol. You have to specify host, port, user, password and the database.

The protocol is basically a string that will be augmented at push-time with data from the payload. E.g. {payload.metric},room={payload.location} value={payload.value} assumes that payload contains metric, location and value. See


host (str): The host where the influxdb is running.
port (int): The port where the influxdb service is listening on.
user (str): Username to use for authentication.
password (str): Related password.
database (str): The database to write to.
protocol (str): Line protocol template (augmented with payload-data).

All arguments can be automatically injected via environment variables with INFLUX prefix (e.g. INFLUX_HOST).


For the ability to chain multiple pushes together the payload is simply returned as is.


- name: mqtt_pull
    plugin: pnp.plugins.pull.mqtt.Subscribe
      host: mqtt
      topic: home/#
    plugin: pnp.plugins.push.timedb.InfluxPush
      data: "lambda data: data"
      host: influxdb
      port: 8086
      user: root
      password: secret
      database: home
      protocol: "{payload.levels[2]},room={payload.levels[1]} {payload.levels[3]}={payload.payload}"

6.26. pnp.plugins.udf.hass.State

Fetches the state of an entity from home assistant by a rest-api request.


url (str): The url to your home assistant instance (e.g. http://hass:8123)
token (str): The love live access token to get access to home assistant
timeout (Optional[int]): Tell the request to stop waiting for a reponse after given number of seconds. Default is 5 seconds.

Call Arguments

entity_id (str): The entity to fetch the state
attribute (Optional[str]): Optionally you can fetch the state of one of the entities attributes. Default is None (which means to fetch the state of the entity)


Returns the current state of the entity or one of it's attributes. If the entity is not known to home assistant an exception is raised. In case of an attribute does not exists, None will be returned instead to signal it's absence.


  # Defines the udf. name is the actual alias you can call in selector expressions.
  - name: hass_state
    plugin: pnp.plugins.udf.hass.State
      url: http://localhost:8123
      token: "{{env::HA_TOKEN}}"
  - name: hass_state
      plugin: pnp.plugins.pull.simple.Repeat
        repeat: "Hello World"  # Repeats 'Hello World'
        wait: 1  # Every second
      - plugin: pnp.plugins.push.simple.Echo
        # Will only print the data when attribute azimuth of the sun component is above 200
        selector: "'azimuth is greater than 200' if hass_state('sun.sun', attribute='azimuth') > 200.0 else SUPPRESS"
      - plugin: pnp.plugins.push.simple.Echo
        # Will only print the data when the state of the sun component is above 'above_horizon'
        selector: "'above_horizon' if hass_state('sun.sun') == 'above_horizon' else SUPPRESS"

6.27. pnp.plugins.udf.simple.Counter

Memories a counter value which is increased everytime you call the udf.


init (Optional[int]): The initialization value of the counter. Default is 0.


Returns the current counter.


  # Defines the udf. name is the actual alias you can call in selector expressions.
  - name: counter
    plugin: pnp.plugins.udf.simple.Counter
      init: 1
  - name: countme
      plugin: pnp.plugins.pull.simple.Repeat
        repeat: "Hello World"  # Repeats 'Hello World'
        wait: 1  # Every second
      - plugin: pnp.plugins.push.simple.Echo
          data: "lambda data: data"
          count: "lambda data: counter()"  # Calls the udf

6.28. pnp.plugins.udf.simple.Memory

Returns a previously memorized value when called.


init (any, optional): The initial memory of the plugin. Default is None.

Call Arguments

new_memory (any, optional): After emitting the current memorized value the current memory is overwritten by this value. Will only be overwritten if the parameter is specified.


Returns the memorized value.


  - name: mem
    plugin: pnp.plugins.udf.simple.Memory
      init: 1
  - name: countme
      plugin: pnp.plugins.pull.simple.Count
        from_cnt: 1
        wait: 1  # Every second
      - plugin: pnp.plugins.push.simple.Echo
        # Will memorize every uneven count
        selector: "mem() if data % 2 == 0 else mem(new_memory=data)"

7. Changelog

We cannot ensure not to introduce any breaking changes to interfaces / behaviour. This might occur every commit whether it is intended or by accident. Nevertheless we try to list breaking changes in the changelog that we are aware of. You are encouraged to specify explicitly the version in your dependency tools, e.g.:

pip install pnp==0.10.0


  • Adds UDF (user defined functions)
  • Adds UDF udf.hass.State to request the current state of an entity (or one of it's attributes) from home assistant
  • Makes selector expressions in complex structures (dicts / lists) more explicit using lambda expressions with mandatory payload argument. This will probably break configs that use complex expressions containing lists and/or dictionaries
  • Adds pull.hass.State to listen to state changes in home assistant
  • Fixes bug in pull.fitbit.Goal when fetching weekly goals (so far daily goals were fetched too)
  • Adds UDF udf.simple.Memory to memorize values to access them later


  • Adds pull.fitbit.Current, pull.fitbit.Devices, pull.fitbit.Goal plugins to request data from fitbit api
  • Adds push.mqtt.Discovery to create mqtt discovery enabled devices for home assistant. Reference
  • Adds unwrapping-feature to pushes


  • Adds additional argument multi (default False) to push.mqtt.MQTTPush to send multiple messages to the broker if the payload is a dictionary (see plugin docs for reference)
  • Adds plugin pull.monitor.Stats to periodically emit stats about the host system
  • Adds plugin push.notify.Pushbullet to send message via the pushbullet service
  • Adds plugin to upload files to a dropbox account/app
  • Adds feature to use complex lists and/or dictionary constructs in selector expressions
  • Adds plugin pull.gpio.Watcher (extra gpio) to watch gpio pins for state changes. Only works on raspberry
  • Adds plugin push.simple.Execute to run commands in a shell
  • Adds extra http-server to optionally install flask and gevent when needed
  • Adds utility method to check for installed extras
  • Adds -v | --verbose flag to pnp runner to switch logging level to DEBUG. No matter what...


  • Adds auto-mapping magic to the pull.zway.ZwayReceiver.
  • Adds humidity and temperature offset to dht


  • Fixes error catching of run_pending in Polling base class


  • Fixes resolution of logging configuration on startup


  • Introduces the pull.zway.ZwayReceiver and pull.sensor.OpenWeather component
  • Introduces logging configurations. Integrates dictmentor package to augment configuration


  • Introduces engines. You are not enforced to explicitly use one and backward compatibility with legacy configs is given (actually the example configs work as they did before the change). So there shouldn't be any breaking change.

Project details

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pnp-0.14.0.tar.gz (149.1 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page