Skip to main content

No project description provided

Project description

django-postgres-queue

django-postgres-queue is a task queue system for Django backed by postgres.

Why postgres?

I thought you were never supposed to use an RDBMS as a queue? Well, postgres has some features that make it not as bad as you might think, it has some compelling advantages.

  • Transactional behavior and reliability.

    Adding tasks is atomic with respect to other database work. There is no need to use transaction.on_commit hooks and there is no risk of a transaction being committed but the tasks it queued being lost.

    Processing tasks is atomic with respect to other database work. Database work done by a task will either be committed, or the task will not be marked as processed, no exceptions. If the task only does database work, you achieve true exactly-once message processing.

  • Operational simplicity

    By reusing the durable, transactional storage that we’re already using anyway, there’s no need to configure, monitor, and backup another stateful service. For small teams and light workloads, this is the right trade-off.

  • Easy introspection

    Since tasks are stored in a database table, it’s easy to query and monitor the state of the queue.

  • Safety

    By using postgres transactions, there is no possibility of jobs being left in a locked or ambiguous state if a worker dies. Tasks immediately become available for another worker to pick up. You can even kill -9 a worker and be sure your database and queue will be left in a consistent state.

  • Priority queues

    Since ordering is specified explicitly when selecting the next task to work on, it’s easy to ensure high-priority tasks are processed first.

Disadvantages

  • Lower throughput than a dedicated queue server.

  • Harder to scale a relational database than a dedicated queue server.

  • Thundering herd. Postgres has no way to notify a single worker to wake up, so we can either wake every single worker up when a task is queued with LISTEN/NOTIFY, or workers have to short-poll.

  • With at-least-once delivery, a postgres transaction has to be held open for the duration of the task. For long running tasks, this can cause table bloat and performance problems.

  • When a task crashes or raises an exception under at-least-once delivery, it immediately becomes eligible to be retried. If you want to implement a retry delay, you must catch exceptions and requeue the task with a delay. If your task crashes without throwing an exception (eg SIGKILL), you could end up in an endless retry loop that prevents other tasks from being processed.

How it works

django-postgres-queue is able to claim, process, and remove a task in a single query.

DELETE FROM dpq_job
WHERE id = (
    SELECT id
    FROM dpq_job
    WHERE execute_at <= now()
    ORDER BY priority DESC, created_at
    FOR UPDATE SKIP LOCKED
    LIMIT 1
)
RETURNING *;

As soon as this query runs, the task is unable to be claimed by other workers. When the transaction commits, the task will be deleted. If the transaction rolls back or the worker crashes, the task will immediately become available for another worker.

To achieve at-least-once delivery, we begin a transaction, process the task, then commit the transaction. For at-most-once, we claim the task and immediately commit the transaction, then process the task. For tasks that don’t have any external effects and only do database work, the at-least-once behavior is actually exactly-once (because both the claiming of the job and the database work will commit or rollback together).

Comparison to Celery

django-postgres-queue fills the same role as Celery. In addition to to using postgres as its backend, its intended to be simpler, without any of the funny business Celery does (metaprogramming, messing with logging, automatically importing modules). There is boilerplate to make up for the lack of metaprogramming, but I find that better than importing things by strings.

Usage

Requirements

django-postgres-queue requires Python 3, at least postgres 9.5 and at least Django 1.11.

Installation

Install with pip:

pip install django-postgres-queue

Then add 'dpq' to your INSTALLED_APPS. Run manage.py migrate to create the jobs table.

Instantiate a queue object. This can go wherever you like and be named whatever you like. For example, someapp/queue.py:

from dpq.queue import AtLeastOnceQueue

queue = AtLeastOnceQueue(
    tasks={
        # ...
    },
    notify_channel='my-queue',
)

You will need to import this queue instance to queue or process tasks. Use AtLeastOnceQueue for at-least-once delivery, or AtMostOnceQueue for at-most-once delivery.

django-postgres-queue comes with a management command base class that you can use to consume your tasks. It can be called whatever you like, for example in a someapp/management/commands/worker.py:

from dpq.commands import Worker

from someapp.queue import queue

class Command(Worker):
    queue = queue

Then you can run manage.py worker to start your worker.

A task function takes two arguments – the queue instance in use, and the Job instance for this task. The function can be defined anywhere and called whatever you like. Here’s an example:

def debug_task(queue, job):
    print(job.args)

To register it as a task, add it to your queue instance:

queue = AtLeastOnceQueue(tasks={
    'debug_task': debug_task,
})

The key is the task name, used to queue the task. It doesn’t have to match the function name.

To queue the task, use enqueue method on your queue instance:

queue.enqueue('debug_task', {'some_args': 0})

Assuming you have a worker running for this queue, the task will be run immediately. The second argument must be a single json-serializeable value and will be available to the task as job.args.

Monitoring

Tasks are just database rows stored in the dpq_job table, so you can monitor the system with SQL.

To get a count of current tasks:

SELECT count(*) FROM dpq_job WHERE execute_at <= now()

This will include both tasks ready to process and tasks currently being processed. To see tasks currently being processed, we need visibility into postgres row locks. This can be provided by the pgrowlocks extension. Once installed, this query will count currently-running tasks:

SELECT count(*)
FROM pgrowlocks('dpq_job')
WHERE 'For Update' = ANY(modes);

You could join the results of pgrowlocks with dpq_job to get the full list of tasks in progress if you want.

Logging

django-postgres-queue logs through Python’s logging framework, so can be configured with the LOGGING dict in your Django settings. It will not log anything under the default config, so be sure to configure some form of logging. Everything is logged under the dpq namespace. Here is an example configuration that will log INFO level messages to stdout:

LOGGING = {
    'version': 1,
    'root': {
        'level': 'DEBUG',
        'handlers': ['console'],
    },
    'formatters': {
        'verbose': {
            'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s',
        },
    },
    'handlers': {
        'console': {
            'level': 'INFO',
            'class': 'logging.StreamHandler',
            'formatter': 'verbose',
        },
    },
    'loggers': {
        'dpq': {
            'handlers': ['console'],
            'level': 'INFO',
            'propagate': False,
        },
    }
}

It would also be sensible to log WARNING and higher messages to something like Sentry:

LOGGING = {
    'version': 1,
    'root': {
        'level': 'INFO',
        'handlers': ['sentry', 'console'],
    },
    'formatters': {
        'verbose': {
            'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s',
        },
    },
    'handlers': {
        'console': {
            'level': 'INFO',
            'class': 'logging.StreamHandler',
            'formatter': 'verbose',
        },
        'sentry': {
            'level': 'WARNING',
            'class': 'raven.contrib.django.handlers.SentryHandler',
        },
    },
    'loggers': {
        'dpq': {
            'level': 'INFO',
            'handlers': ['console', 'sentry'],
            'propagate': False,
        },
    },
}

You could also log to a file by using the built-in logging.FileHandler.

Useful Recipes

These recipes aren’t officially supported features of django-postgres-queue. We provide them so that you can mimic some of the common features in other task queues.

Running tasks in tests

When testing code that queues tasks, it can be useful to explicitly run all the pending tasks from your test. To do this, you can use:

while queue.run_once(): pass

This will run all the tasks that have been queued so far, and you can now assert that they did the right thing.

CELERY_ALWAYS_EAGER

Celery uses the CELERY_ALWAYS_EAGER setting to run a task immediately, without queueing it for a worker. It could be used during tests, and while debugging in a development environment with any workers turned off.

class EagerAtLeastOnceQueue(AtLeastOnceQueue):
    def enqueue(self, *args, **kwargs):
        job = super().enqueue(*args, **kwargs)
        if settings.QUEUE_ALWAYS_EAGER:
            self.run_job(job)
        return job

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

makavafal-django-postgres-queue-1.0.5.tar.gz (16.3 kB view details)

Uploaded Source

Built Distribution

File details

Details for the file makavafal-django-postgres-queue-1.0.5.tar.gz.

File metadata

File hashes

Hashes for makavafal-django-postgres-queue-1.0.5.tar.gz
Algorithm Hash digest
SHA256 88c44759a785c0c1204c01a4c77ddcc02dfbe7fb71c95a1074de9ce1760476d8
MD5 27ec957f2dd2752ed8cb439a95e8160a
BLAKE2b-256 3873d93ebfd1ae7a1552cfe69a6f07f5153a68cff7dfaf0cef8e3bdc83c0e0a7

See more details on using hashes here.

File details

Details for the file makavafal_django_postgres_queue-1.0.5-py3-none-any.whl.

File metadata

File hashes

Hashes for makavafal_django_postgres_queue-1.0.5-py3-none-any.whl
Algorithm Hash digest
SHA256 409b09e0c3aff96b8e9665e780f89a73b9c1006200e050634be84f8af748b648
MD5 eb6b9885124d5f1ac5b56560117e4fea
BLAKE2b-256 c394a1f6646cfccc6b873d408e4ef3ce396235e2b2cdce4ff49a783735482685

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page