Skip to main content

Utility belt to handle data on AWS.

Project description

AWS Data Wrangler

Utility belt to handle data on AWS.

Release Downloads Python Version Documentation Status Coverage Average time to resolve an issue License

forthebadge made-with-python

Read the documentation


Contents: Use Cases | Installation | Examples | Diving Deep | Contributing


Use Cases

Pandas

  • Pandas -> Parquet (S3) (Parallel)
  • Pandas -> CSV (S3) (Parallel)
  • Pandas -> Glue Catalog Table
  • Pandas -> Athena (Parallel)
  • Pandas -> Redshift (Parallel)
  • Parquet (S3) -> Pandas (Parallel) (NEW :star:)
  • CSV (S3) -> Pandas (One shot or Batching)
  • Glue Catalog Table -> Pandas (Parallel) (NEW :star:)
  • Athena -> Pandas (One shot, Batching or Parallel (NEW :star:))
  • Redshift -> Pandas (Parallel) (NEW :star:)
  • Redshift -> Parquet (S3) (NEW :star:)
  • CloudWatch Logs Insights -> Pandas
  • Encrypt Pandas Dataframes on S3 with KMS keys

PySpark

  • PySpark -> Redshift (Parallel)
  • Register Glue table from Dataframe stored on S3
  • Flatten nested DataFrames

General

  • List S3 objects (Parallel)
  • Delete S3 objects (Parallel)
  • Delete listed S3 objects (Parallel)
  • Delete NOT listed S3 objects (Parallel)
  • Copy listed S3 objects (Parallel)
  • Get the size of S3 objects (Parallel)
  • Get CloudWatch Logs Insights query results
  • Load partitions on Athena/Glue table (repair table)
  • Create EMR cluster (For humans)
  • Terminate EMR cluster
  • Get EMR cluster state
  • Submit EMR step(s) (For humans)
  • Get EMR step state
  • Get EMR step state
  • Athena query to receive the result as python primitives (Iterable[Dict[str, Any])

Installation

pip install awswrangler

Runs only with Python 3.6 and beyond.

Runs anywhere (AWS Lambda, AWS Glue Python Shell, EMR, EC2, on-premises, local, etc).

P.S. Lambda Layer's bundle and Glue's wheel/egg are available to download. It's just upload and run! :rocket:

P.P.S. AWS Data Wrangler counts on compiled dependencies (C/C++) so there is no support for Glue PySpark by now.

Examples

Pandas

Writing Pandas Dataframe to S3 + Glue Catalog

wrangler = awswrangler.Session()
wrangler.pandas.to_parquet(
    dataframe=dataframe,
    database="database",
    path="s3://...",
    partition_cols=["col_name"],
)

If a Glue Database name is passed, all the metadata will be created in the Glue Catalog. If not, only the s3 data write will be done.

Writing Pandas Dataframe to S3 as Parquet encrypting with a KMS key

extra_args = {
    "ServerSideEncryption": "aws:kms",
    "SSEKMSKeyId": "YOUR_KMY_KEY_ARN"
}
wrangler = awswrangler.Session(s3_additional_kwargs=extra_args)
wrangler.pandas.to_parquet(
    path="s3://..."
)

Reading from AWS Athena to Pandas

wrangler = awswrangler.Session()
dataframe = wrangler.pandas.read_sql_athena(
    sql="select * from table",
    database="database"
)

Reading from AWS Athena to Pandas in chunks (For memory restrictions)

wrangler = awswrangler.Session()
dataframe_iter = wrangler.pandas.read_sql_athena(
    sql="select * from table",
    database="database",
    max_result_size=512_000_000  # 512 MB
)
for dataframe in dataframe_iter:
    print(dataframe)  # Do whatever you want

Reading from AWS Athena to Pandas with the blazing fast CTAS approach

wrangler = awswrangler.Session(athena_ctas_approach=True)
dataframe = wrangler.pandas.read_sql_athena(
    sql="select * from table",
    database="database"
)

Reading from S3 (CSV) to Pandas

wrangler = awswrangler.Session()
dataframe = wrangler.pandas.read_csv(path="s3://...")

Reading from S3 (CSV) to Pandas in chunks (For memory restrictions)

wrangler = awswrangler.Session()
dataframe_iter = wrangler.pandas.read_csv(
    path="s3://...",
    max_result_size=512_000_000  # 512 MB
)
for dataframe in dataframe_iter:
    print(dataframe)  # Do whatever you want

Reading from CloudWatch Logs Insights to Pandas

wrangler = awswrangler.Session()
dataframe = wrangler.pandas.read_log_query(
    log_group_names=[LOG_GROUP_NAME],
    query="fields @timestamp, @message | sort @timestamp desc | limit 5",
)

Typical Pandas ETL

import pandas
import awswrangler

df = pandas.read_...  # Read from anywhere

# Typical Pandas, Numpy or Pyarrow transformation HERE!

wrangler = awswrangler.Session()
wrangler.pandas.to_parquet(  # Storing the data and metadata to Data Lake
    dataframe=dataframe,
    database="database",
    path="s3://...",
    partition_cols=["col_name"],
)

Loading Pandas Dataframe to Redshift

wrangler = awswrangler.Session()
wrangler.pandas.to_redshift(
    dataframe=dataframe,
    path="s3://temp_path",
    schema="...",
    table="...",
    connection=con,
    iam_role="YOUR_ROLE_ARN",
    mode="overwrite",
    preserve_index=False,
)

Extract Redshift query to Pandas DataFrame

wrangler = awswrangler.Session()
dataframe = session.pandas.read_sql_redshift(
    sql="SELECT ...",
    iam_role="YOUR_ROLE_ARN",
    connection=con,
    temp_s3_path="s3://temp_path")

PySpark

Loading PySpark Dataframe to Redshift

wrangler = awswrangler.Session(spark_session=spark)
wrangler.spark.to_redshift(
    dataframe=df,
    path="s3://...",
    connection=conn,
    schema="public",
    table="table",
    iam_role="IAM_ROLE_ARN",
    mode="append",
)

Register Glue table from Dataframe stored on S3

dataframe.write \
        .mode("overwrite") \
        .format("parquet") \
        .partitionBy(["year", "month"]) \
        .save(compression="gzip", path="s3://...")
wrangler = awswrangler.Session(spark_session=spark)
wrangler.spark.create_glue_table(
    dataframe=dataframe,
    file_format="parquet",
    partition_by=["year", "month"],
    path="s3://...",
    compression="gzip",
    database="my_database")

Flatten nested PySpark DataFrame

wrangler = awswrangler.Session(spark_session=spark)
dfs = wrangler.spark.flatten(dataframe=df_nested)
for name, df_flat in dfs.items():
    print(name)
    df_flat.show()

General

Deleting a bunch of S3 objects (parallel)

wrangler = awswrangler.Session()
wrangler.s3.delete_objects(path="s3://...")

Get CloudWatch Logs Insights query results

wrangler = awswrangler.Session()
results = wrangler.cloudwatchlogs.query(
    log_group_names=[LOG_GROUP_NAME],
    query="fields @timestamp, @message | sort @timestamp desc | limit 5",
)

Load partitions on Athena/Glue table (repair table)

wrangler = awswrangler.Session()
wrangler.athena.repair_table(database="db_name", table="tbl_name")

Create EMR cluster

wrangler = awswrangler.Session()
cluster_id = wrangler.emr.create_cluster(
    cluster_name="wrangler_cluster",
    logging_s3_path=f"s3://BUCKET_NAME/emr-logs/",
    emr_release="emr-5.27.0",
    subnet_id="SUBNET_ID",
    emr_ec2_role="EMR_EC2_DefaultRole",
    emr_role="EMR_DefaultRole",
    instance_type_master="m5.xlarge",
    instance_type_core="m5.xlarge",
    instance_type_task="m5.xlarge",
    instance_ebs_size_master=50,
    instance_ebs_size_core=50,
    instance_ebs_size_task=50,
    instance_num_on_demand_master=1,
    instance_num_on_demand_core=1,
    instance_num_on_demand_task=1,
    instance_num_spot_master=0,
    instance_num_spot_core=1,
    instance_num_spot_task=1,
    spot_bid_percentage_of_on_demand_master=100,
    spot_bid_percentage_of_on_demand_core=100,
    spot_bid_percentage_of_on_demand_task=100,
    spot_provisioning_timeout_master=5,
    spot_provisioning_timeout_core=5,
    spot_provisioning_timeout_task=5,
    spot_timeout_to_on_demand_master=True,
    spot_timeout_to_on_demand_core=True,
    spot_timeout_to_on_demand_task=True,
    python3=True,
    spark_glue_catalog=True,
    hive_glue_catalog=True,
    presto_glue_catalog=True,
    bootstraps_paths=None,
    debugging=True,
    applications=["Hadoop", "Spark", "Ganglia", "Hive"],
    visible_to_all_users=True,
    key_pair_name=None,
    spark_jars_path=[f"s3://...jar"],
    maximize_resource_allocation=True,
    keep_cluster_alive_when_no_steps=True,
    termination_protected=False,
    spark_pyarrow=True,
    tags={
        "foo": "boo"
    }
)
print(cluster_id)

Athena query to receive the result as python primitives (Iterable[Dict[str, Any])

wrangler = awswrangler.Session()
for row in wrangler.athena.query(query="...", database="..."):
    print(row)

Diving Deep

Parallelism, Non-picklable objects and GeoPandas

AWS Data Wrangler tries to parallelize everything that is possible (I/O and CPU bound task). You can control the parallelism level using the parameters:

  • procs_cpu_bound: number of processes that can be used in single node applications for CPU bound case (Default: os.cpu_count())
  • procs_io_bound: number of processes that can be used in single node applications for I/O bound cases (Default: os.cpu_count() * PROCS_IO_BOUND_FACTOR)

Both can be defined on Session level or directly in the functions.

Some special cases will not work with parallelism:

  • GeoPandas
  • Columns with non-picklable objects

To handle that use procs_cpu_bound=1 and avoid the distribution of the dataframe.

Pandas with null object columns (UndetectedType exception)

Pandas has a too generic "data type" named object. Pandas object columns can be string, dates, etc, etc, etc. We can handle this object column fine inferring the types of theses objects inside the values, Pyarrow does that like a charm. So the real problem starts when we have a completely null object column because we don't have anything to infer.

To work with null object columns you can explicitly set the expected Athena data type for the target table doing:

import awswrangler
import pandas as pd

dataframe = pd.DataFrame({
    "col": [1, 2],
    "col_string_null": [None, None],
    "col_date_null": [None, None],
})
session = awswrangler.Session()
session.pandas.to_parquet(
    dataframe=dataframe,
    database="DATABASE",
    path=f"s3://...",
    cast_columns={
      "col_string_null": "string",
      "col_date_null": "date"
    })

Pandas to Redshift Flow

Pandas to Redshift Flow

Spark to Redshift Flow

Spark to Redshift Flow

Contributing

  • AWS Data Wrangler practically only makes integrations. So we prefer to dedicate our energy / time writing integration tests instead of unit tests. We really like an end-to-end approach for all features.

  • All integration tests are between a local Docker container and a remote/real AWS service.

  • We have a Docker recipe to set up the local end (testing/Dockerfile).

  • We have a Cloudformation to set up the AWS end (testing/template.yaml).

Step-by-step

DISCLAIMER: Make sure to know what you are doing. This steps will charge some services on your AWS account. And requires a minimum security skills to keep your environment safe.

  • Pick up a Linux or MacOS.

  • Install Python 3.6+

  • Install Docker and configure at least 4 cores and 8 GB of memory

  • Fork the AWS Data Wrangler repository and clone that into your development environment

  • Go to the project's directory create a Python's virtual environment for the project (python -m venv venv && source venv/bin/activate)

  • Run ./setup-dev-env.sh

  • Go to the testing directory

  • Configure the parameters.json file with your AWS environment infos (Make sure that your Redshift will not be open for the World! Configure your security group to only give access for your IP.)

  • Deploy the Cloudformation stack ./deploy-cloudformation.sh

  • Open the docker image ./open-image.sh

  • Inside the image you finally can run ./run-tests.sh

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

awswrangler-0.1.0.tar.gz (48.0 kB view details)

Uploaded Source

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

awswrangler-0.1.0-py3.6.egg (110.3 kB view details)

Uploaded Egg

awswrangler-0.1.0-glue-none-any.whl (52.7 kB view details)

Uploaded glue

File details

Details for the file awswrangler-0.1.0.tar.gz.

File metadata

  • Download URL: awswrangler-0.1.0.tar.gz
  • Upload date:
  • Size: 48.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.40.2 CPython/3.6.8

File hashes

Hashes for awswrangler-0.1.0.tar.gz
Algorithm Hash digest
SHA256 385b27804f1721397369486590ce37603f7c9b3c6d530565c422f47c0dc2040a
MD5 61d7fda139974ac5388aa8517ff4df1b
BLAKE2b-256 5c5600f9909b2dbb4497aece7c3e6bdbb7180a6362822c04cad93636c9dc543c

See more details on using hashes here.

File details

Details for the file awswrangler-0.1.0-py3.6.egg.

File metadata

  • Download URL: awswrangler-0.1.0-py3.6.egg
  • Upload date:
  • Size: 110.3 kB
  • Tags: Egg
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.40.2 CPython/3.6.8

File hashes

Hashes for awswrangler-0.1.0-py3.6.egg
Algorithm Hash digest
SHA256 47b84fb5f4a4b01eb777d7edd53c438c378dbd45740bf78d7e491c60a7fcfeac
MD5 e15b9d3b907a2de26328385aec5654e2
BLAKE2b-256 15a9aabcad6395028737329ac830a9c08b19ab21e29dd1fd897013a41f7e01d8

See more details on using hashes here.

File details

Details for the file awswrangler-0.1.0-glue-none-any.whl.

File metadata

  • Download URL: awswrangler-0.1.0-glue-none-any.whl
  • Upload date:
  • Size: 52.7 kB
  • Tags: glue
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.40.2 CPython/3.6.8

File hashes

Hashes for awswrangler-0.1.0-glue-none-any.whl
Algorithm Hash digest
SHA256 04c4b47ba77e9e08e4853c75751e1f40788a227865f7ad6572b0cafc356f50b0
MD5 a9bda95d87d9793488897d715923ced2
BLAKE2b-256 232075e7232fd4b6f740b72fbd84a33abcac30efee66fb5d391ee20b8f08156f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page