Framework for Kelvin application development
Project description
Kelvin Python SDK
A framework to build Kelvin applications in Python. This package helps you connect and interact with the Kelvin platform.
It provides tools for building, publishing, receiving, and filtering Kelvin messages (the primary communication method with Kelvin) and easier access to the resources (Assets) that the application is deployed to.
Quickstart
Here's a simple example receiving a data Message and publishing another.
from kelvin.application import KelvinApp
from kelvin.message import Number
from kelvin.krn import KRNAssetDataStream
app = KelvinApp()
@app.stream()
async def handle_input(msg):
"""Process all incoming messages"""
print(f"Received: {msg.payload} from {msg.resource}")
# Publish a response
await app.publish(
Number(
resource=KRNAssetDataStream(msg.resource.asset, "output"),
payload=msg.payload * 2
)
)
# Run the application
app.run()
Table of Contents
- Installation
- Core Concepts
- Application Deployment Context
- Message Handling
- Background Tasks
- Data Windows
- Publishing Messages
- Testing with KelvinPublisher
- Complete Example
Installation
pip install kelvin-python-sdk
For AI/Data Science features (pandas support):
pip install kelvin-python-sdk[ai]
For development and testing with the publisher tool:
pip install kelvin-python-sdk[publisher]
Run kelvin-publisher --help to see available testing options.
Core Concepts
Kelvin Messages
Messages are the primary interface for exchanging data with the Kelvin platform. Applications send and receive messages to communicate with assets and other components.
Message Types
The SDK provides pre-built message types for common data:
from kelvin.message import Number, String, Boolean
from kelvin.krn import KRNAssetDataStream
# Create primitive data messages
number_msg = Number(
resource=KRNAssetDataStream("asset-name", "datastream-name"),
payload=42.5
)
string_msg = String(
resource=KRNAssetDataStream("asset-name", "datastream-name"),
payload="Hello Kelvin"
)
boolean_msg = Boolean(
resource=KRNAssetDataStream("asset-name", "datastream-name"),
payload=True
)
Message Builders
For more complex messages, use the provided MessageBuilder helpers:
from kelvin.message import (
ControlChange,
ControlAck,
Recommendation,
DataTag,
AssetParameter,
AssetParameters,
StateEnum
)
from kelvin.krn import KRNAsset, KRNAssetDataStream, KRNAssetParameter
from datetime import datetime, timedelta
# Control Change - request a change in asset behavior
control_change = ControlChange(
resource=KRNAssetDataStream("asset-name", "setpoint"),
payload=75.0,
expiration_date=timedelta(minutes=10),
timeout=60,
retries=3
)
# Control Acknowledgement - respond to control changes
ack = ControlAck(
resource=KRNAssetDataStream("asset-name", "setpoint"),
state=StateEnum.applied,
message="Control change successfully applied"
)
# Recommendation - suggest multiple control changes
recommendation = Recommendation(
resource=KRNAsset("asset-name"),
type="optimization",
control_changes=[control_change],
expiration_date=timedelta(hours=1),
auto_accepted=False
)
# Data Tag - add metadata to data
data_tag = DataTag(
resource=KRNAsset("asset-name"),
start_date=datetime.now(),
tag_name="anomaly_detected",
description="Temperature spike detected"
)
# Asset Parameters - update asset configuration
param = AssetParameter(
resource=KRNAssetParameter("asset-name", "threshold"),
value=100
)
params = AssetParameters(parameters=[param])
Evidences
Evidences provide context to recommendations by including supporting data and visual information. They help users understand the reasoning behind a recommendation, presenting analysis results, charts, and explanations that justify the suggested actions.
from kelvin.message import Recommendation
from kelvin.message.evidences import DataExplorer, DataExplorerSelector, Markdown
from kelvin.krn import KRNAsset, KRNAssetDataStream
from datetime import datetime, timedelta
# Markdown evidence - provide textual explanations
markdown_evidence = Markdown(
title="Analysis Summary",
markdown="""
## Temperature Anomaly Detected
The system detected unusually high temperature readings:
- Average temperature: 85°C
- Normal range: 60-75°C
- Duration: 2 hours
**Recommended action**: Reduce speed to allow cooling.
"""
)
# Data Explorer evidence - visualize time-series data
now = datetime.now()
data_explorer = DataExplorer(
title="Temperature Trend Analysis",
start_time=now - timedelta(hours=6),
end_time=now,
selectors=[
DataExplorerSelector(
resource=KRNAssetDataStream("asset-name", "temperature")
),
DataExplorerSelector(
resource=KRNAssetDataStream("asset-name", "pressure"),
agg="mean",
time_bucket="5m"
)
]
)
# Include evidences in recommendation
recommendation = Recommendation(
resource=KRNAsset("asset-name"),
type="temperature_optimization",
evidences=[markdown_evidence, data_explorer]
)
The SDK supports multiple evidence types including Markdown, DataExplorer, LineChart, BarChart, Image, and IFrame. For a complete list of available evidence types and their configurations, see kelvin.message.evidences.
Kelvin Resource Names (KRN)
KRNs are used to uniquely identify Kelvin resources. They follow a structured format to reference assets, datastreams, parameters, and other entities.
from kelvin.krn import KRN, KRNAsset, KRNAssetDataStream, KRNAssetParameter
# Generic KRN (parse from string)
generic_krn = KRN.from_string("krn:asset:my-asset")
# Asset KRN
asset_krn = KRNAsset("my-asset")
# Asset DataStream KRN
datastream_krn = KRNAssetDataStream("my-asset", "temperature")
Application Deployment Context
When your application connects to Kelvin, it receives runtime information about its deployment environment. The KelvinApp instance provides access to:
app.app_configuration: Application-specific configuration settingsapp.inputs: Input datastreams configured for your applicationapp.outputs: Output datastreams configured for your applicationapp.assets: The assets your application is deployed to (most important)
Working with Assets
The app.assets dictionary contains all assets available to your application. Each asset provides access to its properties, parameters, and datastreams:
from kelvin.application import KelvinApp
app = KelvinApp()
async def on_connect():
# Access all available assets
for asset_name, asset in app.assets.items():
print(f"Asset: {asset_name}")
# Access asset properties
print(f" Properties: {asset.properties}")
# Access asset parameters
for param_name, param_value in asset.parameters.items():
print(f" Parameter {param_name}: {param_value}")
# Access asset datastreams
for ds_name, datastream in asset.datastreams.items():
print(f" Datastream {ds_name}: {datastream}")
app.on_connect = on_connect
app.run()
Assets are the core resources in Kelvin, representing physical or logical entities (machines, sensors, systems, etc.). Your application interacts with assets by reading from their datastreams and publishing data or control changes back to them.
Message Handling
Stream Decorators
Stream decorators are the recommended way to handle incoming messages. They allow you to process messages based on specific criteria:
Use decorators to process messages based on specific criteria:
from kelvin.application import KelvinApp
from kelvin.message.typing import AssetDataMessage
app = KelvinApp()
# Process all messages
@app.stream()
async def handle_all(msg: AssetDataMessage):
print(f"All messages: {msg.payload}")
# Filter by asset
@app.stream(assets=["asset-1", "asset-2"])
async def handle_specific_assets(msg: AssetDataMessage):
print(f"From specific assets: {msg.payload}")
# Filter by input datastream
@app.stream(inputs=["temperature", "pressure"])
async def handle_specific_inputs(msg: AssetDataMessage):
print(f"From specific inputs: {msg.payload}")
# Can also register functions directly
def my_handler(msg: AssetDataMessage):
print(f"Handler: {msg.payload}")
app.stream(my_handler, inputs=["humidity"])
app.run()
Filters
Filters allow you to selectively process messages using queues or async streams:
from kelvin.application import KelvinApp, filters
app = KelvinApp()
async def main():
await app.connect()
# Using a queue with filters
queue = app.filter(filters.input_equals("temperature"))
while True:
msg = await queue.get()
print(f"Temperature: {msg.payload}")
# Using async stream with filters
stream = app.stream_filter(filters.asset_equals("asset-1"))
async for msg in stream:
print(f"Asset 1 data: {msg.payload}")
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Built-in Filters
from kelvin.application import filters
# Filter by input datastream name(s)
filters.input_equals("temperature")
filters.input_equals(["temperature", "pressure"])
# Filter by asset name(s)
filters.asset_equals("my-asset")
filters.asset_equals(["asset-1", "asset-2"])
# Filter by resource KRN
filters.resource_equals(krn_instance)
filters.resource_equals([krn1, krn2])
# Filter by message type
filters.is_data_message(msg)
filters.is_asset_data_message(msg)
filters.is_control_status_message(msg)
filters.is_custom_action(msg)
filters.is_data_quality_message(msg)
Custom Filters
You can create custom filter functions:
def custom_filter(msg: Message) -> bool:
"""Filter for high-value readings"""
return msg.payload > 100
queue = app.filter(custom_filter)
Callbacks
For advanced scenarios, you can define callbacks for specific lifecycle events. However, stream decorators are generally preferred for message processing:
from kelvin.application import KelvinApp, AssetInfo
from kelvin.message.typing import AssetDataMessage
from typing import Optional
app = KelvinApp()
async def on_connect():
"""Called when the app connects to Kelvin"""
print("Connected to Kelvin platform")
print(f"Configuration: {app.app_configuration}")
print(f"Assets: {app.assets}")
async def on_asset_input(msg: AssetDataMessage):
"""Called for data messages from asset inputs"""
print(f"Data from {msg.resource}: {msg.payload}")
async def on_control_change(msg: AssetDataMessage):
"""Called when control changes are received"""
print(f"Control change for {msg.resource}: {msg.payload}")
async def on_asset_change(new_asset: Optional[AssetInfo], old_asset: Optional[AssetInfo]):
"""Called when assets are added, removed, or modified"""
if new_asset is None:
print(f"Asset removed: {old_asset.name}")
else:
print(f"Asset changed: {new_asset.name}")
async def on_app_configuration(config: dict):
"""Called when app configuration changes"""
print(f"New configuration: {config}")
# Assign callbacks
app.on_connect = on_connect
app.on_asset_input = on_asset_input
app.on_control_change = on_control_change
app.on_asset_change = on_asset_change
app.on_app_configuration = on_app_configuration
app.run()
Background Tasks
Tasks
Tasks are functions that run in the background. They're started when the application connects:
from kelvin.application import KelvinApp
import asyncio
app = KelvinApp()
@app.task
async def background_task():
"""Runs once when the app starts"""
print("Task started")
# Perform initialization or one-time operations
@app.task
async def continuous_task():
"""Runs continuously in the background"""
while True:
print("Processing...")
await asyncio.sleep(10)
# Can also register functions directly
async def another_task():
print("Another task")
app.task(another_task, name="my_task")
app.run()
Timers
Timers execute functions at regular intervals:
from kelvin.application import KelvinApp
app = KelvinApp()
@app.timer(interval=30) # seconds
async def periodic_check():
"""Runs every 30 seconds"""
print("Periodic check executed")
@app.timer(interval=60)
async def publish_metrics():
"""Runs every 60 seconds"""
# Publish periodic metrics
await app.publish(...)
# Register timers directly
def sync_timer():
print("Sync timer")
app.timer(sync_timer, interval=10, name="sync_timer")
app.run()
Data Windows
Data windows aggregate incoming data over time or count, returning pandas DataFrames for analysis. All window operations require the ai optional dependency.
Tumbling Window
Non-overlapping, fixed-size time windows:
import asyncio
from datetime import datetime, timedelta
from kelvin.application import KelvinApp
app = KelvinApp()
async def main():
await app.connect()
# Process data in 10-second non-overlapping windows
window_start = datetime.now()
async for asset_name, df in app.tumbling_window(
window_size=timedelta(seconds=10)
).stream(window_start):
print(f"Asset: {asset_name}")
print(df) # pandas DataFrame with all data in window
if __name__ == "__main__":
asyncio.run(main())
Use case: Aggregate data every N seconds for batch processing (e.g., computing averages every 10 seconds).
Hopping Window
Overlapping, fixed-size windows with a configurable hop interval:
import asyncio
from datetime import datetime, timedelta
from kelvin.application import KelvinApp
app = KelvinApp()
async def main():
await app.connect()
# 10-second windows, moving forward by 5 seconds each time
window_start = datetime.now()
async for asset_name, df in app.hopping_window(
window_size=timedelta(seconds=10),
hop_size=timedelta(seconds=5)
).stream(window_start=window_start):
print(f"Asset: {asset_name}")
print(df) # pandas DataFrame with overlapping data
if __name__ == "__main__":
asyncio.run(main())
Use case: Sliding window analysis where you want overlapping data (e.g., moving averages with 50% overlap).
Rolling Window
Count-based windows that slide with each new message:
import asyncio
from kelvin.application import KelvinApp
app = KelvinApp()
async def main():
await app.connect()
# Window of last 5 messages, slides by 2 messages
async for asset_name, df in app.rolling_window(
count_size=5,
slide=2
).stream():
print(f"Asset: {asset_name}")
print(df) # pandas DataFrame with last 5 messages
if __name__ == "__main__":
asyncio.run(main())
Use case: Process the last N data points (e.g., calculate trend over the last 10 readings).
Publishing Messages
Use app.publish() to send messages to the Kelvin platform:
from kelvin.application import KelvinApp
from kelvin.message import Number, ControlChange, Recommendation
from kelvin.krn import KRNAssetDataStream, KRNAsset
from datetime import timedelta
app = KelvinApp()
@app.timer(interval=10)
async def publish_data():
# Publish data
await app.publish(
Number(
resource=KRNAssetDataStream("asset-1", "output"),
payload=42.0
)
)
# Publish control change
await app.publish(
ControlChange(
resource=KRNAssetDataStream("asset-1", "setpoint"),
payload=75.0,
expiration_date=timedelta(minutes=5)
)
)
# Publish recommendation
await app.publish(
Recommendation(
resource=KRNAsset("asset-1"),
type="optimization",
control_changes=[...],
expiration_date=timedelta(hours=1)
)
)
app.run()
Testing with KelvinPublisher
kelvin-publisher is a CLI tool for testing applications during development. It's not meant to be imported in your code, but used as a standalone testing utility.
Installation
pip install kelvin-python-sdk[publisher]
Run kelvin-publisher --help to see all available commands and options.
Usage
The publisher has three modes for simulating data:
1. Simulator - Random Data
Generate random data to your application's inputs:
kelvin-publisher simulator --help
kelvin-publisher simulator
This automatically discovers your application's inputs and publishes random data.
2. CSV - File-based Data
Publish data from a CSV file:
kelvin-publisher csv --help
kelvin-publisher csv --csv data.csv
Replays test data from CSV files.
3. Generator - Custom Data
Use a custom Python class to generate data:
kelvin-publisher generator --help
kelvin-publisher generator --entrypoint action_generator.py:CustomActionGenerator
Example generator:
import asyncio
from typing import AsyncGenerator
from kelvin.message import Number
from kelvin.krn import KRNAssetDataStream
from kelvin.publisher import DataGenerator
class OtherGenerator(DataGenerator):
def __init__(self) -> None:
print("Hello from OtherGenerator")
async def run(self) -> AsyncGenerator[Number, None]:
print("Running OtherGenerator")
for i in range(20, 30):
yield Number(
resource=KRNAssetDataStream("test-asset-1", "input-number"),
payload=i,
)
await asyncio.sleep(1)
This mode allows you to implement sophisticated test scenarios with custom logic.
Complete Example
Here's a complete example that demonstrates multiple features working together:
import asyncio
from datetime import timedelta
from kelvin.application import KelvinApp, filters
from kelvin.message import Number, ControlChange, ControlAck, StateEnum
from kelvin.krn import KRNAssetDataStream
app = KelvinApp()
# Lifecycle callbacks
async def on_connect():
print("Connected to Kelvin platform")
print(f"Configuration: {app.app_configuration}")
print(f"Available assets: {list(app.assets.keys())}")
async def on_control_change(msg):
print(f"Control change received for {msg.resource}")
# Acknowledge the control change
await app.publish(
ControlAck(
resource=msg.resource,
state=StateEnum.applied,
message="Control change successfully applied"
)
)
app.on_connect = on_connect
app.on_control_change = on_control_change
# Stream decorators for processing inputs
@app.stream(inputs=["temperature"])
async def process_temperature(msg):
"""Process temperature readings"""
temp = msg.payload
print(f"Temperature from {msg.resource.asset}: {temp}°C")
# Publish processed data
await app.publish(
Number(
resource=KRNAssetDataStream(msg.resource.asset, "temp_fahrenheit"),
payload=temp * 9/5 + 32
)
)
@app.stream(inputs=["pressure"])
async def process_pressure(msg):
"""Process pressure readings"""
print(f"Pressure from {msg.resource.asset}: {msg.payload} Pa")
# Background task
@app.task
async def monitor_system():
"""Continuous monitoring task"""
await asyncio.sleep(5) # Wait for initialization
while True:
print("Monitoring system health...")
# Add your monitoring logic here
await asyncio.sleep(30)
# Periodic timer
@app.timer(interval=60)
async def publish_heartbeat():
"""Publish heartbeat every minute"""
for asset_name in app.assets.keys():
await app.publish(
Number(
resource=KRNAssetDataStream(asset_name, "heartbeat"),
payload=1
)
)
# Run the application
if __name__ == "__main__":
app.run()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kelvin_python_sdk-0.4.3-py3-none-any.whl.
File metadata
- Download URL: kelvin_python_sdk-0.4.3-py3-none-any.whl
- Upload date:
- Size: 70.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2d1a0ca1b7933822df664f4b350deeff852043e35806cfb5ba26730dfc75f0f5
|
|
| MD5 |
a5b8d8a40412d7402f4b939ebc984a8b
|
|
| BLAKE2b-256 |
9cdbdec12a47509e51070ac4b6413bfdcfffca76d5ceed2ca9c06a6aee1dfe1f
|