simple Helper library to configure AMQP communication
Project description
Introduction
amqp_helper aims to be a simple Helper library to configure AMQP communication for use with aio-pika
To achieve this goal this Library provides the AMQPConfig class.
This package also provides a log handler to send logs to an AMQP Broker. You can use it via the class AMQPLogHandler
Installation
amqp_helper can be installed in multiple ways. The easiest Solution is to install it with pip.
via pip
python3 -m pip install amqp-helper
from source
git clone https://github.com/bad-microservices/amqp_helper.git
cd amqp_helper
python3 -m pip install .
Example
import asyncio
from amqp_helper import AMQPConfig
from aio_pika import connect_robust
amqp_config = AMQPConfig(username="test",password="testpw",vhost="testvhost")
async def main():
connection = await connect_robust(**amqp_config.aio_pika())
# do some amqp stuff
if __name__ == "__main__":
asyncio.run(main())
Example RPC over AMQP
Server code
The Server code is quite simple
import asyncio
from amqp_helper import AMQPConfig, AMQPService, new_amqp_func
amqp_config = AMQPConfig(username="test",password="testpw",vhost="testvhost")
async def testfunc(throw_value_error = False,throw_key_error = False, throw_exception = False*args, **kwargs):
if throw_value_error:
raise ValueError()
if throw_key_error:
raise KeyError()
if throw_exception:
raise Exception()
return {"result": "sync stuff"}
rpc_fun = new_amqp_func("test1", test1234)
@rpc_fun.exception_handler(ValueError, KeyError)
async def handle_value_error(*args, **kwargs):
retrun "got ValueError or KeyError"
@rpc_fun.exception_handler(Exception)
async def handle_value_error(*args, **kwargs):
return "got Exception"
async def main():
service = await AMQPService().connect(amqp_config)
await service.register_function(rpc_fun)
await service.serve()
# do some amqp stuff
if __name__ == "__main__":
asyncio.run(main())
Client
import asyncio
from amqp_helper import AMQPConfig, AMQPClient
amqp_config = AMQPConfig(username="test",password="testpw",vhost="testvhost")
async def main():
client = await AMQPClient().connect(amqp_config)
print(await client.call(None,"test1"))
if __name__ == "__main__":
asyncio.run(main())
Logging to AMQP
if we want to log to an AMQP Topic we can do it with the following example code.
import logging
from amqp_helper import AMQPLogHandler
log_cfg = AMQPConfig(username="test",password="testpw",vhost="testvhost")
handler = AMQPLogHandler(amqp_config=log_cfg, exchange_name="amqp.topic")
root_logger= logging.getLogger()
root_logger.addHandler(handler)
root_logger.info("test log message")
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file amqp_helper-0.1.7.tar.gz.
File metadata
- Download URL: amqp_helper-0.1.7.tar.gz
- Upload date:
- Size: 8.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.9.19
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8cdcea122f3d0996692ebf82dcc47118d069c1bcd95c2755c9b6a62f4b7f5d32
|
|
| MD5 |
d96eb78b36dfafcf0a928a939ffaacd1
|
|
| BLAKE2b-256 |
98599e81c580ec155ec3247be701e22b6a79a4c656322dd02566ca6714d7f4cd
|
File details
Details for the file amqp_helper-0.1.7-py3-none-any.whl.
File metadata
- Download URL: amqp_helper-0.1.7-py3-none-any.whl
- Upload date:
- Size: 9.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.9.19
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ad6878375f6bf13071adced797ca839fecba7e2adb9c59f6647438c2701cf54f
|
|
| MD5 |
bc8b385863e54a8408f99f4da85d54b7
|
|
| BLAKE2b-256 |
6aa82ab9aae509dc175743cc789381892d0245a5c0815227bd7a01a5b0550021
|