Skip to main content

Implementation of the Shapeshifter (UFTP) protocol

Project description

This is a Python implementation of the ShapeShifter UFTP protocol.

Overview

This library implements the full UFTP protocol that you can use for Shapeshifter communications. It implements all three roles: Distribution System Operator (DSO), Aggregator (AGR) and Common Reference Operator (CRO) in both directions (client and service).

Features of this package:

  • Building, parsing and validation of the XML messages

  • Signing and verifying of the XML messages using signatures

  • DNS for service discovery and key retrieval

  • Convenient clients for each role-pair

  • Convenient services for each role

  • JSON-serializable dataclasses for easy transport to other systems

  • Fully internal queing system for full-duplex communication with minimal user code required

  • Compatible with version 3.0.0 and 3.1.0 of the Shapeshifter protocol.

Installation

pip install shapeshifter-uftp

Running tests

If you want to develop shapeshifter-uftp, you can fork or clone this repository and run the tests:

$ pip install .
$ pip install .[dev]
$ pytest .

Getting Started

Shapehifter always requires the use of a Client and a Service, because all responses are asynchronous.

You choose the server class based on your role in the Shapeshifter conversation. If you are an Aggregator (also known as a CSP), you can use this setup:

from datetime import datetime, timedelta, timezone

from shapeshifter_uftp import ShapeshifterAgrService
from shapeshifter_uftp.uftp import (FlexOffer, FlexOfferOption,
                                    FlexOfferOptionISP, FlexRequest,
                                    FlexRequestResponse, FlexOrder, FlexOrderResponse,
                                    AcceptedRejected)
from xsdata.models.datatype import XmlDate


class DemoAggregator(ShapeshifterAgrService):
    """
    Aggregator service that implements callbacks for
    each of the messages that can be received.
    """

    def process_agr_portfolio_query_response(self, message):
        print(f"Received a message: {message}")

    def process_agr_portfolio_update_response(self, message):
        print(f"Received a message: {message}")

    def process_d_prognosis_response(self, message):
        print(f"Received a message: {message}")

    def process_flex_request(self, message: FlexRequest):
        print(f"Received a message: {message}")

        # Example of how to send a new message after
        # processing an incoming message.
        dso_client = self.dso_client(message.sender_domain)

        # Send the FlexRequestResponse
        dso_client.send_flex_request_response(
            FlexRequestResponse(
                flex_request_message_id=message.message_id,
                conversation_id=message.conversation_id,
                result=AcceptedRejected.ACCEPTED
            )
        )

        # Send the FlexOffer
        dso_client.send_flex_offer(
            FlexOffer(
                flex_request_message_id=message.message_id,
                conversation_id=message.conversation_id,
                isp_duration="PT15M",
                period=XmlDate(2023, 1, 1),
                congestion_point="ean.123456789012",
                expiration_date_time=datetime.now(timezone.utc).isoformat(),
                offer_options=[
                    FlexOfferOption(
                        isps=[FlexOfferOptionISP(power=1, start=1, duration=1)],
                        option_reference="MyOption",
                        price=2.30,
                        min_activation_factor=0.5,
                    )
                ],
            )
        )

    def process_flex_offer_response(self, message: FlexOffer):
        print(f"Received a message: {message}")

    def process_flex_offer_revocation_response(self, message):
        print(f"Received a message: {message}")

    def process_flex_order(self, message: FlexOrder):
        print(f"Received a message: {message}")

        dso_client = self.dso_client(message.sender_domain)
        dso_client.send_flex_order_response(
            FlexOrderResponse(
                flex_order_message_id=message.message_id,
                conversation_id=message.conversation_id,
                result=AcceptedRejected.ACCEPTED
            )
        )

    def process_flex_reservation_update(self, message):
        print(f"Received a message: {message}")

    def process_flex_settlement(self, message):
        print(f"Received a message: {message}")

    def process_metering_response(self, message):
        print(f"Received a message: {message}")


def key_lookup(sender_domain, sender_role):
    """
    Lookup function for public keys, so that incoming
    messages can be verified.
    """
    known_senders = {
        ("dso.demo", "DSO"): "NsTbq/iABU6tbsjriBg/Z5dSfQstulD0GpMI2fLDWec=",
        ("cro.demo", "CRO"): "ySUYU87usErRFKGJafwvVDLGhnBVJCCNYfQvmwv8ObM=",
    }
    return known_senders.get((sender_domain, sender_role))


def endpoint_lookup(sender_domain, sender_role):
    """
    Lookup function for endpoints, so that the service
    knowns where to send responses to.
    """
    known_senders = {
        ("dso.demo", "DSO"): "http://localhost:8081/shapeshifter/api/v3/message",
        ("cro.demo", "CRO"): "http://localhost:8082/shapeshifter/api/v3/message",
    }
    return known_senders.get((sender_domain, sender_role))

aggregator = DemoAggregator(
    sender_domain="aggregator.demo",
    signing_key="mz5XYCNKxpx48K+9oipUhsjBZed3L7rTVKLsWmG1HOqRLIeuGpIa1KAt6AlbVGqJvewd8v1J0uVUTqpGt7F8tw==",
    key_lookup_function=key_lookup,
    endpoint_lookup_function=endpoint_lookup,
    port=8080,
)

# Start the Aggregator Service
aggregator.run_in_thread()

# Create a client object to talk to a DSO
dso_client = aggregator.dso_client("dso.demo")

# Create a Flex Offer Message
flex_offer_message = FlexOffer(
    isp_duration="PT15M",
    period=XmlDate(2023, 1, 1),
    congestion_point="ean.123456789012",
    expiration_date_time=datetime.now(timezone.utc).isoformat(),
    flex_request_message_id=str(uuid4())
    offer_options=[
        FlexOfferOption(
            isps=[FlexOfferOptionISP(power=1, start=1, duration=1)],
            option_reference="MyOption",
            price=2.30,
            min_activation_factor=0.5,
        )
    ],
)

# As a demo, press enter to send another FlexOffer message to the DSO.
while True:
    try:
        input("Press return to send a FlexOffer message to the DSO")
        response = dso_client.send_flex_offer(flex_offer_message)
        print(f"Response was: {response}")
    except:
        aggregator.stop()
        break

Using OAuth in outgoing requests

To use OAuth in outgoing requests, you can use the provided OAuthClient class. To use it in a bare Shapeshifter client:

from shapeshifter_uftp import ShapeshifterAgrDsoClient, OAuthClient

oauth_client = OAuthClient(
    url="https://oauth.provider.url",
    client_id="my-client-id",
    client_secret="my-client-secret"
)

client = ShapeshifterAgrDsoClient(
    sender_domain="my.aggregator.domain",
    signing_key="abcdef",
    recipient_domain="some.dso",
    recipient_endpoint="https://some.dso.endpoint/shapeshifter/api/v3/message",
    recipient_signing_key="123456",
    oauth_client=oauth_client,
)

# If you use any of the sending methods, the oauth client will
# make sure you're authenticated.
client.send_flex_request_response(...)

Similarly, if you have a Service instance that dynamically needs to retrieve the OAuth information for each different recipient server, you can provide an oauth_lookup_function that takes a (sender_domain, sender_role) and returns an instance of OAuthClient.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

shapeshifter_uftp-2.3.2.tar.gz (46.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

shapeshifter_uftp-2.3.2-py3-none-any.whl (59.4 kB view details)

Uploaded Python 3

File details

Details for the file shapeshifter_uftp-2.3.2.tar.gz.

File metadata

  • Download URL: shapeshifter_uftp-2.3.2.tar.gz
  • Upload date:
  • Size: 46.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.2 {"installer":{"name":"uv","version":"0.10.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for shapeshifter_uftp-2.3.2.tar.gz
Algorithm Hash digest
SHA256 e15ff433d1021fc878958bc4ce3671664cc25b4d83d31c357a84044c2dee4a25
MD5 3e4f33f954ff1d6817d47e21adb7adc8
BLAKE2b-256 4ca6d3b67bf8b4a54e714ab347dffe3800eec2791145907ef051c78094a41c98

See more details on using hashes here.

File details

Details for the file shapeshifter_uftp-2.3.2-py3-none-any.whl.

File metadata

  • Download URL: shapeshifter_uftp-2.3.2-py3-none-any.whl
  • Upload date:
  • Size: 59.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.2 {"installer":{"name":"uv","version":"0.10.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for shapeshifter_uftp-2.3.2-py3-none-any.whl
Algorithm Hash digest
SHA256 83266d4b152439fce4c6add8f754ef4000de5d97f0570d17dde31c80ca015462
MD5 4d5a30e292ed3776bf6bfcf13c8bd539
BLAKE2b-256 a2604a6dc0b95e4814743864e694d5bdf2ca531e12193b352f29ff14d5f4657e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page