Skip to main content

No project description provided

Project description

Picking a message broker requires accepting various design choices of its authors. You write client side code, server side code and the broker makes things happen. This library is an experiment to see if a different way is feasible - one where you have finer control over what the broker does: you write client, server and broker code.

Usage example

Let’s look at 2 unit tests.

def test_publish_one_read_one_in_memory():

    # This is responsible for exchanging messages in memory
    memory_transport = msglib.ios.io_memory.Transport()

    # We need to identify the broker
    broker_endpoint = 'broker'

    # We also need to identify queues
    class QueueId(int, enum.Enum):
        GREETINGS = enum.auto()

    with (
        msglib.broker.Broker(
            # When a new client connects or writes
            # the broker needs to handle it.
            # This is where the broker's messaging logic lives.
            handler=msglib.handlers.ConnectionHandler(),

            # Object that provides means to talk to the world.
            connection_manager=msglib.ios.io_memory.InMemoryConnectionManager(
                transport=memory_transport,
                endpoint_id=broker_endpoint,
            ),
        ) as broker,
        memory_transport.connect(broker_endpoint) as sender_connection,
        memory_transport.connect(broker_endpoint) as receiver_connection,
    ):
        msglib.client.publish_to_q(
            connection=sender_connection,
            q_id=QueueId.GREETINGS,
            payload=b'Hello, world!',
        )
        sub = msglib.client.blocking_pull_subscribe_to_queue(
            connection=receiver_connection,
            q_id=QueueId.GREETINGS,
        )

        # We don't necessarily know how many low level messages
        # need to travel "on the wire" to send a message, subscribe,
        # receive the message. So we just keep processing
        # until the message is received. To avoid the test
        # getting stuck in the infinite loop, in case
        # of a bug or during refactoring, there is an arbitrary
        # upper bound of the number of iterations.
        # Note that, because this in memory implementation is
        # intended to be used in a test environment, the blocking
        # methods do not in fact block, but instead raise
        # `BlockingIOError` in a blocking situation.
        for _ in range(10):
            broker.process_connections()
            try:
                msg = next(sub)
            except BlockingIOError:
                continue
            break
        else:
            raise AssertionError('Did not receive an expected message.')

        assert msg.payload == b'Hello, world!'
        msg.ack()


def test_publish_one_read_one_sockets():

    broker_port = 12345
    broker_ip = msglib.ios.io_sockets.IPv6.from_string('::1')

    class QueueId(int, enum.Enum):
        GREETINGS = enum.auto()

    with (
            msglib.broker.Broker(
                handler=msglib.handlers.ConnectionHandler(),
                connection_manager=msglib.ios.io_sockets.EpollSocketManager(
                        port=broker_port,
                        ip=broker_ip,
                        epoll_timeout_seconds=0.001,
                )
            ) as broker,
            msglib.ios.io_sockets.connect(
                ip=broker_ip,
                port=broker_port,
                # Add timeout, so that we don't block forever
                # in case of a failing test.
                timeout_seconds=10,
            ) as sender_connection,
            msglib.ios.io_sockets.connect(
                ip=broker_ip,
                port=broker_port,
                timeout_seconds=10,
            ) as receiver_connection,
    ):
        msglib.client.publish_to_q(
            connection=sender_connection,
            q_id=QueueId.GREETINGS,
            payload=b'Hello, world!',
        )
        sub = msglib.client.blocking_pull_subscribe_to_queue(
            connection=receiver_connection,
            q_id=QueueId.GREETINGS,
        )

        # Attempting to read an unavailable message would
        # block the main thread, so we run it in its own thread.
        class Reader(threading.Thread):

            msg: msglib.client.AckableQMsg

            def run(self):
                self.msg = next(sub)

        reader = Reader()
        reader.start()

        # Run broker until the reader thread quits.
        while reader.is_alive():
            broker.process_connections()

        assert reader.msg.payload == b'Hello, world!'
        reader.msg.ack()

Both test cases execute the same logical scenario. A message is published to a queue by one party and consumed by a different one. The key point here is that the broker is created and run as part of these tests.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

msglib-0.2.0.tar.gz (8.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

msglib-0.2.0-py3-none-any.whl (8.8 kB view details)

Uploaded Python 3

File details

Details for the file msglib-0.2.0.tar.gz.

File metadata

  • Download URL: msglib-0.2.0.tar.gz
  • Upload date:
  • Size: 8.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.0 CPython/3.11.2 Linux/6.2.0-23-generic

File hashes

Hashes for msglib-0.2.0.tar.gz
Algorithm Hash digest
SHA256 51b69817d5573b7d52e5c1d08fcae5b65dc264488ce79eb46d89fee308dcd6d8
MD5 e17190ff6341760c54d990e55c256714
BLAKE2b-256 258628dc46baccd6b4d7693a4c5d4905a770377a2cecf86fd657cff02ff61e4e

See more details on using hashes here.

File details

Details for the file msglib-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: msglib-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 8.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.0 CPython/3.11.2 Linux/6.2.0-23-generic

File hashes

Hashes for msglib-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 df3839053e9c4e6581b3f5ce9c6af09e305a7039aa51c4323017ec1382155d0d
MD5 d292277fbce301efb8efa0b4b62fedd5
BLAKE2b-256 08c860a2bb7294effe7089dc28adfa4eb318d0014113c464d69e5870d7ab597d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page