Python AMQP Client Library
Project description
AMQP Client Python
A Python client providing a high-level abstraction for interacting with RabbitMQ, simplifying message publishing, subscribing, and RPC patterns.
Documentation: https://nutes-uepb.github.io/amqp-client-python
Source Code: https://github.com/nutes-uepb/amqp-client-python
Discord Server: https://discord.gg/RkXNeZpNZk
🦀 Looking for higher performance? Try amqp-rs!
If you need higher throughput, thread safety, and built-in compression, check out amqp-rs.
It is a high-performance Python extension developed in Rust using PyO3 and tokio. It shares a similar API and design philosophy with this library, but adds native thread safety, zstd/zlib/lz4 compression, robust TLS/SSL support, and graceful shutdowns.
You can install it via pip or uv:
pip install amqp-rs
Features:
- Automatic creation and management of queues, exchanges and channels;
- Connection persistence and auto reconnect;
- Support for direct, topic and fanout exchanges;
- Publish;
- Subscribe;
- Support for a Remote procedure call (RPC).
Table of Compatibility
| version | compatible with |
|---|---|
| 0.2.0 | 0.2.0 |
| 0.1.14 | ~0.1.12 |
Installation
You can install amqp-client-python using pip:
pip install amqp-client-python
Prerequisites
- Python 3.7+
- A running RabbitMQ instance.
Examples:
you can use sync , async eventbus and sync wrapper of async eventbus
async usage
# basic configuration
from amqp_client_python import (
AsyncEventbusRabbitMQ,
Config, Options
)
config = Config(Options("queue", "rpc_queue", "rpc_exchange"))
eventbus = AsyncEventbusRabbitMQ(config)
# publish
eventbus.publish("rpc_exchange", "routing.key", "message_content")
# subscribe
async def subscribe_handler(body) -> None:
print(body, type(body), flush=True) # handle messages
await eventbus.subscribe("rpc_exchange", "routing.key", subscribe_handler)
# rpc_publish
response = await eventbus.rpc_client("rpc_exchange", "user.find", "message_content")
# provider
async def rpc_provider_handler(body) -> bytes:
print(f"body: {body}")
return b"content"
await eventbus.provide_resource("user.find", rpc_provider_handler)
sync usage(deprecated)
from amqp_client_python import (
EventbusRabbitMQ,
Config, Options
)
from amqp_client_python.event import IntegrationEvent, IntegrationEventHandler
from examples.default import queue, rpc_queue, rpc_exchange, rpc_routing_key
class ExampleEvent(IntegrationEvent):
EVENT_NAME: str = "ExampleEvent"
ROUTING_KEY: str = rpc_routing_key
def __init__(self, event_type: str, message = []) -> None:
super().__init__(self.EVENT_NAME, event_type)
self.message = message
self.routing_key = self.ROUTING_KEY
class ExampleEventHandler(IntegrationEventHandler):
def handle(self, body) -> None:
print(body,"subscribe")
config = Config(Options(queue, rpc_queue, rpc_exchange))
eventbus = EventbusRabbitMQ(config=config)
class ExampleEvent(IntegrationEvent):
EVENT_NAME: str = "ExampleEvent"
def __init__(self, event_type: str, message = []) -> None:
super().__init__(self.EVENT_NAME, event_type)
self.message = message
from time import sleep
from random import randint
def handle(*body):
print(body[0], "rpc_provider")
return f"{body[0]}".encode("utf-8")
subscribe_event = ExampleEvent(rpc_exchange)
publish_event = ExampleEvent(rpc_exchange, ["message"])
subscribe_event_handle = ExampleEventHandler()
eventbus.subscribe(subscribe_event, subscribe_event_handle, rpc_routing_key)
eventbus.provide_resource(rpc_routing_key+"2", handle)
count = 0
running = True
from concurrent.futures import TimeoutError
while running:
try:
count += 1
if str(count) != eventbus.rpc_client(rpc_exchange, rpc_routing_key+"2", [f"{count}"]).decode("utf-8"):
running = False
#eventbus.publish(publish_event, rpc_routing_key, "message_content")
#running = False
except TimeoutError as err:
print("timeout!!!: ", str(err))
except KeyboardInterrupt:
running=False
except BaseException as err:
print("Err:", err)
sync wrapper usage
from amqp_client_python import EventbusWrapperRabbitMQ, Config, Options
config = Config(Options("queue", "rpc_queue", "rpc_exchange"))
eventbus = EventbusWrapperRabbitMQ(config=config)
async def subscribe_handler(body) -> None:
print(f"{body}", type(body), flush=True)
async def rpc_provider_handler(body) -> bytes:
print(f"handle - {body}", type(body), flush=True)
return f"{body}".encode("utf-8")
# rpc_provider
eventbus.provide_resource("user.find", rpc_provider_handler).result()
# subscribe
eventbus.subscribe("rpc_exchange", "routing.key", subscribe_handler).result()
count = 0
running = True
while running:
try:
count += 1
# rpc_client call
eventbus.rpc_client("rpc_exchange", "user.find", count).result().decode("utf-8")
# publish
eventbus.publish("rpc_exchange", "routing.key", "message_content").result()
#running = False
except KeyboardInterrupt:
running=False
except BaseException as err:
print("Err:", err)
Sponsors
The library is provided by NUTES-UEPB.
Know Limitations:
basic eventbus
When using EventbusRabbitMQ Should not use rpc call inside of rpc provider or subscribe handlers, it may block the ioloop #/obs: fixed on other kinds of eventbus, will be removed on nexts releases
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file amqp_client_python-0.2.0.post1.tar.gz.
File metadata
- Download URL: amqp_client_python-0.2.0.post1.tar.gz
- Upload date:
- Size: 152.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.4 {"installer":{"name":"uv","version":"0.10.4","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Pop!_OS","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d34d2a35d9044222ea8db4e68ffdbd9787e71889846637f7b6a7fd2ec87ec265
|
|
| MD5 |
1a17acd1d26702d00c1871e9d9548610
|
|
| BLAKE2b-256 |
b29188238ef31e340a0a042241c78850dd48b5c953be8a1f99e00a17323c455f
|
File details
Details for the file amqp_client_python-0.2.0.post1-py3-none-any.whl.
File metadata
- Download URL: amqp_client_python-0.2.0.post1-py3-none-any.whl
- Upload date:
- Size: 43.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.4 {"installer":{"name":"uv","version":"0.10.4","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Pop!_OS","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d8aab177ecb8477a8294aaabc8343f825f2aea6b2d405193b91079ec18244613
|
|
| MD5 |
87c079158776bcd473049a07eeb1a29d
|
|
| BLAKE2b-256 |
ca84fc9d14d90232b9993ad08ebe3736fde4c40206ad051ba706b373bd8a3f06
|