Skip to main content

PQ is a transactional queue for PostgreSQL.

Project description

A transactional queue system for PostgreSQL written in Python.

PQ does the job!

It allows you to push and pop items in and out of a queue in various ways and also provides two scheduling options: delayed processing and prioritization.

The system uses a single table that holds all jobs across queues; the specifics are easy to customize.

The system currently supports only the psycopg2 database driver - or psycopg2cffi for PyPy.

The basic queue implementation is similar to Ryan Smith’s queue_classic library written in Ruby, but uses advisory locks for concurrency control.

In terms of performance, the implementation clock in at about 1,000 operations per second. Using the PyPy interpreter, this scales linearly with the number of cores available.

Getting started

All functionality is encapsulated in a single class PQ.

class PQ(conn=None, pool=None, table='queue', debug=False)

Example usage:

from psycopg2 import connect
from pq import PQ

conn = connect('dbname=example user=postgres')
pq = PQ(conn)

For multi-threaded operation, use a connection pool such as psycopg2.pool.ThreadedConnectionPool.

You probably want to make sure your database is created with the utf-8 encoding.

To create and configure the queue table, call the create() method.

pq.create()

The table name defaults to 'queue'. To use a different name, pass it as a string value as the table argument for the PQ class (illustrated above).

Queues

The pq object exposes queues through Python’s dictionary interface:

queue = pq['apples']

The queue object provides get and put methods as explained below, and in addition, it also works as a context manager where it manages a transaction:

with queue as cursor:
    ...

The statements inside the context manager are either committed as a transaction or rejected, atomically. This is useful when a queue is used to manage jobs because it allows you to retrieve a job from the queue, perform a job and write a result, with transactional semantics.

Methods

Use the put(data) method to insert an item into the queue. It takes a JSON-compatible object such as a Python dictionary:

queue.put({'kind': 'Cox'})
queue.put({'kind': 'Arthur Turner'})
queue.put({'kind': 'Golden Delicious'})

Items are pulled out of the queue using get(block=True). The default behavior is to block until an item is available with a default timeout of one second after which a value of None is returned.

def eat(kind):
    print 'umm, %s apples taste good.' % kind

job = queue.get()
eat(**job.data)

The job object provides additional metadata in addition to the data attribute as illustrated by the string representation:

>>> job
<pq.Job id=77709 size=1 enqueued_at="2014-02-21T16:22:06Z" schedule_at=None>

The get operation is also available through iteration:

for job in queue:
    if job is None:
        break

    eat(**job.data)

The iterator blocks if no item is available. Again, there is a default timeout of one second, after which the iterator yields a value of None.

An application can then choose to break out of the loop, or wait again for an item to be ready.

for job in queue:
    if job is not None:
        eat(**job.data)

    # This is an infinite loop!

Scheduling

Items can be scheduled such that they’re not pulled until a later time:

queue.put({'kind': 'Cox'}, '5m')

In this example, the item is ready for work five minutes later. The method also accepts datetime and timedelta objects.

Priority

If some items are more important than others, a time expectation can be expressed:

queue.put({'kind': 'Cox'}, expected_at='5m')

This tells the queue processor to give priority to this item over an item expected at a later time, and conversely, to prefer an item with an earlier expected time.

The scheduling and priority options can be combined:

queue.put({'kind': 'Cox'}, '1h', '2h')

This item won’t be pulled out until after one hour, and even then, it’s only processed subject to it’s priority of two hours.

Pickles

If a queue name is provided as <name>/pickle (e.g. 'jobs/pickle'), items are automatically pickled and unpickled using Python’s built-in cPickle module:

queue = pq['apples/pickle']

class Apple(object):
    def __init__(self, kind):
       self.kind = kind

queue.put(Apple('Cox'))

The old pickle protocol 0 is used to ensure the pickled data is encoded as ascii which should be compatible with any database encoding.

Tasks

pq comes with a higher level API that helps to manage tasks.

from pq.tasks import PQ

pq = PQ(...)

queue = pq['default']

@queue.task(schedule_at='1h')
def eat(kind):
    print 'umm, %s apples taste good.' % kind

eat('Cox')

queue.work()

tasks’s jobs can optionally be re-scheduled on failure:

@queue.task(schedule_at='1h', max_retries=2, retry_in='10s')
def eat(kind):
    # ...

Time expectations can be overriden at task call:

eat('Cox', _expected_at='2m', _schedule_at='1m')

Thread-safety

All objects are thread-safe as long as a connection pool is provided where each thread receives its own database connection.

Changes

1.6.1 (2018-11-14)

  • Fix queue class factory pattern.

1.6 (2018-11-12)

  • Fix compatibility with NamedTupleCursor.

  • Fix duplicate column name issue.

  • Add option to specify own queue class.

1.5 (2017-04-18)

  • Fixed Python 2 compatibility.

1.4 (2016-03-25)

  • Added worker class and handler helper decorator. [jeanphix]

1.3 (2015-05-11)

  • Python 3 compatibility. [migurski]

  • Fix time zone issue.

1.2 (2014-10-21)

Improvements:

  • Fixed concurrency issue where a large number of locks would be held as a queue grows in size.

  • Fixed a database connection resource issue.

1.1 (2014-02-27)

Features:

  • A queue is now also a context manager, providing transactional semantics.

  • A queues now returns task objects which provide metadata and allows reading and writing task data.

Improvements:

  • The same connection pool can now be used with different queues.

Bugs:

  • The Literal string wrapper did not work correctly with psycopg2.

  • The transaction manager now correctly returns connections to the pool.

1.0 (2013-11-20)

  • Initial public release.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pq-1.6.1.tar.gz (17.8 kB view details)

Uploaded Source

File details

Details for the file pq-1.6.1.tar.gz.

File metadata

  • Download URL: pq-1.6.1.tar.gz
  • Upload date:
  • Size: 17.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: Python-urllib/2.7

File hashes

Hashes for pq-1.6.1.tar.gz
Algorithm Hash digest
SHA256 16f233d9cf0b01f183852bd95168c7e847b9030bba885c031c013095c1dcf3d1
MD5 da3f1348d9d007111410fff1553e943a
BLAKE2b-256 1022498c70afbd3555f0ae9c1171dfaae432eebe78cd456a772efcd6345a7116

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page