Skip to main content

@cdklabs/cdk-appflow

Project description

Amazon AppFlow Construct Library

Note: this library is currently in technical preview.

Introduction

Amazon AppFlow is a service that enables creating managed, bi-directional data transfer integrations between various SaaS applications and AWS services.

For more information, see the Amazon AppFlow User Guide.

Example

from aws_cdk import SecretValue
from aws_cdk.aws_s3 import Bucket
from aws_cdk.aws_secretsmanager import ISecret
from cdklabs.cdk_appflow import ISource, IDestination, Filter, FilterCondition, Mapping, OnDemandFlow, S3Destination, SalesforceConnectorProfile, SalesforceSource, Transform, Validation, ValidationAction, ValidationCondition

# client_secret: ISecret
# access_token: SecretValue
# refresh_token: SecretValue
# instance_url: str


profile = SalesforceConnectorProfile(self, "MyConnectorProfile",
    o_auth=SalesforceOAuthSettings(
        access_token=access_token,
        flow=SalesforceOAuthFlow(
            refresh_token_grant=SalesforceOAuthRefreshTokenGrantFlow(
                refresh_token=refresh_token,
                client=client_secret
            )
        )
    ),
    instance_url=instance_url,
    is_sandbox=False
)

source = SalesforceSource(
    profile=profile,
    object="Account"
)

bucket = Bucket(self, "DestinationBucket")

destination = S3Destination(
    location=S3Location(bucket=bucket)
)

OnDemandFlow(self, "SfAccountToS3",
    source=source,
    destination=destination,
    mappings=[Mapping.map_all()],
    transforms=[
        Transform.mask(Field(name="Name"), "*")
    ],
    validations=[
        Validation.when(ValidationCondition.is_null("Name"), ValidationAction.ignore_record())
    ],
    filters=[
        Filter.when(FilterCondition.timestamp_less_than_equals(Field(name="LastModifiedDate", data_type="datetime"), Date(Date.parse("2022-02-02"))))
    ]
)

Concepts

Amazon AppFlow introduces several concepts that abstract away the technicalities of setting up and managing data integrations.

An Application is any SaaS data integration component that can be either a source or a destination for Amazon AppFlow. A source is an application from which Amazon AppFlow will retrieve data, whereas a destination is an application to which Amazon AppFlow will send data.

A Flow is Amazon AppFlow's integration between a source and a destination.

A ConnectorProfile is Amazon AppFlow's abstraction over authentication/authorization with a particular SaaS application. The per-SaaS application permissions given to a particular ConnectorProfile will determine whether the connector profile can support the application as a source or as a destination (see whether a particular application is supported as either a source or a destination in the documentation).

Types of Flows

The library introduces three, separate types of flows:

  • OnDemandFlow - a construct representing a flow that can be triggered programmatically with the use of a StartFlow API call.
  • OnEventFlow - a construct representing a flow that is triggered by a SaaS application event published to AppFlow. At the time of writing only a Salesforce source is able to publish events that can be consumed by AppFlow flows.
  • OnScheduleFlow - a construct representing a flow that is triggered on a Schedule

Tasks

Tasks are steps that can be taken upon fields. Tasks compose higher level objects that in this library are named Operations. There are four operations identified:

  • Transforms - 1-1 transforms on source fields, like truncation or masking
  • Mappings - 1-1 or many-to-1 operations from source fields to a destination field
  • Filters - operations that limit the source data on a particular conditions
  • Validations - operations that work on a per-record level and can have either a record-level consequence (i.e. dropping the record) or a global one (terminating the flow).

Each flow exposes dedicated properties to each of the operation types that one can use like in the example below:

from cdklabs.cdk_appflow import Filter, FilterCondition, IDestination, ISource, Mapping, OnDemandFlow, S3Destination, SalesforceConnectorProfile, SalesforceSource, Transform, Validation, ValidationAction, ValidationCondition

# stack: Stack
# source: ISource
# destination: IDestination


flow = OnDemandFlow(stack, "OnDemandFlow",
    source=source,
    destination=destination,
    transforms=[
        Transform.mask(Field(name="Name"), "*")
    ],
    mappings=[
        Mapping.map(Field(name="Name", data_type="String"), name="Name", data_type="string")
    ],
    filters=[
        Filter.when(FilterCondition.timestamp_less_than_equals(Field(name="LastModifiedDate", data_type="datetime"), Date(Date.parse("2022-02-02"))))
    ],
    validations=[
        Validation.when(ValidationCondition.is_null("Name"), ValidationAction.ignore_record())
    ]
)

Monitoring

Metrcis

Each flow allows to access metrics through the methods:

  • metricFlowExecutionsStarted
  • metricFlowExecutionsFailed
  • metricFlowExecutionsSucceeded
  • metricFlowExecutionTime
  • metricFlowExecutionRecordsProcessed

For detailed information about AppFlow metrics refer to the documentation.

It can be consume by CloudWatch alert using as in the example below:

from cdklabs.cdk_appflow import IFlow

# flow: IFlow
# stack: Stack


metric = flow.metric_flow_executions_started()

metric.create_alarm(stack, "FlowExecutionsStartedAlarm",
    threshold=1000,
    evaluation_periods=2
)

EventBridge notifications

Each flow publishes events to the default EventBridge bus:

  • onRunStarted
  • onRunCompleted
  • onDeactivated (only for the OnEventFlow and the OnScheduleFlow)
  • onStatus (only for the OnEventFlow )

This way one can consume the notifications as in the example below:

from aws_cdk.aws_sns import ITopic
from aws_cdk.aws_events_targets import SnsTopic
from cdklabs.cdk_appflow import IFlow

# flow: IFlow
# my_topic: ITopic


flow.on_run_completed("OnRunCompleted",
    target=SnsTopic(my_topic)
)

Notable distinctions from CloudFormation specification

OnScheduleFlow and incrementalPullConfig

In CloudFormation the definition of the incrementalPullConfig (which effectively gives a name of the field used for tracking the last pulled timestamp) is on the SourceFlowConfig property. In the library this has been moved to the OnScheduleFlow constructor properties.

S3Destination and Glue Catalog

Although in CloudFormation the Glue Catalog configuration is settable on the flow level - it works only when the destination is S3. That is why the library shifts the Glue Catalog properties definition to the S3Destination, which in turn requires using Lazy for populating metadataCatalogConfig in the flow.

Security considerations

It is recommended to follow data protection mechanisms for Amazon AppFlow.

Confidential information

Amazon AppFlow application integration is done using ConnectionProfiles. A ConnectionProfile requires providing sensitive information in the form of e.g. access and refresh tokens. It is recommended that such information is stored securely and passed to AWS CDK securely. All sensitive fields are effectively IResolvable and this means they can be resolved at deploy time. With that one should follow the best practices for credentials with CloudFormation. In this library, the sensitive fields are typed as SecretValue to emphasize these should not be plain strings.

An example of using a predefined AWS Secrets Manager secret for storing sensitive information can be found below:

from aws_cdk.aws_secretsmanager import Secret
from cdklabs.cdk_appflow import GoogleAnalytics4ConnectorProfile

# stack: Stack


secret = Secret.from_secret_name_v2(stack, "GA4Secret", "appflow/ga4")

profile = GoogleAnalytics4ConnectorProfile(stack, "GA4Connector",
    o_auth=GoogleAnalytics4OAuthSettings(
        flow=GoogleAnalytics4OAuthFlow(
            refresh_token_grant=GoogleAnalytics4RefreshTokenGrantFlow(
                refresh_token=secret.secret_value_from_json("refreshToken"),
                client_id=secret.secret_value_from_json("clientId"),
                client_secret=secret.secret_value_from_json("clientSecret")
            )
        )
    )
)

An approach to managing permissions

This library relies on an internal AppFlowPermissionsManager class to automatically infer and apply appropriate resource policy statements to the S3 Bucket, KMS Key, and Secrets Manager Secret resources. AppFlowPermissionsManager places the statements exactly once for the appflow.amazonaws.com principal no matter how many times a resource is reused in the code.

Confused Deputy Problem

Amazon AppFlow is an account-bound and a regional service. With this it is invurlnerable to the confused deputy problem (see, e.g. here). However, AppFlowPermissionsManager still introduces the aws:SourceAccount condtition to the resource policies as a best practice.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cdklabs.cdk-appflow-0.0.35.tar.gz (361.0 kB view hashes)

Uploaded Source

Built Distribution

cdklabs.cdk_appflow-0.0.35-py3-none-any.whl (360.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page