Query regularized time series from raw historian data on Spark
Project description
Historian Query
This Python library helps query regularized time series based on raw historian data with irregular time intervals (due to compression, deadband, outages etc.) on Spark. It is particularly suited for compressed manufacturing time series coming from a historian database, and it imitates the resampling functionality as offered by some historians.
Installation
historian-query
can be installed from pip using the following command:
pip install historian-query
Useful Links
Example usage
An input table / dataframe with columns ["tag_name", "ts", "value_double", "quality"]
is required.
This is then loaded and resampled as follows:
from historian_query import HistorianQuery
table_name = "ot_database.historian_timeseries"
tag_dict = {
"PR03.229HIC0384_SPEED_PV": "belt_speed",
"PR03.730KIC4985_FLOW_PV": "water_flow",
"PR03.849ZMC3525_PRESS_PV": "pressure",
}
HQ = HistorianQuery(
table_name,
tag_list=list(tag_dict.keys()),
sample_freq="10 seconds",
ff_timeout="15 minutes",
keep_quality=3,
)
df = HQ.resample("2023-08-01 00:00:00", "2023-08-02 00:00:00")
Note that instead of table_name
and tag_list
you can alternatively pass a Spark dataframe df
,
which gives more flexibility for your data source (see the
reference).
The output is a resampled Spark dataframe in long format:
tag_name | ts | value_double | quality | orig_ts |
---|---|---|---|---|
PR03.229HIC0384_SPEED_PV | 2023-08-01 00:00:00.000 | 3564.920410 | 3 | 2023-07-31 23:59:53.000 |
PR03.730KIC4985_FLOW_PV | 2023-08-01 00:00:00.000 | 53.218196 | 3 | 2023-07-31 23:59:58.000 |
PR03.849ZMC3525_PRESS_PV | 2023-08-01 00:00:00.000 | 7.485432 | 3 | 2023-07-31 23:59:58.000 |
PR03.229HIC0384_SPEED_PV | 2023-08-01 00:00:10.000 | 3565.104004 | 3 | 2023-08-01 00:00:03.000 |
PR03.730KIC4985_FLOW_PV | 2023-08-01 00:00:10.000 | 53.218196 | 3 | 2023-07-31 23:59:58.000 |
PR03.849ZMC3525_PRESS_PV | 2023-08-01 00:00:10.000 | 8.028755 | 3 | 2023-08-01 00:00:08.000 |
PR03.229HIC0384_SPEED_PV | 2023-08-01 00:00:20.000 | 3565.104004 | 3 | 2023-08-01 00:00:03.000 |
PR03.730KIC4985_FLOW_PV | 2023-08-01 00:00:20.000 | 53.218196 | 3 | 2023-07-31 23:59:58.000 |
PR03.849ZMC3525_PRESS_PV | 2023-08-01 00:00:20.000 | 7.371169 | 3 | 2023-08-01 00:00:18.000 |
... |
Each tag now has observations every 10 seconds. Note that the original time stamps orig_ts
are
slightly earlier, and some have been filled forward several times (no newer observation is
available).
We can now use standard Spark tools for aggregation and feature engineering, e.g. average over 5 minute windows:
from itertools import chain
from pyspark.sql.functions import window, avg, create_map, lit, col
# aggregate over 5 minute windows
df = df.groupBy("tag_name", window("ts", "5 minutes").end.alias("ts")).agg(
avg("value_double").alias("avg_val")
)
# map tag names
tag_map = create_map([lit(x) for x in chain(*tag_dict.items())])
df = df.withColumn("tag_descr", tag_map[col("tag_name")])
# pivot to get tags as columns
df.toPandas().pivot(index="ts", columns="tag_descr", values="avg_val")
The output is now in a suitable format for using in reports or ML models:
ts | belt_speed | pressure | water_flow |
---|---|---|---|
2023-08-01 00:05:00 | 3564.950122 | 7.774326 | 53.010570 |
2023-08-01 00:10:00 | 3565.053158 | 7.812656 | 53.042973 |
2023-08-01 00:15:00 | 3564.626245 | 7.869692 | 52.981631 |
2023-08-01 00:20:00 | 3564.934269 | 7.828228 | 52.985406 |
... |
Note that we use the end-point timestamp for each aggregation interval, so that each record is only based on data that is available at the specified timestamp.
In summary, we recommend a two-step approach:
- resample to a fine time granularity,
- aggregate to the desired time windows.
Step 1 ensures that the potentially irregular observations from raw historian data do not bias the aggregation (more weight on volatile periods) or lead to skipped/null intervals (due to gaps).
How this works
The HistorianQuery class is a wrapper around the TSDF class in the Databricks Tempo library. It adds some functionality that is helpful for querying data from a historian in manufacturing context:
- Returns records from start to end time (instead of just between first and last record for each tag).
- Interval timestamp is rounded up instead of down, reporting the last known value at the given time point.
- The original timestamp of the observation is also reported.
- Timeout functionality - forward fill only up to a specified time interval to avoid stale values.
- Filter by quality flag - historians often capture an indication of how reliable each observation is. This is to remind users to keep only good quality observations.
- Keep or ignore nulls - a null value recorded by a historian can be an indication that the last known value should no longer be forward filled.
While Tempo offers other options, here the resampling configuration has been fixed to the following :
- Interpolation: ffill - forwards fill
- Aggregation: ceil - returns the latest value by timestamp.
Forward filling (along with rounding up interval timestamps) ensures that observations are only based on past data. This is important for e.g. Machine Learning use-cases, where the training samples must contain only information that is available at inference time during production.
Taking the last known value instead of mean ensures that the value makes sense for any tag, e.g. also for 0/1 indicators. For custom aggregation, follow the approach described in section Example Usage.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file historian_query-0.0.1.tar.gz
.
File metadata
- Download URL: historian_query-0.0.1.tar.gz
- Upload date:
- Size: 7.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8016976980ce0615a6bdcd62ce4c44368f4594bb7305722b4c97c75ed7d19ab1 |
|
MD5 | 852ae26835ed4690b1c2c34b1a25024b |
|
BLAKE2b-256 | 233227aad38b12a7cfccd3aabeb83ab62ce7241824389646ac93f50dc491261d |
File details
Details for the file historian_query-0.0.1-py3-none-any.whl
.
File metadata
- Download URL: historian_query-0.0.1-py3-none-any.whl
- Upload date:
- Size: 7.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 45a389d176290d2e173dff6526928b8be1dad3a712b2c492af82668f3e33771f |
|
MD5 | aeebfebb89f0c810e3e847b83c10c58b |
|
BLAKE2b-256 | 6132114422f7590e8afe310ebc53e4e5321802d0e439d85da0cf8585d2b7ce22 |