Skip to main content

Library for Piscada Foresight; Access knowledge-graph and timeseries data.

Project description

Piscada Foresight

Access knowledge graph and timeseries data.

Overview

This library provides access to the knowledge graph and timeseries data in the Foresight platform. It implements a transport using HTTPX for use with the GQL GraphQL client. It provides the following modules:

  • Data Module (Read timeseries values as Pandas DataFrames or Series.)

    • Functions:
      • get_value(time): Get the latest value before a given time.
      • get_values(start_time, end_time): Get values between a start and end time.
      • get_all_values(graph_query_response): Extract all values from a graph query response.
  • Domains Module (Access domain definitions and trait hierarchies)

    • Classes:
      • Domain:
        • get_trait_by_id(trait_id): Get a trait by its ID.
    • Functions:
      • get_domains(): Retrieve a list of all available domains.
      • get_trait_by_id(trait_id): Get a trait from any domain by its ID string.
      • get_parent_traits(trait_id): Get all parent traits for a given trait.
  • HTTP Module (OAuth2 authenticated transport)

    • Classes:
      • ForesightHTTPXTransport:
        • connect(): Establish an authenticated connection.

Installation

pip install piscada-foresight

You will need access to a Piscada Foresight instance. The library uses the OAuth2 Authorization Code Flow with Proof Key for Code Exchange (PKCE) to act on behalf of your user. After an initial interactive login, the library persists the session state in $HOME/.<client_id>_state and will use that to authenticate non-interactively the next time.

Usage

To be able to fetch data from the Foresight platform, you first need to build a QueryManager object.

The QueryManager object is responsible for handling the GraphQL queries and the transport mechanism.

from piscada_foresight.queries_templates.query_manager import QueryManager
from datetime import datetime, timedelta, timezone
from piscada_foresight.data import get_values

domain = "foresight.piscada.cloud"
query_manager = QueryManager(domain=domain)

get_values(
  query_manager,
  entity_ids=["ENTITY_ID", "ENTITY_ID2"],
  start=datetime.now(tz=timezone.utc) - timedelta(hours=8),
)

QueryManager Documentation

Overview

QueryManager is a class that provides a way to:

  • GraphQL client: Handle the creation and management of the client and transport.
  • QueryHandler: Load and render query templates from files and directories.
  • Execute query: Send the query to the client.

It implements a singleton pattern to ensure that only one instance of the client is active, thereby maintaining consistent access to query execution and management.

GraphQL Client

The client is responsible for executing queries against the Foresight platform.

During the initialization of the QueryManager object, a GraphQL client is created. This client object needs a transport mechanism to handle OAuth2 authentication and token refreshing.

A ForesightHTTPXTransport instance is created and passed during the creation of the client. The ForesightHTTPXTransport class is responsible for handling OAuth2 authentication and token refreshing.

There are three different transport mechanisms available:

  • Interactive (used for local development):
    The user will be prompted to log in, which in turn runs the OAuth2 flow to get the access/refresh token. The refresh token is then automatically updated and stored in the user's home directory.
  • Non-interactive:
    The client_id and client_secret are passed directly.
  • Production:
    The authorization headers should be provided to the transport mechanism.

Interactive

The interactive mode should be used for local development. It uses the OAuth2 Authorization Code Flow to authenticate the user.

from datetime import datetime, timedelta, timezone

from piscada_foresight.data import get_values
from piscada_foresight.queries_templates.query_manager import QueryManager

domain = "foresight.piscada.cloud"
query_manager = QueryManager(domain=domain)  # client_id default is "foresight-lib-py" or use query_manager = QueryManager(domain=domain, client_id="<client_id>")

# Retrieve timeseries values for two specific entities:
get_values(
  query_manager,
  entity_ids=["ENTITY_ID", "ENTITY_ID2"],
  start=datetime.now(tz=timezone.utc) - timedelta(hours=8),
)

# Retrieve aggregated timeseries values for two specific entities:
get_values(
  query_manager,
  entity_ids=["ENTITY_ID", "ENTITY_ID2"],
  start=datetime.now(tz=timezone.utc) - timedelta(hours=8),
  interval="1h",
  aggregation_functions=["min", "max", "avg", "count", "last"],
)

Note: Not all interval values are accepted (e.g., 30m works but 45m won't work).

Non-interactive

Specify the client_id and the client_secret in the QueryManager creation:

from piscada_foresight.queries_templates.query_manager import QueryManager

domain = "foresight.piscada.cloud"
query_manager = QueryManager(domain=domain, client_id="<client_id>", client_secret="<client_secret>")

Production

To handle authorization, catch the headers from the request and pass them to the QueryManager transport.

The code below provides an example of an implementation using FastAPI:

from piscada_foresight.queries_templates.query_manager import QueryManager
from fastapi import Request
from starlette.datastructures import Headers
...

domain = "foresight.piscada.cloud"

# Build your QueryManager object
query_manager = QueryManager(domain=domain)

# Catch the request
@count_and_duration_app.api_route("/graphql", methods=["GET", "POST"])
async def graphql_server(request: Request):
    # Get the headers       
    headers: Headers = request.headers
    dict_headers = dict(headers)
    
    # Sort/filter the headers that you need to keep 
    keys_to_keep = ["x-auth-request-email",
                    "X-Auth-Request-Email",
                    "X-Auth-Request-User",
                    "x-auth-request-user",
                    "X-Auth-Request-Groups",
                    "x-auth-request-groups",
                    "X-Forwarded-Host",
                    "x-forwarded-host",
                    ] 
    filtered_headers = {key: dict_headers[key] for key in keys_to_keep if key in dict_headers}
    query_manager.update_transport(headers=filtered_headers)

Update Transport

The transport can be updated dynamically by calling the update_transport method on the QueryManager object.

Any arguments passed to the update_transport method will be used to update the transport configuration. Those arguments are passed to the client.

Example to pass headers to the client:

query_manager.update_transport(headers=filtered_headers)

Internally, update_transport calls the create_transport method to create a new transport object with the updated parameters, then connects this new transport to the client:

self.transport = self._create_transport()
self.client.transport = self.transport

QueryHandler

Queries are handled by the QueryHandler class.

The QueryManager object creates a QueryHandler instance that is responsible for loading and rendering query templates from files and directories.

The QueryHandler initialization:

  • Finds the base repository of the default queries
  • Walks through this repository to find all queries (.j2 and .graphql files)
  • Builds a query dictionary (key: <query_file_name>, value: <query_path>)

Example:

self.query_dict["<query_file_name>"] = "<query_path>"

Adding Queries

There are two ways to add queries to the QueryHandler:

  • add_query: Add a single query to the query dictionary.
  • add_queries_from_directory: Add queries from a specified directory.

add_query

The add_query method is used to add a single query to the query dictionary.

query_manager = QueryManager(domain=domain)
query_manager.add_query(<query_name>, <query_path>)

add_queries_from_directory

If additional queries need to be added, the add_queries_from_directory method can be used to add queries from a specified directory.

query_manager = QueryManager(domain=domain)
base = os.path.dirname(__file__)
relative_query_path = "../query_manager_queries"
path = os.path.join(base, relative_query_path)
query_manager.add_queries_from_directory(path)

The advantage is that the queries are then available anywhere in the code via the query_manager.query_handler.query_dict dictionary.

load_query

The load_query method is responsible for loading and rendering a query.

  • If the query is static (.graphql), the load_query method will load the query as is.
  • If the query is dynamic (.j2), the load_query method will render the query with the provided variables.

Example: To load and render the get_raw_values query:

get_raw_values2.j2

{% raw %}
query value(
{% endraw %}
{% for variable_name in variable_names -%}
    ${{variable_name}}: ID!
{% endfor -%}
{% raw %}
    $startEventTime: DateTime!
    $endEventTime: DateTime!
) {
{% endraw %}
{% for variable_name in variable_names -%}
    {{variable_name}}: entity(id: ${{variable_name}}) {
        name
        trait(id: "foresight:Datapoint") {
            quantity(key: "Value") {
                values(startEventTime: $startEventTime, endEventTime: $endEventTime) {
                    eventTime
                    value
                }
            }
        }
    }
{% endfor %}
{% raw %}
}
{% endraw %}

Run the load_query method:

qm = QueryManager(...)
entity_ids = [0, 1]
entity_variables = {
    f"entityId_{i}": entity_id
    for i, entity_id in enumerate(entity_ids)
}
jinja_dict = {"variable_names": list(entity_variables.keys())}
rendered_query = qm.load_query("get_raw_values", jinja_dict)

rendered_query:

query value(
$entityId_0: ID!
$entityId_1: ID!
$startEventTime: DateTime!
$endEventTime: DateTime!
) {
entityId_0: entity(id: $entityId_0) {
        name
        trait(id: "foresight:Datapoint") {
            quantity(key: "Value") {
                values(startEventTime: $startEventTime, endEventTime: $endEventTime) {
                    eventTime
                    value
                }
            }
        }
    }
entityId_1: entity(id: $entityId_1) {
        name
        trait(id: "foresight:Datapoint") {
            quantity(key: "Value") {
                values(startEventTime: $startEventTime, endEventTime: $endEventTime) {
                    eventTime
                    value
                }
            }
        }
    }
}

This allows the creation of complex queries with multiple entities and aliases.

Execute Query

In the QueryManager, there are two ways to execute a query:

  • execute_query: Execute a GraphQL query with the provided query string and variables. Use this method for simple queries.
  • run_query: Load and execute a query in a single call.

execute_query

query_variables = {
    "entityId_0": <entityId_0>,
    "entityId_1": <entityId_1>
}
response = query_manager.execute_query(query=rendered_query, variables=query_variables)

run_query

entity_ids = [<entityId_0>, <entityId_1>]
entity_variables = {
    f"entityId_{i}": entity_id for i, entity_id in enumerate(entity_ids)
}

query_variables = {
    **entity_variables,
}

jinja_dict = {
    "entity_ids": list(entity_variables.keys()),
}

if entity_ids is not None:
    response_relationships = query_manager.query_handler.load_query
    run_query(
            query_name="get_raw_values",
            jinja_variables=jinja_dict,
            query_variables=query_variables
            )

Contributing

Contributions are welcome! You can contact us at foresight@piscada.com.

Support

If you have any questions, issues, or suggestions, please contact us at foresight@piscada.com.

Copyright

© Piscada AS 2024

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

piscada_foresight-0.6.5.tar.gz (130.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

piscada_foresight-0.6.5-py3-none-any.whl (22.7 kB view details)

Uploaded Python 3

File details

Details for the file piscada_foresight-0.6.5.tar.gz.

File metadata

  • Download URL: piscada_foresight-0.6.5.tar.gz
  • Upload date:
  • Size: 130.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.4.30

File hashes

Hashes for piscada_foresight-0.6.5.tar.gz
Algorithm Hash digest
SHA256 b73ba3bf1f571479a2d8a0e8b85307b063eab696f6b68eb7e8b3feb77fbb5694
MD5 932ee59880c264d0a2ce1590770ed663
BLAKE2b-256 8abc0d7be95fba48a249a3b54dc16e3ec4d3e3a491b3b4fe3eaac5acd69605fb

See more details on using hashes here.

File details

Details for the file piscada_foresight-0.6.5-py3-none-any.whl.

File metadata

File hashes

Hashes for piscada_foresight-0.6.5-py3-none-any.whl
Algorithm Hash digest
SHA256 9a2b2bd9126ea294a9f30b4c22aff8f8f68c6f4bf420414f05f3b850ad1a18ce
MD5 90bcfbea782900c9bdd9a3614931dead
BLAKE2b-256 332ff31d6f6b49f8d517ae15f79eef80d97de4157a6dc25b7e06781d5b2695e1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page