Skip to main content

An object-oriented interface for defining parquet datasets for AWS built on top of awswrangler and pandera

Project description

Brief

aws-parquet is a toolkit than enables working with parquet datasets on AWS. It handles AWS S3 reads/writes, AWS Glue catalog updates and AWS Athena queries by providing a simple and intuitive interface.

Motivation

The goal is to provide a simple and intuitive interface to create and manage parquet datasets on AWS.

aws-parquet makes use of the following tools:

Features

aws-parquet provides a ParquetDataset class that enables the following operations:

  • create a parquet dataset that will get registered in AWS Glue
  • append new data to the dataset and update the AWS Glue catalog
  • read a partition of the dataset and perform proper schema validation and type casting
  • overwrite data in the dataset after performing proper schema validation and type casting
  • delete a partition of the dataset and update the AWS Glue catalog
  • query the dataset using AWS Athena

How to setup

Using pip:

pip install aws_parquet

How to use

Create a parquet dataset that will get registered in AWS Glue

import os

from aws_parquet import ParquetDataset
import pandas as pd
import pandera as pa
from pandera.typing import Series

# define your pandera schema model
class MyDatasetSchemaModel(pa.SchemaModel):
    col1: Series[int] = pa.Field(nullable=False, ge=0, lt=10)
    col2: Series[pa.DateTime]
    col3: Series[float]

# configuration
database = "default"
bucket_name = os.environ["AWS_S3_BUCKET"]
table_name = "foo_bar"
path = f"s3://{bucket_name}/{table_name}/"
partition_cols = ["col1", "col2"]
schema = MyDatasetSchemaModel.to_schema()

# create the dataset
dataset = ParquetDataset(
    database=database,
    table=table_name,
    partition_cols=partition_cols,
    path=path,
    pandera_schema=schema,
)

dataset.create()

instead with awswrangler one would have to do the following:

import awswrangler as wr

wr.catalog.create_parquet_table(
    database=database,
    path=path,
    table=table,
    # figure out the equivalent glue/athena types to use
    partitions_types={"col1": "bigint", "col2": "timestamp"},
    columns_types={"col3": "double"},
)

Append new data to the dataset

df = pd.DataFrame({
    "col1": [1, 2, 3],
    "col2": ["2021-01-01", "2021-01-02", "2021-01-03"],
    "col3": [1.0, 2.0, 3.0]
})

dataset.update(df)

instead with awswrangler one would have to do the following:

df = pd.DataFrame({
    "col1": [1, 2, 3],
    "col2": ["2021-01-01", "2021-01-02", "2021-01-03"],
    "col3": [1.0, 2.0, 3.0]
})

# perform schema validation and type casting 
df_validated = validate_schema(df)

wr.s3.to_parquet(
    df=df,
    path="s3://my-bucket/my-dataset",
    dataset=True,
    database="my_database",
    table="my_table",
    partition_cols=["col1", "col2"],
    mode="append"
)

Read a partition of the dataset

df = dataset.read({"col2": "2021-01-01"})

instead with awswrangler one would have to do the following:

df = wr.s3.read_parquet(
    path="s3://my-bucket/my-dataset",
    dataset=True,
    database="my_database",
    table="my_table",
    # make sure to cast the partition values to the right dtype
    partition_filter=lambda x: pd.Timestamp(x["col2"]) == pd.Timestamp("2021-01-01")
)

# perform schema validation and type casting 
df_validated = validate_schema(df)

Overwrite data in the dataset

df_overwrite = pd.DataFrame({
    "col1": [1, 2, 3],
    "col2": ["2021-01-01", "2021-01-02", "2021-01-03"],
    "col3": [4.0, 5.0, 6.0]
})
dataset.update(df_overwrite, overwrite=True)

instead with awswrangler one would have to do the following:

df_overwrite_validated = validate_schema(df_overwrite)

wr.s3.to_parquet(
    df=df_overwrite_validated,
    path="s3://my-bucket/my-dataset",
    dataset=True,
    database="my_database",
    table="my_table",
    partition_cols=["col1", "col2"],
    mode="overwrite_partitions"
)

Delete a partition of the dataset

dataset.delete({"col1": 1, "col2": "2021-01-01"})

instead with awswrangler one would have to do the following:

# remove the partitions from s3
wr.s3.delete_objects(path="s3://infima-package-testing/foo/bar/col1=1/col2=2021-01-01")

# remove the partitions from glue
wr.catalog.delete_partitions(
    database="default",
    table="foo_bar",
    partitions_values=[["1", "2021-01-01 00:00:00"]],
)

Query the dataset using AWS Athena

df = dataset.query("SELECT col1 FROM foo_bar")

instead with awswrangler one would have to do the following:

out = wr.athena.read_sql_query("SELECT col1 FROM foo_bar", database=database)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aws_parquet-0.4.0.tar.gz (8.9 kB view hashes)

Uploaded Source

Built Distribution

aws_parquet-0.4.0-py3-none-any.whl (11.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page