Skip to main content

Apache Beam Python I/O connector for Amazon SQS

Project description

sqs_pyio

doc test release pypi python

Amazon Simple Queue Service (Amazon SQS) offers a secure, durable, and available hosted queue that lets you integrate and decouple distributed software systems and components. The Apache Beam Python I/O connector for Amazon SQS (sqs_pyio) aims to integrate with the queue service by supporting a source and sink connectors. Currently, a sink connector is available.

Installation

The connector can be installed from PyPI.

pip install sqs_pyio

Usage

Sink Connector

It has the main composite transform (WriteToSqs), and it expects a list or tuple PCollection element. If the element is a tuple, the tuple's first element is taken. If the element is not of the accepted types, you can apply the GroupIntoBatches or BatchElements transform beforehand. Then, the element is sent into a SQS queue using the send_message_batch method of the boto3 package. Note that the above batch transforms can also be useful to overcome the API limitation listed below.

  • Each SendMessageBatch request supports up to 10 messages. The maximum allowed individual message size and the maximum total payload size (the sum of the individual lengths of all the batched messages) are both 256 KiB (262,144 bytes).

The transform also has options that handle failed records as listed below.

  • max_trials - The maximum number of trials when there is one or more failed records - it defaults to 3. Note that failed records after all trials are returned by a tagged output, which allows users to determine how to handle them subsequently.
  • append_error - Whether to append error details to failed records. Defaults to True.

As mentioned earlier, failed elements are returned by a tagged output where it is named as write-to-sqs-failed-output by default. You can change the name by specifying a different name using the failed_output argument.

Sink Connector Example

If a PCollection element is key-value pair (i.e. keyed stream), it can be batched in group using the GroupIntoBatches transform before it is connected into the main transform.

import apache_beam as beam
from apache_beam import GroupIntoBatches
from sqs_pyio.io import WriteToSqs

records = [(i % 2, {"Id": str(i), "MessageBody": str(i)}) for i in range(3)]

with beam.Pipeline() as p:
    (
        p
        | beam.Create(records)
        | GroupIntoBatches(batch_size=2)
        | WriteToSqs(queue_name=self.queue_name)
    )

For a list element (i.e. unkeyed stream), we can apply the BatchElements transform instead.

import apache_beam as beam
from apache_beam.transforms.util import BatchElements
from sqs_pyio.io import WriteToSqs

records = [{"Id": str(i), "MessageBody": str(i)} for i in range(3)]

with beam.Pipeline() as p:
    (
        p
        | beam.Create(records)
        | BatchElements(min_batch_size=2, max_batch_size=2)
        | WriteToSqs(queue_name=self.queue_name)
    )

See this post for more examples.

Contributing

Interested in contributing? Check out the contributing guidelines. Please note that this project is released with a Code of Conduct. By contributing to this project, you agree to abide by its terms.

License

sqs_pyio was created as part of the Apache Beam Python I/O Connectors project. It is licensed under the terms of the Apache License 2.0 license.

Credits

sqs_pyio was created with cookiecutter and the pyio-cookiecutter template.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sqs_pyio-0.1.0.tar.gz (6.5 kB view details)

Uploaded Source

Built Distribution

sqs_pyio-0.1.0-py3-none-any.whl (8.9 kB view details)

Uploaded Python 3

File details

Details for the file sqs_pyio-0.1.0.tar.gz.

File metadata

  • Download URL: sqs_pyio-0.1.0.tar.gz
  • Upload date:
  • Size: 6.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.8.10 Linux/5.15.153.1-microsoft-standard-WSL2

File hashes

Hashes for sqs_pyio-0.1.0.tar.gz
Algorithm Hash digest
SHA256 1d95d52836da4212fe8266eb6a4a956a418e1734c3266d56dfadcdc69b846d2b
MD5 900a35d61d3ddb5bdcfe32f13b2cd35c
BLAKE2b-256 3229de1db26b87ca533f3c944b4b400eb4d465148b4c6cf337209a563e6609b4

See more details on using hashes here.

File details

Details for the file sqs_pyio-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: sqs_pyio-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 8.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.8.10 Linux/5.15.153.1-microsoft-standard-WSL2

File hashes

Hashes for sqs_pyio-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 41fda3e34a2df0151d0c106492b2abf06fa8d34a7c5e52d3feaa3b0618d39bfd
MD5 9b5323cadfb65f181ff4bb02f002b505
BLAKE2b-256 8b3b42ae56db7e976a3cac33cf02706ac2c70613d9ee84cfd48d8082f1737439

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page