Skip to main content

Receipt rule actions for AWS IoT

Project description

Actions for AWS IoT Rule

---

cdk-constructs: Experimental

The APIs of higher level constructs in this module are experimental and under active development. They are subject to non-backward compatible changes or removal in any future version. These are not subject to the Semantic Versioning model and breaking changes will be announced in the release notes. This means that while you may use them, you may need to update your source code when upgrading to a newer version of this package.


This library contains integration classes to send data to any number of supported AWS Services. Instances of these classes should be passed to TopicRule defined in @aws-cdk/aws-iot.

Currently supported are:

  • Republish a message to another MQTT topic
  • Invoke a Lambda function
  • Put objects to a S3 bucket
  • Put logs to CloudWatch Logs
  • Capture CloudWatch metrics
  • Change state for a CloudWatch alarm
  • Put records to Kinesis Data stream
  • Put records to Kinesis Data Firehose stream
  • Send messages to SQS queues
  • Publish messages on SNS topics
  • Write messages into columns of DynamoDB
  • Put messages IoT Events input

Republish a message to another MQTT topic

The code snippet below creates an AWS IoT Rule that republish a message to another MQTT topic when it is triggered.

iot.TopicRule(self, "TopicRule",
    sql=iot.IotSql.from_string_as_ver20160323("SELECT topic(2) as device_id, timestamp() as timestamp, temperature FROM 'device/+/data'"),
    actions=[
        actions.IotRepublishMqttAction("${topic()}/republish",
            quality_of_service=actions.MqttQualityOfService.AT_LEAST_ONCE
        )
    ]
)

Invoke a Lambda function

The code snippet below creates an AWS IoT Rule that invoke a Lambda function when it is triggered.

func = lambda_.Function(self, "MyFunction",
    runtime=lambda_.Runtime.NODEJS_14_X,
    handler="index.handler",
    code=lambda_.Code.from_inline("""
            exports.handler = (event) => {
              console.log("It is test for lambda action of AWS IoT Rule.", event);
            };""")
)

iot.TopicRule(self, "TopicRule",
    sql=iot.IotSql.from_string_as_ver20160323("SELECT topic(2) as device_id, timestamp() as timestamp, temperature FROM 'device/+/data'"),
    actions=[actions.LambdaFunctionAction(func)]
)

Put objects to a S3 bucket

The code snippet below creates an AWS IoT Rule that puts objects to a S3 bucket when it is triggered.

bucket = s3.Bucket(self, "MyBucket")

iot.TopicRule(self, "TopicRule",
    sql=iot.IotSql.from_string_as_ver20160323("SELECT topic(2) as device_id FROM 'device/+/data'"),
    actions=[actions.S3PutObjectAction(bucket)]
)

The property key of S3PutObjectAction is given the value ${topic()}/${timestamp()} by default. This ${topic()} and ${timestamp()} is called Substitution templates. For more information see this documentation. In above sample, ${topic()} is replaced by a given MQTT topic as device/001/data. And ${timestamp()} is replaced by the number of the current timestamp in milliseconds as 1636289461203. So if the MQTT broker receives an MQTT topic device/001/data on 2021-11-07T00:00:00.000Z, the S3 bucket object will be put to device/001/data/1636243200000.

You can also set specific key as following:

bucket = s3.Bucket(self, "MyBucket")

iot.TopicRule(self, "TopicRule",
    sql=iot.IotSql.from_string_as_ver20160323("SELECT topic(2) as device_id, year, month, day FROM 'device/+/data'"),
    actions=[
        actions.S3PutObjectAction(bucket,
            key="${year}/${month}/${day}/${topic(2)}"
        )
    ]
)

If you wanna set access control to the S3 bucket object, you can specify accessControl as following:

bucket = s3.Bucket(self, "MyBucket")

iot.TopicRule(self, "TopicRule",
    sql=iot.IotSql.from_string_as_ver20160323("SELECT * FROM 'device/+/data'"),
    actions=[
        actions.S3PutObjectAction(bucket,
            access_control=s3.BucketAccessControl.PUBLIC_READ
        )
    ]
)

Put logs to CloudWatch Logs

The code snippet below creates an AWS IoT Rule that puts logs to CloudWatch Logs when it is triggered.

import aws_cdk.aws_logs as logs


log_group = logs.LogGroup(self, "MyLogGroup")

iot.TopicRule(self, "TopicRule",
    sql=iot.IotSql.from_string_as_ver20160323("SELECT topic(2) as device_id FROM 'device/+/data'"),
    actions=[actions.CloudWatchLogsAction(log_group)]
)

Capture CloudWatch metrics

The code snippet below creates an AWS IoT Rule that capture CloudWatch metrics when it is triggered.

topic_rule = iot.TopicRule(self, "TopicRule",
    sql=iot.IotSql.from_string_as_ver20160323("SELECT topic(2) as device_id, namespace, unit, value, timestamp FROM 'device/+/data'"),
    actions=[
        actions.CloudWatchPutMetricAction(
            metric_name="${topic(2)}",
            metric_namespace="${namespace}",
            metric_unit="${unit}",
            metric_value="${value}",
            metric_timestamp="${timestamp}"
        )
    ]
)

Change the state of an Amazon CloudWatch alarm

The code snippet below creates an AWS IoT Rule that changes the state of an Amazon CloudWatch alarm when it is triggered:

import aws_cdk.aws_cloudwatch as cloudwatch


metric = cloudwatch.Metric(
    namespace="MyNamespace",
    metric_name="MyMetric",
    dimensions={"MyDimension": "MyDimensionValue"}
)
alarm = cloudwatch.Alarm(self, "MyAlarm",
    metric=metric,
    threshold=100,
    evaluation_periods=3,
    datapoints_to_alarm=2
)

topic_rule = iot.TopicRule(self, "TopicRule",
    sql=iot.IotSql.from_string_as_ver20160323("SELECT topic(2) as device_id FROM 'device/+/data'"),
    actions=[
        actions.CloudWatchSetAlarmStateAction(alarm,
            reason="AWS Iot Rule action is triggered",
            alarm_state_to_set=cloudwatch.AlarmState.ALARM
        )
    ]
)

Put records to Kinesis Data stream

The code snippet below creates an AWS IoT Rule that puts records to Kinesis Data stream when it is triggered.

import aws_cdk.aws_kinesis as kinesis


stream = kinesis.Stream(self, "MyStream")

topic_rule = iot.TopicRule(self, "TopicRule",
    sql=iot.IotSql.from_string_as_ver20160323("SELECT * FROM 'device/+/data'"),
    actions=[
        actions.KinesisPutRecordAction(stream,
            partition_key="${newuuid()}"
        )
    ]
)

Put records to Kinesis Data Firehose stream

The code snippet below creates an AWS IoT Rule that puts records to Put records to Kinesis Data Firehose stream when it is triggered.

import aws_cdk.aws_kinesisfirehose_alpha as firehose
import aws_cdk.aws_kinesisfirehose_destinations_alpha as destinations


bucket = s3.Bucket(self, "MyBucket")
stream = firehose.DeliveryStream(self, "MyStream",
    destinations=[destinations.S3Bucket(bucket)]
)

topic_rule = iot.TopicRule(self, "TopicRule",
    sql=iot.IotSql.from_string_as_ver20160323("SELECT * FROM 'device/+/data'"),
    actions=[
        actions.FirehosePutRecordAction(stream,
            batch_mode=True,
            record_separator=actions.FirehoseRecordSeparator.NEWLINE
        )
    ]
)

Send messages to an SQS queue

The code snippet below creates an AWS IoT Rule that send messages to an SQS queue when it is triggered:

import aws_cdk.aws_sqs as sqs


queue = sqs.Queue(self, "MyQueue")

topic_rule = iot.TopicRule(self, "TopicRule",
    sql=iot.IotSql.from_string_as_ver20160323("SELECT topic(2) as device_id, year, month, day FROM 'device/+/data'"),
    actions=[
        actions.SqsQueueAction(queue,
            use_base64=True
        )
    ]
)

Publish messages on an SNS topic

The code snippet below creates and AWS IoT Rule that publishes messages to an SNS topic when it is triggered:

import aws_cdk.aws_sns as sns


topic = sns.Topic(self, "MyTopic")

topic_rule = iot.TopicRule(self, "TopicRule",
    sql=iot.IotSql.from_string_as_ver20160323("SELECT topic(2) as device_id, year, month, day FROM 'device/+/data'"),
    actions=[
        actions.SnsTopicAction(topic,
            message_format=actions.SnsActionMessageFormat.JSON
        )
    ]
)

Write attributes of a message to DynamoDB

The code snippet below creates an AWS IoT rule that writes all or part of an MQTT message to DynamoDB using the DynamoDBv2 action.

import aws_cdk.aws_dynamodb as dynamodb

# table: dynamodb.Table


topic_rule = iot.TopicRule(self, "TopicRule",
    sql=iot.IotSql.from_string_as_ver20160323("SELECT * FROM 'device/+/data'"),
    actions=[
        actions.DynamoDBv2PutItemAction(table)
    ]
)

Put messages IoT Events input

The code snippet below creates an AWS IoT Rule that puts messages to an IoT Events input when it is triggered:

import aws_cdk.aws_iotevents_alpha as iotevents
import aws_cdk.aws_iam as iam

# role: iam.IRole


input = iotevents.Input(self, "MyInput",
    attribute_json_paths=["payload.temperature", "payload.transactionId"]
)
topic_rule = iot.TopicRule(self, "TopicRule",
    sql=iot.IotSql.from_string_as_ver20160323("SELECT * FROM 'device/+/data'"),
    actions=[
        actions.IotEventsPutMessageAction(input,
            batch_mode=True,  # optional property, default is 'false'
            message_id="${payload.transactionId}",  # optional property, default is a new UUID
            role=role
        )
    ]
)

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aws-cdk.aws-iot-actions-alpha-2.56.0a0.tar.gz (96.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file aws-cdk.aws-iot-actions-alpha-2.56.0a0.tar.gz.

File metadata

File hashes

Hashes for aws-cdk.aws-iot-actions-alpha-2.56.0a0.tar.gz
Algorithm Hash digest
SHA256 e54a1a013c5f6be871ce219b8b86fe3e299792c34f51e5182082533d9de854c6
MD5 22b15d8f87cbb90951597aec2aad26c0
BLAKE2b-256 b47a919a305fad0134ddeb4c285dfb8baca2db4b58c2bb02e91b7aebf2204991

See more details on using hashes here.

File details

Details for the file aws_cdk.aws_iot_actions_alpha-2.56.0a0-py3-none-any.whl.

File metadata

File hashes

Hashes for aws_cdk.aws_iot_actions_alpha-2.56.0a0-py3-none-any.whl
Algorithm Hash digest
SHA256 1f246fa6277eb5d52c7729be671d11d96e34a7327828210cb91e9e3fb19678f5
MD5 e99fbf0baaba4c7504a508f17138ef7a
BLAKE2b-256 01260b42991acb740671c3f1020a34f3179547ada53f08933ba2c5ceae779d59

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page