Skip to main content

Defines a root level messaging architecture in python

Project description

aiSSMEBLE™ Foundation Python Messaging Client (UNDER CONSTRUCTION)

PyPI PyPI - Python Version PyPI - Wheel

Overview

aiSSEMBLE™'s python messaging module provides an interface through which python pipelines can leverage messaging's features. This is done through the MessagingClient class, the details of which are discussed below, along with examples of its use.

Subscribing

The messaging client can pass along a subscription request to the service to subscribe to a given topic. The message client can start receiving messages after subscription. However, any messages before subscription will be ignored. To do this, you will need to tell the service which topic you want to subscribe to, provide a callback function if you would like to execute one, and an acknowledgement strategy for handling messages on the topic.

def my_pipeline_function()
client = MessagingClient()
ack_strategy = AckStrategy.<STRATEGY>
client.subscribe("topic_name", my_callback_func(), ack_strategy)

Sending

The messaging client can send a message to a given topic name through the service and will receive back a future object to handle that messages receipt. To do this, you can simply provide a topic name and a message object containing the text string for your message, like the following example.

c = MessagingClient()
message = Message()
message.set_payload("test message")
result = c.queueForSend("TopicA", message)

Receiving

The message client can receive a message to a given topic name through the service after subscribe to the topic. However, any messages to the given topic before subscription is made will be ignored.

To do this, you can define a Callback function and subscribe to the topic with the Callback function created, like the following example:

Note: the client can must ack or nack the message received. If there are any exception thrown during the process time, the exception will be handled by the service and the message will be nacked.

from aissemble_messaging import ProcessMessageError, MessagingClient, AckStrategy, Message, Callback
import threading

def receive_message(msg: Message, topic):
    message_received = msg.get_payload()
    print("Received message from topic: {}, message: {}".format(topic, message_received))
    
    # to nack a message, return msg.nack(reason)
    # ex:
    if "nacked" in message_received :
        return msg.nack("receiver nacks the message: " + message_received)
    
    # when raise an Exception for any issue, message will be nacked
    # ex:
    if "problem" in message_received:
        raise Exception("error while processing message: " + message_received)

    # to ack a message, return msg.ack()
    return msg.ack()

def demo_topic_callback(msg: Message):
    receive_message(msg, "demoTopic")
    
def subscribe_to_topic(consumer, callback_func):
    consumer.subscribe("demoTopic", callback_func, AckStrategy.MANUAL)
    is_subscribed = consumer.isTopicSubscribed("demoTopic")
    while is_subscribed:
        pass


# Create MessageClient with default and the messaging configration
client = MessagingClient(properties_location = os.path.join("microprofile-config.properties"))

sb_thread = threading.Thread(target=subscribe_to_topic,args=[client, Callback(demo_topic_callback)])
sb_thread.start()

Misc.

The testing strategy for the client's integration tests is to run the messaging service's jar through py4j's launch_gateway() function, and then interact with it through the MessagingClient class. In addition to running the jar, the launch_gateway() function has some quality of life features for testing. In order to include logging from the jvm in the python test output, add the following parameters to the function, and run the behave tests with the --no-capture flag.

redirect_stdout=sys.stdout,
redirect_stderr=sys.stdout

Please see the aiSSEMBLE Messaging documentation for additional general information.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aissemble_foundation_messaging_python-1.12.1.tar.gz (55.0 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file aissemble_foundation_messaging_python-1.12.1.tar.gz.

File metadata

File hashes

Hashes for aissemble_foundation_messaging_python-1.12.1.tar.gz
Algorithm Hash digest
SHA256 5f78ff74f05fb7f2dd1738b7d95317e34e3947a9e26feabdc83d98da4976671c
MD5 8b2e8ee64b3d2aec8d29d4653dd261d4
BLAKE2b-256 1fbe1e77cf114146de454a86cc81b1583a6906bee44a0e9e0e14b7735f1b90d9

See more details on using hashes here.

File details

Details for the file aissemble_foundation_messaging_python-1.12.1-py3-none-any.whl.

File metadata

File hashes

Hashes for aissemble_foundation_messaging_python-1.12.1-py3-none-any.whl
Algorithm Hash digest
SHA256 adab0771ff841319b4842d8d6a714ea2fe573a4ea9bf81cfb20f414dc93d20c6
MD5 1d4ff3edef90b0d9313b003884b9c791
BLAKE2b-256 b2dc4104765a3028389f0781919859d94f1748f5412d2ff7e484267e4c6a7e8c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page