Skip to main content

Event sources for AWS Lambda

Project description

AWS Lambda Event Sources

---

End-of-Support

AWS CDK v1 has reached End-of-Support on 2023-06-01. This package is no longer being updated, and users should migrate to AWS CDK v2.

For more information on how to migrate, see the Migrating to AWS CDK v2 guide.


An event source mapping is an AWS Lambda resource that reads from an event source and invokes a Lambda function. You can use event source mappings to process items from a stream or queue in services that don't invoke Lambda functions directly. Lambda provides event source mappings for the following services. Read more about lambda event sources here.

This module includes classes that allow using various AWS services as event sources for AWS Lambda via the high-level lambda.addEventSource(source) API.

NOTE: In most cases, it is also possible to use the resource APIs to invoke an AWS Lambda function. This library provides a uniform API for all Lambda event sources regardless of the underlying mechanism they use.

The following code sets up a lambda function with an SQS queue event source -

from aws_cdk.aws_lambda_event_sources import SqsEventSource

# fn: lambda.Function

queue = sqs.Queue(self, "MyQueue")
event_source = SqsEventSource(queue)
fn.add_event_source(event_source)

event_source_id = event_source.event_source_mapping_id

The eventSourceId property contains the event source id. This will be a token that will resolve to the final value at the time of deployment.

SQS

Amazon Simple Queue Service (Amazon SQS) allows you to build asynchronous workflows. For more information about Amazon SQS, see Amazon Simple Queue Service. You can configure AWS Lambda to poll for these messages as they arrive and then pass the event to a Lambda function invocation. To view a sample event, see Amazon SQS Event.

To set up Amazon Simple Queue Service as an event source for AWS Lambda, you first create or update an Amazon SQS queue and select custom values for the queue parameters. The following parameters will impact Amazon SQS's polling behavior:

  • visibilityTimeout: May impact the period between retries.
  • receiveMessageWaitTime: Will determine long poll duration. The default value is 20 seconds.
  • batchSize: Determines how many records are buffered before invoking your lambda function.
  • maxBatchingWindow: The maximum amount of time to gather records before invoking the lambda. This increases the likelihood of a full batch at the cost of delayed processing.
  • enabled: If the SQS event source mapping should be enabled. The default is true.
from aws_cdk.aws_lambda_event_sources import SqsEventSource
# fn: lambda.Function


queue = sqs.Queue(self, "MyQueue",
    visibility_timeout=Duration.seconds(30),  # default,
    receive_message_wait_time=Duration.seconds(20)
)

fn.add_event_source(SqsEventSource(queue,
    batch_size=10,  # default
    max_batching_window=Duration.minutes(5),
    report_batch_item_failures=True
))

S3

You can write Lambda functions to process S3 bucket events, such as the object-created or object-deleted events. For example, when a user uploads a photo to a bucket, you might want Amazon S3 to invoke your Lambda function so that it reads the image and creates a thumbnail for the photo.

You can use the bucket notification configuration feature in Amazon S3 to configure the event source mapping, identifying the bucket events that you want Amazon S3 to publish and which Lambda function to invoke.

import aws_cdk.aws_s3 as s3
from aws_cdk.aws_lambda_event_sources import S3EventSource
# fn: lambda.Function


bucket = s3.Bucket(self, "mybucket")

fn.add_event_source(S3EventSource(bucket,
    events=[s3.EventType.OBJECT_CREATED, s3.EventType.OBJECT_REMOVED],
    filters=[s3.NotificationKeyFilter(prefix="subdir/")]
))

SNS

You can write Lambda functions to process Amazon Simple Notification Service notifications. When a message is published to an Amazon SNS topic, the service can invoke your Lambda function by passing the message payload as a parameter. Your Lambda function code can then process the event, for example publish the message to other Amazon SNS topics, or send the message to other AWS services.

This also enables you to trigger a Lambda function in response to Amazon CloudWatch alarms and other AWS services that use Amazon SNS.

For an example event, see Appendix: Message and JSON Formats and Amazon SNS Sample Event. For an example use case, see Using AWS Lambda with Amazon SNS from Different Accounts.

import aws_cdk.aws_sns as sns
from aws_cdk.aws_lambda_event_sources import SnsEventSource

# topic: sns.Topic

# fn: lambda.Function

dead_letter_queue = sqs.Queue(self, "deadLetterQueue")
fn.add_event_source(SnsEventSource(topic,
    filter_policy={},
    dead_letter_queue=dead_letter_queue
))

When a user calls the SNS Publish API on a topic that your Lambda function is subscribed to, Amazon SNS will call Lambda to invoke your function asynchronously. Lambda will then return a delivery status. If there was an error calling Lambda, Amazon SNS will retry invoking the Lambda function up to three times. After three tries, if Amazon SNS still could not successfully invoke the Lambda function, then Amazon SNS will send a delivery status failure message to CloudWatch.

DynamoDB Streams

You can write Lambda functions to process change events from a DynamoDB Table. An event is emitted to a DynamoDB stream (if configured) whenever a write (Put, Delete, Update) operation is performed against the table. See Using AWS Lambda with Amazon DynamoDB for more information about configuring Lambda function event sources with DynamoDB.

To process events with a Lambda function, first create or update a DynamoDB table and enable a stream specification. Then, create a DynamoEventSource and add it to your Lambda function. The following parameters will impact Amazon DynamoDB's polling behavior:

  • batchSize: Determines how many records are buffered before invoking your lambda function - could impact your function's memory usage (if too high) and ability to keep up with incoming data velocity (if too low).
  • bisectBatchOnError: If a batch encounters an error, this will cause the batch to be split in two and have each new smaller batch retried, allowing the records in error to be isolated.
  • reportBatchItemFailures: Allow functions to return partially successful responses for a batch of records.
  • maxBatchingWindow: The maximum amount of time to gather records before invoking the lambda. This increases the likelihood of a full batch at the cost of delayed processing.
  • maxRecordAge: The maximum age of a record that will be sent to the function for processing. Records that exceed the max age will be treated as failures.
  • onFailure: In the event a record fails after all retries or if the record age has exceeded the configured value, the record will be sent to SQS queue or SNS topic that is specified here
  • parallelizationFactor: The number of batches to concurrently process on each shard.
  • retryAttempts: The maximum number of times a record should be retried in the event of failure.
  • startingPosition: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all records that arrived prior to attaching the event source.
  • tumblingWindow: The duration in seconds of a processing window when using streams.
  • enabled: If the DynamoDB Streams event source mapping should be enabled. The default is true.
import aws_cdk.aws_dynamodb as dynamodb
from aws_cdk.aws_lambda_event_sources import DynamoEventSource, SqsDlq

# table: dynamodb.Table

# fn: lambda.Function


dead_letter_queue = sqs.Queue(self, "deadLetterQueue")
fn.add_event_source(DynamoEventSource(table,
    starting_position=lambda_.StartingPosition.TRIM_HORIZON,
    batch_size=5,
    bisect_batch_on_error=True,
    on_failure=SqsDlq(dead_letter_queue),
    retry_attempts=10
))

Kinesis

You can write Lambda functions to process streaming data in Amazon Kinesis Streams. For more information about Amazon Kinesis, see Amazon Kinesis Service. To learn more about configuring Lambda function event sources with kinesis and view a sample event, see Amazon Kinesis Event.

To set up Amazon Kinesis as an event source for AWS Lambda, you first create or update an Amazon Kinesis stream and select custom values for the event source parameters. The following parameters will impact Amazon Kinesis's polling behavior:

  • batchSize: Determines how many records are buffered before invoking your lambda function - could impact your function's memory usage (if too high) and ability to keep up with incoming data velocity (if too low).
  • bisectBatchOnError: If a batch encounters an error, this will cause the batch to be split in two and have each new smaller batch retried, allowing the records in error to be isolated.
  • reportBatchItemFailures: Allow functions to return partially successful responses for a batch of records.
  • maxBatchingWindow: The maximum amount of time to gather records before invoking the lambda. This increases the likelihood of a full batch at the cost of possibly delaying processing.
  • maxRecordAge: The maximum age of a record that will be sent to the function for processing. Records that exceed the max age will be treated as failures.
  • onFailure: In the event a record fails and consumes all retries, the record will be sent to SQS queue or SNS topic that is specified here
  • parallelizationFactor: The number of batches to concurrently process on each shard.
  • retryAttempts: The maximum number of times a record should be retried in the event of failure.
  • startingPosition: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all records that arrived prior to attaching the event source.
  • tumblingWindow: The duration in seconds of a processing window when using streams.
  • enabled: If the DynamoDB Streams event source mapping should be enabled. The default is true.
import aws_cdk.aws_kinesis as kinesis
from aws_cdk.aws_lambda_event_sources import KinesisEventSource

# my_function: lambda.Function


stream = kinesis.Stream(self, "MyStream")
my_function.add_event_source(KinesisEventSource(stream,
    batch_size=100,  # default
    starting_position=lambda_.StartingPosition.TRIM_HORIZON
))

Kafka

You can write Lambda functions to process data either from Amazon MSK or a self managed Kafka cluster.

The following code sets up Amazon MSK as an event source for a lambda function. Credentials will need to be configured to access the MSK cluster, as described in Username/Password authentication.

from aws_cdk.aws_secretsmanager import Secret
from aws_cdk.aws_lambda_event_sources import ManagedKafkaEventSource

# my_function: lambda.Function


# Your MSK cluster arn
cluster_arn = "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4"

# The Kafka topic you want to subscribe to
topic = "some-cool-topic"

# The secret that allows access to your MSK cluster
# You still have to make sure that it is associated with your cluster as described in the documentation
secret = Secret(self, "Secret", secret_name="AmazonMSK_KafkaSecret")
my_function.add_event_source(ManagedKafkaEventSource(
    cluster_arn=cluster_arn,
    topic=topic,
    secret=secret,
    batch_size=100,  # default
    starting_position=lambda_.StartingPosition.TRIM_HORIZON
))

The following code sets up a self managed Kafka cluster as an event source. Username and password based authentication will need to be set up as described in Managing access and permissions.

from aws_cdk.aws_secretsmanager import Secret
from aws_cdk.aws_lambda_event_sources import SelfManagedKafkaEventSource

# The secret that allows access to your self hosted Kafka cluster
# secret: Secret

# my_function: lambda.Function


# The list of Kafka brokers
bootstrap_servers = ["kafka-broker:9092"]

# The Kafka topic you want to subscribe to
topic = "some-cool-topic"
my_function.add_event_source(SelfManagedKafkaEventSource(
    bootstrap_servers=bootstrap_servers,
    topic=topic,
    secret=secret,
    batch_size=100,  # default
    starting_position=lambda_.StartingPosition.TRIM_HORIZON
))

If your self managed Kafka cluster is only reachable via VPC also configure vpc vpcSubnets and securityGroup.

Roadmap

Eventually, this module will support all the event sources described under Supported Event Sources in the AWS Lambda Developer Guide.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

Built Distribution

File details

Details for the file aws-cdk.aws-lambda-event-sources-1.204.0.tar.gz.

File metadata

File hashes

Hashes for aws-cdk.aws-lambda-event-sources-1.204.0.tar.gz
Algorithm Hash digest
SHA256 8f431884e6f7f43b7c4131fb45ffd2c99b01ef09036e75b98517b778dae3a0c8
MD5 cdbd5b1c99e30174e7912682302d6625
BLAKE2b-256 2c872c3d8832ea969b50a6d7610083b0b3a41d8462ee110f4793a78fda35cef6

See more details on using hashes here.

File details

Details for the file aws_cdk.aws_lambda_event_sources-1.204.0-py3-none-any.whl.

File metadata

File hashes

Hashes for aws_cdk.aws_lambda_event_sources-1.204.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c9b4d3f11ab3b370b7bb8b4c33632eeae73d03c153408eab3ae8374be53a708a
MD5 8c71cffcf931c4179e5732429935dfca
BLAKE2b-256 0816d85b12ce19fbdf97e470e9c715db026b79d4c05b3ce4e05bf181f66da6e5

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page