Skip to main content

Provides implementation for publisher and consumer with Kombu library

Project description

Classic Messaging Kombu

This package provides implementation of interfaces in classic-messaging and base for consumers and messages handling.

Usage with publishing:

from classic.components import component
from classic.messaging import Message, Publisher
from classic.messaging_kombu import BrokerScheme, KombuPublisher
from kombu import Exchange, Queue, Connection


@component
class SomeService:
    publisher: Publisher

    def do_some_work(self):
        message = Message('some', 'Some very useful info')
        self.publisher.publish(message)


broker_scheme = BrokerScheme(
    Queue('queue1', Exchange('some')),
)
        
connection = Connection('amqp://localhost:5672/')
publisher = KombuPublisher(
    connection=connection,
    scheme=broker_scheme
)

service = SomeService(publisher=publisher)

service.do_some_work()

Message have 2 arguments - target and body. Target is a str with destination. In simple case it is an exchange name, in complex case - entry in mapping.

Body can be any serializable object.

Customization of target:

from classic.messaging import Message
from classic.messaging_kombu import BrokerScheme, KombuPublisher
from kombu import Exchange, Queue, Connection


targets = {
    'FIRST': {
        'exchange': 'exchange1',
    },
    'SECOND': {
        'exchange': 'exchange2',
        'timeout': 10,
    }
}

broker_scheme = BrokerScheme(
    Queue('queue1', Exchange('exchange1')),
    Queue('queue2', Exchange('exchange2')),
)
        
connection = Connection('amqp://localhost:5672/')
publisher = KombuPublisher(
    connection=connection,
    scheme=broker_scheme,
    messages_params=targets,
)

publisher = KombuPublisher(
    connection=connection,
    scheme=broker_scheme
)

publisher.publish(
    Message('FIRST', 'Hello'),  # Will be published to exchange1 and queue1
    Message('SECOND', 'By'),  # Will be published to exchange2, queue2 and timeout=10
)

Usage with consuming:

from classic.messaging_kombu import BrokerScheme, KombuConsumer, MessageHandler
from kombu import Exchange, Queue, Connection


class SomeSerice:
    def handle_message(self, message):
        print(message)

        
class CustomHandler(MessageHandler):
    
    def handle(self, message, body):
        print(body)
        
        message.ack()


broker_scheme = BrokerScheme(
    Queue('queue1', Exchange('exchange1')),
    Queue('queue2', Exchange('exchange2')),
)

connection = Connection('amqp://localhost:5672/')

consumer = KombuConsumer(
    connection=connection,
    scheme=broker_scheme,
)

service = SomeSerice()
handler = CustomHandler()

consumer.register_function(service.handle_message, 'queue1')
consumer.register_handler(handler, 'queue2')

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

classic-messaging-kombu-0.0.3.tar.gz (4.6 kB view hashes)

Uploaded Source

Built Distribution

classic_messaging_kombu-0.0.3-py3-none-any.whl (6.1 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page