A thread-safe disk based persistent queue in Python.
Project description
Overview
persist-queue implements file-based and SQLite3-based persistent queues for Python. It provides thread-safe, disk-based queue implementations that survive process crashes and restarts.
By default, persist-queue use pickle object serialization module to support object instances. Most built-in type, like int, dict, list are able to be persisted by persist-queue directly, to support customized objects, please refer to Pickling and unpickling extension types(Python2) and Pickling Class Instances(Python3)
This project is based on the achievements of python-pqueue and queuelib
Key Features
Disk-based: Each queued item is stored on disk to survive crashes
Thread-safe: Supports multi-threaded producers and consumers
Recoverable: Items can be read after process restart
Green-compatible: Works with greenlet or eventlet environments
Multiple serialization: Supports pickle (default), msgpack, cbor, and json
Async support: Provides async versions of all queue types (v1.1.0+)
Supported Queue Types
File-based Queues:
Queue - Basic file-based FIFO queue
AsyncQueue - Async file-based queue (v1.1.0+)
SQLite-based Queues:
SQLiteQueue / FIFOSQLiteQueue - FIFO SQLite queue
FILOSQLiteQueue - FILO SQLite queue
UniqueQ - Unique items only queue
PriorityQueue - Priority-based queue
SQLiteAckQueue - Acknowledgment-based queue
AsyncSQLiteQueue - Async SQLite queue (v1.1.0+)
Other:
PDict - Persistent dictionary
MySQLQueue - MySQL-based queue (requires extra dependencies)
Installation
Basic Installation
pip install persist-queue
With Extra Features
# For msgpack, cbor, and MySQL support
pip install "persist-queue[extra]"
# For async support (requires Python 3.7+)
pip install "persist-queue[async]"
# For all features
pip install "persist-queue[extra,async]"
From Source
git clone https://github.com/peter-wangxu/persist-queue
cd persist-queue
python setup.py install
Requirements
Python 3.5 or newer (Python 2 support dropped in v1.0.0)
Full support for Linux, macOS, and Windows
For async features: Python 3.7+ with aiofiles and aiosqlite
For MySQL queues: DBUtils and PyMySQL
Quick Start
Basic File Queue
from persistqueue import Queue
# Create a queue
q = Queue("my_queue_path")
# Add items
q.put("item1")
q.put("item2")
# Get items
item = q.get()
print(item) # "item1"
# Mark as done
q.task_done()
SQLite Queue
import persistqueue
# Create SQLite queue
q = persistqueue.SQLiteQueue('my_queue.db', auto_commit=True)
# Add items
q.put('data1')
q.put('data2')
# Get items
item = q.get()
print(item) # "data1"
MySQL Queue
import persistqueue
# Create MySQL queue
q = persistqueue.MySQLQueue(
host='localhost',
port=3306,
user='username',
password='password',
database='testdb',
table_name='my_queue'
)
# Add items
q.put('data1')
q.put('data2')
# Get items
item = q.get()
print(item) # "data1"
# Mark as done
q.task_done()
Async Queue (v1.1.0+)
import asyncio
from persistqueue import AsyncQueue
async def main():
async with AsyncQueue("/path/to/queue") as queue:
await queue.put("async item")
item = await queue.get()
await queue.task_done()
asyncio.run(main())
Examples
File-based Queue
>>> from persistqueue import Queue
>>> q = Queue("mypath")
>>> q.put('a')
>>> q.put('b')
>>> q.put('c')
>>> q.get()
'a'
>>> q.task_done()
SQLite3-based Queue
>>> import persistqueue
>>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True)
>>> q.put('str1')
>>> q.put('str2')
>>> q.put('str3')
>>> q.get()
'str1'
>>> del q
Priority Queue
>>> import persistqueue
>>> q = persistqueue.PriorityQueue('mypath')
>>> q.put('low', priority=10)
>>> q.put('high', priority=1)
>>> q.put('mid', priority=5)
>>> q.get()
'high'
>>> q.get()
'mid'
>>> q.get()
'low'
Unique Queue
>>> import persistqueue
>>> q = persistqueue.UniqueQ('mypath')
>>> q.put('str1')
>>> q.put('str1') # Duplicate ignored
>>> q.size
1
>>> q.put('str2')
>>> q.size
2
Acknowledgment Queue
>>> import persistqueue
>>> ackq = persistqueue.SQLiteAckQueue('path')
>>> ackq.put('str1')
>>> item = ackq.get()
>>> # Process the item
>>> ackq.ack(item) # Mark as completed
>>> # Or if processing failed:
>>> ackq.nack(item) # Mark for retry
>>> ackq.ack_failed(item) # Mark as failed
MySQL Queue
>>> import persistqueue
>>> q = persistqueue.MySQLQueue(
... host='localhost',
... port=3306,
... user='testuser',
... password='testpass',
... database='testdb',
... table_name='test_queue'
... )
>>> q.put('item1')
>>> q.put('item2')
>>> q.put('item3')
>>> q.get()
'item1'
>>> q.task_done()
>>> q.get()
'item2'
>>> q.task_done()
>>> q.size
1
Async Queue (v1.1.0+)
import asyncio
from persistqueue import AsyncQueue, AsyncSQLiteQueue
async def example():
# File-based async queue
async with AsyncQueue("/path/to/queue") as queue:
await queue.put("data item")
item = await queue.get()
await queue.task_done()
# SQLite-based async queue
async with AsyncSQLiteQueue("/path/to/queue.db") as queue:
item_id = await queue.put({"key": "value"})
item = await queue.get()
await queue.update({"key": "new_value"}, item_id)
await queue.task_done()
asyncio.run(example())
Persistent Dictionary
>>> from persistqueue import PDict
>>> q = PDict("testpath", "testname")
>>> q['key1'] = 123
>>> q['key2'] = 321
>>> q['key1']
123
>>> len(q)
2
>>> del q['key1']
>>> q['key1']
KeyError: 'Key: key1 not exists.'
Multi-threading Usage
SQLite3-based Queue
from persistqueue import FIFOSQLiteQueue
from threading import Thread
q = FIFOSQLiteQueue(path="./test", multithreading=True)
def worker():
while True:
item = q.get()
do_work(item)
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
q.join() # Block until all tasks are done
File-based Queue
from persistqueue import Queue
from threading import Thread
q = Queue()
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
q.join() # Block until all tasks are done
MySQL Queue
from persistqueue import MySQLQueue
from threading import Thread
q = MySQLQueue(
host='localhost',
port=3306,
user='username',
password='password',
database='testdb',
table_name='my_queue'
)
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
q.join() # Block until all tasks are done
Serialization Options
persist-queue supports multiple serialization protocols:
>>> from persistqueue import Queue
>>> from persistqueue import serializers
# Pickle (default)
>>> q = Queue('mypath', serializer=serializers.pickle)
# MessagePack
>>> q = Queue('mypath', serializer=serializers.msgpack)
# CBOR
>>> q = Queue('mypath', serializer=serializers.cbor2)
# JSON
>>> q = Queue('mypath', serializer=serializers.json)
Performance
Benchmark Results (1000 items)
Windows (Windows 10, SATA3 SSD, 16GB RAM)
Write |
Write/Read(1 task_done) |
Write/Read(many task_done) |
|
SQLite3 Queue |
1.8880 |
2.0290 |
3.5940 |
File Queue |
4.9520 |
5.0560 |
8.4900 |
Benchmarking
You can easily benchmark the performance of all queue types (including async) using the built-in tool:
Run with tox:
tox -e bench -- rst
Or run directly:
python benchmark/run_benchmark.py 1000 rst
The first argument is the number of items to test (default: 1000)
The second argument is the output format: rst (for reStructuredText table), console, or json
Example output (rst):
+--------------------+--------------------+--------------------+--------------------+
| Queue Type | Write | Write/Read(1 task_done) | Write/Read(many task_done) |
+--------------------+--------------------+--------------------+--------------------+
| File Queue | 0.0481 | 0.0299 | 0.0833 |
| AsyncSQLiteQueue | 0.2664 | 0.5353 | 0.5508 |
| AsyncFileQueue | 0.1333 | 0.1500 | 0.2337 |
+--------------------+--------------------+--------------------+--------------------+
This makes it easy to compare the performance of sync and async queues on your platform.
Performance Tips
WAL Mode: SQLite3 queues use WAL mode by default for better performance
auto_commit=False: Use for batch operations, call task_done() to persist
Protocol Selection: Automatically selects optimal pickle protocol
Windows: File queue performance improved 3-4x since v0.4.1
MySQL Connection Pooling: MySQL queues use connection pooling for better performance
Testing
Run tests using tox:
# Run tests for specific Python version
tox -e py312
# Run code style checks
tox -e pep8
# Generate coverage report
tox -e cover
Development
Install development dependencies:
pip install -r test-requirements.txt
pip install -r extra-requirements.txt
Run benchmarks:
python benchmark/run_benchmark.py 1000
Release Notes
For detailed information about recent changes and updates, see:
Release Notes for v1.1 - Major update with async queue enhancements and pytest migration
Known Issues
Windows File Queue: Atomic operations are experimental. Critical data may become unreadable during task_done() failures
MySQL Tests: Require local MySQL service, otherwise skipped automatically
Async Features: Require Python 3.7+ and asyncio support
Troubleshooting
Database Locked Error
If you get sqlite3.OperationalError: database is locked:
Increase the timeout parameter when creating the queue
Ensure you’re using multithreading=True for multi-threaded access
MySQL Connection Issues
If you get MySQL connection errors:
Verify MySQL server is running and accessible
Check connection parameters (host, port, user, password)
Ensure the database exists and user has proper permissions
For connection pool issues, try increasing max_connections parameter
Thread Safety Issues
Make sure to set multithreading=True when initializing SQLite queues
SQLite3 queues are thoroughly tested in multi-threading environments
MySQL queues are thread-safe by default
Import Errors
For async features: Install with pip install "persist-queue[async]"
For MySQL support: Install with pip install "persist-queue[extra]"
Community
Slack: Join persist-queue channel
GitHub: Repository
PyPI: Package
Contributing
Fork the repository
Create a feature branch
Make your changes
Add tests to cover your changes
Submit a pull request with a clear title and description
License
Contributors
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file persist_queue-1.1.0.tar.gz.
File metadata
- Download URL: persist_queue-1.1.0.tar.gz
- Upload date:
- Size: 39.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4c042a3d39c9af4fa24497da76697fb4fff7abe6af04bcf55f099e0c23bc2c67
|
|
| MD5 |
14737ec408a18ad6db4357758a77a977
|
|
| BLAKE2b-256 |
2f9e865b2a173a272d2f52b0f50534e13c38c159877251a987e0863fc7376acd
|
File details
Details for the file persist_queue-1.1.0-py3-none-any.whl.
File metadata
- Download URL: persist_queue-1.1.0-py3-none-any.whl
- Upload date:
- Size: 53.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
be431ad90cd57621be5563dcd2424e21d81db3bb8cdfb918ade722369d640763
|
|
| MD5 |
ef7890f83e463dee60f02930a44c9fcd
|
|
| BLAKE2b-256 |
07887248345679ad5001cdc0211eb83303c62659f86ae7024170c0bc0afd4e4e
|