Skip to main content

Pluggable, compressed in-memory queues for both sync and asyncio applications.

Project description

Quez

Quez is a high-performance, memory-efficient library providing pluggable, compressed queues and deques for buffering data in both synchronous and asynchronous Python applications.

This library excels at managing large volumes of in-memory data, making it perfect for streaming data pipelines, logging systems, or high-throughput servers. It transparently compresses objects as they enter the data structure and decompresses them upon retrieval, slashing the memory footprint of in-flight data while maintaining a simple, familiar interface.

Quez: A Compressed Python Queue

Key Features

  • Flexible Data Structures: Provides both FIFO (Queue) and Deque (double-ended queue) implementations to support a variety of access patterns.
  • Dual Sync and Async Interfaces: Offers thread-safe quez.CompressedQueue and quez.CompressedDeque for multi-threaded applications, alongside quez.AsyncCompressedQueue and quez.AsyncCompressedDeque for asyncio.
  • Pluggable Compression Strategies: Includes built-in support for zlib (default), bz2, and lzma, with optional zstd and lzo. The flexible architecture lets you plug in custom compression, serialization, or encryption algorithms.
  • Real-Time Observability: Track performance with the .stats property, which reports item count, raw and compressed data sizes, and live compression ratio.
  • Optimized for Performance: In the asyncio versions, CPU-intensive compression and decompression tasks run in a background thread pool, keeping the event loop responsive.
  • Memory Efficiency: Handles large, temporary data bursts without excessive memory usage, preventing swapping and performance degradation.

Installation

You can install the core library from PyPI:

pip install quez

Optional Compression Backends

ZstdCompressor (Zstandard compression):

  • Python 3.14+: Zstandard compression is included in the Python standard library via compression.zstd. No extra installation needed!

  • Python 3.10-3.13: Install the zstandard library:

    pip install quez[zstd]
    

LzoCompressor (LZO compression):

Requires the python-lzo library (available on all supported Python versions):

pip install quez[lzo]

Or install with all optional compressors:

pip install quez[all]

Available extras:

  • zstd: Enables ZstdCompressor on Python < 3.14 (included in Python 3.14+)
  • lzo: Enables LzoCompressor on all Python versions

Quick Start

Here's a quick example of using CompressedQueue to compress and store a random string:

>>> import random
>>> import string
>>> from quez import CompressedQueue
>>> data = ''.join(random.choices(string.ascii_letters + string.digits, k=100)) * 10
>>> len(data)
1000
>>> q = CompressedQueue()  # Initialize the Queue with default ZlibCompressor
>>> q.put(data)
>>> q.stats
{'count': 1, 'raw_size_bytes': 1018, 'compressed_size_bytes': 131, 'compression_ratio_pct': 87.13163064833006}
>>> data == q.get()
True
>>> q.stats
{'count': 0, 'raw_size_bytes': 0, 'compressed_size_bytes': 0, 'compression_ratio_pct': None}

Usage

Synchronous Queue

Use CompressedQueue in standard multi-threaded Python applications.

from quez import CompressedQueue
from quez.compressors import LzmaCompressor

# Use a different compressor for higher compression
q = CompressedQueue(compressor=LzmaCompressor())

# The API is the same as the standard queue.Queue
q.put({"data": "some important data"})
item = q.get()
q.task_done()
q.join()

Asynchronous Queue

Use AsyncCompressedQueue in asyncio applications. The API mirrors asyncio.Queue.

import asyncio
from quez import AsyncCompressedQueue
from quez.compressors import ZstdCompressor

async def main():
    # Using the high-speed Zstd compressor
    # Note: Python 3.14+ has built-in support. For older versions, run:
    # pip install quez[zstd]
    q = AsyncCompressedQueue(compressor=ZstdCompressor())

    await q.put({"request_id": "abc-123", "payload": "..."})
    item = await q.get()
    q.task_done()
    await q.join()
    print(item)

asyncio.run(main())

Synchronous & Asynchronous Deque

For use cases requiring efficient appends and pops from both ends (LIFO and FIFO), use CompressedDeque and AsyncCompressedDeque. Their interfaces are similar to collections.deque.

Synchronous Deque (CompressedDeque)

from quez import CompressedDeque

# Deques support adding/removing from both ends
d = CompressedDeque(maxsize=5)

d.append("item-at-right")      # Add to the right
d.appendleft("item-at-left") # Add to the left

# Items are still compressed
print(d.stats)

# Retrieve from both ends
print(d.popleft()) # "item-at-left"
print(d.pop())     # "item-at-right"

Asynchronous Deque (AsyncCompressedDeque)

import asyncio
from quez import AsyncCompressedDeque

async def main():
    d = AsyncCompressedDeque(maxsize=5)

    await d.append("item-at-right")
    await d.appendleft("item-at-left")

    print(d.stats)

    print(await d.popleft()) # "item-at-left"
    print(await d.pop())     # "item-at-right"

asyncio.run(main())

Extensibility

You can easily provide your own custom serializers or compressors. Any object that conforms to the Serializer or Compressor protocol can be used.

Example: Custom JSON Serializer

import json
from quez import CompressedQueue

class JsonSerializer:
    def dumps(self, obj):
        # Serialize to JSON and encode to bytes
        return json.dumps(obj).encode('utf-8')

    def loads(self, data):
        # Decode from bytes and parse JSON
        return json.loads(data.decode('utf-8'))

# Now, use it with the queue
json_queue = CompressedQueue(serializer=JsonSerializer())

json_queue.put({"message": "hello world"})
data = json_queue.get()
print(data) # {'message': 'hello world'}

A Note on Performance & Overhead

Compression Overhead: Keep in mind that compression algorithms have overhead. For very small or highly random data payloads (e.g., under 100-200 bytes), the compressed output might occasionally be slightly larger than the original. The memory-saving benefits of quez are most significant when dealing with larger objects or data with repeating patterns.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

quez-1.1.2.tar.gz (13.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

quez-1.1.2-py3-none-any.whl (13.8 kB view details)

Uploaded Python 3

File details

Details for the file quez-1.1.2.tar.gz.

File metadata

  • Download URL: quez-1.1.2.tar.gz
  • Upload date:
  • Size: 13.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.2 CPython/3.14.3 Linux/6.19.8-200.fc43.x86_64

File hashes

Hashes for quez-1.1.2.tar.gz
Algorithm Hash digest
SHA256 054acc732630058febdef77cfefc02105dfa5858e8ab09a10f2a70371bf2a6f0
MD5 66ffdb75fb8cdeeccf5f4c0055717288
BLAKE2b-256 e405d0dd3f07664d06f239379e8053bbe8fe43f420a70c47d64a25fdbbe43887

See more details on using hashes here.

File details

Details for the file quez-1.1.2-py3-none-any.whl.

File metadata

  • Download URL: quez-1.1.2-py3-none-any.whl
  • Upload date:
  • Size: 13.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.2 CPython/3.14.3 Linux/6.19.8-200.fc43.x86_64

File hashes

Hashes for quez-1.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 7856bc46a71b84dc672b38a3e15181a840cca88d68abe888d3e72af0a0657e75
MD5 1c26973870cf21e1780efbaa5bf8f0ad
BLAKE2b-256 26e3950f6b7a895f55c0024b06643b1c791478cfd0c809eac1aa61d87c0f21a8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page