Skip to main content

Message (Command and Event) passing for Python

Project description

messageit

Message (Command and Event) passing for Python

Examples

Create custom event

Custom events could be used to notify multiple subscribers about something that happened to the the application or to the world. To create custom event, you subclass the Event class and mark it as dataclass:

from dataclasses import dataclass
from messageit import Event

@dataclass
class MessageReceived(Event):
    message: str = None

Each message has message_id property automatically set to an UUID. If you do not pass correlation_id during message creation, correlation_id is set to the message_id. The correlation_id could be used to track the execution of complex flows.

event = MessageReceived(message="Hello world")
event.message_id   # UUID('ddeb3e81-3b75-403b-9b1e-13007b1f3abe')
event.correlation_id  # UUID('ddeb3e81-3b75-403b-9b1e-13007b1f3abe')

Create custom command

Commands are used to perform actions. To create custom command, you subclass the Command class and mark it as dataclass:

from dataclasses import dataclass
from messageit import Command

@dataclass
class CleanMessage(Command):
    message: str = None

If correlation_id is passed during message creation the passed value is set to the message's correlation_id attribute. With proper logging this could be used to track complex flows, e.g. distributed event and message handling.

clean = CleanMessage(message="Hello world", correlation_id=event.correlation_id)
clean.message_id      # UUID('e0812459-5145-4de8-8d96-d0fce7fefb42')
clean.correlation_id  # UUID('ddeb3e81-3b75-403b-9b1e-13007b1f3abe')
event.correlation_id  # UUID('ddeb3e81-3b75-403b-9b1e-13007b1f3abe')

Handle Commands - Assign algorithm to perform the action

Building on the previous example, we could assign algorithm to be executed when action is requested:

from messageit import Executor

def clean_message(command: CleanMessage):
    return command.message.strip()

commands = Executor()
commands.register(CleanMessage, clean_message)

command = CleanMessage(message="   Hello   ")
commands.handle(command)   # 'Hello'

Subscribe to Events - Get notified when event takes place

Building on previous examples:

from messageit import Publisher

def on_message_received(event: MessageReceived):
    clean_command = CleanMessage(message = event.message, correlation_id = event.message_id)
    cleaned = commands.handle(clean_command)
    print(f"Cleaned message: {repr(cleaned)}")


def echo_message(event: MessageReceived):
    print(f"Your input is: '{event.message}'")
    raise NotImplementedError()

events = Publisher()
events.register(MessageReceived, echo_message)
events.register(MessageReceived, on_message_received)

result = events.handle(
    MessageReceived(message="Hello World!!!")
)   # Your input is: 'Hello World!!!''
    # Cleaned message: 'Hello World!!!'
result  # [None, NotImplementedError()]

The two registered subscribers were called and produced output. The result from handle() is a collection from the results returned by subscribers invoked by handle(). If subscriber raised an exception, the exception is used as return result.

Complete example

Here is a complete example putting all the pieces from above together. It is a simple command line applicationo that reads a line from the user and prints the cleansed message.

from dataclasses import dataclass
from messageit import Command, Event, Executor, Publisher

@dataclass
class CleanMessage(Command):
    message: str = None

@dataclass
class MessageReceived(Event):
    message: str = None

def on_message_received(event: MessageReceived):
    print(f"Received message: {repr(event.message)}")
    clean_command = CleanMessage(message = event.message, correlation_id = event.message_id)
    cleaned = commands.handle(clean_command)
    print(f"Cleaned message: {repr(cleaned)}")

def clean_message(command: CleanMessage):
    return command.message.strip()

events = Publisher()
events.register(MessageReceived, on_message_received)

commands = Executor()
commands.register(CleanMessage, clean_message)

while True:
    print("Type your message or leave empty to exit and press <Enter>: ")
    message = input()
    if message == "":
        break
    result = events.handle(MessageReceived(message=message))

Package and publish

To run the tests:

$ pytest

To package and publish:

$ pip install -U build twine
$ python -m build
$ python -m twine upload dist/*

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

messageit-0.1.0.tar.gz (7.9 kB view details)

Uploaded Source

Built Distribution

messageit-0.1.0-py3-none-any.whl (8.1 kB view details)

Uploaded Python 3

File details

Details for the file messageit-0.1.0.tar.gz.

File metadata

  • Download URL: messageit-0.1.0.tar.gz
  • Upload date:
  • Size: 7.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.10.2

File hashes

Hashes for messageit-0.1.0.tar.gz
Algorithm Hash digest
SHA256 688223432a4e430bab81769ca27c7385b0cfc4d2f5f591700fd49e87681c5b38
MD5 d33d281c9c0a448f0da70988ab17ca2d
BLAKE2b-256 bdc616a1f38e87389f82af39df89b5f1b604e137e73f81716d47513f4493d034

See more details on using hashes here.

File details

Details for the file messageit-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: messageit-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 8.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.10.2

File hashes

Hashes for messageit-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c0c6a1b602f559566857fdfc28b8869a6119f5e85791c2de86daa4ddce9bae92
MD5 6112a30c9d8a301766022b26b00e2119
BLAKE2b-256 e7f5d5ad0900a1d662bd0dd7e4cf8a008ab5a91c00be5971ecabd46f95585e85

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page