Skip to main content

A highlevel interface for the dispatch API

Project description

Dispatch Highlevel Interface

Build Status PyPI Package Docs

Introduction

A highlevel interface for the dispatch API.

See the documentation for more information.

Usage

The Dispatcher class, the main entry point for the API, provides two channels:

  • Lifecycle events: A channel that sends a message whenever a [Dispatch][frequenz.dispatch.Dispatch] is created, updated or deleted.
  • Running status change: Sends a dispatch message whenever a dispatch is ready to be executed according to the schedule or the running status of the dispatch changed in a way that could potentially require the actor to start, stop or reconfigure itself.

Example using the running status change channel

import os
import grpc.aio
from unittest.mock import MagicMock

async def run():
    host = os.getenv("DISPATCH_API_HOST", "localhost")
    port = os.getenv("DISPATCH_API_PORT", "50051")

    service_address = f"{host}:{port}"
    grpc_channel = grpc.aio.insecure_channel(service_address)
    microgrid_id = 1
    dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
    await dispatcher.start()

    actor = MagicMock() # replace with your actor

    changed_running_status_rx = dispatcher.running_status_change.new_receiver()

    async for dispatch in changed_running_status_rx:
        match dispatch.running("DEMO_TYPE"):
            case RunningState.RUNNING:
                print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
                if actor.is_running:
                    actor.reconfigure(
                        components=dispatch.selector,
                        run_parameters=dispatch.payload, # custom actor parameters
                        dry_run=dispatch.dry_run,
                        until=dispatch.until,
                    )  # this will reconfigure the actor
                else:
                    # this will start a new actor with the given components
                    # and run it for the duration of the dispatch
                    actor.start(
                        components=dispatch.selector,
                        run_parameters=dispatch.payload, # custom actor parameters
                        dry_run=dispatch.dry_run,
                        until=dispatch.until,
                    )
            case RunningState.STOPPED:
                actor.stop()  # this will stop the actor
            case RunningState.DIFFERENT_TYPE:
                pass  # dispatch not for this type

Supported Platforms

The following platforms are officially supported (tested):

  • Python: 3.11
  • Operating System: Ubuntu Linux 20.04
  • Architectures: amd64, arm64

Contributing

If you want to know how to build this project and contribute to it, please check out the Contributing Guide.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

frequenz-dispatch-0.1.0.tar.gz (14.3 kB view hashes)

Uploaded Source

Built Distribution

frequenz_dispatch-0.1.0-py3-none-any.whl (13.6 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page