Skip to main content

Mikkoo is a PgQ to RabbitMQ Relay

Project description

A PgQ to RabbitMQ relay.

Mikkoo is named for the rabbit in the “Clever Rabbit and the Elephant” fable.

Version Downloads Status

PgQ Setup

  1. Install pgq into your database and create the queue:

    # CREATE EXTENSION pgq;
    CREATE EXTENSION
    # SELECT pgq.create_queue('test');
    create_queue
    --------------
                1
    (1 row)
  2. Ensure that pgqd is running.

PgQ Event to AMQP Mapping

When inserting events into a PgQ queue, the pgq.insert_event/7 function should be used with the following field mappings:

PgQ Event

AMQP

ev_type

Routing Key

ev_data

Message body

ev_extra1

Exchange

ev_extra2

Content-Type Property

ev_extra3

AMQP Properties [1]

ev_extra4

Headers [2]

There is a convenience schema in the mikkoo.sql file that adds stored procedures for creating properly formatted mikkoo events in PgQ. In addition, there is are auditing functions that allow for the creation of an audit-log of events that were sent to PgQ.

AMQP Message Properties

The following table defines the available fields that can be set in a JSON blob in the ev_extra3 field when inserting an event.

Property

PgSQL Type

app_id

text

content_encoding

text

content_type

text

correlation_id

text

delivery_mode

int2

expiration

text

message_id

text

headers

text/json [3]

timestamp

int4

type

text

priority

int4

user_id

text

Values assigned in the JSON blob provided to ev_extra3 take precedence over the automatically assigned app_id, content_type, correlation_id, headers, and timestamp values created by Mikkoo at processing time.

Event Insertion Example

The following example inserts a JSON blob message body of {"foo": "bar"} that will be published to the postgres exchange in RabbitMQ using the test.routing-key routing key. The content type is specified in ev_extra2 and the AMQP type message property is specified in ev_extra3.

# SELECT pgq.insert_event('test', 'test.routing-key', '{"foo": "bar"}', 'postgres', 'application/json', '{"type": "example"}', '');
 insert_event
--------------
            4
(1 row)

When this message is received by RabbitMQ it will have a message body of:

{"foo": "bar"}

And it will have message properties similar to the following:

Property

Example Value

app_id

mikkoo

content_type

application/json

correlation_id

0ad6b212-4c84-4eb0-8782-9a44bdfe949f

timestamp

1449600290

type

example

Configuration

The Mikkoo configuration file uses YAML for markup and allows for one or more PgQ queue to be processed.

If you have a Sentry or a Sentry account, the Application/sentry_dsn setting will turn on sentry exception logging, if the raven client library is installed.

Queues are configured by name under the Application/workers stanza. The following example configures two workers for the processing of a queue named invoices. Each worker process connects to a local PostgreSQL and RabbitMQ instance using default credentials.

Application:
  workers:
     invoices:
       postgres_url: postgresql://localhost:5432/postgres
       rabbitmq_url: amqp://localhost:5672/%2f
       confirm: False

Queue Configuration Options

The following table details the configuration options available per queue:

Key

Description

confirm

Enable/Disable RabbitMQ Publisher Confirmations. Default: True

consumer_name

Overwrite the default PgQ consumer name. Default: mikkoo

max_failures

Maximum failures before discarding an event. Default: 10

postgresql_url

The url for connecting to PostgreSQL

rabbitmq_url

The AMQP url for connecting to RabbitMQ

retry_delay

How long in seconds until PgQ emits failed events. Default: 10

unregister

Unregister a consumer with PgQ on shutdown. Default: True

wait_duration

How long to wait before checking the queue after the last empty result. Default: 1

Example Configuration

The following is an example of a full configuration file:

Application:

  poll_interval: 10
  sentry_dsn: [YOUR SENTRY DSN]

  statsd:
    enabled: true
    host: localhost
    port: 8125

  workers:
    test:
      confirm: False
      consumer_name: my_consumer
      max_failures: 5
      postgres_url: postgresql://localhost:5432/postgres
      rabbitmq_url: amqp://localhost:5672/%2f
      retry_delay: 5
      unregister: False
      wait_duration: 5

Daemon:
  user: mikkoo
  pidfile: /var/run/mikkoo

Logging:
  version: 1
  formatters:
    verbose:
      format: '%(levelname) -10s %(asctime)s  %(process)-6d %(processName) -20s %(name) -18s: %(message)s'
      datefmt: '%Y-%m-%d %H:%M:%S'
  handlers:
    console:
      class: logging.StreamHandler
      formatter: verbose
      debug_only: True
  loggers:
    helper:
      handlers: [console]
      level: INFO
      propagate: true
    mikkoo:
      handlers: [console]
      level: INFO
      propagate: true
    pika:
      handlers: [console]
      level: ERROR
      propagate: true
    queries:
      handlers: [console]
      level: ERROR
      propagate: true
    tornado:
      handlers: [console]
      level: ERROR
      propagate: true
  root:
    handlers: [console]
    level: CRITICAL
    propagate: true
  disable_existing_loggers: true
  incremental: false

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mikkoo-0.4.1.tar.gz (19.0 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page