A thread-safe disk based persistent queue in Python.
The goals is to achieve following requirements:
- Disk-based: each queued item should be stored in disk in case of any crash.
- Thread-safe: can be used by multi-threaded producers and multi-threaded consumers.
- Recoverable: Items can be read after process restart.
- Green-compatible: can be used in greenlet or eventlet environment.
While queuelib and python-pqueue cannot fulfil all of above. After some try, I found it’s hard to achieve based on their current implementation without huge code change. this is the motivation to start this project.
persist-queue use pickle object serialization module to support object instances. To support customized objects, please refer to Pickling and unpickling extension types(Python2) and Pickling Class Instances(Python3)
- Python 2.7 or Python 3.x.
- Full support for Linux.
- Windows support (with Caution if persistqueue.Queue is used).
pip install persist-queue
from source code
git clone https://github.com/peter-wangxu/persist-queue cd persist-queue python setup.py install
Example usage with a file based queue
>>> from persistqueue import Queue >>> q = Queue("mypath") >>> q.put('a') >>> q.put('b') >>> q.put('c') >>> q.get() 'a' >>> q.task_done()
Close the python console, and then we restart the queue from the same path,
>>> from persistqueue import Queue >>> q = Queue('mypath') >>> q.get() 'b' >>> q.task_done()
Example usage with a SQLite3 based queue
>>> import persistqueue >>> q = persistqueue.SQLiteQueue('mypath') >>> q.put('str1') >>> q.put('str2') >>> q.put('str3') >>> q.get() 'str1' >>> del q
Also close the console, and then recreate the queue:
>>> import persistqueue >>> q = persistqueue.SQLiteQueue('mypath') >>> q.get() 'str2' >>>
Example usage with multi-thread
from persistqueue import Queue q = Queue() def worker(): while True: item = q.get() do_work(item) q.task_done() for i in range(num_worker_threads): t = Thread(target=worker) t.daemon = True t.start() for item in source(): q.put(item) q.join() # block until all tasks are done
Example usage for SQLite3 based queue
from persistqueue import FIFOSQLiteQueue q = FIFOSQLiteQueue(path="./test", multithreading=True) def worker(): while True: item = q.get() do_work(item) for i in range(num_worker_threads): t = Thread(target=worker) t.daemon = True t.start() for item in source(): q.put(item)
persist-queue use tox to trigger tests.
to trigger tests based on python2.7/python3.x, use:
tox -e py27
tox -e py34
tox -e py35
tox -e py36
to trigger pep8 check, use:
tox -e pep8
pyenv is usually a helpful tool to manage multiple versions of Python.
Currently, the atomic operation is not supported on Windows due to the limitation of Python’s os.rename, That’s saying, the data in persistqueue.Queue could be in unreadable state when an incidential failure occurs during Queue.task_done.
DO NOT PUT ANY CRITICAL DATA ON persistqueue.QUEUE WHEN RUNNING ON WINDOWS.
Simply fork this repo and send PR for your code change(also tests to cover your change), remember to give a title and description of your PR. I am willing to enhance this project with you :).
- sqlite3.OperationalError: database is locked is raised.
persistquest open 2 connections for the db if multithreading=True, the SQLite database is locked until that transaction is committed. The timeout parameter specifies how long the connection should wait for the lock to go away until raising an exception. Default time is 10, increase timeout when creating the queue if above error occurs.
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
|File Name & Checksum SHA256 Checksum Help||Version||File Type||Upload Date|
|persist_queue-0.2.3-py2.py3-none-any.whl (15.6 kB) Copy SHA256 Checksum SHA256||py2.py3||Wheel||Jun 5, 2017|
|persist-queue-0.2.3.tar.gz (12.8 kB) Copy SHA256 Checksum SHA256||–||Source||Jun 5, 2017|