Python AMQP Client Library
Project description
AMQP Client Python
Client with high level of abstraction for manipulation of messages in the event bus RabbitMQ.
Features:
- Automatic creation and management of queues, exchanges and channels;
- Connection persistence and auto reconnect;
- Support for direct, topic and fanout exchanges;
- Publish;
- Subscribe;
- Support for a Remote procedure call (RPC).
Examples:
you can use sync and async eventbus
async usage
# basic configuration
from amqp_client_python import (
AsyncEventbusRabbitMQ,
Config, Options
)
from amqp_client_python.event import IntegrationEvent, IntegrationEventHandler
config = Config(Options("queue", "rpc_queue", "rpc_exchange"))
eventbus = AsyncEventbusRabbitMQ(config)
# publish
class ExampleEvent(IntegrationEvent):
EVENT_NAME: str = "ExampleEvent"
def __init__(self, event_type: str, message = []) -> None:
super().__init__(self.EVENT_NAME, event_type)
self.message = message
publish_event = ExampleEvent(rpc_exchange, ["message"])
eventbus.publish(publish_event, rpc_routing_key, "direct")
# subscribe
class ExampleEventHandler(IntegrationEventHandler):
def handle(self, body) -> None:
print(body) # handle messages
await eventbus.subscribe(subscribe_event, subscribe_event_handle, rpc_routing_key)
# rpc_publish
response = await eventbus.rpc_client(rpc_exchange, "user.find", ["content_message"])
# provider
async def handle2(*body) -> bytes:
print(f"body: {body}")
return b"content"
await eventbus.provide_resource("user.find", handle)
sync usage
from amqp_client_python import (
EventbusRabbitMQ,
Config, Options
)
from amqp_client_python.event import IntegrationEvent, IntegrationEventHandler
from examples.default import queue, rpc_queue, rpc_exchange, rpc_routing_key
class ExampleEvent(IntegrationEvent):
EVENT_NAME: str = "ExampleEvent"
ROUTING_KEY: str = rpc_routing_key
def __init__(self, event_type: str, message = []) -> None:
super().__init__(self.EVENT_NAME, event_type)
self.message = message
self.routing_key = self.ROUTING_KEY
class ExampleEventHandler(IntegrationEventHandler):
def handle(self, body) -> None:
print(body,"subscribe")
config = Config(Options(queue, rpc_queue, rpc_exchange))
eventbus = EventbusRabbitMQ(config=config)
class ExampleEvent(IntegrationEvent):
EVENT_NAME: str = "ExampleEvent"
def __init__(self, event_type: str, message = []) -> None:
super().__init__(self.EVENT_NAME, event_type)
self.message = message
from time import sleep
from random import randint
def handle(*body):
print(body[0], "rpc_provider")
return f"{body[0]}".encode("utf-8")
subscribe_event = ExampleEvent(rpc_exchange)
publish_event = ExampleEvent(rpc_exchange, ["message"])
subscribe_event_handle = ExampleEventHandler()
eventbus.subscribe(subscribe_event, subscribe_event_handle, rpc_routing_key)
eventbus.provide_resource(rpc_routing_key+"2", handle)
count = 0
running = True
from concurrent.futures import TimeoutError
while running:
try:
count += 1
if str(count) != eventbus.rpc_client(rpc_exchange, rpc_routing_key+"2", [f"{count}"]).decode("utf-8"):
running = False
#eventbus.publish(publish_event, rpc_routing_key, "direct")
#running = False
except TimeoutError as err:
print("timeout!!!: ", str(err))
except KeyboardInterrupt:
running=False
except BaseException as err:
print("Err:", err)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
amqp_client_python-0.1.6.tar.gz
(22.1 kB
view hashes)
Built Distributions
Close
Hashes for amqp_client_python-0.1.6-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2665a550936894416d43e118e106b640e1291ba0a84bba7fb23a5a7f44852403 |
|
MD5 | 770fe572d6ec64557c93085e7513de54 |
|
BLAKE2b-256 | 36ff37c96a4b6cd33799ac01e778acef9484863be2a880dfb52def9a49601e89 |
Close
Hashes for amqp_client_python-0.1.6-1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 201d7cea26aa1c97fdc69f94b64ce4d44d718ee09d796d3c427c99989a8bbc3e |
|
MD5 | 223e51cc1059cc7f97f937daa023fc6f |
|
BLAKE2b-256 | c347371a728011fac66fa60c196e8820910cd4443bbcc864717a5deb19639bae |