Skip to main content

ETL for Python

Project description

ETL using python

Python package for extracting data from a source, transforming it and loading it to a destination, with validation in between.

The provided ETL pipeline provides useful functionality on top of the usual operations:

  • Extract: Extract data from multiples sources, in parallel (using threads).
  • Validate: Validate the extracted data, to make sure it matches what will be required by the transform step, using pandera schemas. This provide early fail if there is any unexpected change in the sources.
  • Transform: Define the logic for transformation of the data, making it reusable, and allowing multiple data frames as input and multiple data frames as output.
  • Validate again: Validate the transformed data, to make sure it matches your expectation, and what the destination will require.
  • Load: Load multiple data, each to one or more destination, and load diferent data to diferent destinations in parallel (using threads).

Installation

The package is available at PyPI, so you can install it using pip:

pip install extralo

Usage

Lets create some fake data to use in the examples:

import pandas as pd

data = pd.DataFrame(
    {
        "client": ["Alice", "Bob", "Charlie", "David", "Eve"],
        "policy_start_date": ["2024-01-01", "2024-02-02", "2024-03-03", "2024-04-04", "2024-05-05"],
    }
)

data.to_csv("data.csv", index=False)

Lets define some logic to transform the data:

from extralo.transformer import Transformer


class MyTransformer(Transformer):
    def transform(self, data):
        data["policy_start_date"] = pd.to_datetime(data["policy_start_date"])
        data["days_since_start"] = (pd.Timestamp.now() - data["policy_start_date"]).dt.days
        return {"data": data}

Notice how we defined the argument to transform with the name "data". This name must be the same name used in the sources definition in the next step. Also, notice how we returned a dict of DataFrame. This is required since we could return multiple data from this step.

Lets create a SQLite database to use as destination:

from sqlalchemy import create_engine

# Create a SQLite database
engine = create_engine("sqlite:///data.sqlite")

Now we can define the ETL pipeline:

from extralo import ETL, CSVSource, SQLDestination
import pandera as pa

etl = ETL(
    sources={
        "data": CSVSource("data.csv"),
    },
    before_schemas={"data": pa.DataFrameModel},
    transformer=MyTransformer(),
    after_schemas={
        "data": pa.DataFrameModel,
    },
    destinations={
        "data": [
            SQLDestination(engine, "data_group", None, if_exists="replace"),
        ],
    },
)

And finally run it:

etl.execute()

Log the execution

The extralo packages uses a logger named "elt" to display useful information about the execution. You can configure the logger to display the logs in the console:

import logging

logging.basicConfig(level=logging.INFO)

If we execute the ETL pipeline again, we will see some logs printed to the console:

etl.execute()

The log message can be configured using the functionality provided by the logging module.

Validate data with pandera

The ETL pipeline can validate the data extracted and transformed using pandera schemas. This is useful to make sure the data is in the expected format, and to provide early fail if there is any unexpected change in the sources.

In the previous example, we used the "base" DataFrameModel to create a validator that will never fail. We can create a more strict schema to validate the data:

before_schema = pa.DataFrameSchema(
    {
        "client": pa.Column(pa.String),
        "policy_start_date": pa.Column(pa.DateTime),
    }
)

after_schema = pa.DataFrameSchema(
    {
        "client": pa.Column(pa.String),
        "policy_start_date": pa.Column(pa.DateTime),
        "days_since_start": pa.Column(pa.Int),
    }
)

And inform the ETL pipeline to use these schemas:

etl = ETL(
    sources={
        "data": CSVSource("data.csv"),
    },
    before_schemas={"data": before_schema},
    transformer=MyTransformer(),
    after_schemas={
        "data": after_schema,
    },
    destinations={
        "data": [
            SQLDestination(engine, "data_group", None, if_exists="replace"),
        ],
    },
)

etl.execute()

Notice that we got SchemaErrors (since the validation is always performed with lazy=True): policy_start_date is not a DateTime. Lets fix it and try again:

before_schema = pa.DataFrameSchema(
    {
        "client": pa.Column(pa.String),
        "policy_start_date": pa.Column(pa.String),
    }
)

etl = ETL(
    sources={
        "data": CSVSource("data.csv"),
    },
    before_schemas={"data": before_schema},
    transformer=MyTransformer(),
    after_schemas={
        "data": after_schema,
    },
    destinations={
        "data": [
            SQLDestination(engine, "data_group", None, if_exists="replace"),
        ],
    },
)

etl.execute()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

extralo-0.9.1.tar.gz (10.1 kB view details)

Uploaded Source

Built Distribution

extralo-0.9.1-py3-none-any.whl (10.3 kB view details)

Uploaded Python 3

File details

Details for the file extralo-0.9.1.tar.gz.

File metadata

  • Download URL: extralo-0.9.1.tar.gz
  • Upload date:
  • Size: 10.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.0 CPython/3.12.4

File hashes

Hashes for extralo-0.9.1.tar.gz
Algorithm Hash digest
SHA256 5f51c60b4dd8a391363611248f12913a913cc9787266ffa6f5afc7a9e1f1b41b
MD5 f808c7d86f2383d8c7d95c03612d117b
BLAKE2b-256 6b2acd3d2c22c25c7e429fe31f67317ca9a0d6ead6a018e4ea0260599117fdee

See more details on using hashes here.

File details

Details for the file extralo-0.9.1-py3-none-any.whl.

File metadata

  • Download URL: extralo-0.9.1-py3-none-any.whl
  • Upload date:
  • Size: 10.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.0 CPython/3.12.4

File hashes

Hashes for extralo-0.9.1-py3-none-any.whl
Algorithm Hash digest
SHA256 9566d27ec9da8b5dce8f4eeb09a0947fe311da4d90ebfa4a38cb05350c6a9257
MD5 49dad266cc4847a8db1c2d8e70162159
BLAKE2b-256 baf8110402fe2052832d25b3fa6e6cc6f0a2b5fb33face66ba22595e9940edf5

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page