Skip to main content

Asynchronous RabbitMQ consumer job library for PostgreSQL

Project description

rabbitmq2psql-as-json

rabbitmq2psql-as-json is ready to use, basic asynchronous RabbitMQ consumer job library for PostgreSQL. It stops when queue is empty, so it can be useful for cron jobs, unit tests, CI/CD environments and production environments has slow datastream.

Installation

You can install this library easily with pip. pip install rabbitmq2psql-as-json

Usage

As a library

import os
import asyncio
import logging
from rabbitmq2psql_as_json import consume

if __name__ == '__main__':
    logger = logging.getLogger("rabbitmq2psql-as-json")
    logger.setLevel(os.environ.get('LOG_LEVEL', "DEBUG"))
    handler = logging.StreamHandler()
    handler.setFormatter(
        logging.Formatter(
            os.environ.get('LOG_FORMAT', "%(asctime)s [%(levelname)s] %(name)s: %(message)s")
        )
    )
    logger.addHandler(handler)

    config = {
      "mq_host": os.environ.get('MQ_HOST'),
	  "mq_port": int(os.environ.get('MQ_PORT')), 
	  "mq_vhost": os.environ.get('MQ_VHOST'),
	  "mq_user": os.environ.get('MQ_USER'),
	  "mq_pass": os.environ.get('MQ_PASS'),
	  "mq_queue": os.environ.get('MQ_QUEUE'),
      "mq_exchange": os.environ.get('MQ_EXCHANGE'),
      "mq_routing_key": os.environ.get('MQ_ROUTING_KEY'),
	  "db_host": os.environ.get('DB_HOST'),
	  "db_port": int(os.environ.get('DB_PORT')),
	  "db_user": os.environ.get('DB_USER'),
	  "db_pass": os.environ.get('DB_PASS'),
	  "db_database": os.environ.get('DB_DATABASE') 
    }

    sql_template = """insert into logs (body) values (%s);""" 

    loop = asyncio.get_event_loop()
    loop.run_until_complete(
        consume(
          loop=loop,
          consumer_pool_size=10,
          sql_template=sql_template,
          config=config
        )
    )

loop.close()

This library uses aio_pika and aiopg packages.

Standalone

You can also call this library as standalone consumer job command. Just set required environment variables and run rabbitmq2psql-as-json. This usecase perfectly fits when you need run it on cronjobs or kubernetes jobs.

Required environment variables:

  • MQ_HOST
  • MQ_PORT (optional)
  • MQ_VHOST
  • MQ_USER
  • MQ_PASS
  • MQ_QUEUE
  • MQ_QUEUE_DURABLE (optional, default value: True)
  • MQ_EXCHANGE (Exchange for dead letter queue, aka records with error queue)
  • MQ_ROUTING_KEY (Routing key for dead letter queue)
  • DB_HOST
  • DB_PORT (optional)
  • DB_USER
  • DB_PASS
  • DB_DATABASE
  • SQL_TEMPLATE (DB Insert query template. Ex: insert into logs (body) values (%s);)
  • CONSUMER_POOL_SIZE (optional, default value: 10)
  • LOG_LEVEL (Logging level. See: Python logging module docs)

Example Kubernetes job: You can see it to kube.yaml

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

rabbitmq2psql_as_json-1.0.4-py3-none-any.whl (8.8 kB view details)

Uploaded Python 3

File details

Details for the file rabbitmq2psql_as_json-1.0.4-py3-none-any.whl.

File metadata

File hashes

Hashes for rabbitmq2psql_as_json-1.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 d33b19e8b3b30b7ad575427b2263072471fe4bcf1f2fcb2cc2cc67b733b90dde
MD5 a41d29bd7ca271f1339b6fc9363a18de
BLAKE2b-256 ac05a5280a7e48bffd50768e925b72abfce2a0328a32c812e35db64c1d729a94

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page