Skip to main content

Query regularized time series from raw historian data on Spark

Project description

Historian Query

This Python library helps query regularized time series based on raw historian data with irregular time intervals (due to compression, deadband, outages etc.) on Spark. It is particularly suited for compressed manufacturing time series coming from a historian database, and it imitates the resampling functionality as offered by some historians.

Installation

historian-query can be installed from pip using the following command:

pip install historian-query

Useful Links

Example usage

An input table / dataframe with columns ["tag_name", "ts", "value_double", "quality"] is required. This is then loaded and resampled as follows:

from historian_query import HistorianQuery

table_name = "ot_database.historian_timeseries"
tag_dict = {
    "PR03.229HIC0384_SPEED_PV": "belt_speed",
    "PR03.730KIC4985_FLOW_PV": "water_flow",
    "PR03.849ZMC3525_PRESS_PV": "pressure",
}
HQ = HistorianQuery(
    table_name,
    tag_list=list(tag_dict.keys()),
    sample_freq="10 seconds",
    ff_timeout="15 minutes",
    keep_quality=3,
)
df = HQ.resample("2023-08-01 00:00:00", "2023-08-02 00:00:00")

Note that instead of table_name and tag_list you can alternatively pass a Spark dataframe df, which gives more flexibility for your data source (see the reference). The output is a resampled Spark dataframe in long format:

tag_name ts value_double quality orig_ts
PR03.229HIC0384_SPEED_PV 2023-08-01 00:00:00.000 3564.920410 3 2023-07-31 23:59:53.000
PR03.730KIC4985_FLOW_PV 2023-08-01 00:00:00.000 53.218196 3 2023-07-31 23:59:58.000
PR03.849ZMC3525_PRESS_PV 2023-08-01 00:00:00.000 7.485432 3 2023-07-31 23:59:58.000
PR03.229HIC0384_SPEED_PV 2023-08-01 00:00:10.000 3565.104004 3 2023-08-01 00:00:03.000
PR03.730KIC4985_FLOW_PV 2023-08-01 00:00:10.000 53.218196 3 2023-07-31 23:59:58.000
PR03.849ZMC3525_PRESS_PV 2023-08-01 00:00:10.000 8.028755 3 2023-08-01 00:00:08.000
PR03.229HIC0384_SPEED_PV 2023-08-01 00:00:20.000 3565.104004 3 2023-08-01 00:00:03.000
PR03.730KIC4985_FLOW_PV 2023-08-01 00:00:20.000 53.218196 3 2023-07-31 23:59:58.000
PR03.849ZMC3525_PRESS_PV 2023-08-01 00:00:20.000 7.371169 3 2023-08-01 00:00:18.000
...

Each tag now has observations every 10 seconds. Note that the original time stamps orig_ts are slightly earlier, and some have been filled forward several times (no newer observation is available).

We can now use standard Spark tools for aggregation and feature engineering, e.g. average over 5 minute windows:

from itertools import chain
from pyspark.sql.functions import window, avg, create_map, lit, col

# aggregate over 5 minute windows
df = df.groupBy("tag_name", window("ts", "5 minutes").end.alias("ts")).agg(
    avg("value_double").alias("avg_val")
)
# map tag names
tag_map = create_map([lit(x) for x in chain(*tag_dict.items())])
df = df.withColumn("tag_descr", tag_map[col("tag_name")])
# pivot to get tags as columns
df.toPandas().pivot(index="ts", columns="tag_descr", values="avg_val")

The output is now in a suitable format for using in reports or ML models:

ts belt_speed pressure water_flow
2023-08-01 00:05:00 3564.950122 7.774326 53.010570
2023-08-01 00:10:00 3565.053158 7.812656 53.042973
2023-08-01 00:15:00 3564.626245 7.869692 52.981631
2023-08-01 00:20:00 3564.934269 7.828228 52.985406
...

Note that we use the end-point timestamp for each aggregation interval, so that each record is only based on data that is available at the specified timestamp.

In summary, we recommend a two-step approach:

  1. resample to a fine time granularity,
  2. aggregate to the desired time windows.

Step 1 ensures that the potentially irregular observations from raw historian data do not bias the aggregation (more weight on volatile periods) or lead to skipped/null intervals (due to gaps).

How this works

The HistorianQuery class is a wrapper around the TSDF class in the Databricks Tempo library. It adds some functionality that is helpful for querying data from a historian in manufacturing context:

  • Returns records from start to end time (instead of just between first and last record for each tag).
  • Interval timestamp is rounded up instead of down, reporting the last known value at the given time point.
  • The original timestamp of the observation is also reported.
  • Timeout functionality - forward fill only up to a specified time interval to avoid stale values.
  • Filter by quality flag - historians often capture an indication of how reliable each observation is. This is to remind users to keep only good quality observations.
  • Keep or ignore nulls - a null value recorded by a historian can be an indication that the last known value should no longer be forward filled.

While Tempo offers other options, here the resampling configuration has been fixed to the following :

  • Interpolation: ffill - forwards fill
  • Aggregation: ceil - returns the latest value by timestamp.

Forward filling (along with rounding up interval timestamps) ensures that observations are only based on past data. This is important for e.g. Machine Learning use-cases, where the training samples must contain only information that is available at inference time during production.

Taking the last known value instead of mean ensures that the value makes sense for any tag, e.g. also for 0/1 indicators. For custom aggregation, follow the approach described in section Example Usage.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

historian_query-0.0.1.tar.gz (7.6 kB view details)

Uploaded Source

Built Distribution

historian_query-0.0.1-py3-none-any.whl (7.9 kB view details)

Uploaded Python 3

File details

Details for the file historian_query-0.0.1.tar.gz.

File metadata

  • Download URL: historian_query-0.0.1.tar.gz
  • Upload date:
  • Size: 7.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.0

File hashes

Hashes for historian_query-0.0.1.tar.gz
Algorithm Hash digest
SHA256 8016976980ce0615a6bdcd62ce4c44368f4594bb7305722b4c97c75ed7d19ab1
MD5 852ae26835ed4690b1c2c34b1a25024b
BLAKE2b-256 233227aad38b12a7cfccd3aabeb83ab62ce7241824389646ac93f50dc491261d

See more details on using hashes here.

File details

Details for the file historian_query-0.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for historian_query-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 45a389d176290d2e173dff6526928b8be1dad3a712b2c492af82668f3e33771f
MD5 aeebfebb89f0c810e3e847b83c10c58b
BLAKE2b-256 6132114422f7590e8afe310ebc53e4e5321802d0e439d85da0cf8585d2b7ce22

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page