Skip to main content

A SQLite-based persistent queue implementation.

Project description

sqqueue

sqqueue is a simple, persistent queue implementation backed by SQLite. It is designed for Python applications that require lightweight, reliable queuing without the overhead of setting up a dedicated message broker.

Features

  • Persistent Storage: Ensures messages are not lost even if the application restarts.
  • Custom Table Names: Allows defining custom table names for flexibility.
  • Thread-Safe: Safe to use in multi-threaded environments.
  • Logging: Provides detailed logs for monitoring and debugging.
  • Requeueing: Ability to requeue messages if processing fails.
  • Peek Functionality: View the next message without altering its state.

Build Package

pip install setuptools wheel
python setup.py sdist bdist_wheel

Upload Package

pip install twine
twine upload dist/*

Install Package

pip install sqqueue

Usage

from sqqueue.queue import SQLiteQueue

# Initialize the queue
queue = SQLiteQueue(db_path='my_queue.db', table_name='custom_queue')

# Enqueue a message
message_id = queue.enqueue('Hello, World!')
print(f"Enqueued message ID: {message_id}")

# Dequeue a message
dequeued = queue.dequeue()
if dequeued:
    message_id, payload = dequeued
    print(f"Dequeued message ID {message_id}: {payload}")
    
    # Process the message
    # ...

    # Complete the message
    queue.complete(message_id)
else:
    print("Queue is empty.")

# Requeue a message (if processing failed)
queue.requeue(message_id)

# Check if the queue is empty
if queue.is_empty():
    print("No pending messages.")
else:
    print("There are pending messages.")

SQLiteQueue Library

SQLiteQueue is a lightweight, reliable message queue implemented using SQLite. It provides essential queue operations such as enqueue, dequeue, complete, fail, and support for dead-letter queues (DLQ). This README provides an overview of how to use the library, illustrated with UML and process diagrams, along with a list of potential fixes and improvements.

Table of Contents

Overview

SQLiteQueue leverages SQLite's robust database features to manage message queues efficiently. It ensures message persistence, handles concurrent access, and provides mechanisms to manage message processing states. Key features include:

  • Enqueue: Add messages to the queue.
  • Dequeue: Retrieve and mark messages as processing.
  • Complete: Mark messages as processed and remove them from the queue.
  • Fail: Handle failed message processing, with support for retry attempts and moving to DLQ.
  • Dead-Letter Queue (DLQ): Manage messages that failed processing after maximum attempts.
  • Batch Operations: Enqueue and dequeue messages in batches for efficiency.
  • Maintenance: Automatic database vacuuming and checkpointing to maintain performance.

Architecture Diagrams

Class UML Diagram

The following UML class diagram outlines the primary classes and their relationships within the SQLiteQueue library.

classDiagram
    class SQLiteQueue {
        +enqueue(payload: str) Result<int, EnqueueError>
        +enqueue_batch(payloads: list) Result<list, EnqueueError>
        +dequeue() Result<tuple, DequeueError>
        +dequeue_batch(batch_size: int) Result<list, DequeueError>
        +complete(message_id: int) Result<None, CompletionError>
        +fail_message(message_id: int, send_to_dlq: bool) Result<None, RequeueError>
        +peek() Result<tuple, QueueError>
        +is_empty() Result<bool, QueueError>
        +recover_processing_messages() Result<int, QueueError>
        +get_dead_letters() Result<list, QueueError>
        +reprocess_dead_letter(message_id: int) Result<None, RequeueError>
        +vacuum_database(conn: sqlite3.Connection) Result<None, QueueError>
        +perform_checkpoint(conn: sqlite3.Connection) Result<None, QueueError>
        +get_operation_count() int
    }

    class Result {
        <<generic>>
        +is_ok() bool
        +is_err() bool
        +value: T
        +error: E
    }

    class Exceptions {
        QueueError
        EnqueueError
        DequeueError
        CompletionError
        RequeueError
    }

    SQLiteQueue --> Result
    SQLiteQueue --> Exceptions

Message Lifecycle State Diagram

This state diagram illustrates the various states a message can traverse within the queue.

stateDiagram
    [*] --> Pending
    Pending --> Processing: Dequeue
    Processing --> Completed: Complete
    Processing --> Pending: Recover (Visibility Timeout)
    Processing --> DLQ: Fail (Max Attempts)
    Pending --> DLQ: Fail (Immediate)
    DLQ --> Pending: Reprocess
    DLQ --> [*]
    Completed --> [*]

Process Flow Diagram

The process flow diagram below demonstrates the typical operations performed on the queue, from enqueueing to handling failures and maintenance.

flowchart TD
    A[Start] --> B[Enqueue Message]
    B --> C{Is Queue Empty?}
    C -->|Yes| D[Wait for Dequeue]
    C -->|No| E[Dequeue Message]
    E --> F[Process Message]
    F --> G{Processing Successful?}
    G -->|Yes| H[Complete Message]
    G -->|No| I[Fail Message]
    I --> J{Max Attempts Reached?}
    J -->|Yes| K[Move to DLQ]
    J -->|No| E
    H --> C
    K --> L[Dead-Letter Queue]
    L --> M[Reprocess DLQ Message]
    M --> B
    D --> E
    A --> N[Maintenance]
    N --> O[Vacuum Database]
    N --> P[Perform Checkpoint]
    O --> Q[End Maintenance]
    P --> Q

Usage Guide

Installation

pip install sqqueue

Basic Operations

from sqqueue import SQLiteQueue

# Initialize the queue
queue = SQLiteQueue(
    db_path='path_to_db.sqlite',
    table_name='message_queue',
    vacuum_threshold=100,
    vacuum_on_start=True,
    default_max_attempts=5
)

# Enqueue a message
result = queue.enqueue("Sample Task")
if result.is_ok():
    message_id = result.value
    print(f"Enqueued message with ID: {message_id}")
else:
    print(f"Enqueue failed: {result.error}")

# Dequeue a message
dequeue_result = queue.dequeue()
if dequeue_result.is_ok():
    message = dequeue_result.value
    msg_id, payload = message
    print(f"Dequeued message ID: {msg_id}, Payload: {payload}")
else:
    print(f"Dequeue failed: {dequeue_result.error}")

# Complete a message
complete_result = queue.complete(msg_id)
if complete_result.is_ok():
    print(f"Message {msg_id} completed successfully.")
else:
    print(f"Completion failed: {complete_result.error}")

# Fail a message
fail_result = queue.fail_message(msg_id, send_to_dlq=False)
if fail_result.is_ok():
    print(f"Message {msg_id} failed and requeued.")
else:
    print(f"Failing message failed: {fail_result.error}")

# Retrieve dead letters
dlq_result = queue.get_dead_letters()
if dlq_result.is_ok():
    dead_letters = dlq_result.value
    print(f"Dead-letter messages: {dead_letters}")
else:
    print(f"Failed to retrieve dead letters: {dlq_result.error}")

Batch Operations

# Enqueue multiple messages
payloads = ["Task 1", "Task 2", "Task 3"]
enqueue_batch_result = queue.enqueue_batch(payloads)
if enqueue_batch_result.is_ok():
    message_ids = enqueue_batch_result.value
    print(f"Enqueued batch with IDs: {message_ids}")
else:
    print(f"Batch enqueue failed: {enqueue_batch_result.error}")

# Dequeue multiple messages
dequeue_batch_result = queue.dequeue_batch(batch_size=2)
if dequeue_batch_result.is_ok():
    messages = dequeue_batch_result.value
    for msg_id, payload in messages:
        print(f"Dequeued message ID: {msg_id}, Payload: {payload}")
else:
    print(f"Batch dequeue failed: {dequeue_batch_result.error}")

Maintenance Operations

SQLiteQueue automatically performs maintenance operations like vacuuming and checkpointing based on the configured thresholds. However, you can also trigger them manually:

# Vacuum the database
vacuum_result = queue.vacuum_database()
if vacuum_result.is_ok():
    print("Database vacuumed successfully.")
else:
    print(f"Vacuum failed: {vacuum_result.error}")

# Perform a WAL checkpoint
checkpoint_result = queue.perform_checkpoint()
if checkpoint_result.is_ok():
    print("WAL checkpoint performed successfully.")
else:
    print(f"Checkpoint failed: {checkpoint_result.error}")

Possible Fixes and Improvements

Based on the provided test suite and understanding of the SQLiteQueue library, here are several potential fixes and enhancements to consider:

  1. Enhanced Error Handling

    • Issue: Currently, errors are captured using Result types, but more granular error messages and logging could improve debuggability.
    • Fix: Implement detailed logging for each operation, including stack traces for exceptions.
  2. Concurrency Control

    • Issue: While SQLite handles some concurrency, high-throughput scenarios might lead to database locks or contention.
    • Fix: Introduce connection pooling or implement retry mechanisms with exponential backoff for handling database locks.
  3. Visibility Timeout Configuration

    • Issue: The visibility timeout (default_visibility_timeout) is crucial for message recovery but may need dynamic adjustment based on processing times.
    • Fix: Allow dynamic configuration of visibility timeouts per message or based on runtime metrics.
  4. DLQ Management Enhancements

    • Issue: Currently, reprocessing DLQ messages increments the message ID, which may lead to inconsistencies.
    • Fix: Ensure that reprocessed messages retain their original IDs or implement a proper ID mapping system.
  5. Batch Operation Optimization

    • Issue: Batch enqueue and dequeue operations may not handle partial failures gracefully.
    • Fix: Implement transactional batch operations where either all messages are processed successfully or the entire batch is rolled back.
  6. Monitoring and Metrics

    • Issue: Lack of built-in monitoring makes it challenging to track queue performance and health.
    • Fix: Integrate with monitoring tools or expose metrics (e.g., operation counts, queue lengths) via APIs or logs.
  7. Configuration Flexibility

    • Issue: Hardcoded configuration parameters limit adaptability.
    • Fix: Provide more flexible configuration options, possibly through configuration files or environment variables.
  8. Testing Enhancements

    • Issue: The current test suite covers essential operations but may miss edge cases.
    • Fix: Expand the test suite to include scenarios like concurrent enqueue/dequeue, database corruption handling, and performance benchmarks.
  9. Documentation Improvements

    • Issue: While the README covers basic usage, more detailed documentation on advanced features and customization would be beneficial.
    • Fix: Add comprehensive documentation with examples, API references, and best practices.
  10. Support for Multiple Queues

    • Issue: The library currently supports a single queue per database instance.
    • Fix: Extend support to handle multiple queues within the same database, each with its own configuration.
  11. Graceful Shutdown and Cleanup

    • Issue: Ensuring that all in-process messages are handled correctly during shutdown is critical.
    • Fix: Implement mechanisms to gracefully shut down the queue, ensuring no messages are left in an inconsistent state.
  12. Security Enhancements

    • Issue: Security aspects like database encryption and access control are not addressed.
    • Fix: Integrate SQLite encryption extensions and provide authentication mechanisms to secure the queue data.
  13. Performance Optimization

    • Issue: As the queue scales, performance may degrade due to database limitations.
    • Fix: Optimize database indices, query performance, and consider sharding or partitioning for large-scale deployments.
  14. User-Friendly API

    • Issue: The current API might be low-level for some users.
    • Fix: Provide higher-level abstractions or helper functions to simplify common operations.
  15. Integration with Async Frameworks

    • Issue: The library operates synchronously, which may not fit well with asynchronous applications.
    • Fix: Develop an asynchronous API version using asyncio or similar frameworks to support non-blocking operations.

Implementing these fixes and enhancements will improve the robustness, scalability, and usability of the SQLiteQueue library, making it more suitable for a wide range of applications.

Contributing

Contributions are welcome! Please follow these steps to contribute:

  1. Fork the Repository
  2. Create a Feature Branch: git checkout -b feature/YourFeature
  3. Commit Your Changes: git commit -m "Add some feature"
  4. Push to the Branch: git push origin feature/YourFeature
  5. Open a Pull Request

Please ensure that your code follows the project's coding standards and that all tests pass.

License

This project is licensed under the MIT License.

Additional Notes

To include the diagrams in your README, ensure that your documentation platform supports Mermaid diagrams. GitHub, for instance, supports Mermaid in Markdown files as of recent updates. If your platform does not support Mermaid, consider rendering the diagrams using PlantUML or another tool and embedding them as images.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sqqueue-0.1.0.tar.gz (23.2 kB view details)

Uploaded Source

Built Distribution

sqqueue-0.1.0-py3-none-any.whl (19.5 kB view details)

Uploaded Python 3

File details

Details for the file sqqueue-0.1.0.tar.gz.

File metadata

  • Download URL: sqqueue-0.1.0.tar.gz
  • Upload date:
  • Size: 23.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.11.10

File hashes

Hashes for sqqueue-0.1.0.tar.gz
Algorithm Hash digest
SHA256 5bef06279a3767eb207b82a5b5140c5079c04a54cb0295a78a9dfa1cec336649
MD5 7224695f9d3f6910e5225dfb165f4c1a
BLAKE2b-256 ffedf6fa2f305400c745fa1aaebe8436da8add9e3af8e9ba68d42942c64e453b

See more details on using hashes here.

File details

Details for the file sqqueue-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: sqqueue-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 19.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.11.10

File hashes

Hashes for sqqueue-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a7d412519f6627c73c8124e4936cbb7d68a2c01d575a2a536e7c46e796305ebd
MD5 56b883839403b284c189f42faba90382
BLAKE2b-256 47f759e56248222cd062095cc83638f821e99132aaf2157c3708d55df23a58e1

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page