Skip to main content

A construct for the quick demo of EMR Serverless.

Project description

cdk-emrserverless-with-delta-lake

This constrcut builds an EMR studio, a cluster template for the EMR Studio, and an EMR Serverless application. 2 S3 buckets will be created, one is for the EMR Studio workspace and the other one is for EMR Serverless applications. This construct is for analysts, data engineers, and anyone who wants to know how to process Delta Lake data with EMR serverless. cfn designer They build the construct via cdkv2 and build a serverless job within the EMR application generated by the construct via AWS CLI within few minutes. After the EMR serverless job is finished, they can then check the processed result done by the EMR serverless job on an EMR notebook through the cluster template. app history

TOC

Requirements

  1. Your current identity has the AdministratorAccess power.

  2. An IAM user named Administrator with the AdministratorAccess power.

    • This is related to the Portfolio of AWS Service Catalog created by the construct, which is required for EMR cluster tempaltes.
    • I'm still thinking whether I should leave it as a choice (in the construct) or create for you directly.
  3. Add {"Key": "for-use-with-amazon-emr-managed-policies", "Value": "true"} as tag onto the default VPC and the subnets you chose for the EMR Studio.

Before deployment

You might want to execute the following command.

PROFILE_NAME="scott.hsieh"
# If you only have one credentials on your local machine, just ignore `--profile`, buddy.
cdk bootstrap aws://${AWS_ACCOUNT_ID}/${AWS_REGION} --profile ${PROFILE_NAME}

Minimal content for deployment

#!/usr/bin/env node
import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import { EmrServerless } from '../../emrserverless';

class TypescriptStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);
    new EmrServerless(this, 'EmrServerless', {
      subnetIds: ['subnet-a4bfe38c', 'subnet-278b3050', 'subnet-3571a36c']
    });
  }
}

const app = new cdk.App();
new TypescriptStack(app, 'TypescriptStack', {
  stackName: 'emr-studio',
  env: {
    region: process.env.CDK_DEFAULT_REGION,
    account: process.env.CDK_DEFAULT_ACCOUNT,
  },
});

After deployment

Promise me, darling, make advantage on the CloudFormation outputs. All you need is copy-paste, copy-paste, copy-paste, life should be always that easy. cfn outputs

  1. Define the following environment variables on your current session.

    export PROFILE_NAME="${YOUR_PROFILE_NAME}"
    export JOB_ROLE_ARN="${copy-paste-thank-you}"
    export APPLICATION_ID="${copy-paste-thank-you}"
    export SERVERLESS_BUCKET_NAME="${copy-paste-thank-you}"
    export DELTA_LAKE_SCRIPT_NAME="delta-lake-demo"
    
  2. Create a Python script for processing Delta Lake

    touch ${DELTA_LAKE_SCRIPT_NAME}.py
    cat << EOF > ${DELTA_LAKE_SCRIPT_NAME}.py
    from pyspark.sql import SparkSession
    import uuid
    
    if __name__ == "__main__":
        """
            Delta Lake with EMR Serverless, take NTC taxi as example.
        """
        spark = SparkSession \\
            .builder \\
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \\
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \\
            .enableHiveSupport() \\
            .appName("Delta-Lake-OSS") \\
            .getOrCreate()
    
        url = "s3://${SERVERLESS_BUCKET_NAME}/emr-serverless-spark/delta-lake/output/1.2.1/%s/" % str(
            uuid.uuid4())
    
        # creates a Delta table and outputs to target S3 bucket
        spark.range(5).write.format("delta").save(url)
    
        # reads a Delta table and outputs to target S3 bucket
        spark.read.format("delta").load(url).show()
    
        base = spark.read.parquet(
            "s3://nyc-tlc/trip data/yellow_tripdata_2021-*.parquet")
    
        base.write.format("delta") \\
            .mode("overwrite") \\
            .save("s3a://${SERVERLESS_BUCKET_NAME}/emr-serverless-spark/delta-lake/nyx-tlc-2021")
        spark.stop()
    EOF
    
  3. Upload the script and required jars into the serverless bucket

    # upload script
    aws s3 cp delta-lake-demo.py s3://${SERVERLESS_BUCKET_NAME}/scripts/${DELTA_LAKE_SCRIPT_NAME}.py --profile ${PROFILE_NAME}
    # download jars and upload them
    DELTA_VERSION="1.2.0"
    DELTA_LAKE_CORE="delta-core_2.12-${DELTA_VERSION}.jar"
    DELTA_LAKE_STORAGE="delta-storage-${${DELTA_VERSION}}.jar"
    curl https://repo1.maven.org/maven2/io/delta/delta-core_2.12/${DELTA_VERSION}/${DELTA_LAKE_CORE} --output ${DELTA_LAKE_CORE}
    curl https://repo1.maven.org/maven2/io/delta/delta-storage/${DELTA_VERSION}/${DELTA_LAKE_STORAGE} --output ${DELTA_LAKE_STORAGE}
    aws s3 mv ${DELTA_LAKE_CORE} s3://${SERVERLESS_BUCKET_NAME}/jars/${${DELTA_LAKE_CORE}} --profile ${PROFILE_NAME}
    aws s3 mv ${DELTA_LAKE_STORAGE} s3://${SERVERLESS_BUCKET_NAME}/jars/${DELTA_LAKE_STORAGE} --profile ${PROFILE_NAME}
    

Create an EMR Serverless app

Rememeber, you got so much information to copy and paste from the CloudFormation outputs. cfn outputs

aws emr-serverless start-job-run \
  --application-id ${APPLICATION_ID} \
  --execution-role-arn ${JOB_ROLE_ARN} \
  --name 'shy-shy-first-time' \
  --job-driver '{
        "sparkSubmit": {
            "entryPoint": "s3://'${SERVERLESS_BUCKET_NAME}'/scripts/'${DELTA_LAKE_SCRIPT_NAME}'.py",
            "sparkSubmitParameters": "--conf spark.executor.cores=1 --conf spark.executor.memory=4g --conf spark.driver.cores=1 --conf spark.driver.memory=4g --conf spark.executor.instances=1 --conf spark.jars=s3://'${SERVERLESS_BUCKET_NAME}'/jars/delta-core_2.12-1.2.0.jar,s3://'${SERVERLESS_BUCKET_NAME}'/jars/delta-storage-1.2.0.jar"
        }
    }' \
  --configuration-overrides '{
        "monitoringConfiguration": {
            "s3MonitoringConfiguration": {
                "logUri": "s3://'${SERVERLESS_BUCKET_NAME}'/serverless-log/"
	        }
	    }
	}' \
	--profile ${PROFILE_NAME}

If you execute with success, you should see the similar reponse as the following:

{
    "applicationId": "00f1gvklchoqru25",
    "jobRunId": "00f1h0ipd2maem01",
    "arn": "arn:aws:emr-serverless:ap-northeast-1:630778274080:/applications/00f1gvklchoqru25/jobruns/00f1h0ipd2maem01"
}

Check the executing job

Access the EMR Studio via the URL from the CloudFormation outputs. It should look very similar to the following url: https://es-pilibalapilibala.emrstudio-prod.ap-northeast-1.amazonaws.com, i.e., weird string and region won't be the same as mine.

  1. Enter into the application enter into the app
  2. Enter into the executing job

Check results from an EMR notebook via cluster template

  1. Create a workspace on the AWS Console
  2. Create an EMR cluster via the template in the workspace
  3. Check the results delivered by the EMR serverless application via an EMR notebook.

Fun facts

  1. You can assign multiple jars as a comma-separated list to the spark.jars as the Spark page says for your EMR Serverless job. The UI will complain, you still can start the job. Don't be afraid, just click it like when you were child, facing authority fearlessly. ui bug

  2. Currently, you need to check few subnet IDs within your VPC and assign these IDs to the subnetIds argument of the construt. Furthermore, you need to add a pair of values (a tag) onto the default VPC and the subnets you chose. This is necessary for the service role of the EMR Studio.

    • Further work will be progressed for reducing this manual work.
    • But for now, you got to add {"Key": "for-use-with-amazon-emr-managed-policies", "Value": "true"} onto the default VPC and the subnets you chose for this quick demo.
  3. To fully delet a stack with the construct, you need to make sure there is no more workspace within the EMR Studio. Aside from that, you also need to remove the associated identity from the Service Catalog (this is a necessary resource for the cluster template).

  4. Version inconsistency on Spark history. Possibly it can be ignored yet still made me wonder why the versions are different. naughty inconsistency

  5. So far, I still haven't figured out how to make the s3a URI work. The s3 URI is fine while the serverless app will complain that it couldn't find proper credentials provider to read the s3a URI.

Future work

  1. Custom resuorce for EMR Serverless
  2. Make the construct more flexible for users
  3. Compare Databricks Runtime and EMR Serverless.

Project details


Release history Release notifications | RSS feed

This version

2.0.0

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

Built Distribution

File details

Details for the file cdk_emrserverless_with_delta_lake-2.0.0.tar.gz.

File metadata

File hashes

Hashes for cdk_emrserverless_with_delta_lake-2.0.0.tar.gz
Algorithm Hash digest
SHA256 236dd24e18cfe3fa7350449b973fbd95b55ca6acc5708df51595bc7466233ec7
MD5 99b5c8f6295da5cce5ea6887563b9bd9
BLAKE2b-256 4781eb0b04f12847c81b83efc734a94853a856f8f4617d9f64349ce359c6872e

See more details on using hashes here.

File details

Details for the file cdk_emrserverless_with_delta_lake-2.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for cdk_emrserverless_with_delta_lake-2.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c21a24894cde39c4f0b8792a0555a62106441049809be24a5a1d116833c96c22
MD5 60a9e0d82d3994aed09c98797b1353ed
BLAKE2b-256 666717035fb7faea7dbfaf9f7d436dbd0f905a0cd251c3b52a3ca7023c5118d3

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page