Skip to main content

The volttron-testing library contains classes and utilities for interacting with a VOLTTRON instance.

Project description

volttron-testing

Eclipse VOLTTRON™ Python 3.10 Python 3.11 Run Pytests pypi version

The volttron-testing library contains classes and utilities for interacting with a VOLTTRON instance.

Prerequisites

  • Python >= 3.10

Installation

Create a virtual environment

python -m venv env

Activate the environment

source env/bin/activate

Install volttron-testing

# Installs volttron and volttron-testing
pip install volttron-testing

Testing Agent Workflows

The volttron-testing library provides multiple approaches for testing VOLTTRON agents, from simple unit tests to complex integration tests with full pubsub communication.

Quick Start: Testing an Agent

Here's the simplest way to test an agent with mock infrastructure:

from volttrontesting.server_mock import TestServer
from volttrontesting.mock_core_builder import MockCoreBuilder
from volttrontesting.pubsub_interceptor import intercept_agent_pubsub
from volttron.client import Agent
from volttron.client.vip.agent import Core, PubSub
from volttron.types.auth.auth_credentials import Credentials

def test_agent_pubsub():
    """Test agent pubsub communication with full message routing."""
    
    # 1. Create test server
    server = TestServer()
    
    # 2. Create agents with mock core
    publisher = Agent(credentials=Credentials(identity="publisher"), name="mock")
    subscriber = Agent(credentials=Credentials(identity="subscriber"), name="mock")
    
    # 3. Connect agents to server
    server.connect_agent(publisher)
    server.connect_agent(subscriber)
    
    # 4. Intercept pubsub to route through test server
    pub_interceptor = intercept_agent_pubsub(publisher, TestServer.__server_pubsub__)
    sub_interceptor = intercept_agent_pubsub(subscriber, TestServer.__server_pubsub__)
    
    # 5. Set up subscription
    messages_received = []
    def on_message(peer, sender, bus, topic, headers, message):
        messages_received.append((topic, message))
    
    subscriber.vip.pubsub.subscribe("pubsub", "test/topic", on_message)
    
    # 6. Publish message
    publisher.vip.pubsub.publish("pubsub", "test/topic", message="Hello World!")
    
    # 7. Verify message was received
    import gevent
    gevent.sleep(0.1)  # Allow message propagation
    
    assert len(messages_received) == 1
    assert messages_received[0] == ("test/topic", "Hello World!")

Testing Approaches

1. Unit Testing with Direct Method Calls

For simple unit tests, you can test agent methods directly:

class MyAgent(Agent):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.data_points = []
    
    def process_data(self, value):
        """Process incoming data."""
        processed = value * 2
        self.data_points.append(processed)
        return processed

def test_data_processing():
    """Test agent's data processing logic."""
    agent = MyAgent(credentials=Credentials(identity="test"), name="mock")
    
    result = agent.process_data(5)
    assert result == 10
    assert agent.data_points == [10]

2. Testing Lifecycle Events

Test agent lifecycle methods (onsetup, onstart, onstop):

class LifecycleAgent(Agent):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.setup_complete = False
        self.started = False
    
    @Core.receiver('onsetup')
    def onsetup(self, sender, **kwargs):
        self.setup_complete = True
    
    @Core.receiver('onstart')  
    def onstart(self, sender, **kwargs):
        self.started = True

def test_lifecycle():
    """Test agent lifecycle events."""
    server = TestServer()
    agent = LifecycleAgent(credentials=Credentials(identity="test"), name="mock")
    server.connect_agent(agent)
    
    # Trigger lifecycle events
    response = server.trigger_setup_event(agent, sender="test")
    assert agent.setup_complete
    
    response = server.trigger_start_event(agent, sender="test")
    assert agent.started

3. Testing with Decorator-Based Subscriptions

Test agents that use @PubSub.subscribe decorators:

class SubscriberAgent(Agent):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.received_messages = []
    
    @PubSub.subscribe('pubsub', 'devices/+/+/all')
    def on_device_data(self, peer, sender, bus, topic, headers, message):
        """Handle device data."""
        self.received_messages.append({
            'topic': topic,
            'message': message,
            'sender': sender
        })

def test_decorated_subscriptions():
    """Test agent with decorator-based subscriptions."""
    server = TestServer()
    
    # Create and connect agent
    agent = SubscriberAgent(credentials=Credentials(identity="subscriber"), name="mock")
    server.connect_agent(agent)
    
    # Intercept pubsub for message routing
    interceptor = intercept_agent_pubsub(agent, TestServer.__server_pubsub__)
    
    # Publish matching message through server
    server.publish("devices/campus1/building1/all", 
                   message={"temperature": 72.5})
    
    # Verify message received
    gevent.sleep(0.1)
    assert len(agent.received_messages) == 1
    assert agent.received_messages[0]['topic'] == "devices/campus1/building1/all"

Advanced Testing with PubSub Interceptor

The pubsub_interceptor module enables full integration testing by intercepting agent pubsub at the instance level:

from volttrontesting.pubsub_interceptor import PubSubInterceptor

def test_multi_agent_communication():
    """Test complex multi-agent workflows."""
    server = TestServer()
    
    # Create multiple agents
    coordinator = Agent(credentials=Credentials(identity="coordinator"), name="mock")
    worker1 = Agent(credentials=Credentials(identity="worker1"), name="mock")
    worker2 = Agent(credentials=Credentials(identity="worker2"), name="mock")
    
    # Connect all agents
    for agent in [coordinator, worker1, worker2]:
        server.connect_agent(agent)
        intercept_agent_pubsub(agent, TestServer.__server_pubsub__)
    
    # Set up subscriptions
    worker_responses = []
    
    def on_task(peer, sender, bus, topic, headers, message):
        # Workers respond to tasks
        worker_id = headers.get('target')
        if worker_id == 'worker1':
            coordinator.vip.pubsub.publish('pubsub', 'response/worker1', 
                                          message={'result': 'done'})
    
    worker1.vip.pubsub.subscribe('pubsub', 'task/+', on_task)
    
    def on_response(peer, sender, bus, topic, headers, message):
        worker_responses.append(message)
    
    coordinator.vip.pubsub.subscribe('pubsub', 'response/+', on_response)
    
    # Coordinator sends task
    coordinator.vip.pubsub.publish('pubsub', 'task/process', 
                                   headers={'target': 'worker1'},
                                   message={'action': 'process'})
    
    # Verify workflow completed
    gevent.sleep(0.1)
    assert len(worker_responses) == 1
    assert worker_responses[0]['result'] == 'done'

Testing Patterns and Best Practices

  1. Always use mock core for testing: Pass name="mock" when creating agents
  2. Use interceptors for pubsub testing: This preserves decorator-based subscriptions
  3. Allow time for message propagation: Use gevent.sleep(0.1) after publishing
  4. Clean up interceptors: Call interceptor.restore() when done
  5. Use TestServer for visibility: Access server.get_published_messages() to inspect all messages

Complete Example: Testing a Realistic Agent

from volttrontesting.server_mock import TestServer
from volttrontesting.pubsub_interceptor import intercept_agent_pubsub
from volttron.client import Agent
from volttron.client.vip.agent import Core, PubSub, RPC
from volttron.types.auth.auth_credentials import Credentials
import gevent

class DataCollectorAgent(Agent):
    """Example agent that collects and aggregates data."""
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.data_buffer = []
        self.aggregated_data = None
    
    @Core.receiver('onstart')
    def onstart(self, sender, **kwargs):
        """Start collecting data on agent start."""
        self.core.periodic(self.publish_aggregated, 5)
    
    @PubSub.subscribe('pubsub', 'devices/+/+/all')
    def on_new_data(self, peer, sender, bus, topic, headers, message):
        """Collect incoming data."""
        self.data_buffer.append(message)
        if len(self.data_buffer) >= 10:
            self.aggregate_data()
    
    def aggregate_data(self):
        """Aggregate collected data."""
        if self.data_buffer:
            # Simple average for numeric values
            values = [d.get('value', 0) for d in self.data_buffer if isinstance(d, dict)]
            self.aggregated_data = sum(values) / len(values) if values else 0
            self.data_buffer = []
    
    def publish_aggregated(self):
        """Publish aggregated results."""
        if self.aggregated_data is not None:
            self.vip.pubsub.publish('pubsub', 'analysis/aggregated',
                                   message={'average': self.aggregated_data})
    
    @RPC.export
    def get_buffer_size(self):
        """RPC method to check buffer size."""
        return len(self.data_buffer)

def test_data_collector_agent():
    """Test the complete data collector workflow."""
    
    # Setup
    server = TestServer()
    agent = DataCollectorAgent(credentials=Credentials(identity="collector"), name="mock")
    server.connect_agent(agent)
    interceptor = intercept_agent_pubsub(agent, TestServer.__server_pubsub__)
    
    # Trigger agent start
    server.trigger_start_event(agent, sender="test")
    
    # Simulate incoming data
    for i in range(10):
        server.publish(f"devices/campus/building/all", 
                      message={'value': i * 10, 'timestamp': f'2024-01-01T00:0{i}:00'})
    
    gevent.sleep(0.1)  # Allow message processing
    
    # Verify aggregation occurred
    assert agent.aggregated_data == 45.0  # Average of 0,10,20,...90
    assert len(agent.data_buffer) == 0  # Buffer should be cleared
    
    # Test RPC method
    assert agent.get_buffer_size() == 0
    
    # Cleanup
    interceptor.restore()

TestServer API Reference

The TestServer class provides these key methods for testing:

  • connect_agent(agent): Connect an agent to the test server
  • publish(topic, headers, message): Publish a message through the server
  • subscribe(pattern, callback): Subscribe to messages with a pattern
  • trigger_setup_event(agent): Trigger agent's onsetup lifecycle event
  • trigger_start_event(agent): Trigger agent's onstart lifecycle event
  • trigger_stop_event(agent): Trigger agent's onstop lifecycle event
  • get_published_messages(): Get all messages published through the server
  • get_server_log(): Get server log messages

Development

Please see the following for contributing guidelines contributing.

Please see the following helpful guide about developing modular VOLTTRON agents

Disclaimer Notice

This material was prepared as an account of work sponsored by an agency of the United States Government. Neither the United States Government nor the United States Department of Energy, nor Battelle, nor any of their employees, nor any jurisdiction or organization that has cooperated in the development of these materials, makes any warranty, express or implied, or assumes any legal liability or responsibility for the accuracy, completeness, or usefulness or any information, apparatus, product, software, or process disclosed, or represents that its use would not infringe privately owned rights.

Reference herein to any specific commercial product, process, or service by trade name, trademark, manufacturer, or otherwise does not necessarily constitute or imply its endorsement, recommendation, or favoring by the United States Government or any agency thereof, or Battelle Memorial Institute. The views and opinions of authors expressed herein do not necessarily state or reflect those of the United States Government or any agency thereof.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

volttron_testing-0.5.1rc4.tar.gz (67.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

volttron_testing-0.5.1rc4-py3-none-any.whl (84.5 kB view details)

Uploaded Python 3

File details

Details for the file volttron_testing-0.5.1rc4.tar.gz.

File metadata

  • Download URL: volttron_testing-0.5.1rc4.tar.gz
  • Upload date:
  • Size: 67.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.1 CPython/3.10.18 Linux/6.8.0-1031-azure

File hashes

Hashes for volttron_testing-0.5.1rc4.tar.gz
Algorithm Hash digest
SHA256 0db74459abab92cdcdb50a63847a4f1ba888b55e31e1404004a9b218d1b40165
MD5 aa3c07197d7486fe93d96dbc42204e77
BLAKE2b-256 5ab82183a5821b5142f19d613296483540961eeb4aa8ea4720d2490d5bb72411

See more details on using hashes here.

File details

Details for the file volttron_testing-0.5.1rc4-py3-none-any.whl.

File metadata

  • Download URL: volttron_testing-0.5.1rc4-py3-none-any.whl
  • Upload date:
  • Size: 84.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.1 CPython/3.10.18 Linux/6.8.0-1031-azure

File hashes

Hashes for volttron_testing-0.5.1rc4-py3-none-any.whl
Algorithm Hash digest
SHA256 7f5b80f1f6839085f55ed616e284709e6470d8b954a6487d294edef1dd1e97e0
MD5 696cb0079f6471184b353d6718302229
BLAKE2b-256 944247c6e3330ae3effeb086124163c1b2e3691b55cafdf589219c3bd08c7d0e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page