Skip to main content

A CDK Python app for deploying ETL jobs that operate data pipelines for InsuranceLake in AWS

Project description

InsuranceLake ETL

Overview

This solution guidance helps you deploy extract, transform, load (ETL) processes and data storage resources to create InsuranceLake. It uses Amazon Simple Storage Service (Amazon S3) buckets for storage, AWS Glue for data transformation, and AWS Cloud Development Kit (CDK) Pipelines. The solution is originally based on the AWS blog Deploy data lake ETL jobs using CDK Pipelines.

The best way to learn about InsuranceLake is to follow the Quickstart guide and try it out.

The InsuranceLake solution is comprised of two codebases: Infrastructure and ETL.

Specifically, this solution helps you to:

  • Deploy a "3 Cs" (Collect, Cleanse, Consume) architecture InsuranceLake.
  • Deploy ETL jobs needed to make common insurance industry data souces available in a data lake.
  • Use pySpark Glue jobs and supporting resoures to perform data transforms in a modular approach.
  • Build and replicate the application in multiple environments quickly.
  • Deploy ETL jobs from a central deployment account to multiple AWS environments such as Dev, Test, and Prod.
  • Leverage the benefit of self-mutating feature of CDK Pipelines; specifically, the pipeline itself is infrastructure as code and can be changed as part of the deployment.
  • Increase the speed of prototyping, testing, and deployment of new ETL jobs.

InsuranceLake High Level Architecture

Contents

Cost

This solution uses the following services: Amazon S3, AWS Glue, AWS Step Functions, Amazon DynamoDB, AWS Lambda, Amazon CloudWatch, Amazon Athena, and AWS CodePipeline for continuous integration and continuous deployment (CI/CD) installation only.

An estimated cost for following the Quickstart and Quickstart with CI/CD instructions, assuming a total of 8 Glue DPU hours and cleaning all resources when finished, your cost will not be higher than $2. This cost could be less as some services are included in the Free Tier.

We recommend creating a Budget through AWS Cost Explorer to help manage costs. Prices are subject to change. For full details, refer to the pricing webpage for each AWS service used in this solution.

Sample Cost Table

The following table provides a sample cost breakdown for deploying this Guidance with the default parameters in the US East (Ohio) Region for one month with pricing as of 2 October 2024.

AWS service Dimensions Cost [USD]
AWS Glue per DPU-Hour for each Apache Spark or Spark Streaming job, billed per second with a 1-minute minimum $0.44
Amazon S3 per GB of storage used, Frequent Access Tier, first 50 TB per month
PUT, COPY, POST, LIST requests (per 1,000 requests)
GET, SELECT, and all other requests (per 1,000 requests)
$0.023
$0.005
$0.0004
Amazon Athena per TB of data scanned $5.00
Amazon DynamoDB per GB-month of storage, over 25 GB
per million Write Request Units (WRU)
per million Read Request Units (RRU)
$0.25
$1.25
$0.25

Quickstart

If you'd like to get started quickly transforming some sample raw insurance data and running SQL on the resulting dataset, and without worrying about CI/CD, follow the steps in this section.

Python/CDK Basics

  1. Open the AWS Console in the us-east-2 (Ohio) Region.
    • NOTE: InsuranceLake uses us-east-2 by default. To change the Region, refer to the Quickstart with CI/CD.
  2. Select CloudShell at the bottom of the page and wait for a few seconds until it is available for use.
  3. Ensure you are using the latest version of the AWS Software Development Kit (SDK) for Node.js and AWS CDK.
    sudo npm install -g aws-lib aws-cdk
    
  4. Clone the repositories.
    git clone https://github.com/aws-solutions-library-samples/aws-insurancelake-infrastructure.git
    git clone https://github.com/aws-solutions-library-samples/aws-insurancelake-etl.git
    
  5. Change the working directory to the location of the infrastructure code.
    cd aws-insurancelake-infrastructure
    
  6. Create a Python virtual environment.
    • In AWS CloudShell your home directory is limited to 1 GB of persistent storage. To ensure we have enough storage to download and install the required Python packages, you will use AWS CloudShell's temporary storage, located in /tmp, which has a larger capacity.
    python3 -m venv /tmp/.venv
    
  7. Activate the virtual environment.
    source /tmp/.venv/bin/activate
    
  8. Install required Python libraries.
    • NOTE: You may see a warning stating that a newer version is available; it is safe to ignore this for the Quickstart.
    pip install -r requirements.txt
    
  9. Bootstrap CDK in your AWS account.
    cdk bootstrap
    

Deploy the Application

  1. Ensure you are still in the aws-insurancelake-infrastructure directory.
  2. Deploy infrastructure resources in the development environment (one stack).
    cdk deploy Dev-InsuranceLakeInfrastructurePipeline/Dev/InsuranceLakeInfrastructureS3BucketZones
    
  3. Review and accept AWS Identity and Access Management (IAM) credential creation for the S3 bucket stack.
    • Wait for deployment to finish (approximately 5 minutes).
  4. Copy the S3 bucket name for the Collect bucket to use later.
    • Bucket name will be in the form: dev-insurancelake-<AWS Account ID>-<Region>-collect.
  5. Switch the working directory to the location of the etl code.
    cd ../aws-insurancelake-etl
    
  6. Deploy the ETL resources in the development environment (four stacks).
    cdk deploy Dev-InsuranceLakeEtlPipeline/Dev/InsuranceLakeEtlDynamoDb Dev-InsuranceLakeEtlPipeline/Dev/InsuranceLakeEtlGlue Dev-InsuranceLakeEtlPipeline/Dev/InsuranceLakeEtlStepFunctions Dev-InsuranceLakeEtlPipeline/Dev/InsuranceLakeEtlAthenaHelper
    
    • Wait for approximately 1 minute for DynamoDB deployment to finish.
  7. Review and accept IAM credential creation for the AWS Glue jobs stack.
    • Wait approximately 3 minutes for deployment to finish.
  8. Review and accept IAM credential creation for the Step Functions stack.
    • Wait approximately 7 minutes for deployment of Step Functions and Athena Helper stacks to finish.

Try out the ETL Process

  1. Populate the DynamoDB lookup table with sample lookup data.
    resources/load_dynamodb_lookup_table.py SyntheticGeneralData dev-insurancelake-etl-value-lookup resources/syntheticgeneral_lookup_data.json
    
  2. Transfer the sample claim data to the Collect bucket.
    aws s3 cp resources/syntheticgeneral-claim-data.csv s3://<Collect S3 bucket>/SyntheticGeneralData/ClaimData/
    
  3. Transfer the sample policy data to the Collect bucket.
    aws s3 cp resources/syntheticgeneral-policy-data.csv s3://<Collect S3 bucket>/SyntheticGeneralData/PolicyData/
    
  4. Open Step Functions in the AWS Console and select dev-insurancelake-etl-state-machine. Step Functions Selecting State Machine
  5. Open the state machine execution in progress and monitor the status until complete. Step Functions Selecting Running Execution
  6. Open Athena in the AWS Console.
  7. Select Launch Query Editor, and change the Workgroup to insurancelake.
  8. Run the following query to view a sample of prepared data in the consume bucket:
    select * from syntheticgeneraldata_consume.policydata limit 100
    

Next Steps

Deployment Validation

  1. Transfer the sample claim data to the Collect bucket (Source system: SyntheticData, Table: ClaimData).

    aws s3 cp resources/syntheticgeneral-claim-data.csv s3://<Collect S3 bucket>/SyntheticGeneralData/ClaimData/
    
  2. Transfer the sample policy data to the Collect bucket (Source system: SyntheticData, Table: PolicyData).

    aws s3 cp resources/syntheticgeneral-policy-data.csv s3://<Collect S3 bucket>/SyntheticGeneralData/PolicyData/
    
  3. Upon successful transfer of the file, an event notification from S3 will trigger the state-machine-trigger Lambda function.

  4. The Lambda function will insert a record into the DynamoDB table {environment}-{resource_name_prefix}-etl-job-audit to track job start status.

  5. The Lambda function will also trigger the Step Functions State Machine. The State Machine execution name will be <filename>-<YYYYMMDDHHMMSSxxxxxx> and have the required metadata as input parameters.

  6. The State Machine will trigger the AWS Glue job for Collect to Cleanse data processing.

  7. The Collect to Cleanse AWS Glue job will execute the transformation logic defined in configuration files.

  8. The AWS Glue job will load the data into the Cleanse bucket using the provided metadata. The data will be stored in S3 as s3://{environment}-{resource_name_prefix}-{account}-{region}-cleanse/syntheticgeneraldata/claimdata/year=YYYY/month=MM/day=DD in Apache Parquet format.

  9. The AWS Glue job will create or update the Data Catalog table using the table name passed as a parameter based on the folder name (PolicyData and ClaimData).

  10. After the Collect to Cleanse AWS Glue job completes, the State Machine will trigger the Cleanse to Consume AWS Glue job.

  11. The Cleanse to Consume AWS Glue job will execute the SQL logic defined in configuration files.

  12. The Cleanse to Consume AWS Glue job will store the resulting data set in S3 as s3://{environment}-{resource_name_prefix}-{account}-{region}-consume/syntheticgeneraldata/claimdata/year=YYYY/month=MM/day=DD in Apache Parquet format.

  13. The Cleanse to Consume AWS Glue job will create or update the Data Catalog table.

  14. After successful completion of the Cleanse to Consume AWS Glue job, the State Machine will trigger the etl-job-auditor Lambda function to update the DynamoDB table {environment}-{resource_name_prefix}-etl-job-audit with the latest status.

  15. An Amazon Simple Notification Service (Amazon SNS) notification will be sent to all subscribed users.

  16. To validate the data load, use Athena and execute the following query:

    select * from syntheticgeneraldata_consume.policydata limit 100
    

Cleanup

Refer to the Quickstart Cleanup instructions.


Architecture

This section focuses on the overall InsuranceLake architecture and the components of the ETL.

Collect, Cleanse, Consume

As shown in the figure below, we use S3 for storage, specifically three different S3 buckets:

  1. Collect bucket to store raw data in its original format.
  2. Cleanse/Curate bucket to store the data that meets the quality and consistency requirements for the data source.
  3. Consume bucket for data that is used by analysts and data consumers (for example, Amazon Quicksight, Amazon Sagemaker).

InsuranceLake is designed to support a number of source systems with different file formats and data partitions. To demonstrate, we have provided a CSV parser and sample data files for a source system with two data tables, which are uploaded to the Collect bucket.

We use Lambda and Step Functions for orchestration and scheduling of ETL workloads. We then use AWS Glue with PySpark for ETL and data cataloging, DynamoDB for transformation persistence, and Athena for interactive queries and analysis. We use various AWS services for logging, monitoring, security, authentication, authorization, notification, build, and deployment.

Note: AWS Lake Formation is a service that makes it easy to set up a secure data lake in days. Amazon QuickSight is a scalable, serverless, embeddable, machine learning-powered business intelligence (BI) service built for the cloud. Amazon DataZone is a data management service that makes it faster and easier for customers to catalog, discover, share, and govern data stored across AWS, on premises, and third-party sources. These three services are not used in this solution but can be added.

Conceptual Data Lake

ETL

The figure below represents the ETL resources we provision for the data lake.

InsuranceLake ETL Architecture

  1. A file server uploads files to the Collect bucket of InsuranceLake; file server is a data producer or source for the data lake.
  2. S3 triggers an ObjectCreated event notification to Lambda function.
  3. The Lambda function inserts job information in an DynamoDB table.
  4. The Lambda function starts an execution of Step Functions State machine.
  5. This step runs the first AWS Glue job: initiates data processing from Collect to Cleanse.
  6. An AWS Glue job will process the data from Collect to Cleanse; source data is assumed to be in CSV format and will be converted to Parquet format.
  7. DynamoDB stores original values from PII tokenization, and provides lookup data to the AWS Glue job.
  8. After creating Apache Parquet data, the job updates the AWS Glue Data Catalog table.
  9. In this step the second AWS Glue job initiates data processing from Cleanse to Consume.
  10. The AWS Glue Cleanse to Consume job fetches data transformation rules from AWS Glue etl-scripts bucket, and runs transformations.
  11. The AWS Glue job stores prepared data in Apache Parquet format in the Consume bucket.
  12. The AWS Glue job updates the AWS Glue Data Catalog table.
  13. The AWS Glue job updates the DynamoDB table with job status.
  14. Step Functions sends an SNS notification.
  15. Data engineers or analysts analyze data using Athena.

Security

For more information on how AWS services come together in InsuranceLake to align with the Security Pillar of the AWS Well-Architected Framework refer to the InsuranceLake Well-Architected Pillar Alignment for Security.

Infrastructure Code

InsuranceLake uses CDK-nag to confirm AWS resource security recommendations are followed. CDK-nag can generate warnings, which may need to be fixed depending on the context, and errors, which will interrupt the stack synthesis and prevent any deployment.

To force synthesis of all stacks (including the CodePipeline deployed stacks), which will check all code and generate all reports, use the following command:

cdk synth '**'

When this operation is complete, you will also have access to the CDK-nag reports in CSV format in the cdk.out directory and assembly directories.

By default the AWS Solutions Rules Pack is used, but any combination of CDK Nag Rules packs can be selected by adjusting the source code in four locations (two for both the Infrastructure and ETL codebases):

Infrastructure app.py Line 21, ETL app.py Line 20:

# Enable CDK Nag for the Mirror repository, Pipeline, and related stacks
# Environment stacks must be enabled on the Stage resource
cdk.Aspects.of(app).add(AwsSolutionsChecks())

Infrastructure pipeline_stack.py Line 148, ETL pipeline_stack.py Line 147

        # Enable CDK Nag for environment stacks before adding to
        # pipeline, which are deployed with CodePipeline
        cdk.Aspects.of(pipeline_deploy_stage).add(AwsSolutionsChecks())

Application Code

InsuranceLake uses Bandit and Amazon CodeGuru for static code analysis of all helper scripts, Lambda functions, and PySpark Glue Jobs.

To configure CodeGuru Code Reviews, follow the AWS Documentation on creating Code Reviews.

To scan all application code using Bandit, use the following command:

bandit -r --ini .bandit

Additional Resources

Authors

The following people are involved in the design, architecture, development, testing, and review of this solution:

  • Cory Visi, Senior Solutions Architect, Amazon Web Services
  • Ratnadeep Bardhan Roy, Senior Solutions Architect, Amazon Web Services
  • Jose Guay, Enterprise Support, Amazon Web Services
  • Isaiah Grant, Cloud Consultant, 2nd Watch, Inc.
  • Muhammad Zahid Ali, Data Architect, Amazon Web Services
  • Ravi Itha, Senior Data Architect, Amazon Web Services
  • Justiono Putro, Cloud Infrastructure Architect, Amazon Web Services
  • Mike Apted, Principal Solutions Architect, Amazon Web Services
  • Nikunj Vaidya, Senior DevOps Specialist, Amazon Web Services

License Summary

This sample code is made available under the MIT-0 license. See the LICENSE file.

Copyright Amazon.com and its affiliates; all rights reserved. This file is Amazon Web Services Content and may not be duplicated or distributed without permission.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aws_insurancelake_etl-4.1.2.tar.gz (51.1 kB view details)

Uploaded Source

Built Distribution

aws_insurancelake_etl-4.1.2-py3-none-any.whl (74.2 kB view details)

Uploaded Python 3

File details

Details for the file aws_insurancelake_etl-4.1.2.tar.gz.

File metadata

  • Download URL: aws_insurancelake_etl-4.1.2.tar.gz
  • Upload date:
  • Size: 51.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for aws_insurancelake_etl-4.1.2.tar.gz
Algorithm Hash digest
SHA256 da4f2e342cd3dd51167dbeeb3d39e4c149c6a87db3ed59f44ea9206dc3eba1b4
MD5 c2b5fef2889635891135f41d4a9a5165
BLAKE2b-256 42ddf9f775be5847a39ff48146dd77a3ec33eee3d6c4902680004ea1ca721124

See more details on using hashes here.

File details

Details for the file aws_insurancelake_etl-4.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for aws_insurancelake_etl-4.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 d32548ef91c994360437f77c05c92bb9bae7721ba2b29c971f44f3f09f24cab1
MD5 70c014659349a988ca1c6c5f8ad60d32
BLAKE2b-256 b63f291451c94c92841986de5919d553244892e9a0ca202b48df0d8af17acdef

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page