The volttron-testing library contains classes and utilities for interacting with a VOLTTRON instance.
Project description
volttron-testing
The volttron-testing library contains classes and utilities for interacting with a VOLTTRON instance.
Prerequisites
- Python >= 3.10
Installation
Create a virtual environment
python -m venv env
Activate the environment
source env/bin/activate
Install volttron-testing
# Installs volttron and volttron-testing
pip install volttron-testing
Testing Agent Workflows
The volttron-testing library provides multiple approaches for testing VOLTTRON agents, from simple unit tests to complex integration tests with full pubsub communication.
Quick Start: Testing an Agent
Here's the simplest way to test an agent with mock infrastructure:
from volttrontesting.server_mock import TestServer
from volttrontesting.mock_core_builder import MockCoreBuilder
from volttrontesting.pubsub_interceptor import intercept_agent_pubsub
from volttron.client import Agent
from volttron.client.vip.agent import Core, PubSub
from volttron.types.auth.auth_credentials import Credentials
def test_agent_pubsub():
"""Test agent pubsub communication with full message routing."""
# 1. Create test server
server = TestServer()
# 2. Create agents with mock core
publisher = Agent(credentials=Credentials(identity="publisher"), name="mock")
subscriber = Agent(credentials=Credentials(identity="subscriber"), name="mock")
# 3. Connect agents to server
server.connect_agent(publisher)
server.connect_agent(subscriber)
# 4. Intercept pubsub to route through test server
pub_interceptor = intercept_agent_pubsub(publisher, TestServer.__server_pubsub__)
sub_interceptor = intercept_agent_pubsub(subscriber, TestServer.__server_pubsub__)
# 5. Set up subscription
messages_received = []
def on_message(peer, sender, bus, topic, headers, message):
messages_received.append((topic, message))
subscriber.vip.pubsub.subscribe("pubsub", "test/topic", on_message)
# 6. Publish message
publisher.vip.pubsub.publish("pubsub", "test/topic", message="Hello World!")
# 7. Verify message was received
import gevent
gevent.sleep(0.1) # Allow message propagation
assert len(messages_received) == 1
assert messages_received[0] == ("test/topic", "Hello World!")
Testing Approaches
1. Unit Testing with Direct Method Calls
For simple unit tests, you can test agent methods directly:
class MyAgent(Agent):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.data_points = []
def process_data(self, value):
"""Process incoming data."""
processed = value * 2
self.data_points.append(processed)
return processed
def test_data_processing():
"""Test agent's data processing logic."""
agent = MyAgent(credentials=Credentials(identity="test"), name="mock")
result = agent.process_data(5)
assert result == 10
assert agent.data_points == [10]
2. Testing Lifecycle Events
Test agent lifecycle methods (onsetup, onstart, onstop):
class LifecycleAgent(Agent):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.setup_complete = False
self.started = False
@Core.receiver('onsetup')
def onsetup(self, sender, **kwargs):
self.setup_complete = True
@Core.receiver('onstart')
def onstart(self, sender, **kwargs):
self.started = True
def test_lifecycle():
"""Test agent lifecycle events."""
server = TestServer()
agent = LifecycleAgent(credentials=Credentials(identity="test"), name="mock")
server.connect_agent(agent)
# Trigger lifecycle events
response = server.trigger_setup_event(agent, sender="test")
assert agent.setup_complete
response = server.trigger_start_event(agent, sender="test")
assert agent.started
3. Testing with Decorator-Based Subscriptions
Test agents that use @PubSub.subscribe decorators:
class SubscriberAgent(Agent):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.received_messages = []
@PubSub.subscribe('pubsub', 'devices/+/+/all')
def on_device_data(self, peer, sender, bus, topic, headers, message):
"""Handle device data."""
self.received_messages.append({
'topic': topic,
'message': message,
'sender': sender
})
def test_decorated_subscriptions():
"""Test agent with decorator-based subscriptions."""
server = TestServer()
# Create and connect agent
agent = SubscriberAgent(credentials=Credentials(identity="subscriber"), name="mock")
server.connect_agent(agent)
# Intercept pubsub for message routing
interceptor = intercept_agent_pubsub(agent, TestServer.__server_pubsub__)
# Publish matching message through server
server.publish("devices/campus1/building1/all",
message={"temperature": 72.5})
# Verify message received
gevent.sleep(0.1)
assert len(agent.received_messages) == 1
assert agent.received_messages[0]['topic'] == "devices/campus1/building1/all"
Advanced Testing with PubSub Interceptor
The pubsub_interceptor module enables full integration testing by intercepting agent pubsub at the instance level:
from volttrontesting.pubsub_interceptor import PubSubInterceptor
def test_multi_agent_communication():
"""Test complex multi-agent workflows."""
server = TestServer()
# Create multiple agents
coordinator = Agent(credentials=Credentials(identity="coordinator"), name="mock")
worker1 = Agent(credentials=Credentials(identity="worker1"), name="mock")
worker2 = Agent(credentials=Credentials(identity="worker2"), name="mock")
# Connect all agents
for agent in [coordinator, worker1, worker2]:
server.connect_agent(agent)
intercept_agent_pubsub(agent, TestServer.__server_pubsub__)
# Set up subscriptions
worker_responses = []
def on_task(peer, sender, bus, topic, headers, message):
# Workers respond to tasks
worker_id = headers.get('target')
if worker_id == 'worker1':
coordinator.vip.pubsub.publish('pubsub', 'response/worker1',
message={'result': 'done'})
worker1.vip.pubsub.subscribe('pubsub', 'task/+', on_task)
def on_response(peer, sender, bus, topic, headers, message):
worker_responses.append(message)
coordinator.vip.pubsub.subscribe('pubsub', 'response/+', on_response)
# Coordinator sends task
coordinator.vip.pubsub.publish('pubsub', 'task/process',
headers={'target': 'worker1'},
message={'action': 'process'})
# Verify workflow completed
gevent.sleep(0.1)
assert len(worker_responses) == 1
assert worker_responses[0]['result'] == 'done'
Testing Patterns and Best Practices
- Always use mock core for testing: Pass
name="mock"when creating agents - Use interceptors for pubsub testing: This preserves decorator-based subscriptions
- Allow time for message propagation: Use
gevent.sleep(0.1)after publishing - Clean up interceptors: Call
interceptor.restore()when done - Use TestServer for visibility: Access
server.get_published_messages()to inspect all messages
Complete Example: Testing a Realistic Agent
from volttrontesting.server_mock import TestServer
from volttrontesting.pubsub_interceptor import intercept_agent_pubsub
from volttron.client import Agent
from volttron.client.vip.agent import Core, PubSub, RPC
from volttron.types.auth.auth_credentials import Credentials
import gevent
class DataCollectorAgent(Agent):
"""Example agent that collects and aggregates data."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.data_buffer = []
self.aggregated_data = None
@Core.receiver('onstart')
def onstart(self, sender, **kwargs):
"""Start collecting data on agent start."""
self.core.periodic(self.publish_aggregated, 5)
@PubSub.subscribe('pubsub', 'devices/+/+/all')
def on_new_data(self, peer, sender, bus, topic, headers, message):
"""Collect incoming data."""
self.data_buffer.append(message)
if len(self.data_buffer) >= 10:
self.aggregate_data()
def aggregate_data(self):
"""Aggregate collected data."""
if self.data_buffer:
# Simple average for numeric values
values = [d.get('value', 0) for d in self.data_buffer if isinstance(d, dict)]
self.aggregated_data = sum(values) / len(values) if values else 0
self.data_buffer = []
def publish_aggregated(self):
"""Publish aggregated results."""
if self.aggregated_data is not None:
self.vip.pubsub.publish('pubsub', 'analysis/aggregated',
message={'average': self.aggregated_data})
@RPC.export
def get_buffer_size(self):
"""RPC method to check buffer size."""
return len(self.data_buffer)
def test_data_collector_agent():
"""Test the complete data collector workflow."""
# Setup
server = TestServer()
agent = DataCollectorAgent(credentials=Credentials(identity="collector"), name="mock")
server.connect_agent(agent)
interceptor = intercept_agent_pubsub(agent, TestServer.__server_pubsub__)
# Trigger agent start
server.trigger_start_event(agent, sender="test")
# Simulate incoming data
for i in range(10):
server.publish(f"devices/campus/building/all",
message={'value': i * 10, 'timestamp': f'2024-01-01T00:0{i}:00'})
gevent.sleep(0.1) # Allow message processing
# Verify aggregation occurred
assert agent.aggregated_data == 45.0 # Average of 0,10,20,...90
assert len(agent.data_buffer) == 0 # Buffer should be cleared
# Test RPC method
assert agent.get_buffer_size() == 0
# Cleanup
interceptor.restore()
TestServer API Reference
The TestServer class provides these key methods for testing:
connect_agent(agent): Connect an agent to the test serverpublish(topic, headers, message): Publish a message through the serversubscribe(pattern, callback): Subscribe to messages with a patterntrigger_setup_event(agent): Trigger agent's onsetup lifecycle eventtrigger_start_event(agent): Trigger agent's onstart lifecycle eventtrigger_stop_event(agent): Trigger agent's onstop lifecycle eventget_published_messages(): Get all messages published through the serverget_server_log(): Get server log messages
Development
Please see the following for contributing guidelines contributing.
Please see the following helpful guide about developing modular VOLTTRON agents
Disclaimer Notice
This material was prepared as an account of work sponsored by an agency of the United States Government. Neither the United States Government nor the United States Department of Energy, nor Battelle, nor any of their employees, nor any jurisdiction or organization that has cooperated in the development of these materials, makes any warranty, express or implied, or assumes any legal liability or responsibility for the accuracy, completeness, or usefulness or any information, apparatus, product, software, or process disclosed, or represents that its use would not infringe privately owned rights.
Reference herein to any specific commercial product, process, or service by trade name, trademark, manufacturer, or otherwise does not necessarily constitute or imply its endorsement, recommendation, or favoring by the United States Government or any agency thereof, or Battelle Memorial Institute. The views and opinions of authors expressed herein do not necessarily state or reflect those of the United States Government or any agency thereof.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file volttron_testing-0.5.1rc4.tar.gz.
File metadata
- Download URL: volttron_testing-0.5.1rc4.tar.gz
- Upload date:
- Size: 67.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.1 CPython/3.10.18 Linux/6.8.0-1031-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0db74459abab92cdcdb50a63847a4f1ba888b55e31e1404004a9b218d1b40165
|
|
| MD5 |
aa3c07197d7486fe93d96dbc42204e77
|
|
| BLAKE2b-256 |
5ab82183a5821b5142f19d613296483540961eeb4aa8ea4720d2490d5bb72411
|
File details
Details for the file volttron_testing-0.5.1rc4-py3-none-any.whl.
File metadata
- Download URL: volttron_testing-0.5.1rc4-py3-none-any.whl
- Upload date:
- Size: 84.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.1 CPython/3.10.18 Linux/6.8.0-1031-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7f5b80f1f6839085f55ed616e284709e6470d8b954a6487d294edef1dd1e97e0
|
|
| MD5 |
696cb0079f6471184b353d6718302229
|
|
| BLAKE2b-256 |
944247c6e3330ae3effeb086124163c1b2e3691b55cafdf589219c3bd08c7d0e
|