Skip to main content

Model Runner Client is a Python library that allows Coordinators to manage real-time model synchronization and perform concurrent predictions on distributed model nodes within a Crunch.

Project description

Model Runner Client

Model Runner Client is a Python library that allows you, as a Coordinator, to interact with models participating in your crunch. It tracks which models join or leave through a WebSocket connection to the model nodes.

Features

  • Real-Time Model Sync: Each model participating in your crunch is an instance of ModelRunner, maintained via WebSocket in the ModelCluster.
  • Concurrent Predictions (with Timeout Handling): Use the derived class of ModelConcurrentRunner (an abstract class) to request predictions from all models simultaneously. Define a timeout to avoid blocking if a model takes too long to predict. Make sure to select the proper instance based on the requirements of your crunch.
    • DynamicSubclassModelConcurrentRunner: Allows you to find a subclass on the remote model, instantiate it, and access all its methods.

Secure Connections

To securely connect with model nodes using Crunch Secure Model Protocol, you can use the SecureCredentials object. Below is an example illustrating how to include secure credentials:

from model_runner_client.security.credentials import SecureCredentials

secure_credentials = SecureCredentials.from_directory(
  path="../../issued-certificate",
)

The from_directory method loads TLS certificates from the specified path to establish a secure connection with model nodes. Ensure that the directory contains valid certificates issued for your coordinator instance.
If you lack the TLS certificates and signed message, generate them using crunch certificate generator.

In debugging mode, to avoid stopping the model node due to a bad secure_credentials configuration, you can set the report_failure option to False when initializing the connection. This prevents models from being disrupted by misconfigurations during development:

secure_credentials = SecureCredentials.from_directory(
  path="../../issued-certificate",
)

concurrent_runner = DynamicSubclassModelConcurrentRunner(
  ...,  # take look to the example below 
  secure_credentials=secure_credentials,
  report_failure=False
)

Installation

The package is available on PyPI.

pip install model-runner-client

[!NOTE] Adjust this command (e.g., pip3 or virtual environments) depending on your setup.

Usage

Below are two examples using DynamicSubclassModelConcurrentRunner, which handles concurrent predictions for you and returns all results in one go.
The first shows how to send the same input to all models, while the second demonstrates how to customize arguments for each model individually.

Shared Arguments for All Models

In the simplest case, the same input is passed to all models. Here's how to structure such a call:

import asyncio
from model_runner_client.model_concurrent_runners import DynamicSubclassModelConcurrentRunner
from model_runner_client.grpc.generated.commons_pb2 import VariantType, Argument, Variant
from model_runner_client.utils.datatype_transformer import encode_data


async def main():
  # crunch_id, host, and port are values provided by crunchdao
  concurrent_runner = DynamicSubclassModelConcurrentRunner(
    timeout=10,
    crunch_id="bird-game",
    host="localhost",
    port=8000,
    base_classname='birdgame.trackers.trackerbase.TrackerBase'
    # secure_credentials=secure_credentials,
  )

  # Initialize communication with the model nodes to fetch 
  # models that want to predict and set up the model cluster
  await concurrent_runner.init()

  async def prediction_call():
    while True:
      # Your data to be predicted (X)
      payload = {
        'falcon_location': 21.179864629354732,
        'time': 230.96231205799998,
        'dove_location': 19.164986723324326,
        'falcon_id': 1
      }

      # Encode data as binary and tick
      await concurrent_runner.call(
        method_name='tick',
        arguments=(
          # args
          # The order of positional arguments is strictly enforced and must match the model's method signature.
          # Adding a new positional argument during a crunch would break existing model implementations.
          [
            Argument(position=1, data=Variant(type=VariantType.JSON, value=encode_data(VariantType.JSON, payload)))
          ],

          # kwargs
          # These keyword arguments are passed only if the model's method explicitly expects them.
          # It is recommended to use kwargs for optional parameters, as they can be introduced without breaking compatibility.
          [],
        )
      )

      # predict now
      result = await concurrent_runner.call(method_name='predict')

      # You receive a dictionary of predictions
      for model_runner, model_predict_result in result.items():
        print(f"{model_runner.model_id}: {model_predict_result}")

      # This pause (30s) simulates other work 
      # the Coordinator might perform between predictions
      await asyncio.sleep(30)

  # Keep the cluster updated with `concurrent_runner.sync()`, 
  # which maintains a permanent WebSocket connection.
  # Then run our prediction process.
  await asyncio.gather(
    asyncio.create_task(concurrent_runner.sync()),
    asyncio.create_task(prediction_call())
  )


if __name__ == "__main__":
  try:
    asyncio.run(main())
  except KeyboardInterrupt:
    print("\nReceived exit signal, shutting down gracefully.")

Per-Model Arguments (Advanced Usage)

In some situations, you may want to send different arguments to each model.
You can do this by passing a function to the arguments parameter:

# ... (same setup as the first example)

  # Common argument encoded once
  payload_arg = Argument(
    position=1,
    data=Variant(
      type=VariantType.JSON,
      value=encode_data(
        VariantType.JSON,
        {
          'falcon_location': 21.179864629354732,
          'time': 230.96231205799998,
          'dove_location': 19.164986723324326,
          'falcon_id': 1,
        })
    )
  )
  
  
  def prepare_arguments(model_runner: DynamicSubclassModelRunner):
    # Use model_runner.model_id to keep track of model-specific information
    player_uid = model_runner.model_id
  
    performance_metrics_kwarg = KwArgument(
      keyword="performance_metrics",
      data=Variant(
        type=VariantType.JSON,
        value=encode_data(
          VariantType.JSON,
          {
            "wealth": self.players[player_uid]["wealth"],  # load model-specific information
            "likelihood_ewa": self.players[player_uid]["wealth"]
          })
      )
    )
  
    return (
      # args
      # The order of positional arguments is strictly enforced and must match the model's method signature.
      # Adding a new positional argument during a crunch would break existing model implementations.
  
      [payload_arg],
  
      # kwargs
      # These keyword arguments are passed only if the model's method explicitly expects them.
      # It is recommended to use kwargs for optional parameters, as they can be introduced without breaking compatibility.
  
      [performance_metrics_kwarg]
    )
  
  
  result = await concurrent_runner.call(
    method_name='tick',
    arguments=prepare_arguments,
  )

# ... (same post-processing as in the first example)

Important Notes

  • Prediction Failures & Timeouts: A prediction may fail or exceed the defined timeout, so be sure to handle these cases appropriately. Refer to ModelPredictResult.Status for details.
  • Custom Implementations: If you need more control over your workflow, you can manage each model individually. Instead of using implementations of ModelConcurrentRunner, you can directly leverage ModelRunner instances from the ModelCluster, customizing how you schedule predictions and handle results.

Contributing

Contributions are welcome! Feel free to open issues or submit pull requests if you encounter any bugs or want to suggest improvements.

License

This project is distributed under the MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

model_runner_client-0.11.0.tar.gz (26.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

model_runner_client-0.11.0-py3-none-any.whl (36.9 kB view details)

Uploaded Python 3

File details

Details for the file model_runner_client-0.11.0.tar.gz.

File metadata

  • Download URL: model_runner_client-0.11.0.tar.gz
  • Upload date:
  • Size: 26.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for model_runner_client-0.11.0.tar.gz
Algorithm Hash digest
SHA256 302ca73eb1b144ee46d82e9813ff9da7cecdc1af890ddcf0f3b21d43e01979d9
MD5 c851e69f8019c929e9dca47a4177b5d3
BLAKE2b-256 12f3d52e9120eea64320c33920f7451be390788c4bcbc2025410ab41ec0ab96b

See more details on using hashes here.

File details

Details for the file model_runner_client-0.11.0-py3-none-any.whl.

File metadata

File hashes

Hashes for model_runner_client-0.11.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3f5f1576da7518a9a31a3d66de91eee5bef92b834f93d2d2b86f5749a106eb6e
MD5 b69703ded5092cd185e2d51be9dcc514
BLAKE2b-256 f12e501000f5e3f752362d9fbe3f0d0c7e35653126ffd69aa255a1dae58545a9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page