Skip to main content

Async data client for the AVEVA PI Web API

Project description

Async Utilities for the AVEVA PI Web API

piwebx is a collection of utilities for efficiently retrieving data from the PI System via the PI Web API.

Key Features

  • Timestamp aligned interpolated and recorded time series data retrieval
  • Iterator based API and chunk requesting allows for unbounded time ranges
  • Support for Channels
  • Returns timezone aware data in user defined timezone or local timezone
  • Correctly handles timezone aware input data
  • Built on HTTPX allowing for rich support of different authentication methods

Interpolated Data

The PI Web API supports retrieving time series in an interpolated format. piwebx makes it easy to get interpolated data for many streams...

import csv
from datetime import datetime, timedelta

from httpx import AsyncClient
from piwebx import get_interpolated


web_ids = ["web_id1", ...]

async def main():
    start = datetime.now() - timedelta(minutes=30)
    with open("interpolate_example.csv", "w", newline="") as fh:
        writer = csv.writer(fh)
        async with AsyncClient(base_url=...) as client:
            async for timestamp, data in get_interpolated(client, web_ids, start=start):
                writer.writerow((timestamp.isoformat(), *data))

Join On Interpolated

The PI System usually has a mixture of analog data and discrete points. Analog data is compressed and, with the right compression settings, can be accurately represented by linear interpolation. On the other hand, discrete points are normally not compressed and linear interpolation is not appropriate between values. piwebx provides a way to align interpolated and discrete data on a common index.

import csv
from datetime import datetime, timedelta

from httpx import AsyncClient
from piwebx import get_interpolated, get_recorded, join_on_interpolated, locf


analog_data_points = ["web_id1", ...]
discrete_data_points["web_id1", ...]

async def main():
    start = datetime.now() - timedelta(minutes=30)
    with open("joined_example.csv", "w", newline="") as fh:
        writer = csv.writer(fh)
        async with AsyncClient(base_url=...) as client:
            interpolated_stream = get_interpolated(client, analog_data_points, start_time=start)
            recorded_stream = locf(get_recorded(client, discrete_data_points, start_time=start))
            async for timestamp, data in join_on_interpolated(interpolated_stream, recorded_stream):
                writer.writerow((timestamp.isoformat(), *data))

Recorded Data

Recorded data, also known as compressed data, is the actual time series data stored in the PI archive. piwebx makes it easy to get recorded data for many streams...

import csv
from datetime import datetime, timedelta

from httpx import AsyncClient
from piwebx import get_recorded


web_ids = ["web_id1", ...]

async def main():
    start = datetime.now() - timedelta(minutes=30)
    with open("interpolate_example.csv", "w", newline="") as fh:
        writer = csv.writer(fh)
        async with AsyncClient(base_url=...) as client:
            async for timestamp, data in get_recorded(client, web_ids, start=start):
                writer.writerow((timestamp.isoformat(), *data))

Last Observation Carried Forward

By default, get_recorded returns a value for every stream for every row. Streams which dont have a value at a given timestamp are assigned None. A method for filling values is LOCF (last observation carried forward). This can be used to fill gaps in recorded data streams.

import csv
from datetime import datetime, timedelta

from httpx import AsyncClient
from piwebx import get_recorded, locf


web_ids = ["web_id1", ...]

async def main():
    start = datetime.now() - timedelta(minutes=30)
    with open("interpolate_example.csv", "w", newline="") as fh:
        writer = csv.writer(fh)
        async with AsyncClient(base_url=...) as client:
            async for timestamp, data in locf(get_recorded(client, web_ids, start=start)):
                writer.writerow((timestamp.isoformat(), *data))

Channels

A channel is a way to receive continuous updates about a stream. piwebx has first class support for channels in an easy to use API. open_channel_group opens and manages all connections required to receive real-time updates from any number of streams.

from httpx import AsyncClient
from piwebx import open_channel_group, LabeledTimeseriesValue


web_ids = ["web_id1", ...]

def process_timeseries_value(val: LabeledTimeseriesValue) -> None:
    ...

async def main():
    async with AsyncClient(base_url=...) as client:
        # Upon exiting the context, all connections in the channel group are closed
        with open_channel_group(client, web_ids) as cg:
            async for val in cg:
                process_timeseries_value(val)

WebID Search

Resources in PI Web API are addressed by WebIDs, which are persistent, URL-safe identifiers that encode the GUIDs and/or paths associated with objects in the PI System. There are multiple ways to search for resources in the PI Web API. piwebx is geared towards time series data retrieval so rather than cover all the search semantics in the Web API, basic methods to find the WebID for points and attributes, which singularly identify time series streams, are provided.

Points

Search for points by name.

from httpx import AsyncClient
from piwebx import find_points_web_id


points = [
    "point1",
    "point2",
    "point3",
]

async def main():
    async with AsyncClient(base_url=...) as client:
        found, not_found = await find_points_web_id(client, points)
    if not_found:
        for point in not_found:
            print(f"{point} was not found")
    
    for point, web_id in found:
        print(f"The WebID for {point} is {web_id}")

Attributes

Search for attributes by their fully qualified path.

from httpx import AsyncClient
from piwebx import find_attributes_web_id


attributes = [
    "\\\\server\\database\\element|attribute1",
    "\\\\server\\database\\element|attribute2",
    "\\\\server\\database\\element|attribute3",
]

async def main():
    async with AsyncClient(base_url=...) as client:
        found, not_found = await find_attributes_web_id(client, attributes)
    if not_found:
        for attribute in not_found:
            print(f"{attribute} was not found")
    
    for attribute, web_id in found:
        print(f"The WebID for {attribute} is {web_id}")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

piwebx-0.2.2.tar.gz (19.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

piwebx-0.2.2-py3-none-any.whl (23.2 kB view details)

Uploaded Python 3

File details

Details for the file piwebx-0.2.2.tar.gz.

File metadata

  • Download URL: piwebx-0.2.2.tar.gz
  • Upload date:
  • Size: 19.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.12.4

File hashes

Hashes for piwebx-0.2.2.tar.gz
Algorithm Hash digest
SHA256 fa6766af3194344da7d1075c64d90cc892c08998b4bece8d552e435bead89f99
MD5 98313b0885844eb75cb5678c35d8337d
BLAKE2b-256 a4cf7a68bd95d2897df0103c48f9fefd23cc97f2d31375c529227e88cbf262e7

See more details on using hashes here.

File details

Details for the file piwebx-0.2.2-py3-none-any.whl.

File metadata

  • Download URL: piwebx-0.2.2-py3-none-any.whl
  • Upload date:
  • Size: 23.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.12.4

File hashes

Hashes for piwebx-0.2.2-py3-none-any.whl
Algorithm Hash digest
SHA256 9c3d4aac491549b45a4f1b726a9ce069f3ed07978aca788f0802cb937f085717
MD5 9348e653e18eaa7d6944f9a87aa46915
BLAKE2b-256 1a2c62ac6c2eb7ac6669b2403d5a87f3e30622c48bbdda957f89f5e0029f93d3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page