Skip to main content

ETL job from CSV to Parquet in AWS S3

Project description

S3 Parquetifier

Build Status PyPI version fury.io MIT license

S3 Parquetifier is an ETL tool that can take a file from an S3 bucket convert it to Parquet format and save it to another bucket.

S3 Parquetifier supports the following file types

  • CSV
  • JSON
  • TSV

Instructions

How to install

To install the package just run the following

sudo apt-get install -y libssl-dev libffi-dev &&
sudo apt-get install -y libxml2-dev libxslt1-dev &&
sudo apt-get install -y libsnappy-dev
pip install s3-parquetifier

How to use it

S3 parquetifier needs an AWS Account that will have at least read rights for the target bucket and read-write rights for the destination bucket.

You can read the following article on how to set up S3 roles and policies here

Running the Script

from s3_parquetifier import S3Parquetifier

# Call the covertor
S3Parquetifier(
    source_bucket="<the bucket's name where the CSVs are>",
    target_bucket="<the bucket's name where you want the parquet file to be saved>",
    verbose=True,  # for verbosity or not
).convert_from_s3(
    source_key="<the key of the S3 object>",
    target_key="<the key of the S3 object>",
    chunk_size=100000  # The number of rows per parquet
)
from s3_parquetifier import S3Parquetifier

# Call the covertor
S3Parquetifier(
    target_bucket="<the bucket's name where you want the parquet file to be saved>",
    verbose=True,  # for verbosity or not
).convert_from_local(
    file_name='<The CSV file that you want to transform>',
    target_key='<The S3 bucket key where the file will be saved>',
    chunk_size=100000,
)

Adding custom pre-processing function

You can add custom pre-processing function on your source file. Because this tool is designed for large files the preprocessing is taking place on every chunk separately. If the full file is needed for the preprocessing then a local preprocessing is needed in the source file.

In the following example, we are going to add custom columns on the chunk with some custom values. We are going to add the columns test1, test2, test3 with the values 1, 2, 3 respectively.

We define our function bellow named pre_process and we also define the arguments for the function kwargs. The chunk DataFrame is not needed in the kwargs, it is taken by default. You have to pass your function as an argument in pre_process_chunk and the arguments in kwargs in the convert_from_s3 method.

from s3_parquetifier import S3Parquetifier


# Add three new columns with custom values
def pre_process(chunk, columns=None, values=None):

    for index, column in enumerate(columns):
        chunk[column] = values[index]

    return chunk

# define the arguments for the pre-processor
kwargs = {
    'columns': ['test1', 'test2', 'test3'],
    'values': [1, 2, 3]
}

# Call the covertor
S3Parquetifier(
    source_bucket="<the bucket's name where the CSVs are>",
    target_bucket="<the bucket's name where you want the parquet file to be saved>",
    verbose=True,  # for verbosity or not
).convert_from_s3(
    source_key='<the key of the S3 object>',
    target_key='<the key of the S3 object>',
    chunk_size=100000  # The number of rows per parquet
    pre_process_chunk=pre_process,  # A preprocessing function that will pre-process the each chunk
    kwargs=kwargs  # potential extra arguments for the pre-preocess function
)

ToDo

  • Add support to handle local files too
  • Add support for JSON
  • Add streaming from url support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

s3-parquetifier-0.2.tar.gz (6.4 kB view details)

Uploaded Source

Built Distribution

s3_parquetifier-0.2-py3-none-any.whl (7.8 kB view details)

Uploaded Python 3

File details

Details for the file s3-parquetifier-0.2.tar.gz.

File metadata

  • Download URL: s3-parquetifier-0.2.tar.gz
  • Upload date:
  • Size: 6.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.21.0 setuptools/45.2.0 requests-toolbelt/0.9.1 tqdm/4.31.1 CPython/3.6.9

File hashes

Hashes for s3-parquetifier-0.2.tar.gz
Algorithm Hash digest
SHA256 3e9ec61f140be0b30fd5c5a28d38346da5957c62822057ed4b38c3cc7bada72f
MD5 d1f4229a26a53e680d5ff4149174aab3
BLAKE2b-256 ff84ace06ebba86d9fec789ce7e7a608b70f506dc33bad379a3b7bd2b88f697f

See more details on using hashes here.

File details

Details for the file s3_parquetifier-0.2-py3-none-any.whl.

File metadata

  • Download URL: s3_parquetifier-0.2-py3-none-any.whl
  • Upload date:
  • Size: 7.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.21.0 setuptools/45.2.0 requests-toolbelt/0.9.1 tqdm/4.31.1 CPython/3.6.9

File hashes

Hashes for s3_parquetifier-0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 cae454a15776635ec81749fac1a32f0c40dd395c63eb116f78da83d22e0fb16e
MD5 e578a5fa1acaa9498929208ae057e496
BLAKE2b-256 433bfe22662b9775685fb0d3ad1eb6a2d13690025d428cda3cd6ab04ed89c4e6

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page