A wrapper for connecting to RabbitMQ which constrains clients to a single purpose channel (producer or consumer) with healing for intermittent connectivity.
Project description
talus (noun) - ta·lus | ˈtā-ləs: a slope formed especially by an accumulation of rock debris; Occasional habitat of the pika.
A wrapper for connecting to RabbitMQ which constrains clients to a single purpose channel (producer or consumer) with healing for intermittent connectivity.
Features
Guided separation of connections for producers and consumers
Re-establish connections to the server when lost
Constrained interface to support simple produce / consume use cases for direct exchanges
Installation
pip install talus
Examples
Creating a consumer which listens on a queue, processes valid messages and publishes as part of processing
from talus.consumer DurableConsumer
from talus.producer DurableProducer
from talus.models.retryer import ConnectionRetryerFactory
from talus.models.connection_parameters import ConsumerConnectionParameterFactory, ProducerConnectionParameterFactory
from talus.models.processor import MessageProcessorBase
from talus.models.message import ConsumeMessageBase, PublishMessageBase, MessageBodyBase
from talus.models.queue import Queue
from talus.models.exchange import Exchange
from talus.models.binding import Binding
from typing import Type
############
# Consumer #
############
# Configure messages that will be consumed
class ObjectMessageBody(MessageBodyBase):
objectName: str
bucket: str
class ConsumeMessage(ConsumeMessageBase):
message_body_cls: Type[ObjectMessageBody] = ObjectMessageBody
# Configure the queue the messages should be consumed from
inbound_queue = Queue(name="inbound.q")
# Configure a message processor to handle the consumed messages
class MessageProcessor(MessageProcessorBase):
def process_message(self, message: ConsumeMessage):
print(f"Received message: {message}")
outbound_message = PublishMessage(
body=ObjectMessageBody(objectName=message.body.objectName, bucket="newBucket"),
) # change the bucket name for some reason
self.producer.publish(outbound_message)
############
# Producer #
############
# Configure messages that will be produced
class PublishMessage(PublishMessageBase):
message_body_cls: Type[ObjectMessageBody] = ObjectMessageBody # using the same schema for simplicity
default_routing_key: str = "outbound.message.m"
# Configure the queue the messages should be routed to
outbound_queue_one = Queue(name="outbound.one.q")
outbound_queue_two = Queue(name="outbound.two.q")
# Configure the exchange and queue bindings for publishing
publish_exchange = Exchange(name="outbound.exchange") # Direct exchange by default
bindings = [Binding(queue=outbound_queue_one, message=PublishMessage),
Binding(queue=outbound_queue_two, message=PublishMessage)] # publishing PublishMessage will route to both queues.
# Actually Connect and run the consumer
def main():
with DurableProducer(
queue_bindings=bindings,
publish_exchange=publish_exchange,
connection_parameters=ProducerConnectionParameterFactory(),
connection_retryer=ConnectionRetryerFactory(),
) as producer:
with DurableConsumer(
consume_queue=inbound_queue,
connection_parameters=ConsumerConnectionParameterFactory(),
connection_retryer=ConnectionRetryerFactory(),
) as consumer:
message_processor = MessageProcessor(producer=producer)
consumer.listen(message_processor)
if __name__ == "__main__":
main()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.