simple Helper library to configure AMQP communication
Project description
Introduction
amqp_helper
aims to be a simple Helper library to configure AMQP communication for use with aio-pika
To achieve this goal this Library provides the AMQPConfig
class.
This package also provides a log handler to send logs to an AMQP Broker. You can use it via the class AMQPLogHandler
Installation
amqp_helper
can be installed in multiple ways. The easiest Solution is to install it with pip
.
via pip
python3 -m pip install amqp-helper
from source
git clone https://github.com/bad-microservices/amqp_helper.git
cd amqp_helper
python3 -m pip install .
Example
import asyncio
from amqp_helper import AMQPConfig
from aio_pika import connect_robust
amqp_config = AMQPConfig(username="test",password="testpw",vhost="testvhost")
async def main():
connection = await connect_robust(**amqp_config.aio_pika())
# do some amqp stuff
if __name__ == "__main__":
asyncio.run(main())
Example RPC over AMQP
Server code
The Server code is quite simple
import asyncio
from amqp_helper import AMQPConfig, AMQPService, new_amqp_func
amqp_config = AMQPConfig(username="test",password="testpw",vhost="testvhost")
async def testfunc(throw_value_error = False,throw_key_error = False, throw_exception = False*args, **kwargs):
if throw_value_error:
raise ValueError()
if throw_key_error:
raise KeyError()
if throw_exception:
raise Exception()
return {"result": "sync stuff"}
rpc_fun = new_amqp_func("test1", test1234)
@rpc_fun.exception_handler(ValueError, KeyError)
async def handle_value_error(*args, **kwargs):
retrun "got ValueError or KeyError"
@rpc_fun.exception_handler(Exception)
async def handle_value_error(*args, **kwargs):
return "got Exception"
async def main():
service = await AMQPService().connect(amqp_config)
await service.register_function(rpc_fun)
await service.serve()
# do some amqp stuff
if __name__ == "__main__":
asyncio.run(main())
Client
import asyncio
from amqp_helper import AMQPConfig, AMQPClient
amqp_config = AMQPConfig(username="test",password="testpw",vhost="testvhost")
async def main():
client = await AMQPClient().connect(amqp_config)
print(await client.call(None,"test1"))
if __name__ == "__main__":
asyncio.run(main())
Logging to AMQP
if we want to log to an AMQP Topic we can do it with the following example code.
import logging
from amqp_helper import AMQPLogHandler
log_cfg = AMQPConfig(username="test",password="testpw",vhost="testvhost")
handler = AMQPLogHandler(amqp_config=log_cfg, exchange_name="amqp.topic")
root_logger= logging.getLogger()
root_logger.addHandler(handler)
root_logger.info("test log message")
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
amqp_helper-0.1.4.tar.gz
(8.5 kB
view hashes)
Built Distribution
Close
Hashes for amqp_helper-0.1.4-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9e133a6d42ef4b2454901e2cfc5885190d0d1abb5d42278de2740261136cbbd8 |
|
MD5 | 0e6eea67c9c579e71bca9c9bfb56c5cd |
|
BLAKE2b-256 | a0d45e8dbada963af2fbfd94b5c14a52e107cc1e1d4429e803dd90c7b0cdaf7e |