Skip to main content

Pluggable, compressed in-memory queues for both sync and asyncio applications.

Project description

Quez

Quez is a high-performance, memory-efficient library providing pluggable, compressed queues and deques for buffering data in both synchronous and asynchronous Python applications.

This library excels at managing large volumes of in-memory data, making it perfect for streaming data pipelines, logging systems, or high-throughput servers. It transparently compresses objects as they enter the data structure and decompresses them upon retrieval, slashing the memory footprint of in-flight data while maintaining a simple, familiar interface.

Quez: A Compressed Python Queue

Key Features

  • Flexible Data Structures: Provides both FIFO (Queue) and Deque (double-ended queue) implementations to support a variety of access patterns.
  • Dual Sync and Async Interfaces: Offers thread-safe quez.CompressedQueue and quez.CompressedDeque for multi-threaded applications, alongside quez.AsyncCompressedQueue and quez.AsyncCompressedDeque for asyncio.
  • Pluggable Compression Strategies: Includes built-in support for zlib (default), bz2, and lzma, with optional zstd and lzo. The flexible architecture lets you plug in custom compression, serialization, or encryption algorithms.
  • Real-Time Observability: Track performance with the .stats property, which reports item count, raw and compressed data sizes, and live compression ratio.
  • Optimized for Performance: In the asyncio versions, CPU-intensive compression and decompression tasks run in a background thread pool, keeping the event loop responsive.
  • Memory Efficiency: Handles large, temporary data bursts without excessive memory usage, preventing swapping and performance degradation.

Installation

You can install the core library from PyPI:

pip install quez

Optional Compression Backends

ZstdCompressor (Zstandard compression):

  • Python 3.14+: Zstandard compression is included in the Python standard library via compression.zstd. No extra installation needed!

  • Python 3.10-3.13: Install the zstandard library:

    pip install quez[zstd]
    

LzoCompressor (LZO compression):

Requires the python-lzo library (available on all supported Python versions):

pip install quez[lzo]

Or install with all optional compressors:

pip install quez[all]

Available extras:

  • zstd: Enables ZstdCompressor on Python < 3.14 (included in Python 3.14+)
  • lzo: Enables LzoCompressor on all Python versions

Quick Start

Here's a quick example of using CompressedQueue to compress and store a random string:

>>> import random
>>> import string
>>> from quez import CompressedQueue
>>> data = ''.join(random.choices(string.ascii_letters + string.digits, k=100)) * 10
>>> len(data)
1000
>>> q = CompressedQueue()  # Initialize the Queue with default ZlibCompressor
>>> q.put(data)
>>> q.stats
{'count': 1, 'raw_size_bytes': 1018, 'compressed_size_bytes': 131, 'compression_ratio_pct': 87.13163064833006}
>>> data == q.get()
True
>>> q.stats
{'count': 0, 'raw_size_bytes': 0, 'compressed_size_bytes': 0, 'compression_ratio_pct': None}

Usage

Synchronous Queue

Use CompressedQueue in standard multi-threaded Python applications.

from quez import CompressedQueue
from quez.compressors import LzmaCompressor

# Use a different compressor for higher compression
q = CompressedQueue(compressor=LzmaCompressor())

# The API is the same as the standard queue.Queue
q.put({"data": "some important data"})
item = q.get()
q.task_done()
q.join()

Asynchronous Queue

Use AsyncCompressedQueue in asyncio applications. The API mirrors asyncio.Queue.

import asyncio
from quez import AsyncCompressedQueue
from quez.compressors import ZstdCompressor

async def main():
    # Using the high-speed Zstd compressor
    # Note: Python 3.14+ has built-in support. For older versions, run:
    # pip install quez[zstd]
    q = AsyncCompressedQueue(compressor=ZstdCompressor())

    await q.put({"request_id": "abc-123", "payload": "..."})
    item = await q.get()
    q.task_done()
    await q.join()
    print(item)

asyncio.run(main())

Synchronous & Asynchronous Deque

For use cases requiring efficient appends and pops from both ends (LIFO and FIFO), use CompressedDeque and AsyncCompressedDeque. Their interfaces are similar to collections.deque.

Synchronous Deque (CompressedDeque)

from quez import CompressedDeque

# Deques support adding/removing from both ends
d = CompressedDeque(maxsize=5)

d.append("item-at-right")      # Add to the right
d.appendleft("item-at-left") # Add to the left

# Items are still compressed
print(d.stats)

# Retrieve from both ends
print(d.popleft()) # "item-at-left"
print(d.pop())     # "item-at-right"

Asynchronous Deque (AsyncCompressedDeque)

import asyncio
from quez import AsyncCompressedDeque

async def main():
    d = AsyncCompressedDeque(maxsize=5)

    await d.append("item-at-right")
    await d.appendleft("item-at-left")

    print(d.stats)

    print(await d.popleft()) # "item-at-left"
    print(await d.pop())     # "item-at-right"

asyncio.run(main())

Extensibility

You can easily provide your own custom serializers or compressors. Any object that conforms to the Serializer or Compressor protocol can be used.

Example: Custom JSON Serializer

import json
from quez import CompressedQueue

class JsonSerializer:
    def dumps(self, obj):
        # Serialize to JSON and encode to bytes
        return json.dumps(obj).encode('utf-8')

    def loads(self, data):
        # Decode from bytes and parse JSON
        return json.loads(data.decode('utf-8'))

# Now, use it with the queue
json_queue = CompressedQueue(serializer=JsonSerializer())

json_queue.put({"message": "hello world"})
data = json_queue.get()
print(data) # {'message': 'hello world'}

A Note on Performance & Overhead

Compression Overhead: Keep in mind that compression algorithms have overhead. For very small or highly random data payloads (e.g., under 100-200 bytes), the compressed output might occasionally be slightly larger than the original. The memory-saving benefits of quez are most significant when dealing with larger objects or data with repeating patterns.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

quez-1.1.3.tar.gz (13.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

quez-1.1.3-py3-none-any.whl (13.9 kB view details)

Uploaded Python 3

File details

Details for the file quez-1.1.3.tar.gz.

File metadata

  • Download URL: quez-1.1.3.tar.gz
  • Upload date:
  • Size: 13.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.2 CPython/3.14.3 Linux/6.19.8-200.fc43.x86_64

File hashes

Hashes for quez-1.1.3.tar.gz
Algorithm Hash digest
SHA256 e36b951320ca6dd8dc848430a7327f23cc1ade038a911e47f6698b91db531dda
MD5 15bc27f676a12514571bd210b9c39592
BLAKE2b-256 8cb69d989b9476b1dd45ccc7aaa2dd77ae4ed1f2fc286ba20b719fb269faeb99

See more details on using hashes here.

File details

Details for the file quez-1.1.3-py3-none-any.whl.

File metadata

  • Download URL: quez-1.1.3-py3-none-any.whl
  • Upload date:
  • Size: 13.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.2 CPython/3.14.3 Linux/6.19.8-200.fc43.x86_64

File hashes

Hashes for quez-1.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 aac2e185b9c75ef3a31c009a3069c9167bb6e10c293ef68096a4003d0a30d856
MD5 c23f48a69063de962ef21d2331d6cbe6
BLAKE2b-256 7b4c6a5cdfbcdc9bb2421090498e72738bd9b56c13f6e6a835350971acdcc486

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page