Skip to main content

@cdklabs/cdk-appflow

Project description

Amazon AppFlow Construct Library

---

Experimental

Experimental: This construct library is experimental and under active development. It is subject to non-backward compatible changes or removal in any future version. These are not subject to the Semantic Versioning model and breaking changes will be announced in the release notes. This means that while you may use them, you may need to update your source code when upgrading to a newer version of this package.


Introduction

Amazon AppFlow is a service that enables creating managed, bi-directional data transfer integrations between various SaaS applications and AWS services.

For more information, see the Amazon AppFlow User Guide.

Supported Applications

This library provides L2 constructs for the following applications and services:

  • Amazon RDS for PostgreSQL (destination)
  • Asana (source)
  • EventBridge (destination)
  • GitHub (source)
  • Google Ads (source)
  • Google Analytics 4 (source)
  • Google BigQuery (source)
  • HubSpot (source, destination)
  • Mailchimp (source)
  • Marketo (source)
  • Microsoft Dynamics 365 (source)
  • Microsoft SharePoint Online (source)
  • Redshift (destination)
  • S3 (source, destination)
  • Salesforce (source, destination)
  • Salesforce Marketing Cloud (source)
  • SAP OData (source, destination)
  • ServiceNow (source)
  • Slack (source)
  • Snowflake (destination)
  • Zendesk (source)

Example

from aws_cdk import SecretValue
from aws_cdk.aws_s3 import Bucket
from aws_cdk.aws_secretsmanager import ISecret
from cdklabs.cdk_appflow import ISource, IDestination, Filter, FilterCondition, Mapping, OnDemandFlow, S3Destination, SalesforceConnectorProfile, SalesforceSource, Transform, Validation, ValidationAction, ValidationCondition

# client_secret: ISecret
# access_token: SecretValue
# refresh_token: SecretValue
# instance_url: str


profile = SalesforceConnectorProfile(self, "MyConnectorProfile",
    o_auth=SalesforceOAuthSettings(
        access_token=access_token,
        flow=SalesforceOAuthFlow(
            refresh_token_grant=SalesforceOAuthRefreshTokenGrantFlow(
                refresh_token=refresh_token,
                client=client_secret
            )
        )
    ),
    instance_url=instance_url,
    is_sandbox=False
)

source = SalesforceSource(
    profile=profile,
    object="Account"
)

bucket = Bucket(self, "DestinationBucket")

destination = S3Destination(
    location=S3Location(bucket=bucket)
)

OnDemandFlow(self, "SfAccountToS3",
    source=source,
    destination=destination,
    mappings=[Mapping.map_all()],
    transforms=[
        Transform.mask(Field(name="Name"), "*")
    ],
    validations=[
        Validation.when(ValidationCondition.is_null("Name"), ValidationAction.ignore_record())
    ],
    filters=[
        Filter.when(FilterCondition.timestamp_less_than_equals(Field(name="LastModifiedDate", data_type="datetime"), Date(Date.parse("2022-02-02"))))
    ]
)

Concepts

Amazon AppFlow introduces several concepts that abstract away the technicalities of setting up and managing data integrations.

An Application is any SaaS data integration component that can be either a source or a destination for Amazon AppFlow. A source is an application from which Amazon AppFlow will retrieve data, whereas a destination is an application to which Amazon AppFlow will send data.

A Flow is Amazon AppFlow's integration between a source and a destination.

A ConnectorProfile is Amazon AppFlow's abstraction over authentication/authorization with a particular SaaS application. The per-SaaS application permissions given to a particular ConnectorProfile will determine whether the connector profile can support the application as a source or as a destination (see whether a particular application is supported as either a source or a destination in the documentation).

Types of Flows

The library introduces three, separate types of flows:

  • OnDemandFlow - a construct representing a flow that can be triggered programmatically with the use of a StartFlow API call.
  • OnEventFlow - a construct representing a flow that is triggered by a SaaS application event published to AppFlow. At the time of writing only a Salesforce source is able to publish events that can be consumed by AppFlow flows.
  • OnScheduleFlow - a construct representing a flow that is triggered on a Schedule

Tasks

Tasks are steps that can be taken upon fields. Tasks compose higher level objects that in this library are named Operations. There are four operations identified:

  • Transforms - 1-1 transforms on source fields, like truncation or masking
  • Mappings - 1-1 or many-to-1 operations from source fields to a destination field
  • Filters - operations that limit the source data on particular conditions
  • Validations - operations that work on a per-record level and can have either a record-level consequence (i.e. dropping the record) or a global one (terminating the flow).

Each flow exposes dedicated properties to each of the operation types that one can use like in the example below:

from cdklabs.cdk_appflow import Filter, FilterCondition, IDestination, ISource, Mapping, OnDemandFlow, S3Destination, SalesforceConnectorProfile, SalesforceSource, Transform, Validation, ValidationAction, ValidationCondition

# stack: Stack
# source: ISource
# destination: IDestination


flow = OnDemandFlow(stack, "OnDemandFlow",
    source=source,
    destination=destination,
    transforms=[
        Transform.mask(Field(name="Name"), "*")
    ],
    mappings=[
        Mapping.map(Field(name="Name", data_type="String"), name="Name", data_type="string")
    ],
    filters=[
        Filter.when(FilterCondition.timestamp_less_than_equals(Field(name="LastModifiedDate", data_type="datetime"), Date(Date.parse("2022-02-02"))))
    ],
    validations=[
        Validation.when(ValidationCondition.is_null("Name"), ValidationAction.ignore_record())
    ]
)

Monitoring

Metrics

Each flow allows to access metrics through the methods:

  • metricFlowExecutionsStarted
  • metricFlowExecutionsFailed
  • metricFlowExecutionsSucceeded
  • metricFlowExecutionTime
  • metricFlowExecutionRecordsProcessed

For detailed information about AppFlow metrics refer to the documentation.

It can be consumed by CloudWatch alarms as in the example below:

from cdklabs.cdk_appflow import IFlow

# flow: IFlow
# stack: Stack


metric = flow.metric_flow_executions_started()

metric.create_alarm(stack, "FlowExecutionsStartedAlarm",
    threshold=1000,
    evaluation_periods=2
)

EventBridge notifications

Each flow publishes events to the default EventBridge bus:

  • onRunStarted
  • onRunCompleted
  • onDeactivated (only for the OnEventFlow and the OnScheduleFlow)
  • onStatus (only for the OnEventFlow )

This way one can consume the notifications as in the example below:

from aws_cdk.aws_sns import ITopic
from aws_cdk.aws_events_targets import SnsTopic
from cdklabs.cdk_appflow import IFlow

# flow: IFlow
# my_topic: ITopic


flow.on_run_completed("OnRunCompleted",
    target=SnsTopic(my_topic)
)

Notable distinctions from CloudFormation specification

OnScheduleFlow and incrementalPullConfig

In CloudFormation the definition of the incrementalPullConfig (which effectively gives a name of the field used for tracking the last pulled timestamp) is on the SourceFlowConfig property. In the library this has been moved to the OnScheduleFlow constructor properties.

S3Destination and Glue Catalog

Although in CloudFormation the Glue Catalog configuration is settable on the flow level - it works only when the destination is S3. That is why the library shifts the Glue Catalog properties definition to the S3Destination, which in turn requires using Lazy for populating metadataCatalogConfig in the flow.

Security considerations

It is recommended to follow data protection mechanisms for Amazon AppFlow.

Confidential information

Amazon AppFlow application integration is done using ConnectionProfiles. A ConnectionProfile requires providing sensitive information in the form of e.g. access and refresh tokens. It is recommended that such information is stored securely and passed to AWS CDK securely. All sensitive fields are effectively IResolvable and this means they can be resolved at deploy time. With that one should follow the best practices for credentials with CloudFormation. In this library, the sensitive fields are typed as SecretValue to emphasize these should not be plain strings.

An example of using a predefined AWS Secrets Manager secret for storing sensitive information can be found below:

from aws_cdk.aws_secretsmanager import Secret
from cdklabs.cdk_appflow import GoogleAnalytics4ConnectorProfile

# stack: Stack


secret = Secret.from_secret_name_v2(stack, "GA4Secret", "appflow/ga4")

profile = GoogleAnalytics4ConnectorProfile(stack, "GA4Connector",
    o_auth=GoogleAnalytics4OAuthSettings(
        flow=GoogleAnalytics4OAuthFlow(
            refresh_token_grant=GoogleAnalytics4RefreshTokenGrantFlow(
                refresh_token=secret.secret_value_from_json("refreshToken"),
                client_id=secret.secret_value_from_json("clientId"),
                client_secret=secret.secret_value_from_json("clientSecret")
            )
        )
    )
)

An approach to managing permissions

This library relies on an internal AppFlowPermissionsManager class to automatically infer and apply appropriate resource policy statements to the S3 Bucket, KMS Key, and Secrets Manager Secret resources. AppFlowPermissionsManager places the statements exactly once for the appflow.amazonaws.com principal no matter how many times a resource is reused in the code.

Confused Deputy Problem

Amazon AppFlow is an account-bound and a regional service. With this it is invulnerable to the confused deputy problem (see, e.g. here). However, AppFlowPermissionsManager still introduces the aws:SourceAccount condition to the resource policies as a best practice.

Upgrading and breaking changes

Please consult the UPGRADING docs for information.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cdklabs_cdk_appflow-0.2.5.tar.gz (363.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cdklabs_cdk_appflow-0.2.5-py3-none-any.whl (363.1 kB view details)

Uploaded Python 3

File details

Details for the file cdklabs_cdk_appflow-0.2.5.tar.gz.

File metadata

  • Download URL: cdklabs_cdk_appflow-0.2.5.tar.gz
  • Upload date:
  • Size: 363.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for cdklabs_cdk_appflow-0.2.5.tar.gz
Algorithm Hash digest
SHA256 15744ac1aec711f993966028e89419daa7eb4de88ee1483c0a8033d8f0a58d07
MD5 da1afe1c22b72b4d8467b3e8ad79800c
BLAKE2b-256 5a7c982f89d00b52490303d9e366b0086faf3cd72e5431af86216fe7709104cd

See more details on using hashes here.

File details

Details for the file cdklabs_cdk_appflow-0.2.5-py3-none-any.whl.

File metadata

File hashes

Hashes for cdklabs_cdk_appflow-0.2.5-py3-none-any.whl
Algorithm Hash digest
SHA256 4451ee1608d334b22922f7a7ce2ecd983bfd6f52b8df3307396069452dab3d00
MD5 69473f58a42b2d6824eb0bbafa2f4296
BLAKE2b-256 fa1415f628779223b19a9f70b500da15167d8a6bab6eb92e0b62688a9b109afe

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page