Skip to main content

@cdklabs/cdk-appflow

Project description

Amazon AppFlow Construct Library

Note: this library is currently in technical preview.

Introduction

Amazon AppFlow is a service that enables creating managed, bi-directional data transfer integrations between various SaaS applications and AWS services.

For more information, see the Amazon AppFlow User Guide.

Example

from aws_cdk import SecretValue
from aws_cdk.aws_s3 import Bucket
from aws_cdk.aws_secretsmanager import ISecret
from cdklabs.cdk_appflow import ISource, IDestination, Filter, FilterCondition, Mapping, OnDemandFlow, S3Destination, SalesforceConnectorProfile, SalesforceSource, Transform, Validation, ValidationAction, ValidationCondition

# client_secret: ISecret
# access_token: SecretValue
# refresh_token: SecretValue
# instance_url: str


profile = SalesforceConnectorProfile(self, "MyConnectorProfile",
    o_auth=SalesforceOAuthSettings(
        access_token=access_token,
        flow=SalesforceOAuthFlow(
            refresh_token_grant=SalesforceOAuthRefreshTokenGrantFlow(
                refresh_token=refresh_token,
                client=client_secret
            )
        )
    ),
    instance_url=instance_url,
    is_sandbox=False
)

source = SalesforceSource(
    profile=profile,
    object="Account"
)

bucket = Bucket(self, "DestinationBucket")

destination = S3Destination(
    location=S3Location(bucket=bucket)
)

OnDemandFlow(self, "SfAccountToS3",
    source=source,
    destination=destination,
    mappings=[Mapping.map_all()],
    transforms=[
        Transform.mask(Field(name="Name"), "*")
    ],
    validations=[
        Validation.when(ValidationCondition.is_null("Name"), ValidationAction.ignore_record())
    ],
    filters=[
        Filter.when(FilterCondition.timestamp_less_than_equals(Field(name="LastModifiedDate", data_type="datetime"), Date(Date.parse("2022-02-02"))))
    ]
)

Concepts

Amazon AppFlow introduces several concepts that abstract away the technicalities of setting up and managing data integrations.

An Application is any SaaS data integration component that can be either a source or a destination for Amazon AppFlow. A source is an application from which Amazon AppFlow will retrieve data, whereas a destination is an application to which Amazon AppFlow will send data.

A Flow is Amazon AppFlow's integration between a source and a destination.

A ConnectorProfile is Amazon AppFlow's abstraction over authentication/authorization with a particular SaaS application. The per-SaaS application permissions given to a particular ConnectorProfile will determine whether the connector profile can support the application as a source or as a destination (see whether a particular application is supported as either a source or a destination in the documentation).

Types of Flows

The library introduces three, separate types of flows:

  • OnDemandFlow - a construct representing a flow that can be triggered programmatically with the use of a StartFlow API call.
  • OnEventFlow - a construct representing a flow that is triggered by a SaaS application event published to AppFlow. At the time of writing only a Salesforce source is able to publish events that can be consumed by AppFlow flows.
  • OnScheduleFlow - a construct representing a flow that is triggered on a Schedule

Tasks

Tasks are steps that can be taken upon fields. Tasks compose higher level objects that in this library are named Operations. There are four operations identified:

  • Transforms - 1-1 transforms on source fields, like truncation or masking
  • Mappings - 1-1 or many-to-1 operations from source fields to a destination field
  • Filters - operations that limit the source data on a particular conditions
  • Validations - operations that work on a per-record level and can have either a record-level consequence (i.e. dropping the record) or a global one (terminating the flow).

Each flow exposes dedicated properties to each of the operation types that one can use like in the example below:

from cdklabs.cdk_appflow import Filter, FilterCondition, IDestination, ISource, Mapping, OnDemandFlow, S3Destination, SalesforceConnectorProfile, SalesforceSource, Transform, Validation, ValidationAction, ValidationCondition

# stack: Stack
# source: ISource
# destination: IDestination


flow = OnDemandFlow(stack, "OnDemandFlow",
    source=source,
    destination=destination,
    transforms=[
        Transform.mask(Field(name="Name"), "*")
    ],
    mappings=[
        Mapping.map(Field(name="Name", data_type="String"), name="Name", data_type="string")
    ],
    filters=[
        Filter.when(FilterCondition.timestamp_less_than_equals(Field(name="LastModifiedDate", data_type="datetime"), Date(Date.parse("2022-02-02"))))
    ],
    validations=[
        Validation.when(ValidationCondition.is_null("Name"), ValidationAction.ignore_record())
    ]
)

Monitoring

Metrcis

Each flow allows to access metrics through the methods:

  • metricFlowExecutionsStarted
  • metricFlowExecutionsFailed
  • metricFlowExecutionsSucceeded
  • metricFlowExecutionTime
  • metricFlowExecutionRecordsProcessed

For detailed information about AppFlow metrics refer to the documentation.

It can be consume by CloudWatch alert using as in the example below:

from cdklabs.cdk_appflow import IFlow

# flow: IFlow
# stack: Stack


metric = flow.metric_flow_executions_started()

metric.create_alarm(stack, "FlowExecutionsStartedAlarm",
    threshold=1000,
    evaluation_periods=2
)

EventBridge notifications

Each flow publishes events to the default EventBridge bus:

  • onRunStarted
  • onRunCompleted
  • onDeactivated (only for the OnEventFlow and the OnScheduleFlow)
  • onStatus (only for the OnEventFlow )

This way one can consume the notifications as in the example below:

from aws_cdk.aws_sns import ITopic
from aws_cdk.aws_events_targets import SnsTopic
from cdklabs.cdk_appflow import IFlow

# flow: IFlow
# my_topic: ITopic


flow.on_run_completed("OnRunCompleted",
    target=SnsTopic(my_topic)
)

Notable distinctions from CloudFormation specification

OnScheduleFlow and incrementalPullConfig

In CloudFormation the definition of the incrementalPullConfig (which effectively gives a name of the field used for tracking the last pulled timestamp) is on the SourceFlowConfig property. In the library this has been moved to the OnScheduleFlow constructor properties.

S3Destination and Glue Catalog

Although in CloudFormation the Glue Catalog configuration is settable on the flow level - it works only when the destination is S3. That is why the library shifts the Glue Catalog properties definition to the S3Destination, which in turn requires using Lazy for populating metadataCatalogConfig in the flow.

Security considerations

It is recommended to follow data protection mechanisms for Amazon AppFlow.

Confidential information

Amazon AppFlow application integration is done using ConnectionProfiles. A ConnectionProfile requires providing sensitive information in the form of e.g. access and refresh tokens. It is recommended that such information is stored securely and passed to AWS CDK securely. All sensitive fields are effectively IResolvable and this means they can be resolved at deploy time. With that one should follow the best practices for credentials with CloudFormation. In this library, the sensitive fields are typed as SecretValue to emphasize these should not be plain strings.

An example of using a predefined AWS Secrets Manager secret for storing sensitive information can be found below:

from aws_cdk.aws_secretsmanager import Secret
from cdklabs.cdk_appflow import GoogleAnalytics4ConnectorProfile

# stack: Stack


secret = Secret.from_secret_name_v2(stack, "GA4Secret", "appflow/ga4")

profile = GoogleAnalytics4ConnectorProfile(stack, "GA4Connector",
    o_auth=GoogleAnalytics4OAuthSettings(
        flow=GoogleAnalytics4OAuthFlow(
            refresh_token_grant=GoogleAnalytics4RefreshTokenGrantFlow(
                refresh_token=secret.secret_value_from_json("refreshToken"),
                client_id=secret.secret_value_from_json("clientId"),
                client_secret=secret.secret_value_from_json("clientSecret")
            )
        )
    )
)

An approach to managing permissions

This library relies on an internal AppFlowPermissionsManager class to automatically infer and apply appropriate resource policy statements to the S3 Bucket, KMS Key, and Secrets Manager Secret resources. AppFlowPermissionsManager places the statements exactly once for the appflow.amazonaws.com principal no matter how many times a resource is reused in the code.

Confused Deputy Problem

Amazon AppFlow is an account-bound and a regional service. With this it is invurlnerable to the confused deputy problem (see, e.g. here). However, AppFlowPermissionsManager still introduces the aws:SourceAccount condtition to the resource policies as a best practice.

Upgrading and breaking changes

Please consult the UPGRADING docs for information.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cdklabs_cdk_appflow-0.2.4.tar.gz (353.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cdklabs_cdk_appflow-0.2.4-py3-none-any.whl (352.5 kB view details)

Uploaded Python 3

File details

Details for the file cdklabs_cdk_appflow-0.2.4.tar.gz.

File metadata

  • Download URL: cdklabs_cdk_appflow-0.2.4.tar.gz
  • Upload date:
  • Size: 353.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for cdklabs_cdk_appflow-0.2.4.tar.gz
Algorithm Hash digest
SHA256 435095957a96a450ff02325aeedc300868d6a5ec95e37d2553eec73dcca255a5
MD5 2da09e56e6bb6a2a363eb9ae45d2c940
BLAKE2b-256 063883130eaf79965d6898f2dfa58d89dce118d225460e44c358a39192217e7b

See more details on using hashes here.

File details

Details for the file cdklabs_cdk_appflow-0.2.4-py3-none-any.whl.

File metadata

File hashes

Hashes for cdklabs_cdk_appflow-0.2.4-py3-none-any.whl
Algorithm Hash digest
SHA256 a448015620ea214b5233346d3ffb322b629ec3fcaf38bfd76802471999915c2b
MD5 e9b2e2cc1636a2cb2683b6bb29dd3e25
BLAKE2b-256 e6d06fdc6f936f8612533c3038d474590a09b2d108be4abd9012aa0fc32e7896

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page