Skip to main content

A Python library to handle large Kafka messages using S3.

Project description

Kafka S3 Connector (kaf-s3-connector)

CI Docker License: MIT Container

A Python library to seamlessly handle large Kafka messages by offloading them to Amazon S3.

This library provides a custom Kafka Producer and Consumer that automatically handle the process of storing large message payloads in an S3 bucket and passing lightweight references through Kafka.

Key Features

  • Automatic S3 Offloading: Produce messages larger than Kafka's recommended limit without manual intervention.
  • Transparent Consumption: Consume large messages as if they were directly in Kafka.
  • Data Integrity: Verifies the integrity of S3 objects using ETags to prevent data corruption.
  • Secure by Default: Leverages AWS IAM roles and the default boto3 credential chain, avoiding the need to hardcode secrets.
  • Flexible Configuration: Built on top of confluent-kafka-python, allowing for full customization of Kafka client settings, including SASL and SSL.
  • Operational Ready: DLQ support, Prometheus /metrics, Helm chart, non-root image, optional compression, TTL hints, SSE-KMS.

Installation

pip install .

How it Works

  1. The S3Producer receives a large payload.
  2. It uploads the payload to a specified S3 bucket with a unique key.
  3. It produces a small JSON message to a Kafka topic containing the S3 bucket, key, and the object's ETag.
  4. The S3Consumer reads the JSON reference from Kafka.
  5. It downloads the original payload from S3.
  6. It verifies the ETag to ensure the data is not corrupted, then returns the payload to your application.

Configuration

Configuration is handled through a single dictionary, with separate keys for kafka and s3 settings.

Security

  • AWS Credentials: This library is designed to be secure. Do not pass AWS credentials in the configuration. It uses boto3's default credential discovery chain. The recommended and most secure way to provide credentials is by using an IAM Role attached to your compute instance (EC2, ECS, Lambda, etc.). Alternatively, you can use environment variables or a shared credentials file (~/.aws/credentials).
  • Kafka Security: All standard confluent-kafka-python security settings are supported and should be passed within the kafka dictionary. This includes SASL for authentication (e.g., PLAIN, SCRAM, and GSSAPI for Kerberos) and SSL/TLS for encryption.

Example Configuration

producer_config = {
    "kafka": {
        "bootstrap.servers": "localhost:9092",
        # Optional DLQ for delivery failures
        # "dlq_topic": "my-s3-dlq",
    },
    "s3": {
        "bucket": "my-large-messages-bucket",
        # Optional toggles
        # "max_inline_bytes": 900_000,     # inline small payloads on Kafka, offload larger ones
        # "max_payload_bytes": 5 * 1024 * 1024 * 1024,  # hard cap on payload size
        # "prefix": "kafka/topic",         # prefix keys for organization/enforcement
        # "deterministic_keys": False,     # use payload hash for idempotent keys
        # "compression": "gzip",           # compress before S3 upload (gzip or None)
        # "ttl_seconds": 86400,            # hint TTL stored in object metadata
        # "server_side_encryption": "aws:kms", # SSE, optionally with KMS key below
        # "sse_kms_key_id": "<kms-key-id>",
    }
}

consumer_config = {
    "kafka": {
        "bootstrap.servers": "localhost:9092",
        "group.id": "my-s3-consumers",
        "auto.offset.reset": "earliest",
        # Optional: send failures to a DLQ topic
        # "dlq_topic": "my-s3-dlq",
    },
    "s3": {
        "bucket": "my-large-messages-bucket",
        # Optional toggles
        # "max_inline_bytes": 900_000,     # inline small payloads on Kafka, offload larger ones
        # "max_payload_bytes": 5 * 1024 * 1024 * 1024,  # hard cap on payload size
        # "delete_after_consume": False,   # delete object only after a successful integrity check
        # "allow_inline_payloads": True,   # allow non-reference payloads to pass through unchanged
        # "prefix": "kafka/topic",         # enforce prefix on incoming references
        # "compression": "gzip",           # decompress automatically on consume
        # "deterministic_keys": False,     # use payload hash for idempotent keys (producer)
        # "ttl_seconds": 86400,            # hint TTL stored in object metadata (producer)
        # "server_side_encryption": "aws:kms", # SSE, optionally with KMS key below (producer)
        # "sse_kms_key_id": "<kms-key-id>",    # (producer)
    }
}

Usage

Producer

from s3_connector import S3Producer

# Initialize producer with the config
producer = S3Producer(config=producer_config)

# Read a sample file
with open("examples/sample_payload.txt", "rb") as f:
    payload_data = f.read()

# Produce the data to a topic
producer.produce(topic="large-messages-topic", payload=payload_data)
print("Produced message to Kafka via S3.")

Consumer

from s3_connector import S3Consumer

# Initialize consumer with the config
consumer = S3Consumer(config=consumer_config)
consumer.subscribe(["large-messages-topic"])

print("Waiting for messages...")
while True:
    try:
        # Poll for a message
        payload = consumer.poll(timeout=10.0)

        if payload:
            print(f"Received message of size: {len(payload)} bytes")
            # Save the received file
            with open("examples/received_payload.txt", "wb") as f:
                f.write(payload)
            break # Exit after one message for this example

    except KeyboardInterrupt:
        break

consumer.close()

## Testing

Install runtime deps and test tools:

```bash
pip install -r requirements.txt pytest pytest-mock

Then run pytest from the project root:

pytest

Docker & Operations

Build locally:

docker build -t kaf-s3-connector:local .

Pull from GHCR:

docker pull ghcr.io/2pk03/kaf-s3-connector:latest
# or a specific tag (git tag or commit SHA)
docker pull ghcr.io/2pk03/kaf-s3-connector:v1.0.0

Run as consumer (replace envs as needed):

docker run --rm -p 8000:8000 \
  -e MODE=consumer \
  -e TOPIC=large-messages-topic \
  -e KAFKA_BOOTSTRAP_SERVERS=broker:9092 \
  -e KAFKA_GROUP_ID=my-group \
  -e S3_BUCKET=my-large-messages-bucket \
  kaf-s3-connector:local

Run as producer reading stdin:

echo "hello" | docker run --rm -i \
  -e MODE=producer \
  -e TOPIC=large-messages-topic \
  -e KAFKA_BOOTSTRAP_SERVERS=broker:9092 \
  -e S3_BUCKET=my-large-messages-bucket \
  kaf-s3-connector:local

Configuration is driven by env vars:

  • Kafka: KAFKA_BOOTSTRAP_SERVERS, KAFKA_GROUP_ID (consumer), DLQ_TOPIC, KAFKA_SECURITY_PROTOCOL, KAFKA_SASL_*, KAFKA_SSL_*, etc.
  • S3: S3_BUCKET, S3_PREFIX, S3_DELETE_AFTER_CONSUME, S3_ALLOW_INLINE_PAYLOADS, S3_MAX_INLINE_BYTES, S3_MAX_PAYLOAD_BYTES, S3_DETERMINISTIC_KEYS, S3_COMPRESSION (gzip), S3_TTL_SECONDS, S3_SSE, S3_SSE_KMS_KEY_ID.
  • App: MODE (producer|consumer), TOPIC, POLL_TIMEOUT, METRICS_PORT (default 8000).

Metrics

  • /metrics exposes Prometheus text format on METRICS_PORT (default 8000).
  • Sample Prometheus scrape config: config/prometheus.yml
  • Sample Grafana dashboard JSON: config/grafana-dashboard.json

Helm

  • Chart: charts/kaf-s3-connector
  • Override values (examples):
helm upgrade --install kaf-s3 charts/kaf-s3-connector \
  --set image.repository=ghcr.io/2pk03/kaf-s3-connector \
  --set image.tag=latest \
  --set env.MODE=consumer \
  --set env.TOPIC=large-messages-topic \
  --set env.KAFKA_BOOTSTRAP_SERVERS=broker:9092 \
  --set env.KAFKA_GROUP_ID=my-group \
  --set env.S3_BUCKET=my-large-messages-bucket

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kaf_s3_connector-1.2.5.dev0.tar.gz (33.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kaf_s3_connector-1.2.5.dev0-py3-none-any.whl (21.2 kB view details)

Uploaded Python 3

File details

Details for the file kaf_s3_connector-1.2.5.dev0.tar.gz.

File metadata

  • Download URL: kaf_s3_connector-1.2.5.dev0.tar.gz
  • Upload date:
  • Size: 33.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.0 CPython/3.12.12

File hashes

Hashes for kaf_s3_connector-1.2.5.dev0.tar.gz
Algorithm Hash digest
SHA256 8d303a8d696063e3f3271f3dd3d10e3d1b0bf303cd04c3063ca01349ef348476
MD5 8e30c3eb6bd218fd9609be1f124d43c0
BLAKE2b-256 03fd41f637632d990d13d06bfb4dffc1ec743a1438efde7e392c748a5109d008

See more details on using hashes here.

File details

Details for the file kaf_s3_connector-1.2.5.dev0-py3-none-any.whl.

File metadata

File hashes

Hashes for kaf_s3_connector-1.2.5.dev0-py3-none-any.whl
Algorithm Hash digest
SHA256 9e000d0154000fad5ab7def861015b7ef5577d2b0bd4e96e133c6d754784c57b
MD5 cc064392ea7b32f66719cbc459ffc592
BLAKE2b-256 2ae05f71d2a04d2c71e5929800a9a189b1510276f574d9187d22dd75646c76c9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page