Skip to main content

Async data client for the AVEVA PI Web API

Project description

Async Utilities for the AVEVA PI Web API

piwebx is a collection of utilities for efficiently retrieving data from the PI System via the PI Web API.

Key Features

  • Timestamp aligned interpolated and recorded time series data retrieval
  • Iterator based API and chunk requesting allows for unbounded time ranges
  • Support for Channels
  • Returns timezone aware data in user defined timezone or local timezone
  • Correctly handles timezone aware input data
  • Built on HTTPX allowing for rich support of different authentication methods

Interpolated Data

The PI Web API supports retrieving time series in an interpolated format. piwebx makes it easy to get interpolated data for many streams...

import csv
from datetime import datetime, timedelta

from httpx import AsyncClient
from piwebx import get_interpolated


web_ids = ["web_id1", ...]

async def main():
    start = datetime.now() - timedelta(minutes=30)
    with open("interpolate_example.csv", "w", newline="") as fh:
        writer = csv.writer(fh)
        async with AsyncClient(base_url=...) as client:
            async for timestamp, data in get_interpolated(client, web_ids, start=start):
                writer.writerow((timestamp.isoformat(), *data))

Join On Interpolated

The PI System usually has a mixture of analog data and discrete points. Analog data is compressed and, with the right compression settings, can be accurately represented by linear interpolation. On the other hand, discrete points are normally not compressed and linear interpolation is not appropriate between values. piwebx provides a way to align interpolated and discrete data on a common index.

import csv
from datetime import datetime, timedelta

from httpx import AsyncClient
from piwebx import get_interpolated, get_recorded, join_on_interpolated, locf


analog_data_points = ["web_id1", ...]
discrete_data_points["web_id1", ...]

async def main():
    start = datetime.now() - timedelta(minutes=30)
    with open("joined_example.csv", "w", newline="") as fh:
        writer = csv.writer(fh)
        async with AsyncClient(base_url=...) as client:
            interpolated_stream = get_interpolated(client, analog_data_points, start_time=start)
            recorded_stream = locf(get_recorded(client, discrete_data_points, start_time=start))
            async for timestamp, data in join_on_interpolated(interpolated_stream, recorded_stream):
                writer.writerow((timestamp.isoformat(), *data))

Recorded Data

Recorded data, also known as compressed data, is the actual time series data stored in the PI archive. piwebx makes it easy to get recorded data for many streams...

import csv
from datetime import datetime, timedelta

from httpx import AsyncClient
from piwebx import get_recorded


web_ids = ["web_id1", ...]

async def main():
    start = datetime.now() - timedelta(minutes=30)
    with open("interpolate_example.csv", "w", newline="") as fh:
        writer = csv.writer(fh)
        async with AsyncClient(base_url=...) as client:
            async for timestamp, data in get_recorded(client, web_ids, start=start):
                writer.writerow((timestamp.isoformat(), *data))

Last Observation Carried Forward

By default, get_recorded returns a value for every stream for every row. Streams which dont have a value at a given timestamp are assigned None. A method for filling values is LOCF (last observation carried forward). This can be used to fill gaps in recorded data streams.

import csv
from datetime import datetime, timedelta

from httpx import AsyncClient
from piwebx import get_recorded, locf


web_ids = ["web_id1", ...]

async def main():
    start = datetime.now() - timedelta(minutes=30)
    with open("interpolate_example.csv", "w", newline="") as fh:
        writer = csv.writer(fh)
        async with AsyncClient(base_url=...) as client:
            async for timestamp, data in locf(get_recorded(client, web_ids, start=start)):
                writer.writerow((timestamp.isoformat(), *data))

Channels

A channel is a way to receive continuous updates about a stream. piwebx has first class support for channels in an easy to use API. open_channel_group opens and manages all connections required to receive real-time updates from any number of streams.

from httpx import AsyncClient
from piwebx import open_channel_group, LabeledTimeseriesValue


web_ids = ["web_id1", ...]

def process_timeseries_value(val: LabeledTimeseriesValue) -> None:
    ...

async def main():
    async with AsyncClient(base_url=...) as client:
        # Upon exiting the context, all connections in the channel group are closed
        with open_channel_group(client, web_ids) as cg:
            async for val in cg:
                process_timeseries_value(val)

WebID Search

Resources in PI Web API are addressed by WebIDs, which are persistent, URL-safe identifiers that encode the GUIDs and/or paths associated with objects in the PI System. There are multiple ways to search for resources in the PI Web API. piwebx is geared towards time series data retrieval so rather than cover all the search semantics in the Web API, basic methods to find the WebID for points and attributes, which singularly identify time series streams, are provided.

Points

Search for points by name.

from httpx import AsyncClient
from piwebx import find_points_web_id


points = [
    "point1",
    "point2",
    "point3",
]

async def main():
    async with AsyncClient(base_url=...) as client:
        found, not_found = await find_points_web_id(client, points)
    if not_found:
        for point in not_found:
            print(f"{point} was not found")
    
    for point, web_id in found:
        print(f"The WebID for {point} is {web_id}")

Attributes

Search for attributes by their fully qualified path.

from httpx import AsyncClient
from piwebx import find_attributes_web_id


attributes = [
    "\\\\server\\database\\element|attribute1",
    "\\\\server\\database\\element|attribute2",
    "\\\\server\\database\\element|attribute3",
]

async def main():
    async with AsyncClient(base_url=...) as client:
        found, not_found = await find_attributes_web_id(client, attributes)
    if not_found:
        for attribute in not_found:
            print(f"{attribute} was not found")
    
    for attribute, web_id in found:
        print(f"The WebID for {attribute} is {web_id}")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

piwebx-0.2.3.tar.gz (19.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

piwebx-0.2.3-py3-none-any.whl (23.2 kB view details)

Uploaded Python 3

File details

Details for the file piwebx-0.2.3.tar.gz.

File metadata

  • Download URL: piwebx-0.2.3.tar.gz
  • Upload date:
  • Size: 19.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.12.4

File hashes

Hashes for piwebx-0.2.3.tar.gz
Algorithm Hash digest
SHA256 8730364f455c41de8601c1bdc725eda0b1882c519ef78f279d40dd14a91d6098
MD5 e67085c7f5d4c48e137a32938f079304
BLAKE2b-256 c1540374d108cc7fbc703c08209b7e4ee9c76b29967f62cacfaef2890e8a08a4

See more details on using hashes here.

File details

Details for the file piwebx-0.2.3-py3-none-any.whl.

File metadata

  • Download URL: piwebx-0.2.3-py3-none-any.whl
  • Upload date:
  • Size: 23.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.12.4

File hashes

Hashes for piwebx-0.2.3-py3-none-any.whl
Algorithm Hash digest
SHA256 8bb9e728ed9d02220c003d0783e6bb4eb2210801b065b65853d72be9cb6f639e
MD5 edb64ad09fc2f9740b8a521cbe04fc51
BLAKE2b-256 fbc2d4cdf5d29c3d3930f0644830f0759a77ea9af481e0a3a341c1351ebf2317

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page