A queue using mongo as backend storage.
Project description
mongoqueue
----------
Properties
==========
- Isolation
Do not let different consumers process the same message.
- Reliablity
Do not let a failed consumer disappear an item.
- Atomic
Operations on the queue are atomic.
Usage
=====
A queue can be instantiated with a mongo collection and a consumer
identifier. The consumer identifier helps distinguish multiple queue
consumers that are taking jobs from the queue.
>> from pymongo import Connection
>> from mongoqueue import MongoQueue
>> queue = MongoQueue(
... Connection(TEST_DB).doctest_queue,
... consumer_id="consumer-1",
... timeout=300,
... max_attempts=3)
The ``MongoQueue`` class ``timeout`` parameters specifies how long in a
seconds a how long a job may be held by a consumer before its
considered failed.
A job which timeouts or errors more than the ``max_attempts``
parameter is considered permanently failed, and will no longer be
processed.
New jobs/items can be placed in the queue by passing a dictionary.
>> queue.put({"foobar": 1})
A job ``priority`` key and integer value can be specified in the
dictionary which will cause the job to be processed before lower
priority items.
>> queue.put({"foobar": 0, "priority": 1})
An item can be fetched out by calling the ``next`` method on a queue.
This returns a Job object.
>> job = queue.next()
>> job.data
{"foobar": 1}
The job class exposes some control methods on the job, for marking progress,
completion, errors, or releasing the job back into the queue.
- ``complete`` Marks a job as complete and removes it from the queue.
- ``error`` Optionally specified with a message, releases the job back to the
queue, and increments its attempts, and stores the error message on the job.
- ``progress`` Optionally takes a progress count integer, notes progress on the job
and resets the lock timeout.
- ``release`` Release a job back to the pool. The attempts counter is not modified.
As a convience the job supports the context manager protocol.
>> with job as data:
... print data
{"foobar: 0}
If the context closure is exited without the job is marked complete,
if there's an exception the error is stored on the job.
Inspired By
===========
[0] https://github.com/skiz/mongo_queue/blob/master/lib/mongo_queue.rb
[1] http://blog.boxedice.com/2011/09/28/replacing-rabbitmq-with-mongodb/
[2] http://blog.boxedice.com/2011/04/13/queueing-mongodb-using-mongodb/
[3] https://github.com/lunaru/mongoqueue
[4] http://www.captaincodeman.com/2011/05/28/simple-service-bus-message-queue-mongodb/
Running
Fully Configurable
Distributed
Atomic Locking
Thread Safe
Priority Support
Worker Timeout Support
Fully Tested
Production Ready
Simple API
--------------------------
Per Queue error queues
Max attempts per item
Identity in queue.
---------------------------
----------
Properties
==========
- Isolation
Do not let different consumers process the same message.
- Reliablity
Do not let a failed consumer disappear an item.
- Atomic
Operations on the queue are atomic.
Usage
=====
A queue can be instantiated with a mongo collection and a consumer
identifier. The consumer identifier helps distinguish multiple queue
consumers that are taking jobs from the queue.
>> from pymongo import Connection
>> from mongoqueue import MongoQueue
>> queue = MongoQueue(
... Connection(TEST_DB).doctest_queue,
... consumer_id="consumer-1",
... timeout=300,
... max_attempts=3)
The ``MongoQueue`` class ``timeout`` parameters specifies how long in a
seconds a how long a job may be held by a consumer before its
considered failed.
A job which timeouts or errors more than the ``max_attempts``
parameter is considered permanently failed, and will no longer be
processed.
New jobs/items can be placed in the queue by passing a dictionary.
>> queue.put({"foobar": 1})
A job ``priority`` key and integer value can be specified in the
dictionary which will cause the job to be processed before lower
priority items.
>> queue.put({"foobar": 0, "priority": 1})
An item can be fetched out by calling the ``next`` method on a queue.
This returns a Job object.
>> job = queue.next()
>> job.data
{"foobar": 1}
The job class exposes some control methods on the job, for marking progress,
completion, errors, or releasing the job back into the queue.
- ``complete`` Marks a job as complete and removes it from the queue.
- ``error`` Optionally specified with a message, releases the job back to the
queue, and increments its attempts, and stores the error message on the job.
- ``progress`` Optionally takes a progress count integer, notes progress on the job
and resets the lock timeout.
- ``release`` Release a job back to the pool. The attempts counter is not modified.
As a convience the job supports the context manager protocol.
>> with job as data:
... print data
{"foobar: 0}
If the context closure is exited without the job is marked complete,
if there's an exception the error is stored on the job.
Inspired By
===========
[0] https://github.com/skiz/mongo_queue/blob/master/lib/mongo_queue.rb
[1] http://blog.boxedice.com/2011/09/28/replacing-rabbitmq-with-mongodb/
[2] http://blog.boxedice.com/2011/04/13/queueing-mongodb-using-mongodb/
[3] https://github.com/lunaru/mongoqueue
[4] http://www.captaincodeman.com/2011/05/28/simple-service-bus-message-queue-mongodb/
Running
Fully Configurable
Distributed
Atomic Locking
Thread Safe
Priority Support
Worker Timeout Support
Fully Tested
Production Ready
Simple API
--------------------------
Per Queue error queues
Max attempts per item
Identity in queue.
---------------------------
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
mongoqueue-0.5.0.tar.gz
(4.7 kB
view hashes)