Skip to main content

Faust serializer that serializes large messages on Amazon S3

Project description

GitHub license Python Version Build Status PyPI version

faust-s3-backed-serializer

A Faust Serializer that reads and writes records from and to S3 transparently.

This serializer is compatible with our Kafka S3-backed SerDe for Java.

Read more about it on our blog.

Getting Started

PyPi

pip install faust-s3-backed-serializer
Usage

The serializer was build to be used with other serializers. The idea is to use the "concatenation" feature that comes with Faust

import faust
from faust import Record
import logging
from faust_s3_backed_serializer import S3BackedSerializer
from faust.serializers import codecs


# model.user
class UserModel(Record, serializer="s3_json"):
    first_name: str
    last_name: str


# Declare the serializers
credentials = { 
# you can also leave the fields empty 
# in order to use the default AWS credential chain
    's3backed.access.key': 'access_key',
    's3backed.secret.key': 'secret_key'
}

topic_name = "users_s3"
s3_backed_serializer = S3BackedSerializer(output_topic=topic_name,
                                          base_path="s3://your-bucket-name/",
                                          region_name="eu-central-1",
                                          s3_credentials=credentials, 
                                          max_size=0,
                                          is_key=False)
json_serializer = codecs.get_codec("json")

# Here we use json as the first serializer and
# then we can upload everything to the S3 bucket
s3_json_serializer = json_serializer | s3_backed_serializer

# config
logger = logging.getLogger(__name__)
codecs.register("s3_json", s3_json_serializer)
app = faust.App("app_id", broker="kafka://localhost:9092")
users_topic = app.topic(topic_name, value_type=UserModel)


@app.agent(users_topic)
async def users(users):
    async for user in users:
        logger.info("Event received in topic")
        logger.info(f"The user is : {user}")


@app.timer(5.0, on_leader=True)
async def send_users():
    data_user = {"first_name": "bar", "last_name": "foo"}
    user = UserModel(**data_user)
    await users.send(value=user, value_serializer=s3_json_serializer)


app.main()

Contributing

We are happy if you want to contribute to this project. If you find any bugs or have suggestions for improvements, please open an issue. We are also happy to accept your PRs. Just open an issue beforehand and let us know what you want to do and why.

License

This project is licensed under the MIT license. Have a look at the LICENSE for more details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

faust-s3-backed-serializer-1.1.0.tar.gz (7.7 kB view details)

Uploaded Source

Built Distribution

File details

Details for the file faust-s3-backed-serializer-1.1.0.tar.gz.

File metadata

File hashes

Hashes for faust-s3-backed-serializer-1.1.0.tar.gz
Algorithm Hash digest
SHA256 315c4c05b96be9702fdb2768f931c9ce87ce3197ac6b5a6680c396fec41a6be8
MD5 a4b84d4074a26f3a69ed989706047ef8
BLAKE2b-256 450c003b5716a8d7e247f6b233b7e8ac81d63c88dea2c99b2adfda88a8ee70b4

See more details on using hashes here.

File details

Details for the file faust_s3_backed_serializer-1.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for faust_s3_backed_serializer-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 86d2e0a2981beae8d5f866ae95ed8212171dc21e4033c42d2c06677cf1fdbe4d
MD5 3364ee2038971d2346de20c66dc288fe
BLAKE2b-256 68f1dc425aa802235b7c52a91d3aa459e9628c9f637b4b74b3a0b24cd7a38230

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page