Skip to main content

A thread-safe disk based persistent queue in Python.

Project description

https://img.shields.io/circleci/project/github/peter-wangxu/persist-queue/master.svg?label=Linux%20%26%20Mac https://img.shields.io/appveyor/ci/peter-wangxu/persist-queue/master.svg?label=Windows https://img.shields.io/codecov/c/github/peter-wangxu/persist-queue/master.svg https://img.shields.io/pypi/v/persist-queue.svg PyPI - Python Version

Overview

persist-queue implements file-based and SQLite3-based persistent queues for Python. It provides thread-safe, disk-based queue implementations that survive process crashes and restarts.

By default, persist-queue use pickle object serialization module to support object instances. Most built-in type, like int, dict, list are able to be persisted by persist-queue directly, to support customized objects, please refer to Pickling and unpickling extension types(Python2) and Pickling Class Instances(Python3)

This project is based on the achievements of python-pqueue and queuelib

Key Features

  • Disk-based: Each queued item is stored on disk to survive crashes

  • Thread-safe: Supports multi-threaded producers and consumers

  • Recoverable: Items can be read after process restart

  • Green-compatible: Works with greenlet or eventlet environments

  • Multiple serialization: Supports pickle (default), msgpack, cbor, and json

  • Async support: Provides async versions of all queue types (v1.1.0+)

Supported Queue Types

File-based Queues:

  • Queue - Basic file-based FIFO queue

  • AsyncQueue - Async file-based queue (v1.1.0+)

SQLite-based Queues:

  • SQLiteQueue / FIFOSQLiteQueue - FIFO SQLite queue

  • FILOSQLiteQueue - FILO SQLite queue

  • UniqueQ - Unique items only queue

  • PriorityQueue - Priority-based queue

  • SQLiteAckQueue - Acknowledgment-based queue

  • AsyncSQLiteQueue - Async SQLite queue (v1.1.0+)

Other:

  • PDict - Persistent dictionary

  • MySQLQueue - MySQL-based queue (requires extra dependencies)

Installation

Basic Installation

pip install persist-queue

With Extra Features

# For msgpack, cbor, and MySQL support
pip install "persist-queue[extra]"

# For async support (requires Python 3.7+)
pip install "persist-queue[async]"

# For all features
pip install "persist-queue[extra,async]"

From Source

git clone https://github.com/peter-wangxu/persist-queue
cd persist-queue
python setup.py install

Requirements

  • Python 3.5 or newer (Python 2 support dropped in v1.0.0)

  • Full support for Linux, macOS, and Windows

  • For async features: Python 3.7+ with aiofiles and aiosqlite

  • For MySQL queues: DBUtils and PyMySQL

Quick Start

Basic File Queue

from persistqueue import Queue

# Create a queue
q = Queue("my_queue_path")

# Add items
q.put("item1")
q.put("item2")

# Get items
item = q.get()
print(item)  # "item1"

# Mark as done
q.task_done()

SQLite Queue

import persistqueue

# Create SQLite queue
q = persistqueue.SQLiteQueue('my_queue.db', auto_commit=True)

# Add items
q.put('data1')
q.put('data2')

# Get items
item = q.get()
print(item)  # "data1"

MySQL Queue

import persistqueue

# Create MySQL queue
q = persistqueue.MySQLQueue(
    host='localhost',
    port=3306,
    user='username',
    password='password',
    database='testdb',
    table_name='my_queue'
)

# Add items
q.put('data1')
q.put('data2')

# Get items
item = q.get()
print(item)  # "data1"

# Mark as done
q.task_done()

Async Queue (v1.1.0+)

import asyncio
from persistqueue import AsyncQueue

async def main():
    async with AsyncQueue("/path/to/queue") as queue:
        await queue.put("async item")
        item = await queue.get()
        await queue.task_done()

asyncio.run(main())

Examples

File-based Queue

>>> from persistqueue import Queue
>>> q = Queue("mypath")
>>> q.put('a')
>>> q.put('b')
>>> q.put('c')
>>> q.get()
'a'
>>> q.task_done()

SQLite3-based Queue

>>> import persistqueue
>>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True)
>>> q.put('str1')
>>> q.put('str2')
>>> q.put('str3')
>>> q.get()
'str1'
>>> del q

Priority Queue

>>> import persistqueue
>>> q = persistqueue.PriorityQueue('mypath')
>>> q.put('low', priority=10)
>>> q.put('high', priority=1)
>>> q.put('mid', priority=5)
>>> q.get()
'high'
>>> q.get()
'mid'
>>> q.get()
'low'

Unique Queue

>>> import persistqueue
>>> q = persistqueue.UniqueQ('mypath')
>>> q.put('str1')
>>> q.put('str1')  # Duplicate ignored
>>> q.size
1
>>> q.put('str2')
>>> q.size
2

Acknowledgment Queue

>>> import persistqueue
>>> ackq = persistqueue.SQLiteAckQueue('path')
>>> ackq.put('str1')
>>> item = ackq.get()
>>> # Process the item
>>> ackq.ack(item)  # Mark as completed
>>> # Or if processing failed:
>>> ackq.nack(item)  # Mark for retry
>>> ackq.ack_failed(item)  # Mark as failed

MySQL Queue

>>> import persistqueue
>>> q = persistqueue.MySQLQueue(
...     host='localhost',
...     port=3306,
...     user='testuser',
...     password='testpass',
...     database='testdb',
...     table_name='test_queue'
... )
>>> q.put('item1')
>>> q.put('item2')
>>> q.put('item3')
>>> q.get()
'item1'
>>> q.task_done()
>>> q.get()
'item2'
>>> q.task_done()
>>> q.size
1

Async Queue (v1.1.0+)

import asyncio
from persistqueue import AsyncQueue, AsyncSQLiteQueue

async def example():
    # File-based async queue
    async with AsyncQueue("/path/to/queue") as queue:
        await queue.put("data item")
        item = await queue.get()
        await queue.task_done()

    # SQLite-based async queue
    async with AsyncSQLiteQueue("/path/to/queue.db") as queue:
        item_id = await queue.put({"key": "value"})
        item = await queue.get()
        await queue.update({"key": "new_value"}, item_id)
        await queue.task_done()

asyncio.run(example())

Persistent Dictionary

>>> from persistqueue import PDict
>>> q = PDict("testpath", "testname")
>>> q['key1'] = 123
>>> q['key2'] = 321
>>> q['key1']
123
>>> len(q)
2
>>> del q['key1']
>>> q['key1']
KeyError: 'Key: key1 not exists.'

Multi-threading Usage

SQLite3-based Queue

from persistqueue import FIFOSQLiteQueue
from threading import Thread

q = FIFOSQLiteQueue(path="./test", multithreading=True)

def worker():
    while True:
        item = q.get()
        do_work(item)

for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()  # Block until all tasks are done

File-based Queue

from persistqueue import Queue
from threading import Thread

q = Queue()

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()  # Block until all tasks are done

MySQL Queue

from persistqueue import MySQLQueue
from threading import Thread

q = MySQLQueue(
    host='localhost',
    port=3306,
    user='username',
    password='password',
    database='testdb',
    table_name='my_queue'
)

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()  # Block until all tasks are done

Serialization Options

persist-queue supports multiple serialization protocols:

>>> from persistqueue import Queue
>>> from persistqueue import serializers

# Pickle (default)
>>> q = Queue('mypath', serializer=serializers.pickle)

# MessagePack
>>> q = Queue('mypath', serializer=serializers.msgpack)

# CBOR
>>> q = Queue('mypath', serializer=serializers.cbor2)

# JSON
>>> q = Queue('mypath', serializer=serializers.json)

Performance

Benchmark Results (1000 items)

Windows (Windows 10, SATA3 SSD, 16GB RAM)

Write

Write/Read(1 task_done)

Write/Read(many task_done)

SQLite3 Queue

1.8880

2.0290

3.5940

File Queue

4.9520

5.0560

8.4900

Benchmarking

You can easily benchmark the performance of all queue types (including async) using the built-in tool:

Run with tox:

tox -e bench -- rst

Or run directly:

python benchmark/run_benchmark.py 1000 rst
  • The first argument is the number of items to test (default: 1000)

  • The second argument is the output format: rst (for reStructuredText table), console, or json

Example output (rst):

+--------------------+--------------------+--------------------+--------------------+
| Queue Type         | Write              | Write/Read(1 task_done) | Write/Read(many task_done) |
+--------------------+--------------------+--------------------+--------------------+
| File Queue         | 0.0481             | 0.0299             | 0.0833             |
| AsyncSQLiteQueue   | 0.2664             | 0.5353             | 0.5508             |
| AsyncFileQueue     | 0.1333             | 0.1500             | 0.2337             |
+--------------------+--------------------+--------------------+--------------------+

This makes it easy to compare the performance of sync and async queues on your platform.

Performance Tips

  • WAL Mode: SQLite3 queues use WAL mode by default for better performance

  • auto_commit=False: Use for batch operations, call task_done() to persist

  • Protocol Selection: Automatically selects optimal pickle protocol

  • Windows: File queue performance improved 3-4x since v0.4.1

  • MySQL Connection Pooling: MySQL queues use connection pooling for better performance

Testing

Run tests using tox:

# Run tests for specific Python version
tox -e py312

# Run code style checks
tox -e pep8

# Generate coverage report
tox -e cover

Development

Install development dependencies:

pip install -r test-requirements.txt
pip install -r extra-requirements.txt

Run benchmarks:

python benchmark/run_benchmark.py 1000

Release Notes

For detailed information about recent changes and updates, see:

Known Issues

  • Windows File Queue: Atomic operations are experimental. Critical data may become unreadable during task_done() failures

  • MySQL Tests: Require local MySQL service, otherwise skipped automatically

  • Async Features: Require Python 3.7+ and asyncio support

Troubleshooting

Database Locked Error

If you get sqlite3.OperationalError: database is locked:

  • Increase the timeout parameter when creating the queue

  • Ensure you’re using multithreading=True for multi-threaded access

MySQL Connection Issues

If you get MySQL connection errors:

  • Verify MySQL server is running and accessible

  • Check connection parameters (host, port, user, password)

  • Ensure the database exists and user has proper permissions

  • For connection pool issues, try increasing max_connections parameter

Thread Safety Issues

  • Make sure to set multithreading=True when initializing SQLite queues

  • SQLite3 queues are thoroughly tested in multi-threading environments

  • MySQL queues are thread-safe by default

Import Errors

  • For async features: Install with pip install "persist-queue[async]"

  • For MySQL support: Install with pip install "persist-queue[extra]"

Community

Contributing

  1. Fork the repository

  2. Create a feature branch

  3. Make your changes

  4. Add tests to cover your changes

  5. Submit a pull request with a clear title and description

License

BSD License

Contributors

View Contributors

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

persist_queue-1.1.0b0.tar.gz (39.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

persist_queue-1.1.0b0-py3-none-any.whl (53.1 kB view details)

Uploaded Python 3

File details

Details for the file persist_queue-1.1.0b0.tar.gz.

File metadata

  • Download URL: persist_queue-1.1.0b0.tar.gz
  • Upload date:
  • Size: 39.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.8

File hashes

Hashes for persist_queue-1.1.0b0.tar.gz
Algorithm Hash digest
SHA256 a36c885aca299dd744e8d75eb9035cde06122b1893f85b14eb8148cecdc18aba
MD5 955fe115e3b4e096ac3694128969930c
BLAKE2b-256 c13bda0ebff26deee6f4e466fcbc460b179db64275006960f018b8979a75db33

See more details on using hashes here.

File details

Details for the file persist_queue-1.1.0b0-py3-none-any.whl.

File metadata

File hashes

Hashes for persist_queue-1.1.0b0-py3-none-any.whl
Algorithm Hash digest
SHA256 ed355ac7de9e94bab7e5d9c7695e59457a6b2d3369429e9007aaf218200c50dd
MD5 f176795f4245969822d7778a33d8a8b5
BLAKE2b-256 e94054e7de920df964125c93cbf4b46e3b6899692b2a4a8fbfc7a2abbf025e64

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page