Skip to main content

A middleware for the FastStream framework to support message compression.

Project description

faststream-compressors

A middleware for the FastStream framework to support message compression.

⚠️ Note: RPC Limitation

Due to a bug in FastStream, middleware does not run after receiving a response from the broker, preventing message decompression when using RPC. I've reported this issue to the FastStream developers, and we're hoping for a fix soon.

In the meantime, you can register separate routers for RPC and Pub/Sub:

  • RPC Router: Enable only the DecompressionMiddleware.
  • Pub/Sub Router: Enable both DecompressionMiddleware and CompressionMiddleware.

Example

from faststream.nats import NatsBroker

from faststream_compressors.compressors import GzipCompressor, GzipDecompressor
from faststream_compressors.middlewares import CompressionMiddleware
from faststream_compressors.middlewares.nats import NatsDecompressionMiddleware


broker = NatsBroker(    
    middlewares=(
        # Compression methods used for compressing messages.
        # The order in which compressors are specified matters.
        CompressionMiddleware.make_middleware(compressors=GzipCompressor()),
        
        # Your other middlewares here

        # Compression methods used for decompressing messages.
        # The order does not matter here
        NatsDecompressionMiddleware.make_middleware(decompressors=GzipDecompressor()),
    )
)
Broker Is Supported? Middleware
NATS faststream_compressors.middlewares.nats.NatsDecompressionMiddleware
Other

You can submit a pull request to add support for decompression middleware for your broker. I expect that FastStream will update its middleware API soon, allowing us to create a universal middleware for each broker. For now, only NATS is supported.

Compression Method Is Supported? Compressor Extra Dependency
gzip faststream_compressors.compressors.GzipCompressor
faststream_compressors.compressors.GzipDecompressor
lz4 faststream_compressors.compressors.lz4.Lz4Compressor
faststream_compressors.compressors.lz4.Lz4Decompressor
faststream-compressors[lz4]
Other

You can submit a pull request to add support for your compression method or use your custom algorithm that adheres to the BaseCompressor interface.

from faststream import FastStream, Header
from faststream.nats import NatsBroker

from faststream_compressors.compressors import BaseCompressor
from faststream_compressors.middlewares import CompressionMiddleware
from faststream_compressors.middlewares.nats import NatsDecompressionMiddleware


class MyCompressor(BaseCompressor):
    ENCODING = "xor1"

    def __call__(self, data: bytes) -> bytes:
        return bytes(byte ^ 1 for byte in data)


broker = NatsBroker(
    middlewares=(
        CompressionMiddleware.make_middleware(compressors=MyCompressor()),
        NatsDecompressionMiddleware.make_middleware(decompressors=MyCompressor()),
    )
)
app = FastStream(broker)


@broker.subscriber("my-subject")
async def my_handler(data: str, encoding: str = Header("content-encoding")):
    print(data, encoding)


@app.after_startup
async def ping():
    await broker.publish("My secret message", "my-subject")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

faststream_compressors-0.2.0.tar.gz (5.9 kB view details)

Uploaded Source

Built Distribution

faststream_compressors-0.2.0-py3-none-any.whl (7.6 kB view details)

Uploaded Python 3

File details

Details for the file faststream_compressors-0.2.0.tar.gz.

File metadata

  • Download URL: faststream_compressors-0.2.0.tar.gz
  • Upload date:
  • Size: 5.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.12.4 Darwin/24.0.0

File hashes

Hashes for faststream_compressors-0.2.0.tar.gz
Algorithm Hash digest
SHA256 a7cf18f6b4f203c00e9baef6e2c28e388fa0a1bfcfb41ac011248b3ad9ab927c
MD5 fa8b636cb9fa3fb7c41a9c2bd897be20
BLAKE2b-256 c062826565018ea8370d1aeb98425e4170847f6027f7f430b8cf2d2513b2c821

See more details on using hashes here.

File details

Details for the file faststream_compressors-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for faststream_compressors-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 20e347c1affa415cce64d405ec3e932a7d5c18db7a8bff97a7d25140a241823b
MD5 8d47163a765e740850f478e655d187d5
BLAKE2b-256 4f9344608183ffa2532cc7670a999f39c3c4f8c00ceb231a99427672af48abdc

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page