Skip to main content

A RabbitMQ client for the SSEC.

Project description

QuickMQ

pipeline status coverage report PyPI version shields.io PyPI pyversions

An easy-to-use AMQP/RabbitMQ publisher created for use at the SSEC.

Description

QuickMQ provides a high-level API to simplify common publishing patterns, which allows event processors at SDS to focus on processing events instead of handling network problems.

  • Automatically reconnect and handle errors.
  • Publishing to multiple servers at once.
  • Guaranteeing message delivery.
  • Publishing messages to a RabbitMQ cluster.

To see the requirements of this project check out the reqs and specs doc.

Please note that QuickMQ cannot currently do the following.

  • Declare AMQP 0-9-1 topology (exchanges, queues).
  • Consume messages from a server.

Installation

Requirements

  • Python >= 3.6

Note: QuickMQ has only been tested on OSX and RHEL systems.

To install the latest stable version use pip.

pip install --upgrade quickmq

In addition to being on PyPi, QuickMQ is also deployed to the SSEC's GitLab package registry.

pip install --upgrade --index-url https://gitlab.ssec.wisc.edu/api/v4/projects/2625/packages/pypi/simple quickmq

To install the latest (possibly unstable) developer version use git.

git clone https://gitlab.ssec.wisc.edu/mdrexler/ssec_amqp.git
cd ssec_amqp
pip install .

Usage

API

The easiest way to use QuickMQ is with the API at ssec_amqp.api.

import ssec_amqp.api as mq

# Connect to multiple servers at once
mq.connect('server1', 'server2', user='username', password='password', exchange='satellite')

# Additionally, connect to a RabbitMQ cluster
mq.connect('cluster1', 'cluster2', 'cluster3', user='user', password='pass', exchange='satellite', cluster=True)

QuickMQ will now manage the connections to 'server1', 'server2', and the cluster servers until mq.disconnect() is called or the program exits. This means that if a connection to a server is interrupted, QuickMQ will automatically retry connecting to that server without interrupting connections to other servers or blocking the calling script.

# ...continued from above
# Now when we publish messages they'll get delivered to 'server1', 'server2', and one of the cluster servers.
import logging

try:
    while True:
        deliv_status = mq.publish({'msg': 'Hi from QuickMQ!'}, route_key='my.custom.topic')
        for con, stat in deliv_status.items():
            if stat != "DELIVERED":
                logging.warning("Couldn't deliver message to %s!", con)
except KeyboardInterrupt:
    logging.info("goodbye!")

mq.publish will return a dictionary that contains the status of the message delivery to each server.

  • "DELIVERED" means that the message was successfully delivered and acknowledged by the server.
  • "DROPPED" means that the message wasn't sent to the server (because the connection is actively reconnecting).
  • "REJECTED" means that the message was nacked by the server.

Note: QuickMQ will JSON-serialize messages passed to mq.publish, so make sure to pass the Python object itself and not a JSON string.

It's possible to see the current connection status for each server using mq.status.

# ...continued from above

con_status = mq.status()
for con, status in con_status.items():
    if status != "CONNECTED":
        logging.warning("Not currently connected to %s!", con)

mq.status will return a dictionary that contains the connection status to each server. The values are pretty self-explanatory; "CONNECTED", "RECONNECTING", and "DISCONNECTED". Something to keep in mind is mq.status won't actively check the connection, so a server with a "CONNECTED" status could still fail to publish and then be shown as "RECONNECTING".

Classes

If the API doesn't provide enough flexibility, it's also possible to use the classes that drive it directly.

# Recreate the code blocks above without using the API.

import logging

from ssec_amqp import AmqpClient, AmqpConnection, ClusteredConnection

client = AmqpClient(name='my_client') # Optionally give it a name for logging.

servers = ['server1', 'server2']

for server in servers:
    client.connect(AmqpConnection(server, user='user', password='pass', exchange='satellite'))

cluster_servers = ['cluster1', 'cluster2', 'cluster3']

client.connect(ClusteredConnection([AmqpConnection(server, user='user', password='pass') for server in cluster_servers]))

try:
    while True:
        deliv_status = client.publish({'msg': "hi from QuickMQ!'}, route_key='my.custom.topic')
        for con, stat in deliv_status.items():
            if stat != "DELIVERED":
                logging.warning("Couldn't deliver message to %s!", con)
except KeyboardInterrupt:
    pass

con_status = client.connections
for con, status in con_status.items():
    if status != "CONNECTED":
        logging.warning("Not connected to %s!", con)

# Connections won't automatically disconnect when using a custom client
client.disconnect()

For more information, see the source code for the client and the connections.

Author

Created/Maintained by Max Drexler.

License

MIT License. See LICENSE for more information.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

quickmq-2.0.0.tar.gz (37.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

quickmq-2.0.0-py3-none-any.whl (17.2 kB view details)

Uploaded Python 3

File details

Details for the file quickmq-2.0.0.tar.gz.

File metadata

  • Download URL: quickmq-2.0.0.tar.gz
  • Upload date:
  • Size: 37.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: python-httpx/0.28.0

File hashes

Hashes for quickmq-2.0.0.tar.gz
Algorithm Hash digest
SHA256 85c94875eebe480bf0a27bd761c3c4b67ab3f9978b2227252440818fbc34ad9a
MD5 c0f4d41a89781acc73fee70fce05ceb2
BLAKE2b-256 1281e5d2255c869040de3cad0005a19aafbc37026f3898b2fe6bf52676b4b744

See more details on using hashes here.

File details

Details for the file quickmq-2.0.0-py3-none-any.whl.

File metadata

  • Download URL: quickmq-2.0.0-py3-none-any.whl
  • Upload date:
  • Size: 17.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: python-httpx/0.28.0

File hashes

Hashes for quickmq-2.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b0cdc2e2278e9681cf744ed0117150cea259736df97fd713746e0e12b277d716
MD5 c2d21438d47e8574cd00537035c5f214
BLAKE2b-256 6d019abfcd6d08aa182ecf75737e49b6314ba8f52f7ad2e950789babfc398f82

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page