simple Helper library to configure AMQP communication
Project description
Introduction
amqp_helper
aims to be a simple Helper library to configure AMQP communication for use with aio-pika
To achieve this goal this Library provides the AMQPConfig
class.
This package also provides a log handler to send logs to an AMQP Broker. You can use it via the class AMQPLogHandler
Installation
amqp_helper
can be installed in multiple ways. The easiest Solution is to install it with pip
.
via pip
python3 -m pip install amqp-helper
from source
git clone https://github.com/bad-microservices/amqp_helper.git
cd amqp_helper
python3 -m pip install .
Example
import asyncio
from amqp_helper import AMQPConfig
from aio_pika import connect_robust
amqp_config = AMQPConfig(username="test",password="testpw",vhost="testvhost")
async def main():
connection = await connect_robust(**amqp_config.aio_pika())
# do some amqp stuff
if __name__ == "__main__":
asyncio.run(main())
Example RPC over AMQP
Server code
The Server code is quite simple
import asyncio
from amqp_helper import AMQPConfig, AMQPService, new_amqp_func
amqp_config = AMQPConfig(username="test",password="testpw",vhost="testvhost")
async def testfunc(throw_value_error = False,throw_key_error = False, throw_exception = False*args, **kwargs):
if throw_value_error:
raise ValueError()
if throw_key_error:
raise KeyError()
if throw_exception:
raise Exception()
return {"result": "sync stuff"}
rpc_fun = new_amqp_func("test1", test1234)
@rpc_fun.exception_handler(ValueError, KeyError)
async def handle_value_error(*args, **kwargs):
retrun "got ValueError or KeyError"
@rpc_fun.exception_handler(Exception)
async def handle_value_error(*args, **kwargs):
return "got Exception"
async def main():
service = await AMQPService().connect(amqp_config)
await service.register_function(rpc_fun)
await service.serve()
# do some amqp stuff
if __name__ == "__main__":
asyncio.run(main())
Client
import asyncio
from amqp_helper import AMQPConfig, AMQPClient
amqp_config = AMQPConfig(username="test",password="testpw",vhost="testvhost")
async def main():
client = await AMQPClient().connect(amqp_config)
print(await client.call(None,"test1"))
if __name__ == "__main__":
asyncio.run(main())
Logging to AMQP
if we want to log to an AMQP Topic we can do it with the following example code.
import logging
from amqp_helper import AMQPLogHandler
log_cfg = AMQPConfig(username="test",password="testpw",vhost="testvhost")
handler = AMQPLogHandler(amqp_config=log_cfg, exchange_name="amqp.topic")
root_logger= logging.getLogger()
root_logger.addHandler(handler)
root_logger.info("test log message")
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file amqp_helper-0.1.5.tar.gz
.
File metadata
- Download URL: amqp_helper-0.1.5.tar.gz
- Upload date:
- Size: 8.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.9.19
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9caee1c81ff85c79f7553fb86c9233e8f9a5d275abd72b0a9b6cdb6cc13287d2 |
|
MD5 | 076f4cb3f5735a209c98c8b3bac3fb5f |
|
BLAKE2b-256 | cf7c75b0153354e2a86094263d137db76da27d772e00ec2fe9b92b82cf4fb840 |
File details
Details for the file amqp_helper-0.1.5-py3-none-any.whl
.
File metadata
- Download URL: amqp_helper-0.1.5-py3-none-any.whl
- Upload date:
- Size: 9.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.9.19
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1f994feae78ff11bc7db2d952c5325cb9f8d46f48e38ef2b5d3e0a946edd4413 |
|
MD5 | acff2a242f44ddc4f936c1e80125daa8 |
|
BLAKE2b-256 | 954f412b4afa7165d950c33639f72a89e6c0422b620768aef186c4df2a4c0244 |