Asynchronous RabbitMQ consumer job library for PostgreSQL
Project description
rabbitmq2psql-as-json
rabbitmq2psql-as-json is ready to use, basic asynchronous RabbitMQ consumer job library for PostgreSQL. It stops when queue is empty, so it can be useful for cron jobs, unit tests, CI/CD environments and production environments has slow datastream.
Installation
You can install this library easily with pip.
pip install rabbitmq2psql-as-json
Usage
As a library
import os
import asyncio
import logging
from rabbitmq2psql_as_json import consume
if __name__ == '__main__':
logger = logging.getLogger("rabbitmq2psql-as-json")
logger.setLevel(os.environ.get('LOG_LEVEL', "DEBUG"))
handler = logging.StreamHandler()
handler.setFormatter(
logging.Formatter(
os.environ.get('LOG_FORMAT', "%(asctime)s [%(levelname)s] %(name)s: %(message)s")
)
)
logger.addHandler(handler)
config = {
"mq_host": os.environ.get('MQ_HOST'),
"mq_port": int(os.environ.get('MQ_PORT')),
"mq_vhost": os.environ.get('MQ_VHOST'),
"mq_user": os.environ.get('MQ_USER'),
"mq_pass": os.environ.get('MQ_PASS'),
"mq_queue": os.environ.get('MQ_QUEUE'),
"mq_exchange": os.environ.get('MQ_EXCHANGE'),
"mq_routing_key": os.environ.get('MQ_ROUTING_KEY'),
"db_host": os.environ.get('DB_HOST'),
"db_port": int(os.environ.get('DB_PORT')),
"db_user": os.environ.get('DB_USER'),
"db_pass": os.environ.get('DB_PASS'),
"db_database": os.environ.get('DB_DATABASE')
}
sql_template = """insert into logs (body) values (%s);"""
loop = asyncio.get_event_loop()
loop.run_until_complete(
consume(
loop=loop,
consumer_pool_size=10,
sql_template=sql_template,
config=config
)
)
loop.close()
This library uses aio_pika and aiopg packages.
Standalone
You can also call this library as standalone consumer job command. Just set required environment variables and run rabbitmq2psql-as-json
. This usecase perfectly fits when you need run it on cronjobs or kubernetes jobs.
Required environment variables:
- MQ_HOST
- MQ_PORT (optional)
- MQ_VHOST
- MQ_USER
- MQ_PASS
- MQ_QUEUE
- MQ_QUEUE_DURABLE (optional, default value: True)
- MQ_EXCHANGE (Exchange for dead letter queue, aka records with error queue)
- MQ_ROUTING_KEY (Routing key for dead letter queue)
- DB_HOST
- DB_PORT (optional)
- DB_USER
- DB_PASS
- DB_DATABASE
- SQL_TEMPLATE (DB Insert query template. Ex:
insert into logs (body) values (%s);
) - CONSUMER_POOL_SIZE (optional, default value: 10)
- LOG_LEVEL (Logging level. See: Python logging module docs)
Example Kubernetes job: You can see it to kube.yaml
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
File details
Details for the file rabbitmq2psql_as_json-1.0.4-py3-none-any.whl
.
File metadata
- Download URL: rabbitmq2psql_as_json-1.0.4-py3-none-any.whl
- Upload date:
- Size: 8.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.9.13
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d33b19e8b3b30b7ad575427b2263072471fe4bcf1f2fcb2cc2cc67b733b90dde |
|
MD5 | a41d29bd7ca271f1339b6fc9363a18de |
|
BLAKE2b-256 | ac05a5280a7e48bffd50768e925b72abfce2a0328a32c812e35db64c1d729a94 |