Skip to main content

A python client for RabbitMQ Streams

Project description

RabbitMQ Stream Python Client

A Python asyncio-based client for RabbitMQ Streams

Install

The client is distributed via PIP:

	pip install rstream

Client Codecs

Before start using the client is important to read this section. The client supports two codecs to store the messages to the server:

  • AMQP 1.0
  • Binary

By default you should use AMQP 1.0 codec:

   amqp_message = AMQPMessage(
    body="hello: {}".format(i),
  )

AMQP 1.0 codec vs Binary

You need to use the AMQP 1.0 codec to exchange messages with other stream clients like Java, .NET, Rust, Go or if you want to use the AMQP 0.9.1 clients.

You can use the Binary version if you need to exchange messages from Python to Python.

Note: The messages stored in Binary are not compatible with the other clients and with AMQP 0.9.1 clients.
Once the messages are stored to the server, you can't change them.

Read also the Client Performances section

Publishing messages

You can publish messages with four different methods:

  • send: asynchronous, messages are automatically buffered internally and sent at once after a timeout expires.
  • send_batch: synchronous, the user buffers the messages and sends them. This is the fastest publishing method.
  • send_wait: synchronous, the caller wait till the message is confirmed. This is the slowest publishing method.
  • send_sub_entry: asynchronous, allow batch in sub-entry mode. This mode increases throughput at the cost of increased latency and potential duplicated messages even when deduplication is enabled. It also allows using compression to reduce bandwidth and storage if messages are reasonably similar, at the cost of increasing CPU usage on the client side.

On the examples directory you can find diffent way to send the messages:

Publishing with confirmation

The Send method takes as parameter an handle function that will be called asynchronously when the message sent will be notified from the server to have been published.

Example:

With send_wait instead will wait until the confirmation from the server is received.

Deduplication

RabbitMQ Stream can detect and filter out duplicated messages, based on 2 client-side elements: the producer name and the message publishing ID. All the producer methods to send messages (send, send_batch, send_wait) takes a publisher_name parameter while the message publishing id can be set in the AMQP message.

Example:

Consuming messages

See consumer examples for basic consumer and consumers with different offsets.

Server-side offset tracking

RabbitMQ Streams provides server-side offset tracking for consumers. This features allows a consuming application to restart consuming where it left off in a previous run. You can use the store_offset (to store an offset in the server) and query_offset (to query it) methods of the consumer class like in this example:

Superstreams

A super stream is a logical stream made of individual, regular streams. It is a way to scale out publishing and consuming with RabbitMQ Streams: a large logical stream is divided into partition streams, splitting up the storage and the traffic on several cluster nodes.

See the blog post for more info.

You can use superstream_producer and superstream_consumer classes which internally uses producers and consumers to operate on the componsing streams.

See the Super Stream example

Single Active Consumer support:

Single active consumer provides exclusive consumption and consumption continuity on a stream.
See the blog post for more info. See examples in:

See the single active consumer example

Connecting with SSL:

import ssl

ssl_context = ssl.SSLContext()
ssl_context.load_cert_chain('/path/to/certificate.pem', '/path/to/key.pem')

producer = Producer(
    host='localhost',
    port=5551,
    ssl_context=ssl_context,
    username='guest',
    password='guest',
)

Managing disconnections:

The client does not support auto-reconnect at the moment.

When the TCP connection is disconnected unexpectedly, the client raises an event:

def on_connection_closed(reason: Exception) -> None:
    print("connection has been closed for reason: " + str(reason))

consumer = Consumer(
..        
connection_closed_handler=on_connection_closed,
)

Please take a look at the complete example here

Load Balancer

In order to handle load balancers, you can use the load_balancer_mode parameter for producers and consumers. This will always attempt to create a connection via the load balancer, discarding connections that are inappropriate for the client type.

Producers must connect to the leader node, while consumers can connect to any, prioritizing replicas if available.

Client Performances

The RabbitMQ Stream queues can handle high throughput. Currently, the client cannot reach the maximum throughput the server can handle.

We found some bottlenecks; one of them is the current AMQP 1.0 marshal and unmarshal message format.

This one:

 for i in range(1_000_000):
            amqp_message = AMQPMessage(
                body="hello: {}".format(i),
            )
            # send is asynchronous
            await producer.send(stream=STREAM, message=amqp_message)

is more or less 50% slower than:

 for i in range(1_000_000):
            # send is asynchronous
            await producer.send(stream=STREAM, message=b"hello")

You can use the batch_send to test the performances.

$ python docs/examples/basic_producers/producer_send_batch_binary.py
Sent 1000000 messages in 6.7364 seconds. 148446.9526 messages per second

With AMQP 1.0 parser

$ python docs/examples/basic_producers/producer_send_batch.py       
Sent 1000000 messages in 13.2724 seconds. 75344.4910 messages per second

We are evaluating to rewriting the AMQP 1.0 codec optimized for the stream use case.

Connecting with SSL:

import ssl

ssl_context = ssl.SSLContext()
ssl_context.load_cert_chain('/path/to/certificate.pem', '/path/to/key.pem')

producer = Producer(
    host='localhost',
    port=5551,
    ssl_context=ssl_context,
    username='guest',
    password='guest',
)

Load Balancer

In order to handle load balancers, you can use the load_balancer_mode parameter for producers and consumers. This will always attempt to create a connection via the load balancer, discarding connections that are inappropriate for the client type.

Producers must connect to the leader node, while consumers can connect to any, prioritizing replicas if available.

Build and Test

To run the tests, you need to have a running RabbitMQ Stream server. You can use the docker official image.

Run the server with the following command:

docker run -it --rm --name rabbitmq -p 5552:5552 -p 5672:5672 -p 15672:15672 \
    -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
    rabbitmq:3.12-management

enable the plugin:

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream

and run the tests:

 poetry run pytest

TODO

  • Documentation
  • Handle MetadataUpdate and reconnect to another broker on stream configuration changes
  • AsyncIterator protocol for consumer
  • Add frame size validation

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rstream-0.11.1.tar.gz (25.8 kB view hashes)

Uploaded Source

Built Distribution

rstream-0.11.1-py3-none-any.whl (29.9 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page