Skip to main content

python-native distributed message queue

This project has been archived.

The maintainers of this project have marked this project as archived. No new releases are expected.

Project description

dmq

--> Distributed Message Queue,

fully python-native, based on pluggable architecture and combination of parallelism & concurrency


install

pip install dmq

# with redis broker/backend
pip install dmq[redis]

# with postgres backend
pip install dmq[postgres]

# everything
pip install dmq[all]

quickstart

define a manager and register tasks:

# app.py
from dmq import QManager
from dmq.brokers.redis import RedisBroker
from dmq.backends.redis_backend import RedisResultBackend
from dmq.serializers.msgpack import MsgpackSerializer

broker = RedisBroker(redis_url="redis://localhost:6379")
backend = RedisResultBackend(redis_url="redis://localhost:6379")

q = QManager(
	broker=RedisBroker(redis_url="redis://localhost:6379"), 
	result_backend=RedisResultBackend(redis_url="redis://localhost:6379"), 
	serializer=MsgpackSerializer()
)
# tasks.py
from app import q

@q.register
async def add(x: int, y: int) -> int:
    return x + y

@q.register(qname="emails.send")
async def send_email(to: str, body: str) -> None:
    ...

enqueue from anywhere:

from tasks import add

task = await add.q(2, 3)
result = await task.result(timeout=10.0)  # 5

run the worker:

dmq --broker app:q --tasks tasks --workers 4

architecture

arch

everything is a protocol — swap any layer without touching the rest.


brokers

the broker handles task queuing and delivery.

broker use case backend
InMemoryBroker dev/testing asyncio.Queue
RedisBroker production redis lists + sorted sets
from dmq.brokers.memory import InMemoryBroker
from dmq.brokers.redis import RedisBroker

# in-memory, no deps
broker = InMemoryBroker()

# redis, production
broker = RedisBroker(redis_url="redis://localhost:6379", poll_interval=0.5)

result backends

store and retrieve task results.

backend use case storage
InMemoryResultBackend dev/testing dict
RedisResultBackend production redis keys with ttl
PostgresResultBackend production, durable postgres with schema + triggers
from dmq.backends.redis_backend import RedisResultBackend
from dmq.backends.postgres_backend import PostgresResultBackend

redis_backend = RedisResultBackend(
    redis_url="redis://localhost:6379",
    default_ttl=3600,
    type_serialization=True,  # preserves result types via FQN
)

pg_backend = PostgresResultBackend(
    dsn="postgresql://user:pass@localhost/mydb",
    type_serialization=True,
)
await pg_backend.connect()

type_serialization round-trips custom types (msgspec structs, etc) by storing the fully qualified name alongside the result. requires worker and client to share the same codebase.

serializers

serializer format notes
MsgpackSerializer binary default, fast
JsonSerializer text human readable
PickleSerializer binary deprecated, legacy only

scheduling

one-shot scheduling

delay, eta, or cron... fire once:

from datetime import datetime, UTC, timedelta

# delay by 30 seconds
await my_task.sched(delay=30.0, arg1, arg2)

# fire at a specific time
await my_task.sched(eta=datetime(2025, 6, 1, tzinfo=UTC))

# next cron match
await my_task.sched(cron="0 */4 * * *")

periodic scheduling

register tasks that fire on a recurring schedule for the lifetime of the program:

from dmq.types import CronSchedule, DelaySchedule

# every 5 minutes
@q.periodic(DelaySchedule(delay_seconds=300))
async def heartbeat():
    ...

# every 4 hours, cron
@q.periodic(CronSchedule(cron_expr="0 */4 * * *"))
async def cleanup():
    ...

# every 4 hours, but first fire delayed 1 hour from program start
@q.periodic(CronSchedule(cron_expr="0 */4 * * *", delay_seconds=3600))
async def sync_data():
    ...

periodic tasks are started automatically when QWorkerPool.start() is called. the PeriodicScheduler runs as a background asyncio task, computing next fire times and enqueuing via broker.send_task() at each tick.

delay_seconds on CronSchedule offsets the base time forward before computing the next cron match — useful for staggering periodic work or deferring from program start.


delivery guarantees

from dmq.guarantees import DeliveryConfig, DeliveryGuarantee

config = DeliveryConfig(
    guarantee=DeliveryGuarantee.AT_LEAST_ONCE,  # default
)
guarantee behavior
AT_MOST_ONCE ack before processing — fast, may lose tasks
AT_LEAST_ONCE ack after processing — default, may reprocess
EXACTLY_ONCE ack after + idempotency store — requires enable_idempotency=True

retry policies

from dmq.retry import RetryPolicy, ExponentialBackoff, LinearBackoff, FixedDelayBackoff

policy = RetryPolicy(
    max_retries=5,
    backoff=ExponentialBackoff(base=2.0, max_delay=3600.0, jitter=True),
    retry_on=(ConnectionError, TimeoutError),
    dont_retry_on=(ValueError,),
)

backoff strategies: ExponentialBackoff, LinearBackoff, FixedDelayBackoff


worker pool

from dmq import QWorkerPool, ExecutionMode

pool = QWorkerPool(
    manager=q,
    worker_count=4,
    max_tasks_per_worker=10,
    execution_mode=ExecutionMode.ASYNC_ONLY,
)

await pool.start()
await pool.run_forever()

execution modes

mode python build worker type recommended workers
ASYNC_ONLY standard cpython QAsyncWorker min(cpu * 2, 8)
THREADED free-threaded 3.13+ QThreadedWorker cpu_count

auto-detected at runtime via detect_execution_mode(). free-threaded mode requires sys._is_gil_enabled() == False.


events & callbacks

hook into task lifecycle:

from dmq.callback import Callback, CallbackRule
from dmq.events import QEventType

@q.callback
class LogFailures(Callback):
    rules = [CallbackRule(event_types=[QEventType.TASK_FAILED])]

    async def on_event(self, event):
        print(f"task {event.task_id} failed: {event.exception}")

event types:

  • TASK_QUEUED
  • TASK_STARTED
  • TASK_COMPLETED
  • TASK_FAILED
  • TASK_RETRY
  • TASK_NOT_FOUND

partitioning

from dmq import HashPartitionStrategy, KeyBasedPartitionStrategy, RoundRobinPartitionStrategy

strategy = HashPartitionStrategy(num_partitions=8)
partition = strategy.get_partition("task-id-123")

cli

dmq --broker app:q --tasks tasks --workers 4 --concurrency 20 --guarantee at_least_once

# options
  --broker        		python path to QManager instance [module:variable]
  --tasks         		task modules to import [space separated]
  --workers, -w			number of workers [default: auto]
  --concurrency, -c  	max concurrent tasks per worker [default: 10]
  --guarantee, -g    	[at_most_once | at_least_once | exactly_once]
  --execution-mode, -e  [auto | async | threaded]
  --log-level, -l    	[debug | info | warning]

development

# install
uv sync --all-extras --dev

# test
just test

# test with testcontainers (redis + postgres)
just test-lima

# lint & format
just lint

# type check
just type-check

license

apache 2.0 - see here

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dmq-0.1.0.tar.gz (25.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dmq-0.1.0-py3-none-any.whl (40.0 kB view details)

Uploaded Python 3

File details

Details for the file dmq-0.1.0.tar.gz.

File metadata

  • Download URL: dmq-0.1.0.tar.gz
  • Upload date:
  • Size: 25.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.28 {"installer":{"name":"uv","version":"0.9.28","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for dmq-0.1.0.tar.gz
Algorithm Hash digest
SHA256 08ad041213cb66d6f570d1579b1ef65880d25abebd759cce140ea1fa82db3852
MD5 90c64816bd17772b7bffe8ee2db92c95
BLAKE2b-256 90fc9f523caea890ed201e575b8a4e37068a214402dbd55215af88ff840cff33

See more details on using hashes here.

File details

Details for the file dmq-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: dmq-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 40.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.28 {"installer":{"name":"uv","version":"0.9.28","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for dmq-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 84262de737c629e07371a57909071e41aed5e265a530dbaf74e82323d7d6249a
MD5 6a6b29851dc08107676e83c8a6950c1b
BLAKE2b-256 205625898a2d24837ca3508d0e0a01561ecc3f6b6834b84aec491d075cae373e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page