Skip to main content

A thread-safe disk based persistent queue in Python.

Project description

https://img.shields.io/circleci/project/github/peter-wangxu/persist-queue/master.svg?label=Linux%20%26%20Mac https://img.shields.io/appveyor/ci/peter-wangxu/persist-queue/master.svg?label=Windows https://img.shields.io/codecov/c/github/peter-wangxu/persist-queue/master.svg https://img.shields.io/pypi/v/persist-queue.svg

persist-queue implements a file-based queue and a serial of sqlite3-based queues. The goals is to achieve following requirements:

  • Disk-based: each queued item should be stored in disk in case of any crash.

  • Thread-safe: can be used by multi-threaded producers and multi-threaded consumers.

  • Recoverable: Items can be read after process restart.

  • Green-compatible: can be used in greenlet or eventlet environment.

While queuelib and python-pqueue cannot fulfil all of above. After some try, I found it’s hard to achieve based on their current implementation without huge code change. this is the motivation to start this project.

persist-queue use pickle object serialization module to support object instances. Most built-in type, like int, dict, list are able to be persisted by persist-queue directly, to support customized objects, please refer to Pickling and unpickling extension types(Python2) and Pickling Class Instances(Python3)

This project is based on the achievements of python-pqueue and queuelib

Requirements

  • Python 2.7 or Python 3.x.

  • Full support for Linux.

  • Windows support (with Caution if persistqueue.Queue is used).

Installation

from pypi

pip install persist-queue

from source code

git clone https://github.com/peter-wangxu/persist-queue
cd persist-queue
python setup.py install

Benchmark

Here are the results for writing/reading 1000 items to the disk comparing the sqlite3 and file queue.

  • Windows
    • OS: Windows 10

    • Disk: SATA3 SSD

    • RAM: 16 GiB

Write

Write/Read(1 task_done)

Write/Read(many task_done)

SQLite3 Queue

1.8880

2.0290

3.5940

File Queue

15.0550

15.9150

30.7650

  • Linux
    • OS: Ubuntu 16.04 (VM)

    • Disk: SATA3 SSD

    • RAM: 4 GiB

Write

Write/Read(1 task_done)

Write/Read(many task_done)

SQLite3 Queue

1.8282

1.8075

2.8639

File Queue

0.9123

1.0411

2.5104

note Above result was got from:

python benchmark/run_benchmark.py 1000

To see the real performance on your host, run the script under benchmark/run_benchmark.py:

python benchmark/run_benchmark.py <COUNT, default to 100>

Examples

Example usage with a SQLite3 based queue

>>> import persistqueue
>>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True)
>>> q.put('str1')
>>> q.put('str2')
>>> q.put('str3')
>>> q.get()
'str1'
>>> del q

Close the console, and then recreate the queue:

>>> import persistqueue
>>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True)
>>> q.get()
'str2'
>>>

Example usage of SQLite3 based UniqueQ

This queue does not allow duplicate items.

>>> import persistqueue
>>> q = persistqueue.UniqueQ('mypath')
>>> q.put('str1')
>>> q.put('str1')
>>> q.size
1
>>> q.put('str2')
>>> q.size
2
>>>

Example usage of SQLite3 based SQLiteAckQueue

The core functions: get: get from queue and mark item as unack ack: mark item as acked nack: there might be something wrong with current consumer, so mark item as ready and new consumer will get it ack_failed: there might be something wrong during process, so just mark item as failed.

>>> import persisitqueue
>>> ackq = persistqueue.SQLiteAckQueue('path')
>>> ackq.put('str1')
>>> item = ackq.get()
>>> # Do something with the item
>>> ackq.ack(item) # If done with the item
>>> ackq.nack(item) # Else mark item as `nack` so that it can be proceeded again by any worker
>>> ackq.ack_failed() # Or else mark item as `ack_failed` to discard this item

Note: this queue does not support auto_commit=True

Example usage with a file based queue

>>> from persistqueue import Queue
>>> q = Queue("mypath")
>>> q.put('a')
>>> q.put('b')
>>> q.put('c')
>>> q.get()
'a'
>>> q.task_done()

Close the python console, and then we restart the queue from the same path,

>>> from persistqueue import Queue
>>> q = Queue('mypath')
>>> q.get()
'b'
>>> q.task_done()

Example usage with a SQLite3 based dict

>>> from persisitqueue import PDict
>>> q = PDict("testpath", "testname")
>>> q['key1'] = 123
>>> q['key2'] = 321
>>> q['key1']
123
>>> len(q)
2
>>> del q['key1']
>>> q['key1']
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "persistqueue\pdict.py", line 58, in __getitem__
    raise KeyError('Key: {} not exists.'.format(item))
KeyError: 'Key: key1 not exists.'

Close the console and restart the PDict

>>> from persisitqueue import PDict
>>> q = PDict("testpath", "testname")
>>> q['key2']
321

Multi-thread usage for SQLite3 based queue

from persistqueue import FIFOSQLiteQueue

q = FIFOSQLiteQueue(path="./test", multithreading=True)

def worker():
    while True:
        item = q.get()
        do_work(item)

for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

multi-thread usage for Queue

from persistqueue import Queue

q = Queue()

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done

Tips

task_done is required both for filed based queue and SQLite3 based queue (when auto_commit=False) to persist the cursor of next get to the disk.

Performance impact

  • WAL

    Starting on v0.3.2, the persistqueue is leveraging the sqlite3 builtin feature WAL <https://www.sqlite.org/wal.html> which can improve the performance significantly, a general testing indicates that persistqueue is 2-4 times faster than previous version.

  • auto_commit=False

    Since persistqueue v0.3.0, a new parameter auto_commit is introduced to tweak the performance for sqlite3 based queues as needed. When specify auto_commit=False, user needs to perform queue.task_done() to persist the changes made to the disk since last task_done invocation.

  • pickle protocol selection

    From v0.3.6, the persistqueue will select Protocol version 2 for python2 and Protocol version 4 for python3 respectively. This selection only happens when the directory is not present when initializing the queue.

Tests

persist-queue use tox to trigger tests.

  • Unit test

tox -e <PYTHON_VERSION>

Available <PYTHON_VERSION>: py27, py34, py35, py36, py37

  • PEP8 check

tox -e pep8

pyenv is usually a helpful tool to manage multiple versions of Python.

Caution

Currently, the atomic operation is not supported on Windows due to the limitation of Python’s os.rename, That’s saying, the data in persistqueue.Queue could be in unreadable state when an incidental failure occurs during Queue.task_done.

DO NOT put any critical data on persistqueue.queue on Windows.

This issue is under track by issue Atomic renames on windows

Contribution

Simply fork this repo and send PR for your code change(also tests to cover your change), remember to give a title and description of your PR. I am willing to enhance this project with you :).

License

BSD

Contributors

Contributors

FAQ

  • sqlite3.OperationalError: database is locked is raised.

persistqueue open 2 connections for the db if multithreading=True, the SQLite database is locked until that transaction is committed. The timeout parameter specifies how long the connection should wait for the lock to go away until raising an exception. Default time is 10, increase timeout when creating the queue if above error occurs.

  • sqlite3 based queues are not thread-safe.

The sqlite3 queues are heavily tested under multi-threading environment, if you find it’s not thread-safe, please make sure you set the multithreading=True when initializing the queue before submitting new issue:).

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

persist-queue-0.4.0.tar.gz (21.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

persist_queue-0.4.0-py2.py3-none-any.whl (22.1 kB view details)

Uploaded Python 2Python 3

File details

Details for the file persist-queue-0.4.0.tar.gz.

File metadata

  • Download URL: persist-queue-0.4.0.tar.gz
  • Upload date:
  • Size: 21.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No

File hashes

Hashes for persist-queue-0.4.0.tar.gz
Algorithm Hash digest
SHA256 0261b2d737a9ced79ca04fa03ca09a79e68d2923613603187a4be835e0602e52
MD5 38335cc0bc5a1adc3b7b823409436333
BLAKE2b-256 e0dc393b7ae8c5307014deb35739fe3f28cbe740ea8993b53414e0efffa9faad

See more details on using hashes here.

File details

Details for the file persist_queue-0.4.0-py2.py3-none-any.whl.

File metadata

File hashes

Hashes for persist_queue-0.4.0-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 2d1a4515015c48a6792498f96aa3e1aae4dbafc52e44bf569068ae85d70db591
MD5 0fc8c2e73ddac03de8f6160989739084
BLAKE2b-256 6562be6d28ff1b1f3c1540e23298908eec64c33c8f2c9df90b7504282ff1ccd5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page