Skip to main content

Implementation of the Shapeshifter (UFTP) protocol

Project description

This is a Python implementation of the ShapeShifter UFTP protocol.

Overview

This library implements the full UFTP protocol that you can use for Shapeshifter communications. It implements all three roles: Distribution System Operator (DSO), Aggregator (AGR) and Common Reference Operator (CRO) in both directions (client and service).

Features of this package:

  • Building, parsing and validation of the XML messages

  • Signing and verifying of the XML messages using signatures

  • DNS for service discovery and key retrieval

  • Convenient clients for each role-pair

  • Convenient services for each role

  • JSON-serializable dataclasses for easy transport to other systems

  • Fully internal queing system for full-duplex communication with minimal user code required

  • Compatible with version 3.0.0 and 3.1.0 of the Shapeshifter protocol.

Installation

pip install shapeshifter-uftp

Running tests

If you want to develop shapeshifter-uftp, you can fork or clone this repository and run the tests:

$ pip install .
$ pip install .[dev]
$ pytest .

Getting Started

Shapehifter always requires the use of a Client and a Service, because all responses are asynchronous.

You choose the server class based on your role in the Shapeshifter conversation. If you are an Aggregator (also known as a CSP), you can use this setup:

from datetime import datetime, timedelta, timezone

from shapeshifter_uftp import ShapeshifterAgrService
from shapeshifter_uftp.uftp import (FlexOffer, FlexOfferOption,
                                    FlexOfferOptionISP, FlexRequest,
                                    FlexRequestResponse, FlexOrder, FlexOrderResponse,
                                    AcceptedRejected)
from xsdata.models.datatype import XmlDate


class DemoAggregator(ShapeshifterAgrService):
    """
    Aggregator service that implements callbacks for
    each of the messages that can be received.
    """

    def process_agr_portfolio_query_response(self, message):
        print(f"Received a message: {message}")

    def process_agr_portfolio_update_response(self, message):
        print(f"Received a message: {message}")

    def process_d_prognosis_response(self, message):
        print(f"Received a message: {message}")

    def process_flex_request(self, message: FlexRequest):
        print(f"Received a message: {message}")

        # Example of how to send a new message after
        # processing an incoming message.
        dso_client = self.dso_client(message.sender_domain)

        # Send the FlexRequestResponse
        dso_client.send_flex_request_response(
            FlexRequestResponse(
                flex_request_message_id=message.message_id,
                conversation_id=message.conversation_id,
                result=AcceptedRejected.ACCEPTED
            )
        )

        # Send the FlexOffer
        dso_client.send_flex_offer(
            FlexOffer(
                flex_request_message_id=message.message_id,
                conversation_id=message.conversation_id,
                isp_duration="PT15M",
                period=XmlDate(2023, 1, 1),
                congestion_point="ean.123456789012",
                expiration_date_time=datetime.now(timezone.utc).isoformat(),
                offer_options=[
                    FlexOfferOption(
                        isps=[FlexOfferOptionISP(power=1, start=1, duration=1)],
                        option_reference="MyOption",
                        price=2.30,
                        min_activation_factor=0.5,
                    )
                ],
            )
        )

    def process_flex_offer_response(self, message: FlexOffer):
        print(f"Received a message: {message}")

    def process_flex_offer_revocation_response(self, message):
        print(f"Received a message: {message}")

    def process_flex_order(self, message: FlexOrder):
        print(f"Received a message: {message}")

        dso_client = self.dso_client(message.sender_domain)
        dso_client.send_flex_order_response(
            FlexOrderResponse(
                flex_order_message_id=message.message_id,
                conversation_id=message.conversation_id,
                result=AcceptedRejected.ACCEPTED
            )
        )

    def process_flex_reservation_update(self, message):
        print(f"Received a message: {message}")

    def process_flex_settlement(self, message):
        print(f"Received a message: {message}")

    def process_metering_response(self, message):
        print(f"Received a message: {message}")


def key_lookup(sender_domain, sender_role):
    """
    Lookup function for public keys, so that incoming
    messages can be verified.
    """
    known_senders = {
        ("dso.demo", "DSO"): "NsTbq/iABU6tbsjriBg/Z5dSfQstulD0GpMI2fLDWec=",
        ("cro.demo", "CRO"): "ySUYU87usErRFKGJafwvVDLGhnBVJCCNYfQvmwv8ObM=",
    }
    return known_senders.get((sender_domain, sender_role))


def endpoint_lookup(sender_domain, sender_role):
    """
    Lookup function for endpoints, so that the service
    knowns where to send responses to.
    """
    known_senders = {
        ("dso.demo", "DSO"): "http://localhost:8081/shapeshifter/api/v3/message",
        ("cro.demo", "CRO"): "http://localhost:8082/shapeshifter/api/v3/message",
    }
    return known_senders.get((sender_domain, sender_role))

aggregator = DemoAggregator(
    sender_domain="aggregator.demo",
    signing_key="mz5XYCNKxpx48K+9oipUhsjBZed3L7rTVKLsWmG1HOqRLIeuGpIa1KAt6AlbVGqJvewd8v1J0uVUTqpGt7F8tw==",
    key_lookup_function=key_lookup,
    endpoint_lookup_function=endpoint_lookup,
    port=8080,
)

# Start the Aggregator Service
aggregator.run_in_thread()

# Create a client object to talk to a DSO
dso_client = aggregator.dso_client("dso.demo")

# Create a Flex Offer Message
flex_offer_message = FlexOffer(
    isp_duration="PT15M",
    period=XmlDate(2023, 1, 1),
    congestion_point="ean.123456789012",
    expiration_date_time=datetime.now(timezone.utc).isoformat(),
    flex_request_message_id=str(uuid4())
    offer_options=[
        FlexOfferOption(
            isps=[FlexOfferOptionISP(power=1, start=1, duration=1)],
            option_reference="MyOption",
            price=2.30,
            min_activation_factor=0.5,
        )
    ],
)

# As a demo, press enter to send another FlexOffer message to the DSO.
while True:
    try:
        input("Press return to send a FlexOffer message to the DSO")
        response = dso_client.send_flex_offer(flex_offer_message)
        print(f"Response was: {response}")
    except:
        aggregator.stop()
        break

Using OAuth in outgoing requests

To use OAuth in outgoing requests, you can use the provided OAuthClient class. To use it in a bare Shapeshifter client:

from shapeshifter_uftp import ShapeshifterAgrDsoClient, OAuthClient

oauth_client = OAuthClient(
    url="https://oauth.provider.url",
    client_id="my-client-id",
    client_secret="my-client-secret"
)

client = ShapeshifterAgrDsoClient(
    sender_domain="my.aggregator.domain",
    signing_key="abcdef",
    recipient_domain="some.dso",
    recipient_endpoint="https://some.dso.endpoint/shapeshifter/api/v3/message",
    recipient_signing_key="123456",
    oauth_client=oauth_client,
)

# If you use any of the sending methods, the oauth client will
# make sure you're authenticated.
client.send_flex_request_response(...)

Similarly, if you have a Service instance that dynamically needs to retrieve the OAuth information for each different recipient server, you can provide an oauth_lookup_function that takes a (sender_domain, sender_role) and returns an instance of OAuthClient.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

shapeshifter_uftp-2.3.1.tar.gz (42.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

shapeshifter_uftp-2.3.1-py3-none-any.whl (55.2 kB view details)

Uploaded Python 3

File details

Details for the file shapeshifter_uftp-2.3.1.tar.gz.

File metadata

  • Download URL: shapeshifter_uftp-2.3.1.tar.gz
  • Upload date:
  • Size: 42.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.2 {"installer":{"name":"uv","version":"0.10.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for shapeshifter_uftp-2.3.1.tar.gz
Algorithm Hash digest
SHA256 60bc5f2f783bea0642d1cc0e9f3d55651972a0dd30b7163145e1650d85cb519c
MD5 4003a73f133bdae9727d79b397fa6984
BLAKE2b-256 6377c9590a3320b1ae75c8eaa95a86d6b37b6a3db8246cbe1b4fdc20ae8795f3

See more details on using hashes here.

File details

Details for the file shapeshifter_uftp-2.3.1-py3-none-any.whl.

File metadata

  • Download URL: shapeshifter_uftp-2.3.1-py3-none-any.whl
  • Upload date:
  • Size: 55.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.2 {"installer":{"name":"uv","version":"0.10.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for shapeshifter_uftp-2.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 087d297fdf6a30b4a3b4e692085cb539e8f22257d0cca6d3a5d9173519ef8d4d
MD5 71a981f2869da5a3af67ce696e7e85bb
BLAKE2b-256 3ae75e315256b4771b5ecb84967a255a166bdc1edbda58c7c65da20c88f8401f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page