Skip to main content

Implementation of the Shapeshifter (UFTP) protocol

Project description

This is a Python implementation of the ShapeShifter UFTP protocol.

Overview

This library implements the full UFTP protocol that you can use for Shapeshifter communications. It implements all three roles: Distribution System Operator (DSO), Aggregator (AGR) and Common Reference Operator (CRO) in both directions (client and service).

Features of this package:

  • Building, parsing and validation of the XML messages

  • Signing and verifying of the XML messages using signatures

  • DNS for service discovery and key retrieval

  • Convenient clients for each role-pair

  • Convenient services for each role

  • JSON-serializable dataclasses for easy transport to other systems

  • Fully internal queing system for full-duplex communication with minimal user code required

  • Compatible with version 3.0.0 and 3.1.0 of the Shapeshifter protocol.

Installation

pip install shapeshifter-uftp

Running tests

If you want to develop shapeshifter-uftp, you can fork or clone this repository and run the tests:

$ pip install .
$ pip install .[dev]
$ pytest .

Getting Started

Shapehifter always requires the use of a Client and a Service, because all responses are asynchronous.

You choose the server class based on your role in the Shapeshifter conversation. If you are an Aggregator (also known as a CSP), you can use this setup:

from datetime import datetime, timedelta, timezone

from shapeshifter_uftp import ShapeshifterAgrService
from shapeshifter_uftp.uftp import (FlexOffer, FlexOfferOption,
                                    FlexOfferOptionISP, FlexRequest,
                                    FlexRequestResponse, FlexOrder, FlexOrderResponse,
                                    AcceptedRejected)
from xsdata.models.datatype import XmlDate


class DemoAggregator(ShapeshifterAgrService):
    """
    Aggregator service that implements callbacks for
    each of the messages that can be received.
    """

    def process_agr_portfolio_query_response(self, message):
        print(f"Received a message: {message}")

    def process_agr_portfolio_update_response(self, message):
        print(f"Received a message: {message}")

    def process_d_prognosis_response(self, message):
        print(f"Received a message: {message}")

    def process_flex_request(self, message: FlexRequest):
        print(f"Received a message: {message}")

        # Example of how to send a new message after
        # processing an incoming message.
        dso_client = self.dso_client(message.sender_domain)

        # Send the FlexRequestResponse
        dso_client.send_flex_request_response(
            FlexRequestResponse(
                flex_request_message_id=message.message_id,
                conversation_id=message.conversation_id,
                result=AcceptedRejected.ACCEPTED
            )
        )

        # Send the FlexOffer
        dso_client.send_flex_offer(
            FlexOffer(
                flex_request_message_id=message.message_id,
                conversation_id=message.conversation_id,
                isp_duration="PT15M",
                period=XmlDate(2023, 1, 1),
                congestion_point="ean.123456789012",
                expiration_date_time=datetime.now(timezone.utc).isoformat(),
                offer_options=[
                    FlexOfferOption(
                        isps=[FlexOfferOptionISP(power=1, start=1, duration=1)],
                        option_reference="MyOption",
                        price=2.30,
                        min_activation_factor=0.5,
                    )
                ],
            )
        )

    def process_flex_offer_response(self, message: FlexOffer):
        print(f"Received a message: {message}")

    def process_flex_offer_revocation_response(self, message):
        print(f"Received a message: {message}")

    def process_flex_order(self, message: FlexOrder):
        print(f"Received a message: {message}")

        dso_client = self.dso_client(message.sender_domain)
        dso_client.send_flex_order_response(
            FlexOrderResponse(
                flex_order_message_id=message.message_id,
                conversation_id=message.conversation_id,
                result=AcceptedRejected.ACCEPTED
            )
        )

    def process_flex_reservation_update(self, message):
        print(f"Received a message: {message}")

    def process_flex_settlement(self, message):
        print(f"Received a message: {message}")

    def process_metering_response(self, message):
        print(f"Received a message: {message}")


def key_lookup(sender_domain, sender_role):
    """
    Lookup function for public keys, so that incoming
    messages can be verified.
    """
    known_senders = {
        ("dso.demo", "DSO"): "NsTbq/iABU6tbsjriBg/Z5dSfQstulD0GpMI2fLDWec=",
        ("cro.demo", "CRO"): "ySUYU87usErRFKGJafwvVDLGhnBVJCCNYfQvmwv8ObM=",
    }
    return known_senders.get((sender_domain, sender_role))


def endpoint_lookup(sender_domain, sender_role):
    """
    Lookup function for endpoints, so that the service
    knowns where to send responses to.
    """
    known_senders = {
        ("dso.demo", "DSO"): "http://localhost:8081/shapeshifter/api/v3/message",
        ("cro.demo", "CRO"): "http://localhost:8082/shapeshifter/api/v3/message",
    }
    return known_senders.get((sender_domain, sender_role))

aggregator = DemoAggregator(
    sender_domain="aggregator.demo",
    signing_key="mz5XYCNKxpx48K+9oipUhsjBZed3L7rTVKLsWmG1HOqRLIeuGpIa1KAt6AlbVGqJvewd8v1J0uVUTqpGt7F8tw==",
    key_lookup_function=key_lookup,
    endpoint_lookup_function=endpoint_lookup,
    port=8080,
)

# Start the Aggregator Service
aggregator.run_in_thread()

# Create a client object to talk to a DSO
dso_client = aggregator.dso_client("dso.demo")

# Create a Flex Offer Message
flex_offer_message = FlexOffer(
    isp_duration="PT15M",
    period=XmlDate(2023, 1, 1),
    congestion_point="ean.123456789012",
    expiration_date_time=datetime.now(timezone.utc).isoformat(),
    flex_request_message_id=str(uuid4())
    offer_options=[
        FlexOfferOption(
            isps=[FlexOfferOptionISP(power=1, start=1, duration=1)],
            option_reference="MyOption",
            price=2.30,
            min_activation_factor=0.5,
        )
    ],
)

# As a demo, press enter to send another FlexOffer message to the DSO.
while True:
    try:
        input("Press return to send a FlexOffer message to the DSO")
        response = dso_client.send_flex_offer(flex_offer_message)
        print(f"Response was: {response}")
    except:
        aggregator.stop()
        break

Using OAuth in outgoing requests

To use OAuth in outgoing requests, you can use the provided OAuthClient class. To use it in a bare Shapeshifter client:

from shapeshifter_uftp import ShapeshifterAgrDsoClient, OAuthClient

oauth_client = OAuthClient(
    url="https://oauth.provider.url",
    client_id="my-client-id",
    client_secret="my-client-secret"
)

client = ShapeshifterAgrDsoClient(
    sender_domain="my.aggregator.domain",
    signing_key="abcdef",
    recipient_domain="some.dso",
    recipient_endpoint="https://some.dso.endpoint/shapeshifter/api/v3/message",
    recipient_signing_key="123456",
    oauth_client=oauth_client,
)

# If you use any of the sending methods, the oauth client will
# make sure you're authenticated.
client.send_flex_request_response(...)

Similarly, if you have a Service instance that dynamically needs to retrieve the OAuth information for each different recipient server, you can provide an oauth_lookup_function that takes a (sender_domain, sender_role) and returns an instance of OAuthClient.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

shapeshifter_uftp-2.3.0.tar.gz (42.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

shapeshifter_uftp-2.3.0-py3-none-any.whl (55.2 kB view details)

Uploaded Python 3

File details

Details for the file shapeshifter_uftp-2.3.0.tar.gz.

File metadata

  • Download URL: shapeshifter_uftp-2.3.0.tar.gz
  • Upload date:
  • Size: 42.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.2 {"installer":{"name":"uv","version":"0.10.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for shapeshifter_uftp-2.3.0.tar.gz
Algorithm Hash digest
SHA256 41c38bf5c65ba4bc76c3af13b2cc56397bb8de323de1ca6efe9d126a89663368
MD5 af82be9f8f2c2e78adf2828e2e2f9dc7
BLAKE2b-256 1aa32aecb0ec90be3684493ab81ed0d7d69755fad163534b2d099a6c9dc646c1

See more details on using hashes here.

File details

Details for the file shapeshifter_uftp-2.3.0-py3-none-any.whl.

File metadata

  • Download URL: shapeshifter_uftp-2.3.0-py3-none-any.whl
  • Upload date:
  • Size: 55.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.2 {"installer":{"name":"uv","version":"0.10.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for shapeshifter_uftp-2.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 faa7c4872997e9af04b092109d154a9ec01172f74436567480fe3a0c8ebd7a23
MD5 095cddb22822100e42cabb0c1b834284
BLAKE2b-256 a087f885f7fd46209c7d11f267b533b27442692adcbcb88631798656f9996625

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page