Skip to main content

ETL for Python

Project description

ETL using python

Python package for extracting data from a source, transforming it and loading it to a destination, with validation in between.

The provided ETL pipeline provides useful functionality on top of the usual operations:

  • Extract: Extract data from multiples sources, in parallel (using threads).
  • Validate: Validate the extracted data, to make sure it matches what will be required by the transform step, using pandera schemas. This provide early fail if there is any unexpected change in the sources.
  • Transform: Define the logic for transformation of the data, making it reusable, and allowing multiple data frames as input and multiple data frames as output.
  • Validate again: Validate the transformed data, to make sure it matches your expectation, and what the destination will require.
  • Load: Load multiple data, each to one or more destination, and load diferent data to diferent destinations in parallel (using threads).

Installation

The package is available at PyPI, so you can install it using pip:

pip install extralo

Usage

Lets create some fake data to use in the examples:

import pandas as pd

data = pd.DataFrame(
    {
        "client": ["Alice", "Bob", "Charlie", "David", "Eve"],
        "policy_start_date": ["2024-01-01", "2024-02-02", "2024-03-03", "2024-04-04", "2024-05-05"],
    }
)

data.to_csv("data.csv", index=False)

Lets define some logic to transform the data:

from extralo.transformer import Transformer


class MyTransformer(Transformer):
    def transform(self, data):
        data["policy_start_date"] = pd.to_datetime(data["policy_start_date"])
        data["days_since_start"] = (pd.Timestamp.now() - data["policy_start_date"]).dt.days
        return {"data": data}

Notice how we defined the argument to transform with the name "data". This name must be the same name used in the sources definition in the next step. Also, notice how we returned a dict of DataFrame. This is required since we could return multiple data from this step.

Lets create a SQLite database to use as destination:

from sqlalchemy import create_engine

engine = create_engine("sqlite:///data.sqlite")

Now we can define the ETL pipeline:

from extralo import ETL, CSVSource, SQLDestination
import pandera as pa

etl = ETL(
    sources={
        "data": CSVSource("data.csv"),
    },
    before_schemas={"data": pa.DataFrameModel},
    transformer=MyTransformer(),
    after_schemas={
        "data": pa.DataFrameModel,
    },
    destinations={
        "data": [
            SQLDestination(engine, "data_group", None, if_exists="replace"),
        ],
    },
)

And finally run it:

etl.execute()

Log the execution

The extralo packages uses a logger named "elt" to display useful information about the execution. You can configure the logger to display the logs in the console:

import logging

logging.basicConfig(level=logging.INFO)

If we execute the ETL pipeline again, we will see some logs printed to the console:

etl.execute()

The log message can be configured using the functionality provided by the logging module.

Validate data with pandera

The ETL pipeline can validate the data extracted and transformed using pandera schemas. This is useful to make sure the data is in the expected format, and to provide early fail if there is any unexpected change in the sources.

In the previous example, we used the "base" DataFrameModel to create a validator that will never fail. We can create a more strict schema to validate the data:

before_schema = pa.DataFrameSchema(
    {
        "client": pa.Column(pa.String),
        "policy_start_date": pa.Column(pa.DateTime),
    }
)

after_schema = pa.DataFrameSchema(
    {
        "client": pa.Column(pa.String),
        "policy_start_date": pa.Column(pa.DateTime),
        "days_since_start": pa.Column(pa.Int),
    }
)

And inform the ETL pipeline to use these schemas:

etl = ETL(
    sources={
        "data": CSVSource("data.csv"),
    },
    before_schemas={"data": before_schema},
    transformer=MyTransformer(),
    after_schemas={
        "data": after_schema,
    },
    destinations={
        "data": [
            SQLDestination(engine, "data_group", None, if_exists="replace"),
        ],
    },
)

etl.execute()

Notice that we got SchemaErrors (since the validation is always performed with lazy=True): policy_start_date is not a DateTime. Lets fix it and try again:

before_schema = pa.DataFrameSchema(
    {
        "client": pa.Column(pa.String),
        "policy_start_date": pa.Column(pa.String),
    }
)

etl = ETL(
    sources={"data": CSVSource("data.csv")},
    before_schemas={"data": before_schema},
    transformer=MyTransformer(),
    after_schemas={"data": after_schema},
    destinations={
        "data": [
            SQLDestination(engine, "data_group", None, if_exists="replace"),
        ],
    },
)

etl.execute()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

extralo-0.9.2.tar.gz (10.1 kB view details)

Uploaded Source

Built Distribution

extralo-0.9.2-py3-none-any.whl (10.3 kB view details)

Uploaded Python 3

File details

Details for the file extralo-0.9.2.tar.gz.

File metadata

  • Download URL: extralo-0.9.2.tar.gz
  • Upload date:
  • Size: 10.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.0 CPython/3.12.4

File hashes

Hashes for extralo-0.9.2.tar.gz
Algorithm Hash digest
SHA256 745ddd2716b83838493b5a449e0d5327c22660885e276421bed611cefc11f0a0
MD5 5fc3331af5ff6c423e962a8554d86894
BLAKE2b-256 060765b0ee8f20ef50b42a9c6ca0642fec847d25b26ec2cd7eae7ec989d6965f

See more details on using hashes here.

File details

Details for the file extralo-0.9.2-py3-none-any.whl.

File metadata

  • Download URL: extralo-0.9.2-py3-none-any.whl
  • Upload date:
  • Size: 10.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.0 CPython/3.12.4

File hashes

Hashes for extralo-0.9.2-py3-none-any.whl
Algorithm Hash digest
SHA256 657fd4e0519795212ffb06dc9bfe6a6a9d0eb239c9714e43f339c92a7a539e0b
MD5 5ba4f41c26d1e9c5e74e2702e991d724
BLAKE2b-256 8778fc20e7e1d40457ae33fe61665f9cc6c8879ed190a52002c11f7d13065023

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page