Provides implementation for publisher and consumer with Kombu library
Project description
Classic Messaging Kombu
This package provides implementation of interfaces in classic-messaging and base for consumers and messages handling.
Usage with publishing:
from classic.components import component
from classic.messaging import Message, Publisher
from classic.messaging_kombu import BrokerScheme, KombuPublisher
from kombu import Exchange, Queue, Connection
@component
class SomeService:
publisher: Publisher
def do_some_work(self):
message = Message('some', 'Some very useful info')
self.publisher.publish(message)
broker_scheme = BrokerScheme(
Queue('queue1', Exchange('some')),
)
connection = Connection('amqp://localhost:5672/')
publisher = KombuPublisher(
connection=connection,
scheme=broker_scheme
)
service = SomeService(publisher=publisher)
service.do_some_work()
Message have 2 arguments - target and body. Target is a str with destination. In simple case it is an exchange name, in complex case - entry in mapping.
Body can be any serializable object.
Customization of target:
from classic.messaging import Message
from classic.messaging_kombu import BrokerScheme, KombuPublisher
from kombu import Exchange, Queue, Connection
targets = {
'FIRST': {
'exchange': 'exchange1',
},
'SECOND': {
'exchange': 'exchange2',
'timeout': 10,
}
}
broker_scheme = BrokerScheme(
Queue('queue1', Exchange('exchange1')),
Queue('queue2', Exchange('exchange2')),
)
connection = Connection('amqp://localhost:5672/')
publisher = KombuPublisher(
connection=connection,
scheme=broker_scheme,
messages_params=targets,
)
publisher = KombuPublisher(
connection=connection,
scheme=broker_scheme
)
publisher.publish(
Message('FIRST', 'Hello'), # Will be published to exchange1 and queue1
Message('SECOND', 'By'), # Will be published to exchange2, queue2 and timeout=10
)
Usage with consuming:
from classic.messaging_kombu import BrokerScheme, KombuConsumer, MessageHandler
from kombu import Exchange, Queue, Connection
class SomeSerice:
def handle_message(self, message):
print(message)
class CustomHandler(MessageHandler):
def handle(self, message, body):
print(body)
message.ack()
broker_scheme = BrokerScheme(
Queue('queue1', Exchange('exchange1')),
Queue('queue2', Exchange('exchange2')),
)
connection = Connection('amqp://localhost:5672/')
consumer = KombuConsumer(
connection=connection,
scheme=broker_scheme,
)
service = SomeSerice()
handler = CustomHandler()
consumer.register_function(service.handle_message, 'queue1')
consumer.register_handler(handler, 'queue2')
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file classic-messaging-kombu-0.0.3.tar.gz
.
File metadata
- Download URL: classic-messaging-kombu-0.0.3.tar.gz
- Upload date:
- Size: 4.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.2 CPython/3.7.3
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 82b8861caff7566992fcc025165efa4bc638ce4d16625e4d2c3796a6cdafab18 |
|
MD5 | 2935a7c5f430ba91c53ab74c0e7072ea |
|
BLAKE2b-256 | 941b1d2d941e4589a637f5d6d0aded1bd586b2cf52ce559dc12c7688b650d2c4 |
File details
Details for the file classic_messaging_kombu-0.0.3-py3-none-any.whl
.
File metadata
- Download URL: classic_messaging_kombu-0.0.3-py3-none-any.whl
- Upload date:
- Size: 6.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.2 CPython/3.7.3
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d47bea6c8bc89f6f1f44abdf45e35999725db034afbd508ea98072a111602825 |
|
MD5 | 106facbb13b6edf035b8183b0f5eb6c9 |
|
BLAKE2b-256 | 7e848ee2b158cb67b64b566c990c3d357e52a83cd1f4c63e8ab7d952e8bff537 |