A file based serverless messaging queue
Project description
Dead Drop Messaging Queue
ddmq is a file based and serverless messaging queue, aimed at providing a low throughput* messaging queue when you don't want to rely on a server process to handle your requests. It will create a directory for every queue you create and each message is stored as a JSON objects in a file. ddmq will keep track of which messages has been consumed and will requeue messages that have not been acknowledged by the consumers after a set timeout. Since there is no server handling the messages, the houskeeping is done by the clients as they interact with the queue.
ddmq is written in Python and should work for both Python 2.7+ and Python 3+, and can also be run as a command-line tool either by specifying the order as options and arguments, or by supplying the operation as a JSON object.
* It could handle ~5000-6000 messages per minute (not via CLI) on a SSD based laptop (~10% of RabbitMQ on the same hardware), but other processes competing for file access will impact performance.
Requirements
| Python 2.7+ or 3+, should work with both. | Additional modules required: pyyaml | Additional modules recommended: beautifultable
Installation
::
$ pip install ddmq
Key Features
-
serverless
-
file based
-
First in - first out, within the same priority level
-
outputs plain text, json or yaml
-
input json packaged operations via command-line
-
global and queue specific settings
- custom message expiry time lengths
- limit the number of times a message will be requeued after exipry
-
message specific settings
- set custom priority of messages (all integers >= 0 are valid, lower number = higher priority)
- all other message properties can also be changed per message
Command-Line Usage
::
usage: ddmq <command> [<args>]
The available commands are:
view List queues and number of messages
create Create a queue
delete Delete a queue
publish Publish message to queue
consume Consume message from queue
ack Positivly acknowledge a message
nack Negativly acknowledge a message (possibly requeue)
purge Purge all messages from queue
clean Clean out expired messages from queue
json Run a command packaged as a JSON object
For more info about the commands, run
ddmq <command> -h
Command-line interface to Dead Drop Messaging Queue (ddmq).
positional arguments:
command Subcommand to run
optional arguments:
-h, --help show this help message and exit
-v, --version print version
Examples:
# create a new queue and publish a message to it
$ ddmq create -f /tmp/ddmq queue_name
$ ddmq publish /tmp/ddmq queue_name "Hello World!"
# consume a message from a queue
$ ddmq consume /tmp/ddmq queue_name
# view all queues present in the specified root directory
$ ddmq view /tmp/ddmq
# remove all messages from a queue
$ ddmq purge /tmp/ddmq queue_name
# delete a queue
$ ddmq delete /tmp/ddmq queue_name
Python Module Usage
::
# imports both the broker and message module
import ddmq
# create the broker object and specify the path to the root directory
# adding create=True to tell it to create and initiate both the root
# directory and queue directories if they don't already exist
b = ddmq.broker('/tmp/ddmq', create=True)
# publish a message to the specified queue
b.publish(queue='queue_name', msg_text='Hello World!')
# consume a single message from the specified queue
msg = b.consume(queue='queue_name')
# print the message contained
print(msg[0].message)
File Structure
The structure ddmq uses to handle the messages consists of a root directory, with subfolders for each created queue. The messages waiting in a queue are stored in the queue's folder, and messages that have been consumed but not yet acknowledged are stored in the queue's work directory.
::
root/
├── ddmq.yaml
├── queue_one
│ ├── 999.3.ddmqfc24476c6708416caa2a101845dddd9a
│ ├── ddmq.yaml
│ └── work
│ ├── 1538638378.999.1.ddmq39eb64e1913143aa8d28d9158f089006
│ └── 1538638379.999.2.ddmq1ed12af3760e4adfb62a9109f9b61214
└── queue_two
├── 999.1.ddmq6d8742dbde404d5ab556bf229151f66b
├── 999.2.ddmq15463a6680f942489d54f1ec78a53673
├── ddmq.yaml
└── work
In the example above there are two queues created (queue_one, queue_two) and both have messages published to them. In queue_one there are two messages that have been consumed already, but not yet acknowledged (acked), so the messages are stored in the queue_one's work folder. As soon as a message is acked the message will be deleted by default. Messages that are negatively acknowledged (nacked) will be requeue by default.
Both the root directory and each queue subfolder will contain config files named ddmq.yaml that contains the settings to be used. The root's config file will override the default values, and the queue's config files will override both the default values and the root's config file. If a message is given specific settings when being published/consumed, these settings will override all the ddmq.yaml files.
The message files themselves contain a JSON string with all the properties that make up a message object.
::
{"priority": 999, "queue_number": 2, "requeue_counter": 0, "filename": "queue_one/999.2.ddmq1ed12af3760e4adfb62a9109f9b61214", "queue": "queue_one", "requeue_limit": null, "timeout": null, "message": "msg", "requeue": false, "id": "1ed12af3760e4adfb62a9109f9b61214"}
ddmq.yaml
The config files in the root and queue directories in YAML format. The parameters that can be changed and their default values are:
::
cleaned: 0 # epoch timestamp when the queue was last cleaned
message_timeout: 600 # the number of seconds after which it will be considered expired, after a message is consumed
priority: 999 # the default priority level of published messages. lower number = higher priority
requeue: true # nacked messages are requeued by default, set this to false to delete them instead
requeue_prio: 0 # the priority requeued messages will get (0 = highest prio)
Use case
Since ddmq handles one file per message it will be much slower than other queues. A quick comparison with RabbitMQ showed that first publishing and then consuming 5000 messages is about 10x slower using ddmq (45s vs 4.5s). The point of ddmq is not performance, but to be used in environments where you can't run a server for some reason.
My own motivation for writing ddmq was to run on a shared HPC cluster where I could not reliably run a server process on the same node all the time. The mounted network storage system was available everywhere and all the time though. The throughput was expected to be really low, maybe <10 messages per day so performance was not the main focus.
Example: parallelization within or beyond nodes with minimal effort
Let's say you have many task to go through, and each task takes more than a couple of seconds. A singel threaded approach to process n files could look like this:
::
program.py:
# go through the file names and process directly
for file in file_names:
run_task(file)
This will take nseconds_per_task to complete. If you instead submit each task to ddmq, you can start as many consumers as you want to handle the processing, and the time to complete should be around nseconds_per_task/number_of_consumers
::
program.py:
# init queue
import ddmq
b = ddmq.broker('/tmp/ddmq', create=True)
b.create_queue('tasks')
# go through the file names and submit to queue
for file in file_names:
b.publish('tasks', msg_text=file)
consumer.py:
# init queue
import ddmq
import time
b = ddmq.broker('/tmp/ddmq', create=True)
while True:
msg = b.consume('tasks')
# wait 10s for messages if the queue is empty
if not msg:
time.sleep(10)
else:
# run the task and acknowledge the message
run_task(msg.message)
b.ack(msg)
The nice thing about this type of parallelization is that it doesn't matter if you start 8 instances of the consumer script on a single node or if you start 80 instances in total spread over 10 nodes, as long as all of them can read/write to the file system they will work. No need for multithreadded processes or MPI.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file ddmq-0.9.14.tar.gz
.
File metadata
- Download URL: ddmq-0.9.14.tar.gz
- Upload date:
- Size: 19.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.12.1 pkginfo/1.4.2 requests/2.9.1 setuptools/39.1.0 requests-toolbelt/0.8.0 tqdm/4.25.0 CPython/3.5.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0b0fd365f5dd87922f8b25ae9165a1e22fb3ae12f53615ff0259b332f8ae95ee |
|
MD5 | 684923030e732579f9930d1c0f047e62 |
|
BLAKE2b-256 | ab47e782862637cb9017b0f54b44431658fe95f085f7de920c07d48d67bf72ad |
File details
Details for the file ddmq-0.9.14-py3-none-any.whl
.
File metadata
- Download URL: ddmq-0.9.14-py3-none-any.whl
- Upload date:
- Size: 21.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.12.1 pkginfo/1.4.2 requests/2.9.1 setuptools/39.1.0 requests-toolbelt/0.8.0 tqdm/4.25.0 CPython/3.5.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3c4962f45cc66aa6d05c7fe615714c7889552a47fb4d109b23da72ec1a0ea9f9 |
|
MD5 | 15b9972a6655ebade5f7b98a2c448a78 |
|
BLAKE2b-256 | f0c99ceec9a242341f6d99ee70b02b0ef5dde12cc311c82e01ff0618bf5dc9c6 |