Skip to main content

The Modular Autonomous Discovery for Science (MADSci) Node Module Helper Classes.

Project description

MADSci Node Module

Framework for creating laboratory instrument nodes that integrate with MADSci workcells via REST APIs.

Features

  • REST API server: Automatic FastAPI server generation with comprehensive OpenAPI documentation
  • Action system: Declarative action definitions with flexible return types and automatic validation
  • Advanced return types: Support for Pydantic models, files, JSON data, and datapoint IDs
  • File handling: Seamless file upload/download with automatic path management
  • State management: Periodic state polling and reporting
  • Event integration: Built-in logging to MADSci Event Manager
  • Resource integration: Access to MADSci Resource and Data Managers
  • Lifecycle management: Startup, shutdown, and error handling
  • Configuration: YAML-based node configuration and deployment

Installation

See the main README for installation options. This package is available as:

Quick Start

1. Create a Node Class

from madsci.node_module.rest_node_module import RestNode
from madsci.node_module.helpers import action
from madsci.common.types.node_types import RestNodeConfig
from typing import Any
from pathlib import Path
from pydantic import BaseModel

class MyInstrumentConfig(RestNodeConfig):
    """Configuration for your instrument."""
    device_port: str = "/dev/ttyUSB0"
    timeout: int = 30

class MyInstrumentNode(RestNode):
    """Node for controlling my laboratory instrument."""

    config: MyInstrumentConfig = MyInstrumentConfig()
    config_model = MyInstrumentConfig

    def startup_handler(self) -> None:
        """Initialize device connection."""
        # Connect to your instrument
        self.device = MyDeviceInterface(port=self.config.device_port)
        self.logger.log("Instrument initialized!")

    def shutdown_handler(self) -> None:
        """Clean up device connection."""
        if hasattr(self, 'device'):
            self.device.disconnect()

    def state_handler(self) -> dict[str, Any]:
        """Report current instrument state."""
        if hasattr(self, 'device'):
            self.node_state = {
                "temperature": self.device.get_temperature(),
                "status": self.device.get_status()
            }

    @action
    def measure_sample(self, sample_id: str, duration: int = 60) -> dict[str, float]:
        """Measure a sample and return results directly as JSON."""
        # Your instrument control logic here
        result = self.device.measure(sample_id, duration)
        return {"temperature": result.temp, "absorbance": result.abs}

    @action
    def run_protocol(self, protocol_file: Path) -> str:
        """Execute a protocol file and return datapoint ID."""
        self.device.load_protocol(protocol_file)
        results = self.device.run()

        # Upload results and return datapoint ID
        return self.create_and_upload_value_datapoint(
            value=results,
            label=f"protocol_results"
        )

if __name__ == "__main__":
    node = MyInstrumentNode()
    node.start_node()  # Starts REST server

2. Create Node Definition

Create a YAML file (e.g., my_instrument.node.yaml):

node_name: my_instrument_1
node_id: 01JYKZDPANTNRYXF5TQKRJS0F2  # Generate with ulid
node_description: My laboratory instrument for sample analysis
node_type: device
module_name: my_instrument
module_version: 1.0.0

3. Run Your Node

# Run directly
python my_instrument_node.py

# Or with a pre-defined node
python my_instrument_node.py --node_definition my_instrument.node.yaml

# Node will be available at http://localhost:2000/docs

Core Concepts

Actions

Actions are the primary interface for interacting with nodes. They support flexible return types:

# Return simple JSON data
@action
def get_temperature(self) -> float:
    """Get current temperature."""
    return self.device.get_temperature()

# Return custom Pydantic models
class AnalysisResult(BaseModel):
    sample_id: str
    concentration: float
    ph_level: float

@action
def analyze_sample(self, sample_id: str) -> AnalysisResult:
    """Analyze sample and return structured results."""
    result = self.device.analyze(sample_id)
    return AnalysisResult(
        sample_id=sample_id,
        concentration=result.conc,
        ph_level=result.ph
    )

# Return datapoint IDs for workflow integration
@action
def capture_data(self, location: str) -> str:
    """Capture data and return datapoint ID."""
    data = self.device.capture(location)
    return self.create_and_upload_value_datapoint(
        value=data,
        label=f"capture_{location}"
    )

# Handle file operations
@action
def process_file(self, input_file: Path) -> Path:
    """Process file and return output file path."""
    output_path = self.device.process_file(input_file)
    return output_path

Action features:

  • Flexible return types: JSON data, Pydantic models, file paths, datapoint IDs
  • Automatic validation: Parameter and return value validation via type hints
  • File handling: Seamless file uploads/downloads with Path parameters
  • OpenAPI documentation: Comprehensive auto-generated API documentation
  • Type safety: Full type checking for complex nested data structures

Configuration

Node configuration using Pydantic settings:

class MyNodeConfig(RestNodeConfig):
    # Device-specific settings
    device_ip: str = Field(description="Device IP address")
    device_port: int = Field(default=502, description="Device port")

    # Operational settings
    measurement_timeout: int = Field(default=30, description="Timeout in seconds")
    auto_calibrate: bool = Field(default=True, description="Enable auto-calibration")

    # Advanced settings
    retry_attempts: int = Field(default=3, ge=1, description="Number of retry attempts")

Lifecycle Handlers

Manage node startup, shutdown, and state:

class MyNode(RestNode):
    def startup_handler(self) -> None:
        """Called on node initialization."""
        # Initialize connections, load calibration, etc.
        pass

    def shutdown_handler(self) -> None:
        """Called on node shutdown."""
        # Clean up resources, close connections, etc.
        pass

    def state_handler(self) -> dict[str, Any]:
        """Called periodically to update node state."""
        self.node_state = {
            "connected": self.device.is_connected(),
            "ready": self.device.is_ready()
        }

Integration with MADSci Ecosystem

Nodes automatically integrate with other MADSci services:

class IntegratedNode(RestNode):
    @action
    def process_sample(self, sample_id: str) -> str:
        # Get sample info from Resource Manager
        sample = self.resource_client.get_resource(sample_id)

        # Process sample
        result = self.device.process(sample)

        # Store results and return datapoint ID
        datapoint_id = self.create_and_upload_value_datapoint(
            value=result,
            label=f"processing_result_{sample_id}"
        )

        # Log event
        self.logger.log(f"Processed sample {sample_id}")

        return datapoint_id

Return Type Options

Actions support multiple return patterns depending on your needs:

class MyNode(RestNode):
    @action
    def get_status(self) -> dict[str, Any]:
        """Return status directly as JSON for immediate use."""
        return {"temperature": 25.0, "ready": True}

    @action
    def analyze_sample(self, sample_id: str) -> str:
        """Analyze sample and return datapoint ID for workflow storage."""
        analysis_data = {"purity": 95.2, "concentration": 1.25}
        return self.create_and_upload_value_datapoint(
            value=analysis_data,
            label=f"analysis_{sample_id}"
        )

    @action
    def generate_report(self, data: dict) -> Path:
        """Generate report file and return file path."""
        report_path = self.create_report(data)
        return report_path  # File automatically served via REST API

    @action
    def perform_measurement(self) -> None:
        """Perform action without returning data."""
        self.device.calibrate()
        # No return value needed

Choose the appropriate return type based on how the data will be used in workflows.

Example Nodes

See complete working examples in example_lab/example_modules/:

Deployment

Docker Deployment

FROM ghcr.io/ad-sdl/madsci:latest

COPY my_instrument_node.py /app/
COPY my_instrument.node.yaml /app/

WORKDIR /app
EXPOSE 2000

CMD ["python", "my_instrument_node.py"]

Integration with Workcells

Nodes are automatically discovered by workcells via their REST APIs. Configure in your workcell definition:

# workcell.yaml
nodes:
  my_instrument_1: "http://my-instrument:2000"

Testing Your Node

from madsci.client.node.rest_node_client import RestNodeClient

client = RestNodeClient("http://localhost:2000")

# Check node status
status = client.get_status()

# Execute actions
result = client.execute_action("measure_sample", {
    "sample_id": "sample_001",
    "duration": 120
})

Advanced Features

Error Handling

@action
def risky_action(self, param: str) -> dict[str, float]:
    """Actions can raise exceptions for error handling."""
    try:
        result = self.device.risky_operation(param)
        return {"result": result}
    except DeviceError as e:
        # Exceptions are automatically converted to HTTP error responses
        raise RuntimeError(f"Device error: {e}")

File Handling

@action
def process_file(self, file_input: Path, output_dir: Path = None) -> Path:
    """Process uploaded file and return output file."""
    # file_input is automatically handled as file upload
    # Process the file
    processed_data = self.device.process_file(file_input)

    # Save processed file
    output_path = output_dir / "result.csv" if output_dir else Path("result.csv")
    processed_data.to_csv(output_path)

    # Return file path - file is automatically served
    return output_path

@action
def get_multiple_files(self) -> list[Path]:
    """Return multiple files."""
    files = self.device.generate_reports()
    return files  # All files automatically served

Working examples: See example_lab/ for a complete working laboratory with multiple integrated nodes.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

madsci_node_module-0.5.0.tar.gz (72.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

madsci_node_module-0.5.0-py3-none-any.whl (26.4 kB view details)

Uploaded Python 3

File details

Details for the file madsci_node_module-0.5.0.tar.gz.

File metadata

  • Download URL: madsci_node_module-0.5.0.tar.gz
  • Upload date:
  • Size: 72.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: pdm/2.26.0 CPython/3.9.23 Linux/6.11.0-1018-azure

File hashes

Hashes for madsci_node_module-0.5.0.tar.gz
Algorithm Hash digest
SHA256 77bdea14bf1ce01bc46fe99865cb3294f947e81e613d7b79c4097c6ed8462087
MD5 a05154c8b4b3d1259138c98c87ca02a2
BLAKE2b-256 58d602ea3aa90e9b6fae845c98059f91d2caafc3d520dfa9cc2059e5d7828a8d

See more details on using hashes here.

File details

Details for the file madsci_node_module-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: madsci_node_module-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 26.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: pdm/2.26.0 CPython/3.9.23 Linux/6.11.0-1018-azure

File hashes

Hashes for madsci_node_module-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b7353d8633740bf62cfad05ad1a3bd2e9c190f997fac7fed93919361c503c36e
MD5 fa0db548a1351b8de85b63e0cf8adca3
BLAKE2b-256 01988eaafadaafa2e6fea4ac134c62da47a9da4f2379999bf891e2501e973d6f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page