Skip to main content

Utility belt to handle data on AWS.

Project description

AWS Data Wrangler

Utility belt to handle data on AWS.

Documentation Status

Read the documentation


Contents: Use Cases | Installation | Examples | Diving Deep | Contributing


Use Cases

Pandas

  • Pandas -> Parquet (S3) (Parallel)
  • Pandas -> CSV (S3) (Parallel)
  • Pandas -> Glue Catalog
  • Pandas -> Athena (Parallel)
  • Pandas -> Redshift (Parallel)
  • CSV (S3) -> Pandas (One shot or Batching)
  • Athena -> Pandas (One shot or Batching)
  • CloudWatch Logs Insights -> Pandas (NEW :star:)
  • Encrypt Pandas Dataframes on S3 with KMS keys (NEW :star:)

PySpark

  • PySpark -> Redshift (Parallel) (NEW :star:)

General

  • List S3 objects (Parallel)
  • Delete S3 objects (Parallel)
  • Delete listed S3 objects (Parallel)
  • Delete NOT listed S3 objects (Parallel)
  • Copy listed S3 objects (Parallel)
  • Get the size of S3 objects (Parallel)
  • Get CloudWatch Logs Insights query results (NEW :star:)

Installation

pip install awswrangler

Runs only with Python 3.6 and beyond.

Runs anywhere (AWS Lambda, AWS Glue, EMR, EC2, on-premises, local, etc).

P.S. Lambda Layer bundle and Glue egg are available to download. It's just upload to your account and run! :rocket:

Examples

Pandas

Writing Pandas Dataframe to S3 + Glue Catalog

session = awswrangler.Session()
session.pandas.to_parquet(
    dataframe=dataframe,
    database="database",
    path="s3://...",
    partition_cols=["col_name"],
)

If a Glue Database name is passed, all the metadata will be created in the Glue Catalog. If not, only the s3 data write will be done.

Writing Pandas Dataframe to S3 as Parquet encrypting with a KMS key

extra_args = {
    "ServerSideEncryption": "aws:kms",
    "SSEKMSKeyId": "YOUR_KMY_KEY_ARN"
}
session = awswrangler.Session(s3_additional_kwargs=extra_args)
session.pandas.to_parquet(
    path="s3://..."
)

Reading from AWS Athena to Pandas

session = awswrangler.Session()
dataframe = session.pandas.read_sql_athena(
    sql="select * from table",
    database="database"
)

Reading from AWS Athena to Pandas in chunks (For memory restrictions)

session = awswrangler.Session()
dataframe_iter = session.pandas.read_sql_athena(
    sql="select * from table",
    database="database",
    max_result_size=512_000_000  # 512 MB
)
for dataframe in dataframe_iter:
    print(dataframe)  # Do whatever you want

Reading from S3 (CSV) to Pandas

session = awswrangler.Session()
dataframe = session.pandas.read_csv(path="s3://...")

Reading from S3 (CSV) to Pandas in chunks (For memory restrictions)

session = awswrangler.Session()
dataframe_iter = session.pandas.read_csv(
    path="s3://...",
    max_result_size=512_000_000  # 512 MB
)
for dataframe in dataframe_iter:
    print(dataframe)  # Do whatever you want

Reading from CloudWatch Logs Insights to Pandas

session = awswrangler.Session()
dataframe = session.pandas.read_log_query(
    log_group_names=[LOG_GROUP_NAME],
    query="fields @timestamp, @message | sort @timestamp desc | limit 5",
)

Typical Pandas ETL

import pandas
import awswrangler

df = pandas.read_...  # Read from anywhere

# Typical Pandas, Numpy or Pyarrow transformation HERE!

session = awswrangler.Session()
session.pandas.to_parquet(  # Storing the data and metadata to Data Lake
    dataframe=dataframe,
    database="database",
    path="s3://...",
    partition_cols=["col_name"],
)

PySpark

Loading PySpark Dataframe to Redshift

session = awswrangler.Session(spark_session=spark)
session.spark.to_redshift(
    dataframe=df,
    path="s3://...",
    connection=conn,
    schema="public",
    table="table",
    iam_role="IAM_ROLE_ARN",
    mode="append",
)

General

Deleting a bunch of S3 objects (parallel :rocket:)

session = awswrangler.Session()
session.s3.delete_objects(path="s3://...")

Get CloudWatch Logs Insights query results

session = awswrangler.Session()
results = session.cloudwatchlogs.query(
    log_group_names=[LOG_GROUP_NAME],
    query="fields @timestamp, @message | sort @timestamp desc | limit 5",
)

Diving Deep

Pandas to Redshift Flow

Pandas to Redshift Flow

Spark to Redshift Flow

Spark to Redshift Flow

Contributing

  • AWS Data Wrangler practically only makes integrations. So we prefer to dedicate our energy / time writing integration tests instead of unit tests. We really like an end-to-end approach for all features.

  • All integration tests are between a local Docker container and a remote/real AWS service.

  • We have a Docker recipe to set up the local end (testing/Dockerfile).

  • We have a Cloudformation to set up the AWS end (testing/template.yaml).

Step-by-step

DISCLAIMER: Make sure to know what you are doing. This steps will charge some services on your AWS account. And requires a minimum security skills to keep your environment safe.

  • Pick up a Linux or MacOS.

  • Install Python 3.6+

  • Install Docker and configure at least 4 cores and 8 GB of memory

  • Fork the AWS Data Wrangler repository and clone that into your development environment

  • Go to the project's directory create a Python's virtual environment for the project (python -m venv venv && source source venv/bin/activate)

  • Run ./install-dev.sh

  • Go to the testing directory

  • Configure the parameters.json file with your AWS environment infos (Make sure that your Redshift will not be open for the World!)

  • Deploy the Cloudformation stack ./deploy-cloudformation.sh

  • Open the docker image ./open-image.sh

  • Inside the image you finally can run ./run-tests.sh

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

awswrangler-0.0.3.tar.gz (29.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

awswrangler-0.0.3-py36,py37-none-any.whl (33.6 kB view details)

Uploaded Python 3.6,py37

File details

Details for the file awswrangler-0.0.3.tar.gz.

File metadata

  • Download URL: awswrangler-0.0.3.tar.gz
  • Upload date:
  • Size: 29.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.0.1 requests-toolbelt/0.9.1 tqdm/4.35.0 CPython/3.6.8

File hashes

Hashes for awswrangler-0.0.3.tar.gz
Algorithm Hash digest
SHA256 7ecbec5c46ca2b22009e79fb16fec06e29fd3562bce3847f2121b8f75cd4b8f7
MD5 e890ce9f0d506b58c83a503aa4b8c287
BLAKE2b-256 1b4442f0de2b32c463e9c1422b453a03ba3055da503fb129c24b5bda0ab51837

See more details on using hashes here.

File details

Details for the file awswrangler-0.0.3-py36,py37-none-any.whl.

File metadata

  • Download URL: awswrangler-0.0.3-py36,py37-none-any.whl
  • Upload date:
  • Size: 33.6 kB
  • Tags: Python 3.6,py37
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.0.1 requests-toolbelt/0.9.1 tqdm/4.35.0 CPython/3.6.8

File hashes

Hashes for awswrangler-0.0.3-py36,py37-none-any.whl
Algorithm Hash digest
SHA256 aff8e84a7c1d5798202f1e96d763e6eec402ceaac637252b847e324b31baa866
MD5 76f04f7788d540a035692a157cf90986
BLAKE2b-256 0e26a4e2bc6151ea5fa4f64ec8ab3153f5c56ace822d891a89e255236894d61d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page