Skip to main content

Python 3 API wrapper for FireServiceRota/BrandweerRooster

Project description

Python: FireServiceRota / BrandweerRooster

Python 3 API wrapper for FireServiceRota and BrandweerRooster

About

This package allows you to get notified about emergency incidents from FireServiceRota.co.uk and BrandweerRooster.nl. Those are services used by firefighters.

See https://fireservicerota.co.uk and https://brandweerrooster.nl for more details.

NOTE: You need a subscription and login account to be able to use it.

Installation

pip3 install pyfireservicerota

Usage

from pyfireservicerota import FireServiceRotaOAuth, FireServiceRotaOauthError, FireServiceRotaIncidentsListener
import logging
import sys
import json
import time
import threading

_LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)

oauth = FireServiceRotaOAuth(
        "https://www.brandweerrooster.nl/oauth/token",
        "",
        [USERNAME],
        [PASSWORD],
    )

try:
    token_info = oauth.get_access_token()
except FireServiceRotaOauthError:
    token_info = None

if not token_info:
    _LOGGER.error("Failed to get access token")

wsurl = f"wss://www.brandweerrooster.nl/cable?access_token={token_info['access_token']}"


class FireService():

    def __init__(self):

        self._data = None
        self.listener = None
        self.thread = threading.Thread(target=self.incidents_listener)
        self.thread.daemon = True
        self.thread.start()

    def on_incident(self, data):
        _LOGGER.debug("INCIDENT: %s", data)
        self._data = data

    @property
    def data(self):
        """Return the current data stored in the provider."""
        return self._data

    def incidents_listener(self):
        """Spawn a new Listener and links it to self.on_incident."""

        _LOGGER.debug("Starting incidents listener")
        self.listener = FireServiceRotaIncidentsListener(url=wsurl, on_incident=self.on_incident)

        while True:
            try:
                self.listener.run_forever()
            except:
                pass


ws = FireService()

while True:
    time.sleep(1)

Don't store user credentuals, just the token_info and use below code to refresh it, it will only fetch new token when it has expired. If you want to force a token refresh add True as param to oauth.refresh_acess_token()

oauth = FireServiceRotaOAuth(
        "https://www.brandweerrooster.nl/oauth/token",
        "",
        "",
        "",
    )

try:
    token_info = oauth.refresh_access_token(current_token_info)
except FireServiceRotaOauthError:
    token_info = None

if not token_info:
    _LOGGER.error("Failed to get access token")

if token_info != current_token_info:
    _LOGGER.error("Got new token, store it")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyfireservicerota-0.0.11.tar.gz (4.2 kB view details)

Uploaded Source

File details

Details for the file pyfireservicerota-0.0.11.tar.gz.

File metadata

  • Download URL: pyfireservicerota-0.0.11.tar.gz
  • Upload date:
  • Size: 4.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.43.0 CPython/3.7.3

File hashes

Hashes for pyfireservicerota-0.0.11.tar.gz
Algorithm Hash digest
SHA256 7ddaaa4d2e4d23ea71ae4df7dbe83aa9fbc68c9e963a252b94da04bc4bbe6d63
MD5 21458289c0ba0d2a2b1177278250d395
BLAKE2b-256 e90fab6f97c1ad877aa1c8ee21927c4b3603300b182a0426702f48579dfae4ef

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page