Skip to main content

Apache Beam Python I/O connector for Amazon Firehose

Project description

firehose_pyio

doc test release python os

Amazon Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon OpenSearch Service and Amazon OpenSearch Serverless. The Apache Beam Python I/O connector for Amazon Data Firehose (firehose_pyio) provides a data sink feature that facilitates integration with those services.

Installation

$ pip install firehose_pyio

Usage

The connector has the main composite transform (WriteToFirehose), and it expects a list or tuple PCollection element. If the element is a tuple, the tuple's first element is taken. If the element is not of the accepted types, you can apply the GroupIntoBatches or BatchElements transform beforehand. Then, the element is sent into a Firehose delivery stream using the put_record_batch method of the boto3 package. Note that the above batch transforms can also be useful to overcome the API limitation listed below.

  • Each PutRecordBatch request supports up to 500 records. Each record in the request can be as large as 1,000 KB (before base64 encoding), up to a limit of 4 MB for the entire request. These limits cannot be changed.

The transform also has options that control individual records or handling failed records.

  • jsonify - A flag that indicates whether to convert a record into Json. Note that a record should be of bytes, bytearray or file-like object, and, if it is not of a supported type (e.g. integer), we can convert it into a Json string by specifying this flag to True.
  • multiline - A flag that indicates whether to add a new line character (\n) to each record. It is useful to save records into a CSV or Jsonline file.
  • max_trials - The maximum number of trials when there is one or more failed records - it defaults to 3. Note that failed records after all trials are returned, which allows users to determine how to handle them subsequently.

Example

If a PCollection element is key-value pair (i.e. keyed stream), it can be batched in group using the GroupIntoBatches transform before it is connected into the main transform.

import apache_beam as beam
from apache_beam import GroupIntoBatches
from firehose_pyio.io import WriteToFirehose

with beam.Pipeline(options=pipeline_options) as p:
    (
        p
        | beam.Create([(1, "one"), (2, "three"), (1, "two"), (2, "four")])
        | GroupIntoBatches(batch_size=2)
        | WriteToFirehose(
            delivery_stream_name=delivery_stream_name,
            jsonify=True,
            multiline=True,
            max_trials=3
        )
    )

For a list element (i.e. unkeyed stream), we can apply the BatchElements transform instead.

import apache_beam as beam
from apache_beam.transforms.util import BatchElements
from firehose_pyio.io import WriteToFirehose

with beam.Pipeline(options=pipeline_options) as p:
    (
        p
        | beam.Create(["one", "two", "three", "four"])
        | BatchElements(min_batch_size=2, max_batch_size=2)
        | WriteToFirehose(
            delivery_stream_name=delivery_stream_name,
            jsonify=True,
            multiline=True,
            max_trials=3
        )
    )

See this post for more examples.

Contributing

Interested in contributing? Check out the contributing guidelines. Please note that this project is released with a Code of Conduct. By contributing to this project, you agree to abide by its terms.

License

firehose_pyio was created by Beam PyIO beam.pyio@gmail.com. It is licensed under the terms of the Apache License 2.0 license.

Credits

firehose_pyio was created with cookiecutter and the pyio-cookiecutter template.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

firehose_pyio-0.1.0.tar.gz (8.0 kB view details)

Uploaded Source

Built Distribution

firehose_pyio-0.1.0-py3-none-any.whl (9.4 kB view details)

Uploaded Python 3

File details

Details for the file firehose_pyio-0.1.0.tar.gz.

File metadata

  • Download URL: firehose_pyio-0.1.0.tar.gz
  • Upload date:
  • Size: 8.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.8.10 Linux/5.15.153.1-microsoft-standard-WSL2

File hashes

Hashes for firehose_pyio-0.1.0.tar.gz
Algorithm Hash digest
SHA256 e08cbd66265fa52cb40f3cd7838fe1d1ce22335bd1b484d5d7da444136a062db
MD5 1a164f7bb4ce60df233df62b7a7cabd6
BLAKE2b-256 2eec014b50155b48d7fe6a431325bba4432f3300b31daaf6130c5757a57ed102

See more details on using hashes here.

File details

Details for the file firehose_pyio-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: firehose_pyio-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 9.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.8.10 Linux/5.15.153.1-microsoft-standard-WSL2

File hashes

Hashes for firehose_pyio-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 edc6479cea49a881974b8c7680df1b11aac3779793cc3adfecee841079e12084
MD5 04e2e4fb64cf864ab84a49fb634c324e
BLAKE2b-256 ee2315406e52aa7557afa5d11d821f866d8a5330b863c0248747fb6b196ce825

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page