Skip to main content

IoT core connection methods and utilities in Python

Project description

IoTCoreAPI

Library to interact with IoTCoreAPI in Python

Table Of Contents

  1. Installation
  2. Use
  3. Explanation
  4. Reference

How to install

This library requieres Python 3.8 or higher. IoTCore API can be installed with pip. Dependencies will be installed along with the library.

pip install iotcore-api

Use

In this section we will cover basic usage of the methods.

First, import IoTCoreAPI class from module

from iotcoreapi import IoTCoreAPI

To keep it simple, start by initializing IoTCoreAPI class

API_Host = '[base-url]'
API_Port = 56000
token = 'xxxxxxxxxxxxxxxxx'
version = '3.0'
logger = [logging.Logger object. Can be None or ignored]

iot_api = IoTCoreAPI(API_Host, API_Port, token, version, logger)

Ask about base endpoint url and token to your provider. API Port will be (almost) always 56000. Logger support is enabled in this class, so if a logger object is provided, will print out information to the loggerfile.

Output format can be specified for most of catalogue and reading methods.

Basic usages of this library cover three types of methods:

  • Catalogue: methods related to schema information in the IoTCore (tag info, documents, alarms...)
  • Reading operations: read real time or historic data from tags. Obtain alarm status
  • Write operations: insert data into real time or historic. Also edit alarm information
  • Operation: write directly into PLC tags

Once the class is created methods will be accesible from it. Let's start reading catalogue info. We will ask for all the tags available in the token

tags = iot_api.catalogue_tags()

Information will be retrieved in dataframe format. If json is prefered, can be specified in the "output_format" parameter. Let's read again tags in the token, but this time, we will filter the result by tag names and driver and specify json format.

driver = ['Test']
names = 'api_test'
tags_filtered = iot_api.catalogue_tags(drivers=drivers, tags=names, output_format='json')

One of the most basic usages of the library is to retrieve data from historic. For example, to read a day data from a tagview:

import datetime

UID_TAGVIEW = 'xxxxxxxxxxxx'

end_ts = datetime.now()
start_ts = end_ts - datetime.timedelta(days=1)


data = iotcore_api.read_tagview_historic(UID_TAGVIEW, start_ts, end_ts)

It is also possible filter data by tag uid or even use text filters by using corresponding methods:

import datetime

UID_TAGVIEW = 'xxxxxxxxxxxx'
filters_txt = ['Random_Int1', 'Random_Int2']

end_ts = datetime.now()
start_ts = end_ts - datetime.timedelta(days=1)


data = iotcore_api.read_tagview_historic_text_filters(UID_TAGVIEW, start_ts, end_ts, filters_txt)

To write data into the IoT Core use the corresponding writing methods. Tags must exist before trying to insert data.

To create a tag with writing permissions, use this method:

tags_to_create = ['api_test', 'api_test20', 'api_test33', 'api_test']
iotcore_api.write_tags_insert(tags_to_create)

For writing data operations, a dataframe must be passed. Dataframe must have the following columns:

  • timeStamp: time data
  • name: name of the tag
  • value: value (int or float)
import pandas as pd

test_df = pd.DataFrame([{'timeStamp': time.time(), 'name': 'api_test', 'value': 1},
                        {'timeStamp': time.time(), 'name': 'api_test_20', 'value': 1}])
data = iotcore_api.write_tags_historic_insert(test_df)

Some recommendations to use reading methods:

  • Time data can be passed in datetime or unix format
  • Usually uid tagview is required. This can be read by using catalogue methods
  • Tag filtering by uid is faster than text filters. Text filters methods call uid methods before to retrieve tag name data.

See Reference for more information

Explanation

This library was created to simplify the use of the available GET and POST methods available in the IoTCore API. To understand better the intention behind this library, ask about API reference to your provider.

Instead of dealing with complex and repetitive requests, all functions are written inside IoTCoreAPI class, allowing easier use and avoiding code repetition.

For example, to set up a basic request to get all tags available in the token, you should:

import requests

#1. Configure specific endpoint for this request
endpoint = 'iotcoreurl:PORT/api/Tags'
#2. Provide token and version in the headers
headers = {'token': 'xxxxxx', 'version': '3.0'}
#3. Parameter data
parameters = {'IncludeAttributes': True}

# Set up request using requests library
response = requests.get(endpoint, params=parameters, headers=headers)

# Deal with request format
data = response.json()

This is required for each one of the endpoints listed in the API. Instead, you could use this library as follows:

API_Host = '[base-url]'
API_Port = 56000
token = 'xxxxxxxxxxxxxxxxx'
version = 'v3'

iot_api = IoTCoreAPI(API_Host, API_Port, token, version)

tags = iot_api.catalogue_tags(include_attributes=True)

See Reference for more information about covered endpoints.

Reference

Table of Contents

iotcoreapi

iotcoreapi Class definition

IoTCoreAPI Objects

class IoTCoreAPI()

__init__

def __init__(ip: str = "localhost",
             port: int = 56000,
             token: str = "",
             version: typing.Union[str, int] = "3.0",
             logger: logging.Logger = None)

Init method for iotcoreapi. Needs API configuration parameters

Arguments:

  • ip - IoT Core base endpoint
  • port - API Port. Defaults to 56000
  • token - API token
  • version - 1.0, 2.0 or 3.0. Defaults to 3.0
  • logger - Optional. Logger object to output log messages. If not provided, logger messages will be printed to console

catalogue_tags

def catalogue_tags(
        include_attributes: bool = True,
        output_format: str = 'dataframe') -> typing.Union[dict, pd.DataFrame]

Return all tags available for the token

Arguments:

  • include_attributes optional - if version >3.0, bool to return attributes or not
  • output_format - Result given in 'dataframe' or 'json'or dataframe_table. Defaults to 'dataframe'

Returns:

response in json or dataframe

catalogue_tags_filtered

def catalogue_tags_filtered(
        installations: typing.Union[list, str] = None,
        drivers: typing.Union[list, str] = None,
        tags: typing.Union[list, str] = None,
        attributes: typing.Union[list, str] = None,
        output_format: str = 'dataframe') -> typing.Union[dict, pd.DataFrame]

Searching for tags that comply with a certain criteria can be achieved with the filtered route. If fields are empty, all tags are returned.

Arguments:

  • installations - name of the installations
  • drivers - name of drivers
  • tags - name of tags
  • attributes - not implemented yet
  • output_format - Result given in 'dataframe' or 'json' or 'dataframe_table'. Defaults to 'dataframe'

Returns:

response in json or dataframe

catalogue_tags_attributes

def catalogue_tags_attributes(
        output_format: str = 'dataframe') -> typing.Union[dict, pd.DataFrame]

Obtaining the list of possible attributes within the system and, when limited to a set of values, the list of possible values

Arguments:

  • output_format - Result given in 'dataframe' or 'json' or dataframe_table. Defaults to 'dataframe'

Returns:

response in json or dataframe

catalogue_tags_writable

def catalogue_tags_writable(
        output_format: str = 'dataframe') -> typing.Union[dict, pd.DataFrame]

Return tags available for writing. If version is under 3.0, returned array does not have attribute information

Arguments:

  • output_format - Result given in 'dataframe' or 'json'. Defaults to 'dataframe'

Returns:

response in json

catalogue_documents

def catalogue_documents(
        output_format: str = 'dataframe') -> typing.Union[dict, pd.DataFrame]

Returns all tagviews shared in the token

Arguments:

  • output_format - Result given in 'dataframe' or 'json' or dataframe_table. Defaults to 'dataframe'

Returns:

response in json

catalogue_tagview_detail

def catalogue_tagview_detail(
        uid: str,
        output_format: str = 'dataframe') -> typing.Union[dict, pd.DataFrame]

Return all variables from a given tagview

Arguments:

  • uid - uid of the tagview
  • output_format - Result given in 'dataframe' or 'json' or dataframe_table. Defaults to 'dataframe'

Returns:

response in json

catalogue_alarms

def catalogue_alarms(
    group_uid: str = None,
    output_format: str = 'dataframe'
) -> typing.Union[typing.List[dict], pd.DataFrame]

Returns information of the alarms in the token

Arguments:

group_uid : Optional. Uid of the group to list. If the group uid is indicated, the list only contains the alarms that belong directly to the group (no digging down in the hierarchy) output_format : Result given in 'dataframe' or 'json' or dataframe_table. Defaults to 'dataframe'

Returns:

response in json or dataframe

catalogue_alarm_groups

def catalogue_alarm_groups(
    output_format: str = 'dataframe'
) -> typing.Union[typing.List[dict], pd.DataFrame]

Returns information of the alarm groups in the token

Arguments:

output_format : Result given in 'dataframe' or 'json' or dataframe_table. Defaults to 'dataframe'

Returns:

response in json

read_tags_realtime

def read_tags_realtime(
        tags_uids: typing.List[str],
        output_format: str = 'dataframe',
        time_format='datetime',
        nan_method: str = None
) -> typing.Union[pd.DataFrame, typing.List[dict]]

Reads real time value of the tags provided in the array tags_uids

Arguments:

tags_uids : list with uids of the tags output_format : Result given in 'dataframe' or 'json' or dataframe_table. Defaults to 'dataframe' time_format : 'datetime' or 'unix' if output_format is dataframe. Defaults to datetime

  • nan_method - method used to drop NaNs. None or 'interpolate', 'bfill', 'ffill', 'mean', 'zerofill'. Only valid for 'dataframe' output_format

Returns:

response in json or dataframe

read_tagview_realtime

def read_tagview_realtime(
        uid: str,
        uids_tags: typing.List[str] = None,
        output_format: str = 'dataframe',
        time_format='datetime',
        nan_method: str = None
) -> typing.Union[pd.DataFrame, typing.List[dict]]

Returns real time value for the uids variables provided in a given tagview

Arguments:

uid : uid of the tagview uids_tags : list of uids output_format : Result given in 'dataframe' or 'json'or dataframe_table. Defaults to 'dataframe' time_format : 'datetime' or 'unix' if output_format is dataframe. Defaults to datetime

  • nan_method - method used to drop NaNs. None or 'interpolate', 'bfill', 'ffill', 'mean', 'zerofill'. Only valid for 'dataframe' output_format

Returns:

response in json or dataframe

read_tags_historic

def read_tags_historic(
        uids: typing.List[str],
        start_ts: typing.Union[int, float],
        end_ts: typing.Union[int, float],
        data_source: typing.Union[str, int] = 'RAW',
        resolution: typing.Union[str, int] = 'RES_1_HOUR',
        agg_operation: typing.Union[str, int] = "LAST_VALUE",
        output_format: str = 'dataframe',
        time_format='datetime',
        nan_method: str = None
) -> typing.Union[pd.DataFrame, typing.List[dict]]

Obtain historic data of the specified tags

Arguments:

  • uids - list of unique identifiers of the tags whose values must be obtained.
  • start_ts - start time in unix time or datetime
  • end_ts - end time in unix time or datetime
  • data_source - RAW, STATS_PER_HOUR, STATS_PER_DAY o STATS_PER_MONTH. This parameter indicates the historian section to get the information from, being "RAW" the finest data storage available.
  • resolution - RES_10_SEC, RES_30_SEC, RES_1_MIN, RES_5_MIN, RES_15_MIN, RES_1_HOUR, RES_1_DAY, RES_1_MONTH o RES_1_YEAR, this parameter only applies if the datasource is RAW.
  • agg_operation - MIN, MAX, AVG, LAST_VALUE, SUM. The operation to be applied to obtain the resolution required. Not mandatory, can be null or empty, then applies LAST_VALUE by default. output_format : Result given in 'dataframe' or 'json' or dataframe_table. Defaults to 'dataframe' time_format : 'datetime' or 'unix' if output_format is dataframe. Defaults to datetime
  • nan_method - method used to drop NaNs. None or 'interpolate', 'bfill', 'ffill', 'mean', 'zerofill'. Only valid for 'dataframe' output_format

Returns:

response in json or dataframe

read_tags_rawhistoric

def read_tags_rawhistoric(
        uids,
        start_ts,
        end_ts,
        output_format: str = 'dataframe',
        time_format='datetime',
        nan_method: str = None
) -> typing.Union[pd.DataFrame, typing.List[dict]]

To obtain raw data with no aggregation or normalization applied

Arguments:

  • uids - list of unique identifiers of the tags whose values must be obtained.
  • start_ts - start time in unix time or datetime
  • end_ts - end time in unix time or datetime output_format : Result given in 'dataframe' or 'json'or dataframe_table. Defaults to 'dataframe' time_format : 'datetime' or 'unix' if output_format is dataframe. Defaults to datetime
  • nan_method - method used to drop NaNs. None or 'interpolate', 'bfill', 'ffill', 'mean', 'zerofill'. Only valid for 'dataframe' output_format

Returns:

response in json or dataframe

read_tags_transient

def read_tags_transient(
        uids: typing.List[str],
        start_ts: typing.Union[int, float],
        end_ts: typing.Union[int, float],
        data_source: typing.Union[str, int] = None,
        resolution: typing.Union[str, int] = 'RES_1_SEC',
        output_format: str = 'dataframe',
        time_format='datetime',
        nan_method: str = None
) -> typing.Union[pd.DataFrame, typing.List[dict]]

This method works like "Tags in historical mode", but forces the dataSource to be the transient space. Be aware that the maximum period (endTs - startTs) than can be asked for in transient mode is 15 min. Also please note that resolutions should be according to the span of time (max 15 mins) so there are new options not available for historic.

Arguments:

  • uids - list of unique identifiers of the tags whose values must be obtained. start_ts:
  • start_ts - time in unix time or datetime
  • end_ts - end time in unix time or datetime. Timespan must be smaller than 15 mins
  • data_source - Can be set to null or empty. Not needed
  • resolution - RES_1_SEC, RES_200_MIL, RES_500_MIL, any other option makes no sense with the transient data pool. output_format : Result given in 'dataframe' or 'json'or dataframe_table. Defaults to 'dataframe' time_format : 'datetime' or 'unix' if output_format is dataframe. Defaults to datetime
  • nan_method - method used to drop NaNs. None or 'interpolate', 'bfill', 'ffill', 'mean', 'zerofill'. Only valid for 'dataframe' output_format

Returns:

response in json or dataframe

read_tagview_historic

def read_tagview_historic(
    uid: str,
    start_ts: typing.Union[datetime.datetime, float, int],
    end_ts: typing.Union[datetime.datetime, float, int],
    tags_uids: typing.List[str] = None,
    data_source='RAW',
    resolution='RES_1_HOUR',
    output_format: str = 'dataframe',
    time_format='datetime',
    nan_method: str = None,
    agg_operation: typing.Union[str, int] = "LAST_VALUE"
) -> typing.Union[pd.DataFrame, typing.List[dict]]

Read dataview historic data. It is recommended to use read_dataview_history_text_filters instead.

Arguments:

  • uid - uid of the tagview
  • start_ts - start time in unix or datetime
  • end_ts - end time in unix or datetime
  • tags_uids optional - list of unique identifier of the tags whose values must be obtained. If None, will take all tags in tagview
  • data_source - RAW, STATS_PER_HOUR, STATS_PER_DAY o STATS_PER_MONTH. This parameter indicates the historian section to get the information from, being "RAW" the finest data storage available.
  • resolution - RES_10_SEC, RES_30_SEC, RES_1_MIN, RES_5_MIN, RES_15_MIN, RES_1_HOUR, RES_1_DAY, RES_1_MONTH o RES_1_YEAR, this parameter only applies if the datasource is RAW. output_format : Result given in 'dataframe' or 'json'or dataframe_table. Defaults to 'dataframe' time_format : 'datetime' or 'unix' if output_format is dataframe. Defaults to datetime
  • nan_method - method used to drop NaNs. None or 'interpolate', 'bfill', 'ffill', 'mean', 'zerofill'. Only valid for 'dataframe' output_format
  • agg_operation - MIN, MAX, AVG, LAST_VALUE, SUM. The operation to be applied to obtain the resolution required. Not mandatory, can be null or empty, then applies LAST_VALUE by default.

Returns:

A list of objects or dataframe providing information for the requested tags. Every element in the array corresponds to one of the requested tags associated with one timestamp between the startTs and the endTs.

read_tagview_historic_text_filters

def read_tagview_historic_text_filters(uid_tagview: str, start_ts: typing.Union[datetime.datetime, float, int], end_ts: typing.Union[datetime.datetime, float, int], filter_txt: typing.Union[str, typing.List[str]] = None, data_source: str = 'RAW', resolution: str = 'RES_1_HOUR', output_format: str = 'dataframe', time_format: str = 'datetime', nan_method: str = None, agg_operation: typing.Union[str, int] = "LAST_VALUE") -> typing.Union[
        pd.DataFrame, \
                typing.List[dict]]

Read dataview historic data but use text filters instead of uids. Also returns data in dataframe format

Arguments:

  • uid_tagview - uid of the tagview
  • start_ts - start time in unix or datetime
  • end_ts - end time in unix or datetime
  • filter_txt - text filters to search tags in tagviews. If None, will take all tags in tagview
  • data_source - RAW, STATS_PER_HOUR, STATS_PER_DAY o STATS_PER_MONTH. This parameter indicates the historian section to get the information from, being "RAW" the finest data storage available.
  • resolution - RES_10_SEC, RES_30_SEC, RES_1_MIN, RES_5_MIN, RES_15_MIN, RES_1_HOUR, RES_1_DAY, RES_1_MONTH o RES_1_YEAR, this parameter only applies if the datasource is RAW. output_format : Result given in 'dataframe' or 'json'or dataframe_table. Defaults to 'dataframe' time_format : Optional. 'datetime' or 'unix'. Defaults to datetime
  • nan_method - method used to drop NaNs. None or 'interpolate', 'bfill', 'ffill', 'mean', 'zerofill'. Only valid for 'dataframe' output_format
  • agg_operation - MIN, MAX, AVG, LAST_VALUE, SUM. The operation to be applied to obtain the resolution required. Not mandatory, can be null or empty, then applies LAST_VALUE by default.

Returns:

filtered_hist (dataframe): columns:

  • name - name of tag
  • value - value of tag
  • timeStamp - timeStamp in datatetime or unix time

read_tagview_realtime_text_filters

def read_tagview_realtime_text_filters(
        uid_tagview: str,
        filter_txt: typing.Union[str, typing.List[str]] = None,
        output_format: str = 'dataframe',
        time_format: str = 'datetime',
        nan_method: str = None
) -> typing.Union[pd.DataFrame, typing.List[dict]]

Read dataview realtime data but use text filters instead of uids. Also returns data in dataframe format

Arguments:

  • uid_tagview - uid of the tagview
  • filter_txt - text filters to search tags in tagviews. If None, will take all tags in tagview output_format : Result given in 'dataframe' or 'json'or dataframe_table. Defaults to 'dataframe' time_format : Optional. 'datetime' or 'unix'. Defaults to datetime
  • nan_method - method used to drop NaNs. None or 'interpolate', 'bfill', 'ffill', 'mean', 'zerofill'. Only valid for 'dataframe' output_format

Returns:

filtered_hist (dataframe): columns:

  • name - name of tag
  • value - value of tag
  • timeStamp - timeStamp in datatetime or unix time

read_tags_historic_text_filters

def read_tags_historic_text_filters(
        uids: typing.List[str],
        start_ts: typing.Union[datetime.datetime, int, float],
        end_ts: typing.Union[datetime.datetime, int, float],
        filter_txt: typing.Union[str, typing.List[str]] = None,
        data_source: typing.Union[str, int] = 'RAW',
        resolution: typing.Union[str, int] = 'RES_1_HOUR',
        agg_operation: typing.Union[str, int] = "LAST_VALUE",
        output_format: str = 'dataframe',
        time_format: str = 'datetime',
        nan_method: str = None
) -> typing.Union[pd.DataFrame, typing.List[dict]]

Obtain historic data of the specified tags by name

Arguments:

  • uids - list of unique identifiers of the tags whose values must be obtained.
  • start_ts - start time in unix or datetime
  • end_ts - end time in unix or datetime
  • filter_txt - text filters to search tags in tagviews. If None, will take all tags in tagview
  • data_source - RAW, STATS_PER_HOUR, STATS_PER_DAY o STATS_PER_MONTH. This parameter indicates the historian section to get the information from, being "RAW" the finest data storage available.
  • resolution - RES_10_SEC, RES_30_SEC, RES_1_MIN, RES_5_MIN, RES_15_MIN, RES_1_HOUR, RES_1_DAY, RES_1_MONTH o RES_1_YEAR, this parameter only applies if the datasource is RAW.
  • agg_operation - MIN, MAX, AVG, LAST_VALUE, SUM. The operation to be applied to obtain the resolution required. Not mandatory, can be null or empty, then applies LAST_VALUE by default. output_format : Result given in 'dataframe' or 'json'or dataframe_table. Defaults to 'dataframe' time_format : Optional. 'datetime' or 'unix'. Defaults to datetime
  • nan_method - method used to drop NaNs. None or 'interpolate', 'bfill', 'ffill', 'mean', 'zerofill'. Only valid for 'dataframe' output_format

Returns:

response in json

read_tags_realtime_text_filters

def read_tags_realtime_text_filters(
        filter_txt: typing.Union[str, typing.List[str]] = None,
        output_format: str = 'dataframe',
        time_format: str = 'datetime',
        nan_method: str = None
) -> typing.Union[pd.DataFrame, typing.List[dict]]

Read tags realtime data but use text filters instead of uids. Also returns data in dataframe format

Arguments:

  • filter_txt - text filters to search tags in tagviews. If None, will take all tags in tagview output_format : Result given in 'dataframe' or 'json'or dataframe_table. Defaults to 'dataframe' time_format : Optional. 'datetime' or 'unix'. Defaults to datetime
  • nan_method - method used to drop NaNs. None or 'interpolate', 'bfill', 'ffill', 'mean', 'zerofill'. Only valid for 'dataframe' output_format

Returns:

dataframe or json: columns:

  • name - name of tag
  • value - value of tag
  • timeStamp - timeStamp in datatetime or unix time

read_alarm_status

def read_alarm_status(alarm_guid: str) -> dict

Reads alarm status for a given alarm

Arguments:

alarm_guid : guid of the alarm

Returns:

Dictionary with following data: {

  • "name" - "BasicAlarm1",
  • "uid" - "b926bfb0-3f2f-49df-a2eb-138452296903",
  • "status" - "ARE",
  • "alarmAREDate" - "2022-07-12T12:55:28.9274145+02:00",
  • "alarmLastUpdate" - "2022-07-12T09:58:39.3102729+02:00",
  • "alarmCurrentValue" - true,
  • "resultTimestampActivation" - "2022-07-12T09:58:42.3931339+02:00",
  • "resultTimestampDeactivation" - "2022-07-12T09:55:34.6931883+02:00",
  • "lastNotificationTS" - "1900-01-01T00:00:00",
  • "signalValue" - 95.84623491198114,
  • "dataComparisonType" - ">",
  • "dataComparisonValue" - 0,
  • "signalValueOnLastHisteresis" - 80.27092576039533,
  • "lastEvent" - "New Event: Alarm supervised by the API" }

write_tags_insert

def write_tags_insert(
        tags: typing.Union[str, typing.List[str]]) -> typing.List[dict]

Check if provided tag names exist, then create them if not

Arguments:

  • tags - tags name or names to be created

Returns:

response object in json format: [ { "Uid" : "unique tag identifier", "Name" : "name of the tag", "Installation" : "name of the installation", "Driver" : "name of the driver",

  • "Attributes" - [ { "AttributeName":"name of the attribute", "Value":"value of the attribute for this tag" }, ... ] }, ... ]

write_tag_insert_or_update

def write_tag_insert_or_update(tagname, **attributes) -> typing.List[dict]

This method updates a tag with the complete model that may include attributes or modifies the existing tags changing their attributes to the ones indicated in the query.

Arguments:

  • tagname - name of the new tag
  • **attributes - dictionary of attributes and their values

Returns:

response in json format

Examples:

Call the function with a tag name and any number of attributes response = write_tag_insert_or_update(tagname="mytag", attribute1="value1", attribute2="value2", attribute3="value3")

write_tags_insert_or_update_by_json

def write_tags_insert_or_update_by_json(
        tags_and_attributes: typing.List[dict])

This method creates the tags with the complete model that may include attributes or modifies the existing tags changing their attributes to the ones indicated in the query.

Arguments:

  • tags_and_attributes - json list containing info for each tag: [ {
  • "Name" - "name of the new tag", "Attributes": [ {
  • "AttributeName" - "NameOfAttribute1",
  • "Value" - "ValueOfAttribute1" } ] } ], ...

Returns:

response in json format

write_tags_historic_insert

@_no_row_limit_decorator
def write_tags_historic_insert(df: pd.DataFrame,
                               skip_errors: bool = True) -> typing.List[dict]

Update historical data for tags. Tags need to be created with write_tags_insert first.

Arguments:

  • df - dataframe columns:
  • name - name of the tag value : value of the tag
  • timeStamp - timeStamp in unix
  • skip_errors - True: If true, not created tags will be dropped from dataframe

Returns:

response in json format

write_tags_realtime_insert

@_no_row_limit_decorator
def write_tags_realtime_insert(df: pd.DataFrame, skip_errors: bool = True)

Update realtime data for tags. Tags need to be created with write_tags_insert first.

Arguments:

  • df - dataframe columns:
  • name - name of the tag value : value of the tag
  • timeStamp optional - timeStamp in unix. If not provided, will take current time
  • skip_errors - True: If true, not created tags will be dropped from dataframe

Returns:

response text (None if OK)

write_tag_realtime_insert

def write_tag_realtime_insert(name: str,
                              value: typing.Union[float, int],
                              timeStamp=None)

Update realtime data for a single tag. Tag needs to be created with write_tags_insert first.

Arguments:

  • name - tag name
  • value - value of the tag
  • timeStamp optional - time in unix time. If None, will take current time

Returns:

response text (None if OK)

write_tags_transient_insert

@_no_row_limit_decorator
def write_tags_transient_insert(df: pd.DataFrame,
                                skip_errors: bool = True) -> typing.List[dict]

Update transient data for tags. Tags need to be created with write_tags_insert first.

Arguments:

  • df - dataframe columns:
  • name - name of the tag value : value of the tag
  • timeStamp - timeStamp in unix skip_errors = True: If true, not created tags will be dropped from dataframe

Returns:

response in json format

write_alarm_acknowledge

def write_alarm_acknowledge(guid: str, status: str) -> str

Used to change the status of an alarm from ANR or ENR to ARE o EXR.

Arguments:

  • guid - guid of the alarm
  • status - 'ARE' or 'EXR', 'ANR' or 'ENR'

Returns:

response text (None if OK)

write_alarm_event

def write_alarm_event(guid: str, msg: str) -> str

Used to insert an event with a message in the history of the alarm. The alarma must be active and enabled.

Arguments:

  • guid - guid of the alarm
  • msg - text of the message

operate_tags

def operate_tags(df: pd.DataFrame)

If the token has access to operate against a Conector associated with a PLC, this method can be used to write values to the actual Plc's tags.

Arguments:

  • df - dataframe columns:
  • uid - tag uid
  • value - value to write

operate_tag_single

def operate_tag_single(tag_uid: str, value: typing.Union[int, float])

If the token has access to operate against a Conector associated with a PLC, this method can be used to write values to the actual Plc's tags.

Arguments:

  • tag_uid - nombre de la variable a escribir en el PLC
  • value - valor de la variable a escribir

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

iotcore_api-1.4.1-py3-none-any.whl (18.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page