Skip to main content
This is a pre-production deployment of Warehouse. Changes made here affect the production instance of PyPI (
Help us improve Python packaging - Donate today!

PQ is a transactional queue for PostgreSQL.

Project Description


A transactional queue system for PostgreSQL written in Python.

.. figure::
:alt: PQ does the job!

It allows you to push and pop items in and out of a queue in various
ways and also provides two scheduling options: delayed processing and

The system uses a single table that holds all jobs across queues; the
specifics are easy to customize.

The system currently supports only the `psycopg2
<>`_ database driver - or
`psycopg2cffi <>`_ for PyPy.

The basic queue implementation is similar to Ryan Smith's
`queue_classic <>`_
library written in Ruby, but uses `advisory locks
for concurrency control.

In terms of performance, the implementation clock in at about 1,000
operations per second. Using the `PyPy <>`_
interpreter, this scales linearly with the number of cores available.

Getting started

All functionality is encapsulated in a single class ``PQ``.

``class PQ(conn=None, pool=None, table='queue', debug=False)``

Example usage:

.. code-block:: python

from psycopg2 import connect
from pq import PQ

conn = connect('dbname=example user=postgres')
pq = PQ(conn)

For multi-threaded operation, use a connection pool such as

You probably want to make sure your database is created with the
``utf-8`` encoding.

To create and configure the queue table, call the ``create()`` method.

.. code-block:: python


The table name defaults to ``'queue'``. To use a different name, pass
it as a string value as the ``table`` argument for the ```PQ`` class
(illustrated above).


The ``pq`` object exposes queues through Python's dictionary

.. code-block:: python

queue = pq['apples']

The ``queue`` object provides ``get`` and ``put`` methods as explained
below, and in addition, it also works as a context manager where it
manages a transaction:

.. code-block:: python

with queue as cursor:

The statements inside the context manager are either committed as a
transaction or rejected, atomically. This is useful when a queue is
used to manage jobs because it allows you to retrieve a job from the
queue, perform a job and write a result, with transactional


Use the ``put(data)`` method to insert an item into the queue. It
takes a JSON-compatible object such as a Python dictionary:

.. code-block:: python

queue.put({'kind': 'Cox'})
queue.put({'kind': 'Arthur Turner'})
queue.put({'kind': 'Golden Delicious'})

Items are pulled out of the queue using ``get(block=True)``. The
default behavior is to block until an item is available with a default
timeout of one second after which a value of ``None`` is returned.

.. code-block:: python

def eat(kind):
print 'umm, %s apples taste good.' % kind

job = queue.get()

The ``job`` object provides additional metadata in addition to the
``data`` attribute as illustrated by the string representation:

>>> job
<pq.Job id=77709 size=1 enqueued_at="2014-02-21T16:22:06Z" schedule_at=None>

The ``get`` operation is also available through iteration:

.. code-block:: python

for job in queue:
if job is None:


The iterator blocks if no item is available. Again, there is a default
timeout of one second, after which the iterator yields a value of

An application can then choose to break out of the loop, or wait again
for an item to be ready.

.. code-block:: python

for job in queue:
if job is not None:

# This is an infinite loop!


Items can be scheduled such that they're not pulled until a later

.. code-block:: python

queue.put({'kind': 'Cox'}, '5m')

In this example, the item is ready for work five minutes later. The
method also accepts ``datetime`` and ``timedelta`` objects.


If some items are more important than others, a time expectation can
be expressed:

.. code-block:: python

queue.put({'kind': 'Cox'}, expected_at='5m')

This tells the queue processor to give priority to this item over an
item expected at a later time, and conversely, to prefer an item with
an earlier expected time.

The scheduling and priority options can be combined:

.. code-block:: python

queue.put({'kind': 'Cox'}, '1h', '2h')

This item won't be pulled out until after one hour, and even then,
it's only processed subject to it's priority of two hours.


If a queue name is provided as ``<name>/pickle``
(e.g. ``'jobs/pickle'``), items are automatically pickled and
unpickled using Python's built-in ``cPickle`` module:

.. code-block:: python

queue = pq['apples/pickle']

class Apple(object):
def __init__(self, kind):
self.kind = kind


The old pickle protocol ``0`` is used to ensure the pickled data is
encoded as ``ascii`` which should be compatible with any database


``pq`` comes with a higher level ``API`` that helps to manage ``tasks``.

.. code-block:: python

from pq.tasks import PQ

pq = PQ(...)

queue = pq['default']

def eat(kind):
print 'umm, %s apples taste good.' % kind


``tasks``'s ``jobs`` can optionally be re-scheduled on failure:

.. code-block:: python

@queue.task(schedule_at='1h', max_retries=2, retry_in='10s')
def eat(kind):
# ...

Time expectations can be overriden at ``task`` call:

.. code-block:: python

eat('Cox', _expected_at='2m', _schedule_at='1m')


All objects are thread-safe as long as a connection pool is provided
where each thread receives its own database connection.


1.5 (2017-04-18)

- Fixed Python 2 compatibility.

1.4 (2016-03-25)

- Added worker class and handler helper decorator.

1.3 (2015-05-11)

- Python 3 compatibility.

- Fix time zone issue.

1.2 (2014-10-21)


- Fixed concurrency issue where a large number of locks would be held
as a queue grows in size.

- Fixed a database connection resource issue.

1.1 (2014-02-27)


- A queue is now also a context manager, providing transactional

- A queues now returns task objects which provide metadata and allows
reading and writing task data.


- The same connection pool can now be used with different queues.


- The `Literal` string wrapper did not work correctly with `psycopg2`.

- The transaction manager now correctly returns connections to the

1.0 (2013-11-20)

- Initial public release.

Release History

This version
History Node


History Node


History Node


History Node


History Node


History Node


History Node


History Node


Download Files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Filename, Size & Hash SHA256 Hash Help File Type Python Version Upload Date
(14.5 kB) Copy SHA256 Hash SHA256
Source None Apr 18, 2017

Supported By

Elastic Elastic Search Pingdom Pingdom Monitoring Dyn Dyn DNS Sentry Sentry Error Logging CloudAMQP CloudAMQP RabbitMQ Kabu Creative Kabu Creative UX & Design Google Google Cloud Servers Fastly Fastly CDN StatusPage StatusPage Statuspage DigiCert DigiCert EV Certificate