Skip to main content

A thread-safe disk based persistent queue in Python.

Project description

https://img.shields.io/circleci/project/github/peter-wangxu/persist-queue/master.svg?label=Linux%20%26%20Mac https://img.shields.io/appveyor/ci/peter-wangxu/persist-queue/master.svg?label=Windows https://img.shields.io/codecov/c/github/peter-wangxu/persist-queue/master.svg https://img.shields.io/pypi/v/persist-queue.svg PyPI - Python Version

Overview

persist-queue implements file-based and SQLite3-based persistent queues for Python. It provides thread-safe, disk-based queue implementations that survive process crashes and restarts.

By default, persist-queue use pickle object serialization module to support object instances. Most built-in type, like int, dict, list are able to be persisted by persist-queue directly, to support customized objects, please refer to Pickling and unpickling extension types(Python2) and Pickling Class Instances(Python3)

This project is based on the achievements of python-pqueue and queuelib

Key Features

  • Disk-based: Each queued item is stored on disk to survive crashes

  • Thread-safe: Supports multi-threaded producers and consumers

  • Recoverable: Items can be read after process restart

  • Green-compatible: Works with greenlet or eventlet environments

  • Multiple serialization: Supports pickle (default), msgpack, cbor, and json

  • Async support: Provides async versions of all queue types (v1.1.0+)

Supported Queue Types

File-based Queues:

  • Queue - Basic file-based FIFO queue

  • AsyncQueue - Async file-based queue (v1.1.0+)

SQLite-based Queues:

  • SQLiteQueue / FIFOSQLiteQueue - FIFO SQLite queue

  • FILOSQLiteQueue - FILO SQLite queue

  • UniqueQ - Unique items only queue

  • PriorityQueue - Priority-based queue

  • SQLiteAckQueue - Acknowledgment-based queue

  • AsyncSQLiteQueue - Async SQLite queue (v1.1.0+)

Other:

  • PDict - Persistent dictionary

  • MySQLQueue - MySQL-based queue (requires extra dependencies)

Installation

Basic Installation

pip install persist-queue

With Extra Features

# For msgpack, cbor, and MySQL support
pip install "persist-queue[extra]"

# For async support (requires Python 3.7+)
pip install "persist-queue[async]"

# For all features
pip install "persist-queue[extra,async]"

From Source

git clone https://github.com/peter-wangxu/persist-queue
cd persist-queue
python setup.py install

Requirements

  • Python 3.5 or newer (Python 2 support dropped in v1.0.0)

  • Full support for Linux, macOS, and Windows

  • For async features: Python 3.7+ with aiofiles and aiosqlite

  • For MySQL queues: DBUtils and PyMySQL

Quick Start

Basic File Queue

from persistqueue import Queue

# Create a queue
q = Queue("my_queue_path")

# Add items
q.put("item1")
q.put("item2")

# Get items
item = q.get()
print(item)  # "item1"

# Mark as done
q.task_done()

SQLite Queue

import persistqueue

# Create SQLite queue
q = persistqueue.SQLiteQueue('my_queue.db', auto_commit=True)

# Add items
q.put('data1')
q.put('data2')

# Get items
item = q.get()
print(item)  # "data1"

MySQL Queue

import persistqueue

# Create MySQL queue
q = persistqueue.MySQLQueue(
    host='localhost',
    port=3306,
    user='username',
    password='password',
    database='testdb',
    table_name='my_queue'
)

# Add items
q.put('data1')
q.put('data2')

# Get items
item = q.get()
print(item)  # "data1"

# Mark as done
q.task_done()

Async Queue (v1.1.0+)

import asyncio
from persistqueue import AsyncQueue

async def main():
    async with AsyncQueue("/path/to/queue") as queue:
        await queue.put("async item")
        item = await queue.get()
        await queue.task_done()

asyncio.run(main())

Examples

File-based Queue

>>> from persistqueue import Queue
>>> q = Queue("mypath")
>>> q.put('a')
>>> q.put('b')
>>> q.put('c')
>>> q.get()
'a'
>>> q.task_done()

SQLite3-based Queue

>>> import persistqueue
>>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True)
>>> q.put('str1')
>>> q.put('str2')
>>> q.put('str3')
>>> q.get()
'str1'
>>> del q

Priority Queue

>>> import persistqueue
>>> q = persistqueue.PriorityQueue('mypath')
>>> q.put('low', priority=10)
>>> q.put('high', priority=1)
>>> q.put('mid', priority=5)
>>> q.get()
'high'
>>> q.get()
'mid'
>>> q.get()
'low'

Unique Queue

>>> import persistqueue
>>> q = persistqueue.UniqueQ('mypath')
>>> q.put('str1')
>>> q.put('str1')  # Duplicate ignored
>>> q.size
1
>>> q.put('str2')
>>> q.size
2

Acknowledgment Queue

>>> import persistqueue
>>> ackq = persistqueue.SQLiteAckQueue('path')
>>> ackq.put('str1')
>>> item = ackq.get()
>>> # Process the item
>>> ackq.ack(item)  # Mark as completed
>>> # Or if processing failed:
>>> ackq.nack(item)  # Mark for retry
>>> ackq.ack_failed(item)  # Mark as failed

MySQL Queue

>>> import persistqueue
>>> q = persistqueue.MySQLQueue(
...     host='localhost',
...     port=3306,
...     user='testuser',
...     password='testpass',
...     database='testdb',
...     table_name='test_queue'
... )
>>> q.put('item1')
>>> q.put('item2')
>>> q.put('item3')
>>> q.get()
'item1'
>>> q.task_done()
>>> q.get()
'item2'
>>> q.task_done()
>>> q.size
1

Async Queue (v1.1.0+)

import asyncio
from persistqueue import AsyncQueue, AsyncSQLiteQueue

async def example():
    # File-based async queue
    async with AsyncQueue("/path/to/queue") as queue:
        await queue.put("data item")
        item = await queue.get()
        await queue.task_done()

    # SQLite-based async queue
    async with AsyncSQLiteQueue("/path/to/queue.db") as queue:
        item_id = await queue.put({"key": "value"})
        item = await queue.get()
        await queue.update({"key": "new_value"}, item_id)
        await queue.task_done()

asyncio.run(example())

Persistent Dictionary

>>> from persistqueue import PDict
>>> q = PDict("testpath", "testname")
>>> q['key1'] = 123
>>> q['key2'] = 321
>>> q['key1']
123
>>> len(q)
2
>>> del q['key1']
>>> q['key1']
KeyError: 'Key: key1 not exists.'

Multi-threading Usage

SQLite3-based Queue

from persistqueue import FIFOSQLiteQueue
from threading import Thread

q = FIFOSQLiteQueue(path="./test", multithreading=True)

def worker():
    while True:
        item = q.get()
        do_work(item)

for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()  # Block until all tasks are done

File-based Queue

from persistqueue import Queue
from threading import Thread

q = Queue()

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()  # Block until all tasks are done

MySQL Queue

from persistqueue import MySQLQueue
from threading import Thread

q = MySQLQueue(
    host='localhost',
    port=3306,
    user='username',
    password='password',
    database='testdb',
    table_name='my_queue'
)

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()  # Block until all tasks are done

Serialization Options

persist-queue supports multiple serialization protocols:

>>> from persistqueue import Queue
>>> from persistqueue import serializers

# Pickle (default)
>>> q = Queue('mypath', serializer=serializers.pickle)

# MessagePack
>>> q = Queue('mypath', serializer=serializers.msgpack)

# CBOR
>>> q = Queue('mypath', serializer=serializers.cbor2)

# JSON
>>> q = Queue('mypath', serializer=serializers.json)

Performance

Benchmark Results (1000 items)

Windows (Windows 10, SATA3 SSD, 16GB RAM)

Write

Write/Read(1 task_done)

Write/Read(many task_done)

SQLite3 Queue

1.8880

2.0290

3.5940

File Queue

4.9520

5.0560

8.4900

Benchmarking

You can easily benchmark the performance of all queue types (including async) using the built-in tool:

Run with tox:

tox -e bench -- rst

Or run directly:

python benchmark/run_benchmark.py 1000 rst
  • The first argument is the number of items to test (default: 1000)

  • The second argument is the output format: rst (for reStructuredText table), console, or json

Example output (rst):

+--------------------+--------------------+--------------------+--------------------+
| Queue Type         | Write              | Write/Read(1 task_done) | Write/Read(many task_done) |
+--------------------+--------------------+--------------------+--------------------+
| File Queue         | 0.0481             | 0.0299             | 0.0833             |
| AsyncSQLiteQueue   | 0.2664             | 0.5353             | 0.5508             |
| AsyncFileQueue     | 0.1333             | 0.1500             | 0.2337             |
+--------------------+--------------------+--------------------+--------------------+

This makes it easy to compare the performance of sync and async queues on your platform.

Performance Tips

  • WAL Mode: SQLite3 queues use WAL mode by default for better performance

  • auto_commit=False: Use for batch operations, call task_done() to persist

  • Protocol Selection: Automatically selects optimal pickle protocol

  • Windows: File queue performance improved 3-4x since v0.4.1

  • MySQL Connection Pooling: MySQL queues use connection pooling for better performance

Testing

Run tests using tox:

# Run tests for specific Python version
tox -e py312

# Run code style checks
tox -e pep8

# Generate coverage report
tox -e cover

Development

Install development dependencies:

pip install -r test-requirements.txt
pip install -r extra-requirements.txt

Run benchmarks:

python benchmark/run_benchmark.py 1000

Release Notes

For detailed information about recent changes and updates, see:

Known Issues

  • Windows File Queue: Atomic operations are experimental. Critical data may become unreadable during task_done() failures

  • MySQL Tests: Require local MySQL service, otherwise skipped automatically

  • Async Features: Require Python 3.7+ and asyncio support

Troubleshooting

Database Locked Error

If you get sqlite3.OperationalError: database is locked:

  • Increase the timeout parameter when creating the queue

  • Ensure you’re using multithreading=True for multi-threaded access

MySQL Connection Issues

If you get MySQL connection errors:

  • Verify MySQL server is running and accessible

  • Check connection parameters (host, port, user, password)

  • Ensure the database exists and user has proper permissions

  • For connection pool issues, try increasing max_connections parameter

Thread Safety Issues

  • Make sure to set multithreading=True when initializing SQLite queues

  • SQLite3 queues are thoroughly tested in multi-threading environments

  • MySQL queues are thread-safe by default

Import Errors

  • For async features: Install with pip install "persist-queue[async]"

  • For MySQL support: Install with pip install "persist-queue[extra]"

Community

Contributing

  1. Fork the repository

  2. Create a feature branch

  3. Make your changes

  4. Add tests to cover your changes

  5. Submit a pull request with a clear title and description

License

BSD License

Contributors

View Contributors

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

persist_queue-1.1.0.tar.gz (39.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

persist_queue-1.1.0-py3-none-any.whl (53.1 kB view details)

Uploaded Python 3

File details

Details for the file persist_queue-1.1.0.tar.gz.

File metadata

  • Download URL: persist_queue-1.1.0.tar.gz
  • Upload date:
  • Size: 39.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.8

File hashes

Hashes for persist_queue-1.1.0.tar.gz
Algorithm Hash digest
SHA256 4c042a3d39c9af4fa24497da76697fb4fff7abe6af04bcf55f099e0c23bc2c67
MD5 14737ec408a18ad6db4357758a77a977
BLAKE2b-256 2f9e865b2a173a272d2f52b0f50534e13c38c159877251a987e0863fc7376acd

See more details on using hashes here.

File details

Details for the file persist_queue-1.1.0-py3-none-any.whl.

File metadata

  • Download URL: persist_queue-1.1.0-py3-none-any.whl
  • Upload date:
  • Size: 53.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.8

File hashes

Hashes for persist_queue-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 be431ad90cd57621be5563dcd2424e21d81db3bb8cdfb918ade722369d640763
MD5 ef7890f83e463dee60f02930a44c9fcd
BLAKE2b-256 07887248345679ad5001cdc0211eb83303c62659f86ae7024170c0bc0afd4e4e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page