Skip to main content

Faust serializer that serializes large messages on Amazon S3 or Azure Blob Storage

Project description

GitHub license Python Version Build Status PyPI version

faust-large-message-serializer

A Faust Serializer that reads and writes records from and to S3 or Azure Blob Storage transparently.

This serializer is compatible with our Kafka large-message-serializer SerDe for Java.

Read more about it on our blog.

Getting Started

PyPi

pip install faust-large-message-serializer
Usage

The serializer was build to be used with other serializers. The idea is to use the "concatenation" feature that comes with Faust

import faust
from faust import Record
import logging
from faust_large_message_serializer import LargeMessageSerializer, LargeMessageSerializerConfig
from faust.serializers import codecs


# model.user
class UserModel(Record, serializer="s3_json"):
    first_name: str
    last_name: str


config = LargeMessageSerializerConfig(base_path="s3://your-bucket-name/",
                                      max_size=0,
                                      large_message_s3_region="eu-central-1",
                                      large_message_s3_access_key="access_key",
                                      large_message_s3_secret_key="secret_key")

topic_name = "users_s3"
s3_backed_serializer = LargeMessageSerializer(topic_name, config, is_key=False)
json_serializer = codecs.get_codec("json")

# Here we use json as the first serializer and
# then we can upload everything to the S3 bucket
s3_json_serializer = json_serializer | s3_backed_serializer

# config
logger = logging.getLogger(__name__)
codecs.register("s3_json", s3_json_serializer)
app = faust.App("app_id", broker="kafka://localhost:9092")
users_topic = app.topic(topic_name, value_type=UserModel)


@app.agent(users_topic)
async def users(users):
    async for user in users:
        logger.info("Event received in topic")
        logger.info(f"The user is : {user}")


@app.timer(5.0, on_leader=True)
async def send_users():
    data_user = {"first_name": "bar", "last_name": "foo"}
    user = UserModel(**data_user)
    await users.send(value=user, value_serializer=s3_json_serializer)


app.main()

Contributing

We are happy if you want to contribute to this project. If you find any bugs or have suggestions for improvements, please open an issue. We are also happy to accept your PRs. Just open an issue beforehand and let us know what you want to do and why.

License

This project is licensed under the MIT license. Have a look at the LICENSE for more details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

faust-large-message-serializer-2.0.1.tar.gz (12.9 kB view details)

Uploaded Source

Built Distribution

File details

Details for the file faust-large-message-serializer-2.0.1.tar.gz.

File metadata

File hashes

Hashes for faust-large-message-serializer-2.0.1.tar.gz
Algorithm Hash digest
SHA256 7c60f70feb0e266b60b46f7625afcbb95339d94d964a07fe89cfba564529c0f7
MD5 db445aaf67e793a804f986aa9665295d
BLAKE2b-256 f06e907edf0dc07e68b1f4b87e524888acbf08b7743961cf64fa892a8453f584

See more details on using hashes here.

File details

Details for the file faust_large_message_serializer-2.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for faust_large_message_serializer-2.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 5bea0e2e8330e88dda1526b9e4e415644c1661d5c0dc9c11344b8d7d032beaab
MD5 0eb42344f834be36860d7d472e42d945
BLAKE2b-256 8bb890c4dfffa90c447d64ba68f3289c36974b16d929a8c23bd64cfc7a44f74b

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page