Python AMQP Client Library
Project description
AMQP Client Python
Client with high level of abstraction for manipulation of messages in the event bus RabbitMQ.
Documentation: https://nutes-uepb.github.io/amqp-client-python
Source Code: https://github.com/nutes-uepb/amqp-client-python
Features:
- Automatic creation and management of queues, exchanges and channels;
- Connection persistence and auto reconnect;
- Support for direct, topic and fanout exchanges;
- Publish;
- Subscribe;
- Support for a Remote procedure call (RPC).
Table of Compatibility
| version | compatible with |
|---|---|
| 0.2.0 | 0.2.0 |
| 0.1.14 | ~0.1.12 |
Examples:
you can use sync , async eventbus and sync wrapper of async eventbus
async usage
# basic configuration
from amqp_client_python import (
AsyncEventbusRabbitMQ,
Config, Options
)
config = Config(Options("queue", "rpc_queue", "rpc_exchange"))
eventbus = AsyncEventbusRabbitMQ(config)
# publish
eventbus.publish("rpc_exchange", "routing.key", "message_content")
# subscribe
async def subscribe_handler(body) -> None:
print(body, type(body), flush=True) # handle messages
await eventbus.subscribe("rpc_exchange", "routing.key", subscribe_handler)
# rpc_publish
response = await eventbus.rpc_client("rpc_exchange", "user.find", "message_content")
# provider
async def rpc_provider_handler(body) -> bytes:
print(f"body: {body}")
return b"content"
await eventbus.provide_resource("user.find", rpc_provider_handler)
sync usage(deprecated)
from amqp_client_python import (
EventbusRabbitMQ,
Config, Options
)
from amqp_client_python.event import IntegrationEvent, IntegrationEventHandler
from examples.default import queue, rpc_queue, rpc_exchange, rpc_routing_key
class ExampleEvent(IntegrationEvent):
EVENT_NAME: str = "ExampleEvent"
ROUTING_KEY: str = rpc_routing_key
def __init__(self, event_type: str, message = []) -> None:
super().__init__(self.EVENT_NAME, event_type)
self.message = message
self.routing_key = self.ROUTING_KEY
class ExampleEventHandler(IntegrationEventHandler):
def handle(self, body) -> None:
print(body,"subscribe")
config = Config(Options(queue, rpc_queue, rpc_exchange))
eventbus = EventbusRabbitMQ(config=config)
class ExampleEvent(IntegrationEvent):
EVENT_NAME: str = "ExampleEvent"
def __init__(self, event_type: str, message = []) -> None:
super().__init__(self.EVENT_NAME, event_type)
self.message = message
from time import sleep
from random import randint
def handle(*body):
print(body[0], "rpc_provider")
return f"{body[0]}".encode("utf-8")
subscribe_event = ExampleEvent(rpc_exchange)
publish_event = ExampleEvent(rpc_exchange, ["message"])
subscribe_event_handle = ExampleEventHandler()
eventbus.subscribe(subscribe_event, subscribe_event_handle, rpc_routing_key)
eventbus.provide_resource(rpc_routing_key+"2", handle)
count = 0
running = True
from concurrent.futures import TimeoutError
while running:
try:
count += 1
if str(count) != eventbus.rpc_client(rpc_exchange, rpc_routing_key+"2", [f"{count}"]).decode("utf-8"):
running = False
#eventbus.publish(publish_event, rpc_routing_key, "message_content")
#running = False
except TimeoutError as err:
print("timeout!!!: ", str(err))
except KeyboardInterrupt:
running=False
except BaseException as err:
print("Err:", err)
sync wrapper usage
from amqp_client_python import EventbusWrapperRabbitMQ, Config, Options
config = Config(Options("queue", "rpc_queue", "rpc_exchange"))
eventbus = EventbusWrapperRabbitMQ(config=config)
async def subscribe_handler(body) -> None:
print(f"{body}", type(body), flush=True)
async def rpc_provider_handler(body) -> bytes:
print(f"handle - {body}", type(body), flush=True)
return f"{body}".encode("utf-8")
# rpc_provider
eventbus.provide_resource("user.find", rpc_provider_handler).result()
# subscribe
eventbus.subscribe("rpc_exchange", "routing.key", subscribe_handler).result()
count = 0
running = True
while running:
try:
count += 1
# rpc_client call
eventbus.rpc_client("rpc_exchange", "user.find", count).result().decode("utf-8")
# publish
eventbus.publish("rpc_exchange", "routing.key", "message_content").result()
#running = False
except KeyboardInterrupt:
running=False
except BaseException as err:
print("Err:", err)
Sponsors
The library is provided by NUTES-UEPB.
Know Limitations:
basic eventbus
When using EventbusRabbitMQ Should not use rpc call inside of rpc provider or subscribe handlers, it may block the ioloop #/obs: fixed on other kinds of eventbus, will be removed on nexts releases
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file amqp_client_python-0.2.0.tar.gz.
File metadata
- Download URL: amqp_client_python-0.2.0.tar.gz
- Upload date:
- Size: 28.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.10.12 Linux/5.15.153.1-microsoft-standard-WSL2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
da48b1046037facf56cd39354181159f3be9d47a49ac93408eadda99b3ff800f
|
|
| MD5 |
a08f8ac5594052a2906b76a688581a29
|
|
| BLAKE2b-256 |
3188d5bc3291c18af79bb2418ab8563aa431ae65b58a96670c284c4a58a20f88
|
File details
Details for the file amqp_client_python-0.2.0-py3-none-any.whl.
File metadata
- Download URL: amqp_client_python-0.2.0-py3-none-any.whl
- Upload date:
- Size: 40.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.10.12 Linux/5.15.153.1-microsoft-standard-WSL2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d3f9407899b399d4aae3d617e0d1fca7a6f3385dba643dfd433dfb62419eeac4
|
|
| MD5 |
ea643c720bd32041590d827c014e540a
|
|
| BLAKE2b-256 |
abfb785ae7924d1321455c1733a62c402465252afde1f32de5f606c281306bb7
|