Skip to main content

A thread-safe disk based persistent queue in Python.

Project description

https://img.shields.io/circleci/project/github/peter-wangxu/persist-queue/master.svg?label=Linux%20%26%20Mac https://img.shields.io/appveyor/ci/peter-wangxu/persist-queue/master.svg?label=Windows https://img.shields.io/codecov/c/github/peter-wangxu/persist-queue/master.svg https://img.shields.io/pypi/v/persist-queue.svg

This project is based on the achievements of python-pqueue and queuelib

The goals is to achieve following requirements:

  • Disk-based: each queued item should be stored in disk in case of any crash.

  • Thread-safe: can be used by multi-threaded producers and multi-threaded consumers.

  • Recoverable: Items can be read after process restart.

  • Green-compatible: can be used in greenlet or eventlet environment.

While queuelib and python-pqueue cannot fulfil all of above. After some try, I found it’s hard to achieve based on their current implementation without huge code change. this is the motivation to start this project.

persist-queue use pickle object serialization module to support object instances. To support customized objects, please refer to Pickling and unpickling extension types(Python2) and Pickling Class Instances(Python3)

Requirements

  • Python 2.7 or Python 3.x.

  • Full support for Linux.

  • Windows support (with Caution if persistqueue.Queue is used).

Installation

from pypi

pip install persist-queue

from source code

git clone https://github.com/peter-wangxu/persist-queue
cd persist-queue
python setup.py install

Examples

Example usage with a file based queue

>>> from persistqueue import Queue
>>> q = Queue("mypath")
>>> q.put('a')
>>> q.put('b')
>>> q.put('c')
>>> q.get()
'a'
>>> q.task_done()

Close the python console, and then we restart the queue from the same path,

>>> from persistqueue import Queue
>>> q = Queue('mypath')
>>> q.get()
'b'
>>> q.task_done()

Example usage with a SQLite3 based queue

>>> import persistqueue
>>> q = persistqueue.SQLiteQueue('mypath')
>>> q.put('str1')
>>> q.put('str2')
>>> q.put('str3')
>>> q.get()
'str1'
>>> del q

Also close the console, and then recreate the queue:

>>> import persistqueue
>>> q = persistqueue.SQLiteQueue('mypath')
>>> q.get()
'str2'
>>>

Example usage with multi-thread

from persistqueue import Queue

q = Queue()

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done

Example usage for SQLite3 based queue

from persistqueue import FIFOSQLiteQueue

q = FIFOSQLiteQueue(path="./test", multithreading=True)

def worker():
    while True:
        item = q.get()
        do_work(item)

for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

Tests

persist-queue use tox to trigger tests.

to trigger tests based on python2.7/python3.x, use:

tox -e py27
tox -e py34
tox -e py35
tox -e py36

to trigger pep8 check, use:

tox -e pep8

pyenv is usually a helpful tool to manage multiple versions of Python.

Caution

Currently, the atomic operation is not supported on Windows due to the limitation of Python’s os.rename, That’s saying, the data in persistqueue.Queue could be in unreadable state when an incidential failure occurs during Queue.task_done.

DO NOT PUT ANY CRITICAL DATA ON persistqueue.QUEUE WHEN RUNNING ON WINDOWS.

Contribution

Simply fork this repo and send PR for your code change(also tests to cover your change), remember to give a title and description of your PR. I am willing to enhance this project with you :).

License

BSD

FAQ

  • sqlite3.OperationalError: database is locked is raised.

persistquest open 2 connections for the db if multithreading=True, the SQLite database is locked until that transaction is committed. The timeout parameter specifies how long the connection should wait for the lock to go away until raising an exception. Default time is 10, increase timeout when creating the queue if above error occurs.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

persist-queue-0.2.1.tar.gz (9.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

persist_queue-0.2.1-py2.py3-none-any.whl (13.4 kB view details)

Uploaded Python 2Python 3

File details

Details for the file persist-queue-0.2.1.tar.gz.

File metadata

  • Download URL: persist-queue-0.2.1.tar.gz
  • Upload date:
  • Size: 9.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No

File hashes

Hashes for persist-queue-0.2.1.tar.gz
Algorithm Hash digest
SHA256 f8f4a61303e482ec0b8a7b5d2b081e4dce2e8dd14b8a46b536dafb69fbede5fe
MD5 57d1f34a7338af4e724adc7196e20b4b
BLAKE2b-256 fd3b2b96ab0d5e0a47885b7097e36441b94a9f7f93f68a942bcd5ddcbb211a8d

See more details on using hashes here.

File details

Details for the file persist_queue-0.2.1-py2.py3-none-any.whl.

File metadata

File hashes

Hashes for persist_queue-0.2.1-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 c560cbb3b455dbf4607154c05492733fc98b1d736652dd35b29e9a0392b1b8e5
MD5 0fe7200553a4c21785e662baf4cfa359
BLAKE2b-256 7c2e3c836980bf2c3a71f22fa77feae66cb95a08684c67e950dc456e0d129492

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page