Skip to main content

python-native distributed message queue

This project has been archived.

The maintainers of this project have marked this project as archived. No new releases are expected.

Project description

dmq

--> Distributed Message Queue,

fully python-native, based on pluggable architecture and combination of parallelism & concurrency


install

pip install dmq

# with redis broker/backend
pip install dmq[redis]

# with postgres backend
pip install dmq[postgres]

# everything
pip install dmq[all]

quickstart

define a manager and register tasks:

# app.py
from dmq import QManager
from dmq.brokers.redis import RedisBroker
from dmq.backends.redis_backend import RedisResultBackend
from dmq.serializers.msgpack import MsgpackSerializer

broker = RedisBroker(redis_url="redis://localhost:6379")
backend = RedisResultBackend(redis_url="redis://localhost:6379")

q = QManager(
	broker=RedisBroker(redis_url="redis://localhost:6379"), 
	result_backend=RedisResultBackend(redis_url="redis://localhost:6379"), 
	serializer=MsgpackSerializer()
)
# tasks.py
from app import q

@q.register
async def add(x: int, y: int) -> int:
    return x + y

@q.register(qname="emails.send")
async def send_email(to: str, body: str) -> None:
    ...

enqueue from anywhere:

from tasks import add

task = await add.q(2, 3)
result = await task.result(timeout=10.0)  # 5

run the worker:

dmq --broker app:q --tasks tasks --workers 4

architecture

arch

everything is a protocol — swap any layer without touching the rest.


brokers

the broker handles task queuing and delivery.

broker use case backend
InMemoryBroker dev/testing asyncio.Queue
RedisBroker production redis lists + sorted sets
from dmq.brokers.memory import InMemoryBroker
from dmq.brokers.redis import RedisBroker

# in-memory, no deps
broker = InMemoryBroker()

# redis, production
broker = RedisBroker(redis_url="redis://localhost:6379", poll_interval=0.5)

result backends

store and retrieve task results.

backend use case storage
InMemoryResultBackend dev/testing dict
RedisResultBackend production redis keys with ttl
PostgresResultBackend production, durable postgres with schema + triggers
from dmq.backends.redis_backend import RedisResultBackend
from dmq.backends.postgres_backend import PostgresResultBackend

redis_backend = RedisResultBackend(
    redis_url="redis://localhost:6379",
    default_ttl=3600,
    type_serialization=True,  # preserves result types via FQN
)

pg_backend = PostgresResultBackend(
    dsn="postgresql://user:pass@localhost/mydb",
    type_serialization=True,
)
await pg_backend.connect()

type_serialization round-trips custom types (msgspec structs, etc) by storing the fully qualified name alongside the result. requires worker and client to share the same codebase.

serializers

serializer format notes
MsgpackSerializer binary default, fast
JsonSerializer text human readable
PickleSerializer binary deprecated, legacy only

scheduling

one-shot scheduling

delay, eta, or cron... fire once:

from datetime import datetime, UTC, timedelta

# delay by 30 seconds
await my_task.sched(delay=30.0, arg1, arg2)

# fire at a specific time
await my_task.sched(eta=datetime(2025, 6, 1, tzinfo=UTC))

# next cron match
await my_task.sched(cron="0 */4 * * *")

periodic scheduling

register tasks that fire on a recurring schedule for the lifetime of the program:

from dmq.types import CronSchedule, DelaySchedule

# every 5 minutes
@q.periodic(DelaySchedule(delay_seconds=300))
async def heartbeat():
    ...

# every 4 hours, cron
@q.periodic(CronSchedule(cron_expr="0 */4 * * *"))
async def cleanup():
    ...

# every 4 hours, but first fire delayed 1 hour from program start
@q.periodic(CronSchedule(cron_expr="0 */4 * * *", delay_seconds=3600))
async def sync_data():
    ...

periodic tasks are started automatically when QWorkerPool.start() is called. the PeriodicScheduler runs as a background asyncio task, computing next fire times and enqueuing via broker.send_task() at each tick.

delay_seconds on CronSchedule offsets the base time forward before computing the next cron match — useful for staggering periodic work or deferring from program start.


delivery guarantees

from dmq.guarantees import DeliveryConfig, DeliveryGuarantee

config = DeliveryConfig(
    guarantee=DeliveryGuarantee.AT_LEAST_ONCE,  # default
)
guarantee behavior
AT_MOST_ONCE ack before processing — fast, may lose tasks
AT_LEAST_ONCE ack after processing — default, may reprocess
EXACTLY_ONCE ack after + idempotency store — requires enable_idempotency=True

retry policies

from dmq.retry import RetryPolicy, ExponentialBackoff, LinearBackoff, FixedDelayBackoff

policy = RetryPolicy(
    max_retries=5,
    backoff=ExponentialBackoff(base=2.0, max_delay=3600.0, jitter=True),
    retry_on=(ConnectionError, TimeoutError),
    dont_retry_on=(ValueError,),
)

backoff strategies: ExponentialBackoff, LinearBackoff, FixedDelayBackoff


worker pool

from dmq import QWorkerPool, ExecutionMode

pool = QWorkerPool(
    manager=q,
    worker_count=4,
    max_tasks_per_worker=10,
    execution_mode=ExecutionMode.ASYNC_ONLY,
)

await pool.start()
await pool.run_forever()

execution modes

mode python build worker type recommended workers
ASYNC_ONLY standard cpython QAsyncWorker min(cpu * 2, 8)
THREADED free-threaded 3.13+ QThreadedWorker cpu_count

auto-detected at runtime via detect_execution_mode(). free-threaded mode requires sys._is_gil_enabled() == False.


events & callbacks

hook into task lifecycle:

from dmq.callback import Callback, CallbackRule
from dmq.events import QEventType

@q.callback
class LogFailures(Callback):
    rules = [CallbackRule(event_types=[QEventType.TASK_FAILED])]

    async def on_event(self, event):
        print(f"task {event.task_id} failed: {event.exception}")

event types:

  • TASK_QUEUED
  • TASK_STARTED
  • TASK_COMPLETED
  • TASK_FAILED
  • TASK_RETRY
  • TASK_NOT_FOUND

partitioning

from dmq import HashPartitionStrategy, KeyBasedPartitionStrategy, RoundRobinPartitionStrategy

strategy = HashPartitionStrategy(num_partitions=8)
partition = strategy.get_partition("task-id-123")

cli

dmq --broker app:q --tasks tasks --workers 4 --concurrency 20 --guarantee at_least_once

# options
  --broker        		python path to QManager instance [module:variable]
  --tasks         		task modules to import [space separated]
  --workers, -w			number of workers [default: auto]
  --concurrency, -c  	max concurrent tasks per worker [default: 10]
  --guarantee, -g    	[at_most_once | at_least_once | exactly_once]
  --execution-mode, -e  [auto | async | threaded]
  --log-level, -l    	[debug | info | warning]

development

# install
uv sync --all-extras --dev

# test
just test

# test with testcontainers (redis + postgres)
just test-lima

# lint & format
just lint

# type check
just type-check

license

apache 2.0 - see here

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dmq-0.1.0.post1.tar.gz (25.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dmq-0.1.0.post1-py3-none-any.whl (40.1 kB view details)

Uploaded Python 3

File details

Details for the file dmq-0.1.0.post1.tar.gz.

File metadata

  • Download URL: dmq-0.1.0.post1.tar.gz
  • Upload date:
  • Size: 25.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.28 {"installer":{"name":"uv","version":"0.9.28","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for dmq-0.1.0.post1.tar.gz
Algorithm Hash digest
SHA256 2b7117c5e709865c10d0d4ab56560e29014f82be96a830039b3ab0e9039bc20d
MD5 6905b996005b0375238b740f8f2223d1
BLAKE2b-256 c6da5e067e3cea2640bcac33f4ba909a45c904d51fe911a4ddacead7a962a768

See more details on using hashes here.

File details

Details for the file dmq-0.1.0.post1-py3-none-any.whl.

File metadata

  • Download URL: dmq-0.1.0.post1-py3-none-any.whl
  • Upload date:
  • Size: 40.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.28 {"installer":{"name":"uv","version":"0.9.28","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for dmq-0.1.0.post1-py3-none-any.whl
Algorithm Hash digest
SHA256 7f42d4637fd9a4de0acac717dc38959cc4607d1ffd93747b07afd7c674873ee1
MD5 b6248c1b6f2e1d6a0458289eb57f9db1
BLAKE2b-256 f3e2ad531d9ec662a676aeb80b9e4b6b30fe073f4c1a9e6898654dd04118d248

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page