Skip to main content

Trivial Postgres Queue for Django

Project description

Travis CI Status Code Coverage https://badge.fury.io/py/django-tpq.svg

Trivial Postgres Queue for Django

This is a Django application that integrates tpq. This application provides basic message queue capabilities as well as a high-level futures implementation.

Message Queue

To implement a message queue, you must first create a model derived from the abstract BaseQueue model. Once done, create a migration for that model, then edit the migration to add some additional database objects (tpq does this for you, but you must call it from the migration). Then migrate and use your queue.

from django_tpq.main.models import BaseQueue

class MyQueue(BaseQueue):
    pass
$ python manage.py makemigrations

Now edit the migration and add the RunPython step as is done with the futures initial migration. You will also need to customize the model name in the forward function.

$ python manage.py migrate
from myapp.models import MyQueue

MyQueue.objects.enqueue({'field': 'value'})
message = MyQueue.objects.dequeue()

Futures

Using the above as a foundation, a simple Futures implementation is provided. First you must register any function you wish to call asynchronously as a future. Then call that future. You can optionally wait or poll for the results.

import time

from django_tpq.futures.decorators import future


@future()
def long_running_function(*args):
    time.sleep(100)

# You can execute the future without waiting for a result. This returns
# immediately and your future runs within another process (fire and forget).
long_running_function.async('argument_1')

# Or you can poll for the results (or check after you do some other work).
f = long_running_function.async('argument_1')

while True:
    try:
        r = f.result()
    except Exception:
        # Exceptions are re-raised.
        LOGGER.exception('Future failed', exc_info)
        break
    else:
        break
    time.sleep(10)

print(r)

# Or optionally, you can block waiting for the result.
f = long_running_function.call('argument_1')

try:
    r = f.result(wait=0)
except Exception:
    # Exceptions are re-raised.
    LOGGER.exception('Future failed', exc_info)

print(r)

Function calls are dispatched via a message queue. Arguments are pickled, so you can send any picklable Python objects. Results are delivered via your configured cache. By default the default cache is used, but you can use the FUTURES_RESULT_CACHE setting to provide an alternate name for the Django cache you want to be used for results. Results have a TTL of 60 minutes by default but you can adjust this using the FUTURES_RESULT_TTL setting.

* Note that if you use a very short TTL and start polling after it has already expired, you will never see results. Further, if you use wait, you will wait forever.

Futures are executed by a daemon started using a Django management command.

$ python manage.py futures_executor --help
usage: manage.py futures_executor [-h] [--version] [-v {0,1,2,3}]
                                  [--settings SETTINGS]
                                  [--pythonpath PYTHONPATH] [--traceback]
                                  [--no-color] [--queue_name QUEUE_NAME]
                                  [--once] [--wait WAIT]

Daemon to execute futures.

optional arguments:
  -h, --help            show this help message and exit
  --version             show program's version number and exit
  -v {0,1,2,3}, --verbosity {0,1,2,3}
                        Verbosity level; 0=minimal output, 1=normal output,
                        2=verbose output, 3=very verbose output
  --settings SETTINGS   The Python path to a settings module, e.g.
                        "myproject.settings.main". If this isn't provided, the
                        DJANGO_SETTINGS_MODULE environment variable will be
                        used.
  --pythonpath PYTHONPATH
                        A directory to add to the Python path, e.g.
                        "/home/djangoprojects/myproject".
  --traceback           Raise on CommandError exceptions
  --no-color            Don't colorize the command output.
  --queue_name QUEUE_NAME
                        The queue to monitor. default: futures.FutureQueue
  --once                Run one, then exit.
  --wait WAIT           Wait time. Useful with --once.

Some future statistics are also stored in your Postgres database for reporting purposes.

from django_tpq.futures.models import FutureStat

FutureStat.objects.all()

The FutureStat model has the following fields.

  • name - The python module.function of the future.

  • running - The number of currently executing futures of this type.

  • total - The total number of executed futures of this type.

  • failed - The number of futures resulting in an exception.

  • last_seen - The timestamp of the most recent execution of the future.

  • first_seen - The timestamp of the least recent execution of the future.

Being a model, you can use the Django ORM to report on these fields any way you see fit.

Project details


Release history Release notifications | RSS feed

This version

1.9

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

django_tpq-1.9.tar.gz (3.7 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page