Skip to main content

Apache Beam Python I/O connector for Amazon Firehose

Project description

firehose_pyio

doc test release pypi python

Amazon Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon OpenSearch Service and Amazon OpenSearch Serverless. The Apache Beam Python I/O connector for Amazon Data Firehose (firehose_pyio) provides a data sink feature that facilitates integration with those services.

Installation

$ pip install firehose_pyio

Usage

The connector has the main composite transform (WriteToFirehose), and it expects a list or tuple PCollection element. If the element is a tuple, the tuple's first element is taken. If the element is not of the accepted types, you can apply the GroupIntoBatches or BatchElements transform beforehand. Then, the element is sent into a Firehose delivery stream using the put_record_batch method of the boto3 package. Note that the above batch transforms can also be useful to overcome the API limitation listed below.

  • Each PutRecordBatch request supports up to 500 records. Each record in the request can be as large as 1,000 KB (before base64 encoding), up to a limit of 4 MB for the entire request. These limits cannot be changed.

The transform also has options that control individual records as well as handle failed records.

  • jsonify - A flag that indicates whether to convert a record into JSON. Note that a record should be of bytes, bytearray or file-like object, and, if it is not of a supported type (e.g. integer), we can convert it into a Json string by specifying this flag to True.
  • multiline - A flag that indicates whether to add a new line character (\n) to each record. It is useful to save records into a CSV or Jsonline file.
  • max_trials - The maximum number of trials when there is one or more failed records - it defaults to 3. Note that failed records after all trials are returned, which allows users to determine how to handle them subsequently.
  • append_error - Whether to append error details to failed records. Defaults to True.

As mentioned earlier, failed elements are returned by a tagged output where it is named as write-to-firehose-failed-output by default. You can change the name by specifying a different name using the failed_output argument.

Example

If a PCollection element is key-value pair (i.e. keyed stream), it can be batched in group using the GroupIntoBatches transform before it is connected into the main transform.

import apache_beam as beam
from apache_beam import GroupIntoBatches
from firehose_pyio.io import WriteToFirehose

with beam.Pipeline(options=pipeline_options) as p:
    (
        p
        | beam.Create([(1, "one"), (2, "three"), (1, "two"), (2, "four")])
        | GroupIntoBatches(batch_size=2)
        | WriteToFirehose(
            delivery_stream_name=delivery_stream_name,
            jsonify=True,
            multiline=True,
            max_trials=3
        )
    )

For a list element (i.e. unkeyed stream), we can apply the BatchElements transform instead.

import apache_beam as beam
from apache_beam.transforms.util import BatchElements
from firehose_pyio.io import WriteToFirehose

with beam.Pipeline(options=pipeline_options) as p:
    (
        p
        | beam.Create(["one", "two", "three", "four"])
        | BatchElements(min_batch_size=2, max_batch_size=2)
        | WriteToFirehose(
            delivery_stream_name=delivery_stream_name,
            jsonify=True,
            multiline=True,
            max_trials=3
        )
    )

See this post for more examples.

Contributing

Interested in contributing? Check out the contributing guidelines. Please note that this project is released with a Code of Conduct. By contributing to this project, you agree to abide by its terms.

License

firehose_pyio was created as part of the Apache Beam Python I/O Connectors project. It is licensed under the terms of the Apache License 2.0 license.

Credits

firehose_pyio was created with cookiecutter and the pyio-cookiecutter template.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

firehose_pyio-0.2.1.tar.gz (8.5 kB view details)

Uploaded Source

Built Distribution

firehose_pyio-0.2.1-py3-none-any.whl (9.8 kB view details)

Uploaded Python 3

File details

Details for the file firehose_pyio-0.2.1.tar.gz.

File metadata

  • Download URL: firehose_pyio-0.2.1.tar.gz
  • Upload date:
  • Size: 8.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.8.10 Linux/5.15.153.1-microsoft-standard-WSL2

File hashes

Hashes for firehose_pyio-0.2.1.tar.gz
Algorithm Hash digest
SHA256 0cbc697d66b67bdebd4eafbc599e02112b6f0de15b622f96c9eefb023fb689e2
MD5 a5fa7e6620815061cb1c90beb8fb4e50
BLAKE2b-256 f4956a64294383f3ffa7c6ad7cb333b67cb457b0e4ee66294b62fddf36a3e167

See more details on using hashes here.

File details

Details for the file firehose_pyio-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: firehose_pyio-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 9.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.8.10 Linux/5.15.153.1-microsoft-standard-WSL2

File hashes

Hashes for firehose_pyio-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 7fb8984b81c6283f923a59c30b06d9e4ab864708563a0929c38cb78776ea0a3c
MD5 f8ad693b8713a0f3cc2b4b03378c2e6f
BLAKE2b-256 89808ec8bb826ed39120b61b94291896bc7f136d93fd285fb90b1cfb0255676e

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page