Skip to main content

A package to run ETL Pipeline

Project description

etl-pipeline-runner

Extract Transform Load (ETL) pipeline runner is a simple, yet effective python package to run ETL-Pipelines for Data sceince projects. The sole purpose of this package is to:

Extract data from a source --> Transform the data according to the necessity --> Load data to a Database

Installation

Install the library with pip:

    pip install etl-pipeline-runner

Usage

Run an ETL Pipeline that extracts data from kaggle and stores it in a SQLite Database.


Data source: https://www.kaggle.com/datasets/edenbd/150k-lyrics-labeled-with-spotify-valence

Destination: Under songs table of project.sqlite Database. Suppose the database is located or will be created in /data directory.

Example code:

  1. Import the following services from etl_pipeline_runner
from etl_pipeline_runner.services import (
    ETLPipeline,
    DataExtractor,
    CSVHandler,
    SQLiteLoader,
    ETLQueue,
)
  1. Create a function that defines the transformation you want to perform on the dataset before loading to the Database. The function signature must match the following. Here pd refers to pandas.
    def transform_songs(data_frame: pd.DataFrame):
        data_frame = data_frame.drop(columns=data_frame.columns[0], axis=1)
        data_frame = data_frame.rename(columns={"seq": "lyrics"})
        return data_frame
  1. Create an object of the SQLiteLoader.
    DATA_DIRECTORY = os.path.join(os.getcwd(), "data")
    songs_loader = SQLiteLoader(
        db_name="project.sqlite",
        table_name="song_lyrics",
        if_exists=SQLiteLoader.REPLACE,
        index=False,
        method=None,
        output_directory=DATA_DIRECTORY,
    )
  1. Create an object of the CSVHandler.
    songs_dtype = {
        "#": "Int64",
        "artist": str,
        "seq": str,
        "song": str,
        "label": np.float64,
    }

    songs_csv_handler = CSVHandler(
        file_name="labeled_lyrics_cleaned.csv",
        sep=",",
        names=None,
        dtype=songs_dtype,
        transformer=transform_songs,
        loader=songs_loader,
    )
  1. Create an object of the DataExtractor.
    songs_extractor = DataExtractor(
        data_name="Song lyrics",
        url="https://www.kaggle.com/datasets/edenbd/150k-lyrics-labeled-with-spotify-valence",
        type=DataExtractor.KAGGLE_ARCHIVE,
        file_handlers=(songs_csv_handler,),
    )
  1. Create an object of ETLPipeline.
    songs_pipeline = ETLPipeline(
        extractor=songs_extractor,
    )
  1. Finally run the pipeline:
    if __name__ == "__main__":
        ETLQueue(etl_pipelines=(songs_pipeline,)).run()

Setting-up credentials for KAGGLE Datasource

If your data source is kaggle, you need api key to download the dataset. etl-pipeline-runner uses opendatasets for donwloading dataset from Kaggle.
Following step will guide you to setup kaggle credentials.

  1. Go to https://kaggle.com/me/account (sign in if required).
  2. Scroll down to the "API" section and click "Create New API Token".
  3. This will download a file kaggle.json with the following contents:
    {"username":"YOUR_KAGGLE_USERNAME","key":"YOUR_KAGGLE_KEY"}
  1. You can either put the credentials in your root directory as kaggle.json or enter your username and key in terminal when asked.

Services explained

  1. SQLiteLoader

Parameters description:

Parameter Description
db_name: str Name of the database.
table_name: str Table name where data will be stored.
if_exists: str Action if the table already exists. Possible options: SQLiteLoader.REPLACE, SQLiteLoader.APPEND, SQLiteLoader.FAIL.
index: bool Write DataFrame index as a column. Uses index_label as the column name in the table. (From pandas Doc).
method: Callable Controls the SQL insertion clause used. (From pandas doc).
output_directory: str Path where the databse is located or wil be created.
  1. CSVHandler

Parameters description:

Parameter Description
file_name: str Name of the csv file. It must match with the actual filename.
sep: str Separetor used in the csv file.
names: list Name of the columns if csv file does not contains it.
dtype: dict Type of the columns in the csv file.
compression: str Options: CSVHandler.ZIP_COMPRESSION, CSVHandler.GZIP_COMPRESSION, CSVHandler.BZIP2_COMPRESSION, CSVHandler.ZSTD_COMPRESSION, CSVHandler.XZ_COMPRESSION, CSVHandler.TAR_COMPRESSION
encoding: str Encoding of the file. Default: utf-8.
loader: SQLiteLoader Object of SQLiteLoader
transformer: Callable Function that defines the transformation on the data.
  1. DataExtractor

Parameters description:

Parameter Description
data_name: str Name of the data. (Could be anything of your choice).
url: str Url of the data source.
type: str Type of the source. Possible options: DataExtractor.KAGGLE_ARCHIVE, DataExtractor.CSV.
file_handlers: Tuple(CSVHandlers) Handler objects to handle the extracted files from the url.
  1. ETLPipeline

Parameters description:

Parameter Description
extractor: DataExtractor An object of DataExtractor service.
  1. ETLQueue

Parameters description:

Parameter Description
etl_pipelines: Tuples Tupes of ETLPipelines

Contributing

This is an open source project and I welcome contributions. Please create an issue first and make a feature branch assiciated to the issue.

Local Development Setup

  1. Clone the repository:
    git clone git@github.com:prantoamt/etl-pipeline-runner.git
  1. Install pdm package manager based on your local environment: https://pdm-project.org/latest/

  2. Go to the project directory and install the requirements using pdm:

    pdm install
  1. Open up the project in VS code, make your changes and create a pull request with proper description.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

etl_pipeline_runner-1.1.1.tar.gz (10.0 kB view details)

Uploaded Source

Built Distribution

etl_pipeline_runner-1.1.1-py3-none-any.whl (7.1 kB view details)

Uploaded Python 3

File details

Details for the file etl_pipeline_runner-1.1.1.tar.gz.

File metadata

  • Download URL: etl_pipeline_runner-1.1.1.tar.gz
  • Upload date:
  • Size: 10.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: pdm/2.11.1 CPython/3.10.12

File hashes

Hashes for etl_pipeline_runner-1.1.1.tar.gz
Algorithm Hash digest
SHA256 57833db1e562003c83fef590addb13fdb5987fedd7b55f27c85e0868dda0f3dd
MD5 d18ec42e1720a3e21db0da3a34427697
BLAKE2b-256 5701f4b3efe8772d06edaefaf9553987798c178d97cc7dd0012c8fc6c8f62a0c

See more details on using hashes here.

File details

Details for the file etl_pipeline_runner-1.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for etl_pipeline_runner-1.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 234e326ad0e8c8af0ec17c86a7e44ef6e64db5b529ca78ffa3fd7101cc55baba
MD5 cd9cd6038044d9b163fc95997d0e4f18
BLAKE2b-256 7004efffdcf5a6e5d0f6b1795d4d603be42c98bba3766043704016bd078ee65b

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page