Skip to main content

A dead simple message queue

Project description

Dead Simple Message Queue

What it does

Part mail room, part bulletin board, dsmq is a central location for sharing messages between processes, even when they are running on computers scattered around the world.

Its defining characteristic is bare-bones simplicity.

How to use it

Install

pip install dsmq

Create a dsmq server

As in src/dsmq/example_server.py

from dsmq.server import serve

serve(host="127.0.0.1", port=30008)

Connect a client to a dsmq server

As in src/dsmq/example_put_client.py

from dsmq.client import connect

mq = connect(host="127.0.0.1", port=12345)

Add a message to a queue

As in src/dsmq/example_put_client.py

topic = "greetings"
msg = "hello world!"
mq.put(topic, msg)

Read a message from a queue

As in src/dsmq/example_get_client.py

topic = "greetings"
msg = mq.get(topic)

Spin up and shut down a dsmq in its own process

A dsmq server doesn't come with a built-in way to shut itself down. It can be helpful to have it running in a separate process that can be managed

import multiprocessing as mp

p_mq = mp.Process(target=serve, args=(config.MQ_HOST, config.MQ_PORT))
p_mq.start()

p_mq.join()
# or 
p_mq.kill()
p_mq.close()

Demo

  1. Open 3 separate terminal windows.
  2. In the first, run src/dsmq/server.py as a script.
  3. In the second, run src/dsmq/example_put_client.py.
  4. In the third, run src/dsmq/example_get_client.py.

Alternatively, you can run them all at once with src/dsmq/demo.py.

How it works

Expected behavior and limitations

  • Many clients can read messages of the same topic. It is a one-to-many publication model.

  • A client will not be able to read any of the messages that were put into a queue before it connected.

  • A client will get the oldest message available on a requested topic. Queues are first-in-first-out.

  • Messages older than 600 seconds will be deleted from the queue.

  • Put and get operations are fairly quick--less than 100 $\mu$s of processing time plus any network latency--so it can comfortably handle requests at rates of hundreds of times per second. But if you have several clients reading and writing at 1 kHz or more, you may overload the queue.

  • The queue is backed by an in-memory SQLite database. If your message volumes get larger than your RAM, you will reach an out-of-memory condition.

API Reference

[source]

serve(host="127.0.0.1", port=30008)

Kicks off the mesage queue server. This process will be the central exchange for all incoming and outgoing messages.

  • host (str), IP address on which the server will be visible and
  • port (int), port. These will be used by all clients. Non-privileged ports are numbered 1024 and higher.

connect(host="127.0.0.1", port=30008)

Connects a client to an existing message queue server.

  • host (str), IP address of the server.
  • port (int), port on which the server is listening.
  • returns a DSMQClientSideConnection object.

DSMQClientSideConnection class

This is a convenience wrapper, to make the get() and put() functions easy to write and remember. It's under the hood only, not meant to be called directly.

put(topic, msg)

Puts msg into the queue named topic. If the queue doesn't exist yet, it is created.

  • msg (str), the content of the message.
  • topic (str), name of the message queue in which to put this message.

get(topic)

Get the oldest eligible message from the queue named topic. The client is only elgibile to receive messages that were added after it connected to the server.

  • topic (str)
  • returns str, the content of the message. If there was no eligble message in the topic, or the topic doesn't yet exist, returns "".

get_latest(topic)

Get the most recent eligible message from the queue named topic. All the messages older than that in the queue become ineligible and never get seen by the client.

  • topic (str)
  • returns str, the content of the message. If there was no eligble message in the topic, or the topic doesn't yet exist, returns "".

get_wait(topic)

A variant of get() that retries a few times until it gets a non-empty message. Adjust internal values _n_tries and _initial_retry to change how persistent it will be.

  • topic (str)
  • returns str, the content of the message. If there was no eligble message in the topic after the allotted number of tries, or the topic doesn't yet exist, returns "".

shutdown_server()

Gracefully shut down the server, through the client connection.

close()

Gracefully shut down the client connection.

Testing

Run all the tests in src/dsmq/tests/ with pytest, for example

uv run pytest

Performance characterization

Time typical operations on your system with the script at src/dsmq/tests/performance_suite.py

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dsmq-1.4.0.tar.gz (25.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dsmq-1.4.0-py3-none-any.whl (17.2 kB view details)

Uploaded Python 3

File details

Details for the file dsmq-1.4.0.tar.gz.

File metadata

  • Download URL: dsmq-1.4.0.tar.gz
  • Upload date:
  • Size: 25.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.4.17

File hashes

Hashes for dsmq-1.4.0.tar.gz
Algorithm Hash digest
SHA256 3c93ecaf82d6d3fbd6c614a5749bc07aa66f09cb285ffca08704b76301773526
MD5 e3e053fe9d4bdb43987ab4011eef5f19
BLAKE2b-256 e65d23e7b0c5ec6efd65d08881e8bb95d50e1094762a42f269fa7331fb29553a

See more details on using hashes here.

File details

Details for the file dsmq-1.4.0-py3-none-any.whl.

File metadata

  • Download URL: dsmq-1.4.0-py3-none-any.whl
  • Upload date:
  • Size: 17.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.4.17

File hashes

Hashes for dsmq-1.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 5dbaf4267921c2441e00f608cc6c96aa3935281b20d0580a46653e785679340c
MD5 abdcbc27ac015a6a2c809892f88ed597
BLAKE2b-256 117b2148db8d21a2290474b898b57afc11257cd8e6b67e4c2bb583a22bffd4fd

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page