A package to run ETL Pipeline
Project description
etl-pipeline-runner
Extract Transform Load (ETL) pipeline runner is a simple, yet effective python package to run ETL-Pipelines for Data sceince projects. The sole purpose of this package is to:
Extract data from a source --> Transform the data according to the necessity --> Load data to a Database
Installation
Install the library with pip:
pip install etl-pipeline-runner
Usage
Run an ETL Pipeline that extracts data from kaggle and stores it in a SQLite Database.
Data source: https://www.kaggle.com/datasets/edenbd/150k-lyrics-labeled-with-spotify-valence
Destination: Under songs
table of project.sqlite
Database. Suppose the database is located or will be created in /data
directory.
Example code:
- Import the following services from
etl_pipeline_runner
from etl_pipeline_runner.services import (
ETLPipeline,
DataExtractor,
CSVHandler,
SQLiteLoader,
ETLQueue,
)
- Create a function that defines the transformation you want to perform on the dataset before loading to the Database. The function signature must match the following. Here pd refers to pandas.
def transform_songs(data_frame: pd.DataFrame):
data_frame = data_frame.drop(columns=data_frame.columns[0], axis=1)
data_frame = data_frame.rename(columns={"seq": "lyrics"})
return data_frame
- Create an object of the SQLiteLoader.
DATA_DIRECTORY = os.path.join(os.getcwd(), "data")
songs_loader = SQLiteLoader(
db_name="project.sqlite",
table_name="song_lyrics",
if_exists=SQLiteLoader.REPLACE,
index=False,
method=None,
output_directory=DATA_DIRECTORY,
)
- Create an object of the CSVHandler.
songs_dtype = {
"#": "Int64",
"artist": str,
"seq": str,
"song": str,
"label": np.float64,
}
songs_csv_handler = CSVHandler(
file_name="labeled_lyrics_cleaned.csv",
sep=",",
names=None,
dtype=songs_dtype,
transformer=transform_songs,
loader=songs_loader,
)
- Create an object of the DataExtractor.
songs_extractor = DataExtractor(
data_name="Song lyrics",
url="https://www.kaggle.com/datasets/edenbd/150k-lyrics-labeled-with-spotify-valence",
type=DataExtractor.KAGGLE_ARCHIVE,
file_handlers=(songs_csv_handler,),
)
- Create an object of ETLPipeline.
songs_pipeline = ETLPipeline(
extractor=songs_extractor,
)
- Finally run the pipeline:
if __name__ == "__main__":
ETLQueue(etl_pipelines=(songs_pipeline,)).run()
Setting-up credentials for KAGGLE Datasource
If your data source is kaggle, you need api key to download the dataset.
etl-pipeline-runner uses opendatasets for donwloading dataset from Kaggle.
Following step will guide you to setup kaggle credentials.
- Go to https://kaggle.com/me/account (sign in if required).
- Scroll down to the "API" section and click "Create New API Token".
- This will download a file kaggle.json with the following contents:
{"username":"YOUR_KAGGLE_USERNAME","key":"YOUR_KAGGLE_KEY"}
- You can either put the credentials in your root directory as
kaggle.json
or enter your username and key in terminal when asked.
Services explained
- SQLiteLoader
Parameters description:
Parameter | Description |
---|---|
db_name: str | Name of the database. |
table_name: str | Table name where data will be stored. |
if_exists: str | Action if the table already exists. Possible options: SQLiteLoader.REPLACE , SQLiteLoader.APPEND , SQLiteLoader.FAIL . |
index: bool | Write DataFrame index as a column. Uses index_label as the column name in the table. (From pandas Doc). |
method: Callable | Controls the SQL insertion clause used. (From pandas doc). |
output_directory: str | Path where the databse is located or wil be created. |
- CSVHandler
Parameters description:
Parameter | Description |
---|---|
file_name: str | Name of the csv file. It must match with the actual filename. |
sep: str | Separetor used in the csv file. |
names: list | Name of the columns if csv file does not contains it. |
dtype: dict | Type of the columns in the csv file. |
compression: str | Options: CSVHandler.ZIP_COMPRESSION , CSVHandler.GZIP_COMPRESSION , CSVHandler.BZIP2_COMPRESSION , CSVHandler.ZSTD_COMPRESSION , CSVHandler.XZ_COMPRESSION , CSVHandler.TAR_COMPRESSION |
encoding: str | Encoding of the file. Default: utf-8 . |
loader: SQLiteLoader | Object of SQLiteLoader |
transformer: Callable | Function that defines the transformation on the data. |
- DataExtractor
Parameters description:
Parameter | Description |
---|---|
data_name: str | Name of the data. (Could be anything of your choice). |
url: str | Url of the data source. |
type: str | Type of the source. Possible options: DataExtractor.KAGGLE_ARCHIVE , DataExtractor.CSV . |
file_handlers: Tuple(CSVHandlers) | Handler objects to handle the extracted files from the url. |
- ETLPipeline
Parameters description:
Parameter | Description |
---|---|
extractor: DataExtractor | An object of DataExtractor service. |
- ETLQueue
Parameters description:
Parameter | Description |
---|---|
etl_pipelines: Tuples | Tupes of ETLPipelines |
Contributing
This is an open source project and I welcome contributions. Please create an issue first and make a feature branch assiciated to the issue.
Local Development Setup
- Clone the repository:
git clone git@github.com:prantoamt/etl-pipeline-runner.git
-
Install pdm package manager based on your local environment: https://pdm-project.org/latest/
-
Go to the project directory and install the requirements using pdm:
pdm install
- Open up the project in VS code, make your changes and create a pull request with proper description.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file etl_pipeline_runner-1.1.1.tar.gz
.
File metadata
- Download URL: etl_pipeline_runner-1.1.1.tar.gz
- Upload date:
- Size: 10.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: pdm/2.11.1 CPython/3.10.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 57833db1e562003c83fef590addb13fdb5987fedd7b55f27c85e0868dda0f3dd |
|
MD5 | d18ec42e1720a3e21db0da3a34427697 |
|
BLAKE2b-256 | 5701f4b3efe8772d06edaefaf9553987798c178d97cc7dd0012c8fc6c8f62a0c |
File details
Details for the file etl_pipeline_runner-1.1.1-py3-none-any.whl
.
File metadata
- Download URL: etl_pipeline_runner-1.1.1-py3-none-any.whl
- Upload date:
- Size: 7.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: pdm/2.11.1 CPython/3.10.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 234e326ad0e8c8af0ec17c86a7e44ef6e64db5b529ca78ffa3fd7101cc55baba |
|
MD5 | cd9cd6038044d9b163fc95997d0e4f18 |
|
BLAKE2b-256 | 7004efffdcf5a6e5d0f6b1795d4d603be42c98bba3766043704016bd078ee65b |