Python AMQP Client Library
Project description
AMQP Client Python
Client with high level of abstraction for manipulation of messages in the event bus RabbitMQ.
Documentation: https://nutes-uepb.github.io/amqp-client-python
Source Code: https://github.com/nutes-uepb/amqp-client-python
Features:
- Automatic creation and management of queues, exchanges and channels;
- Connection persistence and auto reconnect;
- Support for direct, topic and fanout exchanges;
- Publish;
- Subscribe;
- Support for a Remote procedure call (RPC).
Table of Compatibility
version | compatible with |
---|---|
0.2.0 | 0.2.0 |
0.1.14 | ~0.1.12 |
Examples:
you can use sync , async eventbus and sync wrapper of async eventbus
async usage
# basic configuration
from amqp_client_python import (
AsyncEventbusRabbitMQ,
Config, Options
)
config = Config(Options("queue", "rpc_queue", "rpc_exchange"))
eventbus = AsyncEventbusRabbitMQ(config)
# publish
eventbus.publish("rpc_exchange", "routing.key", "message_content")
# subscribe
async def subscribe_handler(body) -> None:
print(body, type(body), flush=True) # handle messages
await eventbus.subscribe("rpc_exchange", "routing.key", subscribe_handler)
# rpc_publish
response = await eventbus.rpc_client("rpc_exchange", "user.find", "message_content")
# provider
async def rpc_provider_handler(body) -> bytes:
print(f"body: {body}")
return b"content"
await eventbus.provide_resource("user.find", rpc_provider_handler)
sync usage(deprecated)
from amqp_client_python import (
EventbusRabbitMQ,
Config, Options
)
from amqp_client_python.event import IntegrationEvent, IntegrationEventHandler
from examples.default import queue, rpc_queue, rpc_exchange, rpc_routing_key
class ExampleEvent(IntegrationEvent):
EVENT_NAME: str = "ExampleEvent"
ROUTING_KEY: str = rpc_routing_key
def __init__(self, event_type: str, message = []) -> None:
super().__init__(self.EVENT_NAME, event_type)
self.message = message
self.routing_key = self.ROUTING_KEY
class ExampleEventHandler(IntegrationEventHandler):
def handle(self, body) -> None:
print(body,"subscribe")
config = Config(Options(queue, rpc_queue, rpc_exchange))
eventbus = EventbusRabbitMQ(config=config)
class ExampleEvent(IntegrationEvent):
EVENT_NAME: str = "ExampleEvent"
def __init__(self, event_type: str, message = []) -> None:
super().__init__(self.EVENT_NAME, event_type)
self.message = message
from time import sleep
from random import randint
def handle(*body):
print(body[0], "rpc_provider")
return f"{body[0]}".encode("utf-8")
subscribe_event = ExampleEvent(rpc_exchange)
publish_event = ExampleEvent(rpc_exchange, ["message"])
subscribe_event_handle = ExampleEventHandler()
eventbus.subscribe(subscribe_event, subscribe_event_handle, rpc_routing_key)
eventbus.provide_resource(rpc_routing_key+"2", handle)
count = 0
running = True
from concurrent.futures import TimeoutError
while running:
try:
count += 1
if str(count) != eventbus.rpc_client(rpc_exchange, rpc_routing_key+"2", [f"{count}"]).decode("utf-8"):
running = False
#eventbus.publish(publish_event, rpc_routing_key, "message_content")
#running = False
except TimeoutError as err:
print("timeout!!!: ", str(err))
except KeyboardInterrupt:
running=False
except BaseException as err:
print("Err:", err)
sync wrapper usage
from amqp_client_python import EventbusWrapperRabbitMQ, Config, Options
config = Config(Options("queue", "rpc_queue", "rpc_exchange"))
eventbus = EventbusWrapperRabbitMQ(config=config)
async def subscribe_handler(body) -> None:
print(f"{body}", type(body), flush=True)
async def rpc_provider_handler(body) -> bytes:
print(f"handle - {body}", type(body), flush=True)
return f"{body}".encode("utf-8")
# rpc_provider
eventbus.provide_resource("user.find", rpc_provider_handler).result()
# subscribe
eventbus.subscribe("rpc_exchange", "routing.key", subscribe_handler).result()
count = 0
running = True
while running:
try:
count += 1
# rpc_client call
eventbus.rpc_client("rpc_exchange", "user.find", count).result().decode("utf-8")
# publish
eventbus.publish("rpc_exchange", "routing.key", "message_content").result()
#running = False
except KeyboardInterrupt:
running=False
except BaseException as err:
print("Err:", err)
Sponsors
The library is provided by NUTES-UEPB.
Know Limitations:
basic eventbus
When using EventbusRabbitMQ Should not use rpc call inside of rpc provider or subscribe handlers, it may block the ioloop #/obs: fixed on other kinds of eventbus, will be removed on nexts releases
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
amqp_client_python-0.2.0.tar.gz
(28.4 kB
view details)
Built Distribution
File details
Details for the file amqp_client_python-0.2.0.tar.gz
.
File metadata
- Download URL: amqp_client_python-0.2.0.tar.gz
- Upload date:
- Size: 28.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.10.12 Linux/5.15.153.1-microsoft-standard-WSL2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | da48b1046037facf56cd39354181159f3be9d47a49ac93408eadda99b3ff800f |
|
MD5 | a08f8ac5594052a2906b76a688581a29 |
|
BLAKE2b-256 | 3188d5bc3291c18af79bb2418ab8563aa431ae65b58a96670c284c4a58a20f88 |
File details
Details for the file amqp_client_python-0.2.0-py3-none-any.whl
.
File metadata
- Download URL: amqp_client_python-0.2.0-py3-none-any.whl
- Upload date:
- Size: 40.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.10.12 Linux/5.15.153.1-microsoft-standard-WSL2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d3f9407899b399d4aae3d617e0d1fca7a6f3385dba643dfd433dfb62419eeac4 |
|
MD5 | ea643c720bd32041590d827c014e540a |
|
BLAKE2b-256 | abfb785ae7924d1321455c1733a62c402465252afde1f32de5f606c281306bb7 |