Skip to main content

Pluggable, compressed in-memory queues for both sync and asyncio applications.

Project description

Quez

Quez is a high-performance, memory-efficient library providing pluggable, compressed queues and deques for buffering data in both synchronous and asynchronous Python applications.

This library excels at managing large volumes of in-memory data, making it perfect for streaming data pipelines, logging systems, or high-throughput servers. It transparently compresses objects as they enter the data structure and decompresses them upon retrieval, slashing the memory footprint of in-flight data while maintaining a simple, familiar interface.

Key Features

  • Flexible Data Structures: Provides both FIFO (Queue) and Deque (double-ended queue) implementations to support a variety of access patterns.
  • Dual Sync and Async Interfaces: Offers thread-safe quez.CompressedQueue and quez.CompressedDeque for multi-threaded applications, alongside quez.AsyncCompressedQueue and quez.AsyncCompressedDeque for asyncio.
  • Pluggable Compression Strategies: Includes built-in support for zlib (default), bz2, and lzma, with optional zstd and lzo. The flexible architecture lets you plug in custom compression, serialization, or encryption algorithms.
  • Real-Time Observability: Track performance with the .stats property, which reports item count, raw and compressed data sizes, and live compression ratio.
  • Optimized for Performance: In the asyncio versions, CPU-intensive compression and decompression tasks run in a background thread pool, keeping the event loop responsive.
  • Memory Efficiency: Handles large, temporary data bursts without excessive memory usage, preventing swapping and performance degradation.

Installation

You can install the core library from PyPI:

pip install quez

To enable optional, high-performance compression backends, you can install them as extras. For example, to install with zstd support:

pip install quez[zstd]

Or install with all optional compressors:

pip install quez[all]

Available extras:

  • zstd: Enables the ZstdCompressor.
  • lzo: Enables the LzoCompressor.

Quick Start

Here's a quick example of using CompressedQueue to compress and store a random string:

>>> import random
>>> import string
>>> from quez import CompressedQueue
>>> data = ''.join(random.choices(string.ascii_letters + string.digits, k=100)) * 10
>>> len(data)
1000
>>> q = CompressedQueue()  # Initialize the Queue with default ZlibCompressor
>>> q.put(data)
>>> q.stats
{'count': 1, 'raw_size_bytes': 1018, 'compressed_size_bytes': 131, 'compression_ratio_pct': 87.13163064833006}
>>> data == q.get()
True
>>> q.stats
{'count': 0, 'raw_size_bytes': 0, 'compressed_size_bytes': 0, 'compression_ratio_pct': None}

Usage

Synchronous Queue

Use CompressedQueue in standard multi-threaded Python applications.

from quez import CompressedQueue
from quez.compressors import LzmaCompressor

# Use a different compressor for higher compression
q = CompressedQueue(compressor=LzmaCompressor())

# The API is the same as the standard queue.Queue
q.put({"data": "some important data"})
item = q.get()
q.task_done()
q.join()

Asynchronous Queue

Use AsyncCompressedQueue in asyncio applications. The API mirrors asyncio.Queue.

import asyncio
from quez import AsyncCompressedQueue
from quez.compressors import ZstdCompressor # Requires `pip install quez[zstd]`

async def main():
    # Using the high-speed Zstd compressor
    q = AsyncCompressedQueue(compressor=ZstdCompressor())

    await q.put({"request_id": "abc-123", "payload": "..."})
    item = await q.get()
    q.task_done()
    await q.join()
    print(item)

asyncio.run(main())

Synchronous & Asynchronous Deque

For use cases requiring efficient appends and pops from both ends (LIFO and FIFO), use CompressedDeque and AsyncCompressedDeque. Their interfaces are similar to collections.deque.

Synchronous Deque (CompressedDeque)

from quez import CompressedDeque

# Deques support adding/removing from both ends
d = CompressedDeque(maxsize=5)

d.append("item-at-right")      # Add to the right
d.appendleft("item-at-left") # Add to the left

# Items are still compressed
print(d.stats)

# Retrieve from both ends
print(d.popleft()) # "item-at-left"
print(d.pop())     # "item-at-right"

Asynchronous Deque (AsyncCompressedDeque)

import asyncio
from quez import AsyncCompressedDeque

async def main():
    d = AsyncCompressedDeque(maxsize=5)

    await d.append("item-at-right")
    await d.appendleft("item-at-left")

    print(d.stats)

    print(await d.popleft()) # "item-at-left"
    print(await d.pop())     # "item-at-right"

asyncio.run(main())

Extensibility

You can easily provide your own custom serializers or compressors. Any object that conforms to the Serializer or Compressor protocol can be used.

Example: Custom JSON Serializer

import json
from quez import CompressedQueue

class JsonSerializer:
    def dumps(self, obj):
        # Serialize to JSON and encode to bytes
        return json.dumps(obj).encode('utf-8')

    def loads(self, data):
        # Decode from bytes and parse JSON
        return json.loads(data.decode('utf-8'))

# Now, use it with the queue
json_queue = CompressedQueue(serializer=JsonSerializer())

json_queue.put({"message": "hello world"})
data = json_queue.get()
print(data) # {'message': 'hello world'}

A Note on Performance & Overhead

Compression Overhead: Keep in mind that compression algorithms have overhead. For very small or highly random data payloads (e.g., under 100-200 bytes), the compressed output might occasionally be slightly larger than the original. The memory-saving benefits of quez are most significant when dealing with larger objects or data with repeating patterns.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

quez-1.0.0.tar.gz (12.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

quez-1.0.0-py3-none-any.whl (12.8 kB view details)

Uploaded Python 3

File details

Details for the file quez-1.0.0.tar.gz.

File metadata

  • Download URL: quez-1.0.0.tar.gz
  • Upload date:
  • Size: 12.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.12.11 Linux/6.4.16-cwt.x86_64

File hashes

Hashes for quez-1.0.0.tar.gz
Algorithm Hash digest
SHA256 88a657de2e9c573fbb61b83dc8efc70923273d512b3ecdb6448e73bb4ef39a7b
MD5 750e7562a0c81172cb03eb2c502692ef
BLAKE2b-256 1fa9a1ae53c93b98f534c52dcd9e2ecfa965c202f9e677429a7e0838b54f64d3

See more details on using hashes here.

File details

Details for the file quez-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: quez-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 12.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.12.11 Linux/6.4.16-cwt.x86_64

File hashes

Hashes for quez-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 bbc9b30615c102e956e8fe4102d1deda26a85019da0199f429c4f0a715305bf1
MD5 279baaa4b6d6256baf82328acf3cb1a5
BLAKE2b-256 71b8be76d5dd6e1775b7123a8b662d0ce6aa11fe5d9323e577b6c0d3c7af3378

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page