@cdklabs/cdk-appflow
Project description
Amazon AppFlow Construct Library
Note: this library is currently in technical preview.
Introduction
Amazon AppFlow is a service that enables creating managed, bi-directional data transfer integrations between various SaaS applications and AWS services.
For more information, see the Amazon AppFlow User Guide.
Example
from aws_cdk import SecretValue
from aws_cdk.aws_s3 import Bucket
from aws_cdk.aws_secretsmanager import ISecret
from cdklabs.cdk_appflow import ISource, IDestination, Filter, FilterCondition, Mapping, OnDemandFlow, S3Destination, SalesforceConnectorProfile, SalesforceSource, Transform, Validation, ValidationAction, ValidationCondition
# client_secret: ISecret
# access_token: SecretValue
# refresh_token: SecretValue
# instance_url: str
profile = SalesforceConnectorProfile(self, "MyConnectorProfile",
o_auth=SalesforceOAuthSettings(
access_token=access_token,
flow=SalesforceOAuthFlow(
refresh_token_grant=SalesforceOAuthRefreshTokenGrantFlow(
refresh_token=refresh_token,
client=client_secret
)
)
),
instance_url=instance_url,
is_sandbox=False
)
source = SalesforceSource(
profile=profile,
object="Account"
)
bucket = Bucket(self, "DestinationBucket")
destination = S3Destination(
location=S3Location(bucket=bucket)
)
OnDemandFlow(self, "SfAccountToS3",
source=source,
destination=destination,
mappings=[Mapping.map_all()],
transforms=[
Transform.mask(Field(name="Name"), "*")
],
validations=[
Validation.when(ValidationCondition.is_null("Name"), ValidationAction.ignore_record())
],
filters=[
Filter.when(FilterCondition.timestamp_less_than_equals(Field(name="LastModifiedDate", data_type="datetime"), Date(Date.parse("2022-02-02"))))
]
)
Concepts
Amazon AppFlow introduces several concepts that abstract away the technicalities of setting up and managing data integrations.
An Application
is any SaaS data integration component that can be either a source or a destination for Amazon AppFlow. A source is an application from which Amazon AppFlow will retrieve data, whereas a destination is an application to which Amazon AppFlow will send data.
A Flow
is Amazon AppFlow's integration between a source and a destination.
A ConnectorProfile
is Amazon AppFlow's abstraction over authentication/authorization with a particular SaaS application. The per-SaaS application permissions given to a particular ConnectorProfile
will determine whether the connector profile can support the application as a source or as a destination (see whether a particular application is supported as either a source or a destination in the documentation).
Types of Flows
The library introduces three, separate types of flows:
OnDemandFlow
- a construct representing a flow that can be triggered programmatically with the use of a StartFlow API call.OnEventFlow
- a construct representing a flow that is triggered by a SaaS application event published to AppFlow. At the time of writing only a Salesforce source is able to publish events that can be consumed by AppFlow flows.OnScheduleFlow
- a construct representing a flow that is triggered on aSchedule
Tasks
Tasks are steps that can be taken upon fields. Tasks compose higher level objects that in this library are named Operations
. There are four operations identified:
- Transforms - 1-1 transforms on source fields, like truncation or masking
- Mappings - 1-1 or many-to-1 operations from source fields to a destination field
- Filters - operations that limit the source data on a particular conditions
- Validations - operations that work on a per-record level and can have either a record-level consequence (i.e. dropping the record) or a global one (terminating the flow).
Each flow exposes dedicated properties to each of the operation types that one can use like in the example below:
from cdklabs.cdk_appflow import Filter, FilterCondition, IDestination, ISource, Mapping, OnDemandFlow, S3Destination, SalesforceConnectorProfile, SalesforceSource, Transform, Validation, ValidationAction, ValidationCondition
# stack: Stack
# source: ISource
# destination: IDestination
flow = OnDemandFlow(stack, "OnDemandFlow",
source=source,
destination=destination,
transforms=[
Transform.mask(Field(name="Name"), "*")
],
mappings=[
Mapping.map(Field(name="Name", data_type="String"), name="Name", data_type="string")
],
filters=[
Filter.when(FilterCondition.timestamp_less_than_equals(Field(name="LastModifiedDate", data_type="datetime"), Date(Date.parse("2022-02-02"))))
],
validations=[
Validation.when(ValidationCondition.is_null("Name"), ValidationAction.ignore_record())
]
)
Monitoring
Metrcis
Each flow allows to access metrics through the methods:
metricFlowExecutionsStarted
metricFlowExecutionsFailed
metricFlowExecutionsSucceeded
metricFlowExecutionTime
metricFlowExecutionRecordsProcessed
For detailed information about AppFlow metrics refer to the documentation.
It can be consume by CloudWatch alert using as in the example below:
from cdklabs.cdk_appflow import IFlow
# flow: IFlow
# stack: Stack
metric = flow.metric_flow_executions_started()
metric.create_alarm(stack, "FlowExecutionsStartedAlarm",
threshold=1000,
evaluation_periods=2
)
EventBridge notifications
Each flow publishes events to the default EventBridge bus:
onRunStarted
onRunCompleted
onDeactivated
(only for theOnEventFlow
and theOnScheduleFlow
)onStatus
(only for theOnEventFlow
)
This way one can consume the notifications as in the example below:
from aws_cdk.aws_sns import ITopic
from aws_cdk.aws_events_targets import SnsTopic
from cdklabs.cdk_appflow import IFlow
# flow: IFlow
# my_topic: ITopic
flow.on_run_completed("OnRunCompleted",
target=SnsTopic(my_topic)
)
Notable distinctions from CloudFormation specification
OnScheduleFlow
and incrementalPullConfig
In CloudFormation the definition of the incrementalPullConfig
(which effectively gives a name of the field used for tracking the last pulled timestamp) is on the SourceFlowConfig
property. In the library this has been moved to the OnScheduleFlow
constructor properties.
S3Destination
and Glue Catalog
Although in CloudFormation the Glue Catalog configuration is settable on the flow level - it works only when the destination is S3. That is why the library shifts the Glue Catalog properties definition to the S3Destination
, which in turn requires using Lazy for populating metadataCatalogConfig
in the flow.
Security considerations
It is recommended to follow data protection mechanisms for Amazon AppFlow.
Confidential information
Amazon AppFlow application integration is done using ConnectionProfiles
. A ConnectionProfile
requires providing sensitive information in the form of e.g. access and refresh tokens. It is recommended that such information is stored securely and passed to AWS CDK securely. All sensitive fields are effectively IResolvable
and this means they can be resolved at deploy time. With that one should follow the best practices for credentials with CloudFormation. In this library, the sensitive fields are typed as SecretValue
to emphasize these should not be plain strings.
An example of using a predefined AWS Secrets Manager secret for storing sensitive information can be found below:
from aws_cdk.aws_secretsmanager import Secret
from cdklabs.cdk_appflow import GoogleAnalytics4ConnectorProfile
# stack: Stack
secret = Secret.from_secret_name_v2(stack, "GA4Secret", "appflow/ga4")
profile = GoogleAnalytics4ConnectorProfile(stack, "GA4Connector",
o_auth=GoogleAnalytics4OAuthSettings(
flow=GoogleAnalytics4OAuthFlow(
refresh_token_grant=GoogleAnalytics4RefreshTokenGrantFlow(
refresh_token=secret.secret_value_from_json("refreshToken"),
client_id=secret.secret_value_from_json("clientId"),
client_secret=secret.secret_value_from_json("clientSecret")
)
)
)
)
An approach to managing permissions
This library relies on an internal AppFlowPermissionsManager
class to automatically infer and apply appropriate resource policy statements to the S3 Bucket, KMS Key, and Secrets Manager Secret resources. AppFlowPermissionsManager
places the statements exactly once for the appflow.amazonaws.com
principal no matter how many times a resource is reused in the code.
Confused Deputy Problem
Amazon AppFlow is an account-bound and a regional service. With this it is invurlnerable to the confused deputy problem (see, e.g. here). However, AppFlowPermissionsManager
still introduces the aws:SourceAccount
condtition to the resource policies as a best practice.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file cdklabs_cdk_appflow-0.2.0.tar.gz
.
File metadata
- Download URL: cdklabs_cdk_appflow-0.2.0.tar.gz
- Upload date:
- Size: 343.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 622c27659d487549d7c6e2adb96d7c88f6d179c2cca790450682420a72af3de4 |
|
MD5 | 1b52d00b1682f5ed270dab7bd075c9a7 |
|
BLAKE2b-256 | b1d9cfd051ead2212af75521050f5077d6232bbe0333a11dbe4c2935d456b334 |
File details
Details for the file cdklabs.cdk_appflow-0.2.0-py3-none-any.whl
.
File metadata
- Download URL: cdklabs.cdk_appflow-0.2.0-py3-none-any.whl
- Upload date:
- Size: 342.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 63c61be9c0d6b659a2f6ba382751c5bd937280dd91b4facda87114055b49ed28 |
|
MD5 | 031adcbdef5ef913f9af79b19b789239 |
|
BLAKE2b-256 | 1914c5a5dc07d12e8e9f632cb008a5e6a489712b7c87fd666192458bc50f270c |