Library for Piscada Foresight; Access knowledge-graph and timeseries data.
Project description
Piscada Foresight
Access knowledge graph and timeseries data.
Overview
This library provides access to the knowledge graph and timeseries data in the Foresight platform. It implements a transport using HTTPX for use with the GQL GraphQL client. It provides the following modules:
-
Data Module (Read timeseries values as Pandas DataFrames or Series.)
- Functions:
get_value(time): Get the latest value before a given time.get_values(start_time, end_time): Get values between a start and end time.get_all_values(graph_query_response): Extract all values from a graph query response.
- Functions:
-
Domains Module (Access domain definitions and trait hierarchies)
- Classes:
Domain:get_trait_by_id(trait_id): Get a trait by its ID.
- Functions:
get_domains(): Retrieve a list of all available domains.get_trait_by_id(trait_id): Get a trait from any domain by its ID string.get_parent_traits(trait_id): Get all parent traits for a given trait.
- Classes:
-
HTTP Module (OAuth2 authenticated transport)
- Classes:
ForesightHTTPXTransport:connect(): Establish an authenticated connection.
- Classes:
Installation
pip install piscada-foresight
You will need access to a Piscada Foresight instance. The library uses the OAuth2 Authorization Code Flow with Proof Key for Code Exchange (PKCE) to act on behalf of your user. After an initial interactive login, the library persists the session state in $HOME/.<client_id>_state and will use that to authenticate non-interactively the next time.
Usage
To be able to fetch data from the Foresight platform, you first need to build a QueryManager object.
The QueryManager object is responsible for handling the GraphQL queries and the transport mechanism.
from piscada_foresight.queries_templates.query_manager import QueryManager
from datetime import datetime, timedelta, timezone
from piscada_foresight.data import get_values
domain = "foresight.piscada.cloud"
query_manager = QueryManager(domain=domain)
get_values(
query_manager,
entity_ids=["ENTITY_ID", "ENTITY_ID2"],
start=datetime.now(tz=timezone.utc) - timedelta(hours=8),
)
QueryManager Documentation
Overview
QueryManager is a class that provides a way to:
- GraphQL client: Handle the creation and management of the client and transport.
- QueryHandler: Load and render query templates from files and directories.
- Execute query: Send the query to the client.
It implements a singleton pattern to ensure that only one instance of the client is active, thereby maintaining consistent access to query execution and management.
GraphQL Client
The client is responsible for executing queries against the Foresight platform.
During the initialization of the QueryManager object, a GraphQL client is created. This client object needs a transport mechanism to handle OAuth2 authentication and token refreshing.
A ForesightHTTPXTransport instance is created and passed during the creation of the client. The ForesightHTTPXTransport class is responsible for handling OAuth2 authentication and token refreshing.
There are three different transport mechanisms available:
- Interactive (used for local development):
The user will be prompted to log in, which in turn runs the OAuth2 flow to get the access/refresh token. The refresh token is then automatically updated and stored in the user's home directory. - Non-interactive:
The client_id and client_secret are passed directly. - Production:
The authorization headers should be provided to the transport mechanism.
Interactive
The interactive mode should be used for local development. It uses the OAuth2 Authorization Code Flow to authenticate the user.
from datetime import datetime, timedelta, timezone
from piscada_foresight.data import get_values
from piscada_foresight.queries_templates.query_manager import QueryManager
domain = "foresight.piscada.cloud"
query_manager = QueryManager(domain=domain) # client_id default is "foresight-lib-py" or use query_manager = QueryManager(domain=domain, client_id="<client_id>")
# Retrieve timeseries values for two specific entities:
get_values(
query_manager,
entity_ids=["ENTITY_ID", "ENTITY_ID2"],
start=datetime.now(tz=timezone.utc) - timedelta(hours=8),
)
# Retrieve aggregated timeseries values for two specific entities:
get_values(
query_manager,
entity_ids=["ENTITY_ID", "ENTITY_ID2"],
start=datetime.now(tz=timezone.utc) - timedelta(hours=8),
interval="1h",
aggregation_functions=["min", "max", "avg", "count", "last"],
)
Note: Not all interval values are accepted (e.g., 30m works but 45m won't work).
Non-interactive
Specify the client_id and the client_secret in the QueryManager creation:
from piscada_foresight.queries_templates.query_manager import QueryManager
domain = "foresight.piscada.cloud"
query_manager = QueryManager(domain=domain, client_id="<client_id>", client_secret="<client_secret>")
Production
To handle authorization, catch the headers from the request and pass them to the QueryManager transport.
The code below provides an example of an implementation using FastAPI:
from piscada_foresight.queries_templates.query_manager import QueryManager
from fastapi import Request
from starlette.datastructures import Headers
...
domain = "foresight.piscada.cloud"
# Build your QueryManager object
query_manager = QueryManager(domain=domain)
# Catch the request
@count_and_duration_app.api_route("/graphql", methods=["GET", "POST"])
async def graphql_server(request: Request):
# Get the headers
headers: Headers = request.headers
dict_headers = dict(headers)
# Sort/filter the headers that you need to keep
keys_to_keep = ["x-auth-request-email",
"X-Auth-Request-Email",
"X-Auth-Request-User",
"x-auth-request-user",
"X-Auth-Request-Groups",
"x-auth-request-groups",
"X-Forwarded-Host",
"x-forwarded-host",
]
filtered_headers = {key: dict_headers[key] for key in keys_to_keep if key in dict_headers}
query_manager.update_transport(headers=filtered_headers)
Update Transport
The transport can be updated dynamically by calling the update_transport method on the QueryManager object.
Any arguments passed to the update_transport method will be used to update the transport configuration. Those arguments are passed to the client.
Example to pass headers to the client:
query_manager.update_transport(headers=filtered_headers)
Internally, update_transport calls the create_transport method to create a new transport object with the updated parameters, then connects this new transport to the client:
self.transport = self._create_transport()
self.client.transport = self.transport
QueryHandler
Queries are handled by the QueryHandler class.
The QueryManager object creates a QueryHandler instance that is responsible for loading and rendering query templates from files and directories.
The QueryHandler initialization:
- Finds the base repository of the default queries
- Walks through this repository to find all queries (.j2 and .graphql files)
- Builds a query dictionary (key:
<query_file_name>, value:<query_path>)
Example:
self.query_dict["<query_file_name>"] = "<query_path>"
Adding Queries
There are two ways to add queries to the QueryHandler:
- add_query: Add a single query to the query dictionary.
- add_queries_from_directory: Add queries from a specified directory.
add_query
The add_query method is used to add a single query to the query dictionary.
query_manager = QueryManager(domain=domain)
query_manager.add_query(<query_name>, <query_path>)
add_queries_from_directory
If additional queries need to be added, the add_queries_from_directory method can be used to add queries from a specified directory.
query_manager = QueryManager(domain=domain)
base = os.path.dirname(__file__)
relative_query_path = "../query_manager_queries"
path = os.path.join(base, relative_query_path)
query_manager.add_queries_from_directory(path)
The advantage is that the queries are then available anywhere in the code via the query_manager.query_handler.query_dict dictionary.
load_query
The load_query method is responsible for loading and rendering a query.
- If the query is static (.graphql), the
load_querymethod will load the query as is. - If the query is dynamic (.j2), the
load_querymethod will render the query with the provided variables.
Example: To load and render the get_raw_values query:
get_raw_values2.j2
{% raw %}
query value(
{% endraw %}
{% for variable_name in variable_names -%}
${{variable_name}}: ID!
{% endfor -%}
{% raw %}
$startEventTime: DateTime!
$endEventTime: DateTime!
) {
{% endraw %}
{% for variable_name in variable_names -%}
{{variable_name}}: entity(id: ${{variable_name}}) {
name
trait(id: "foresight:Datapoint") {
quantity(key: "Value") {
values(startEventTime: $startEventTime, endEventTime: $endEventTime) {
eventTime
value
}
}
}
}
{% endfor %}
{% raw %}
}
{% endraw %}
Run the load_query method:
qm = QueryManager(...)
entity_ids = [0, 1]
entity_variables = {
f"entityId_{i}": entity_id
for i, entity_id in enumerate(entity_ids)
}
jinja_dict = {"variable_names": list(entity_variables.keys())}
rendered_query = qm.load_query("get_raw_values", jinja_dict)
rendered_query:
query value(
$entityId_0: ID!
$entityId_1: ID!
$startEventTime: DateTime!
$endEventTime: DateTime!
) {
entityId_0: entity(id: $entityId_0) {
name
trait(id: "foresight:Datapoint") {
quantity(key: "Value") {
values(startEventTime: $startEventTime, endEventTime: $endEventTime) {
eventTime
value
}
}
}
}
entityId_1: entity(id: $entityId_1) {
name
trait(id: "foresight:Datapoint") {
quantity(key: "Value") {
values(startEventTime: $startEventTime, endEventTime: $endEventTime) {
eventTime
value
}
}
}
}
}
This allows the creation of complex queries with multiple entities and aliases.
Execute Query
In the QueryManager, there are two ways to execute a query:
- execute_query: Execute a GraphQL query with the provided query string and variables. Use this method for simple queries.
- run_query: Load and execute a query in a single call.
execute_query
query_variables = {
"entityId_0": <entityId_0>,
"entityId_1": <entityId_1>
}
response = query_manager.execute_query(query=rendered_query, variables=query_variables)
run_query
entity_ids = [<entityId_0>, <entityId_1>]
entity_variables = {
f"entityId_{i}": entity_id for i, entity_id in enumerate(entity_ids)
}
query_variables = {
**entity_variables,
}
jinja_dict = {
"entity_ids": list(entity_variables.keys()),
}
if entity_ids is not None:
response_relationships = query_manager.query_handler.load_query
run_query(
query_name="get_raw_values",
jinja_variables=jinja_dict,
query_variables=query_variables
)
Contributing
Contributions are welcome! You can contact us at foresight@piscada.com.
Support
If you have any questions, issues, or suggestions, please contact us at foresight@piscada.com.
Copyright
© Piscada AS 2024
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file piscada_foresight-0.6.5.tar.gz.
File metadata
- Download URL: piscada_foresight-0.6.5.tar.gz
- Upload date:
- Size: 130.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.4.30
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b73ba3bf1f571479a2d8a0e8b85307b063eab696f6b68eb7e8b3feb77fbb5694
|
|
| MD5 |
932ee59880c264d0a2ce1590770ed663
|
|
| BLAKE2b-256 |
8abc0d7be95fba48a249a3b54dc16e3ec4d3e3a491b3b4fe3eaac5acd69605fb
|
File details
Details for the file piscada_foresight-0.6.5-py3-none-any.whl.
File metadata
- Download URL: piscada_foresight-0.6.5-py3-none-any.whl
- Upload date:
- Size: 22.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.4.30
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9a2b2bd9126ea294a9f30b4c22aff8f8f68c6f4bf420414f05f3b850ad1a18ce
|
|
| MD5 |
90bcfbea782900c9bdd9a3614931dead
|
|
| BLAKE2b-256 |
332ff31d6f6b49f8d517ae15f79eef80d97de4157a6dc25b7e06781d5b2695e1
|