A Python library to handle large Kafka messages using S3.
Project description
Kafka S3 Connector (kaf-s3-connector)
A Python library to seamlessly handle large Kafka messages by offloading them to Amazon S3.
This library provides a custom Kafka Producer and Consumer that automatically handle the process of storing large message payloads in an S3 bucket and passing lightweight references through Kafka.
Key Features
- Automatic S3 Offloading: Produce messages larger than Kafka's recommended limit without manual intervention.
- Transparent Consumption: Consume large messages as if they were directly in Kafka.
- Data Integrity: Verifies the integrity of S3 objects using ETags to prevent data corruption.
- Secure by Default: Leverages AWS IAM roles and the default
boto3credential chain, avoiding the need to hardcode secrets. - Flexible Configuration: Built on top of
confluent-kafka-python, allowing for full customization of Kafka client settings, including SASL and SSL. - Operational Ready: DLQ support, Prometheus
/metrics, Helm chart, non-root image, optional compression, TTL hints, SSE-KMS.
Installation
pip install .
How it Works
- The
S3Producerreceives a large payload. - It uploads the payload to a specified S3 bucket with a unique key.
- It produces a small JSON message to a Kafka topic containing the S3 bucket, key, and the object's ETag.
- The
S3Consumerreads the JSON reference from Kafka. - It downloads the original payload from S3.
- It verifies the ETag to ensure the data is not corrupted, then returns the payload to your application.
Configuration
Configuration is handled through a single dictionary, with separate keys for kafka and s3 settings.
Security
- AWS Credentials: This library is designed to be secure. Do not pass AWS credentials in the configuration. It uses
boto3's default credential discovery chain. The recommended and most secure way to provide credentials is by using an IAM Role attached to your compute instance (EC2, ECS, Lambda, etc.). Alternatively, you can use environment variables or a shared credentials file (~/.aws/credentials). - Kafka Security: All standard
confluent-kafka-pythonsecurity settings are supported and should be passed within thekafkadictionary. This includes SASL for authentication (e.g.,PLAIN,SCRAM, andGSSAPIfor Kerberos) and SSL/TLS for encryption.
Example Configuration
producer_config = {
"kafka": {
"bootstrap.servers": "localhost:9092",
# Optional DLQ for delivery failures
# "dlq_topic": "my-s3-dlq",
},
"s3": {
"bucket": "my-large-messages-bucket",
# Optional toggles
# "max_inline_bytes": 900_000, # inline small payloads on Kafka, offload larger ones
# "max_payload_bytes": 5 * 1024 * 1024 * 1024, # hard cap on payload size
# "prefix": "kafka/topic", # prefix keys for organization/enforcement
# "deterministic_keys": False, # use payload hash for idempotent keys
# "compression": "gzip", # compress before S3 upload (gzip or None)
# "ttl_seconds": 86400, # hint TTL stored in object metadata
# "server_side_encryption": "aws:kms", # SSE, optionally with KMS key below
# "sse_kms_key_id": "<kms-key-id>",
}
}
consumer_config = {
"kafka": {
"bootstrap.servers": "localhost:9092",
"group.id": "my-s3-consumers",
"auto.offset.reset": "earliest",
# Optional: send failures to a DLQ topic
# "dlq_topic": "my-s3-dlq",
},
"s3": {
"bucket": "my-large-messages-bucket",
# Optional toggles
# "max_inline_bytes": 900_000, # inline small payloads on Kafka, offload larger ones
# "max_payload_bytes": 5 * 1024 * 1024 * 1024, # hard cap on payload size
# "delete_after_consume": False, # delete object only after a successful integrity check
# "allow_inline_payloads": True, # allow non-reference payloads to pass through unchanged
# "prefix": "kafka/topic", # enforce prefix on incoming references
# "compression": "gzip", # decompress automatically on consume
# "deterministic_keys": False, # use payload hash for idempotent keys (producer)
# "ttl_seconds": 86400, # hint TTL stored in object metadata (producer)
# "server_side_encryption": "aws:kms", # SSE, optionally with KMS key below (producer)
# "sse_kms_key_id": "<kms-key-id>", # (producer)
}
}
Usage
Producer
from s3_connector import S3Producer
# Initialize producer with the config
producer = S3Producer(config=producer_config)
# Read a sample file
with open("examples/sample_payload.txt", "rb") as f:
payload_data = f.read()
# Produce the data to a topic
producer.produce(topic="large-messages-topic", payload=payload_data)
print("Produced message to Kafka via S3.")
Consumer
from s3_connector import S3Consumer
# Initialize consumer with the config
consumer = S3Consumer(config=consumer_config)
consumer.subscribe(["large-messages-topic"])
print("Waiting for messages...")
while True:
try:
# Poll for a message
payload = consumer.poll(timeout=10.0)
if payload:
print(f"Received message of size: {len(payload)} bytes")
# Save the received file
with open("examples/received_payload.txt", "wb") as f:
f.write(payload)
break # Exit after one message for this example
except KeyboardInterrupt:
break
consumer.close()
## Testing
Install runtime deps and test tools:
```bash
pip install -r requirements.txt pytest pytest-mock
Then run pytest from the project root:
pytest
Docker & Operations
Build locally:
docker build -t kaf-s3-connector:local .
Pull from GHCR:
docker pull ghcr.io/2pk03/kaf-s3-connector:latest
# or a specific tag (git tag or commit SHA)
docker pull ghcr.io/2pk03/kaf-s3-connector:v1.0.0
Run as consumer (replace envs as needed):
docker run --rm -p 8000:8000 \
-e MODE=consumer \
-e TOPIC=large-messages-topic \
-e KAFKA_BOOTSTRAP_SERVERS=broker:9092 \
-e KAFKA_GROUP_ID=my-group \
-e S3_BUCKET=my-large-messages-bucket \
kaf-s3-connector:local
Run as producer reading stdin:
echo "hello" | docker run --rm -i \
-e MODE=producer \
-e TOPIC=large-messages-topic \
-e KAFKA_BOOTSTRAP_SERVERS=broker:9092 \
-e S3_BUCKET=my-large-messages-bucket \
kaf-s3-connector:local
Configuration is driven by env vars:
- Kafka:
KAFKA_BOOTSTRAP_SERVERS,KAFKA_GROUP_ID(consumer),DLQ_TOPIC,KAFKA_SECURITY_PROTOCOL,KAFKA_SASL_*,KAFKA_SSL_*, etc. - S3:
S3_BUCKET,S3_PREFIX,S3_DELETE_AFTER_CONSUME,S3_ALLOW_INLINE_PAYLOADS,S3_MAX_INLINE_BYTES,S3_MAX_PAYLOAD_BYTES,S3_DETERMINISTIC_KEYS,S3_COMPRESSION(gzip),S3_TTL_SECONDS,S3_SSE,S3_SSE_KMS_KEY_ID. - App:
MODE(producer|consumer),TOPIC,POLL_TIMEOUT,METRICS_PORT(default 8000).
Metrics
/metricsexposes Prometheus text format onMETRICS_PORT(default 8000).- Sample Prometheus scrape config:
config/prometheus.yml - Sample Grafana dashboard JSON:
config/grafana-dashboard.json
Helm
- Chart:
charts/kaf-s3-connector - Override values (examples):
helm upgrade --install kaf-s3 charts/kaf-s3-connector \
--set image.repository=ghcr.io/2pk03/kaf-s3-connector \
--set image.tag=latest \
--set env.MODE=consumer \
--set env.TOPIC=large-messages-topic \
--set env.KAFKA_BOOTSTRAP_SERVERS=broker:9092 \
--set env.KAFKA_GROUP_ID=my-group \
--set env.S3_BUCKET=my-large-messages-bucket
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kaf_s3_connector-1.2.5.dev0.tar.gz.
File metadata
- Download URL: kaf_s3_connector-1.2.5.dev0.tar.gz
- Upload date:
- Size: 33.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8d303a8d696063e3f3271f3dd3d10e3d1b0bf303cd04c3063ca01349ef348476
|
|
| MD5 |
8e30c3eb6bd218fd9609be1f124d43c0
|
|
| BLAKE2b-256 |
03fd41f637632d990d13d06bfb4dffc1ec743a1438efde7e392c748a5109d008
|
File details
Details for the file kaf_s3_connector-1.2.5.dev0-py3-none-any.whl.
File metadata
- Download URL: kaf_s3_connector-1.2.5.dev0-py3-none-any.whl
- Upload date:
- Size: 21.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9e000d0154000fad5ab7def861015b7ef5577d2b0bd4e96e133c6d754784c57b
|
|
| MD5 |
cc064392ea7b32f66719cbc459ffc592
|
|
| BLAKE2b-256 |
2ae05f71d2a04d2c71e5929800a9a189b1510276f574d9187d22dd75646c76c9
|