ETL for Python
Project description
ETL using python
Python package for extracting data from a source, transforming it and loading it to a destination, with validation in between.
The provided ETL pipeline provides useful functionality on top of the usual operations:
- Extract: Extract data from multiples sources, in parallel (using threads).
- Validate: Validate the extracted data, to make sure it matches what will be required by the transform step, using pandera schemas. This provide early fail if there is any unexpected change in the sources.
- Transform: Define the logic for transformation of the data, making it reusable, and allowing multiple data frames as input and multiple data frames as output.
- Validate again: Validate the transformed data, to make sure it matches your expectation, and what the destination will require.
- Load: Load multiple data, each to one or more destination, and load diferent data to diferent destinations in parallel (using threads).
Installation
The package is available at PyPI, so you can install it using pip:
pip install extralo
Usage
Lets create some fake data to use in the examples:
import pandas as pd
data = pd.DataFrame(
{
"client": ["Alice", "Bob", "Charlie", "David", "Eve"],
"policy_start_date": ["2024-01-01", "2024-02-02", "2024-03-03", "2024-04-04", "2024-05-05"],
}
)
data.to_csv("data.csv", index=False)
Lets define some logic to transform the data:
from extralo.transformer import Transformer
class MyTransformer(Transformer):
def transform(self, data):
data["policy_start_date"] = pd.to_datetime(data["policy_start_date"])
data["days_since_start"] = (pd.Timestamp.now() - data["policy_start_date"]).dt.days
return {"data": data}
Notice how we defined the argument to transform with the name "data". This name must be the same name used in the sources definition in the next step. Also, notice how we returned a dict of DataFrame. This is required since we could return multiple data from this step.
Lets create a SQLite database to use as destination:
from sqlalchemy import create_engine
# Create a SQLite database
engine = create_engine("sqlite:///data.sqlite")
Now we can define the ETL pipeline:
from extralo import ETL, CSVSource, SQLDestination
import pandera as pa
etl = ETL(
sources={
"data": CSVSource("data.csv"),
},
before_schemas={"data": pa.DataFrameModel},
transformer=MyTransformer(),
after_schemas={
"data": pa.DataFrameModel,
},
destinations={
"data": [
SQLDestination(engine, "data_group", None, if_exists="replace"),
],
},
)
And finally run it:
etl.execute()
Log the execution
The extralo packages uses a logger named "elt" to display useful information about the execution. You can configure the logger to display the logs in the console:
import logging
logging.basicConfig(level=logging.INFO)
If we execute the ETL pipeline again, we will see some logs printed to the console:
etl.execute()
The log message can be configured using the functionality provided by the logging module.
Validate data with pandera
The ETL pipeline can validate the data extracted and transformed using pandera schemas. This is useful to make sure the data is in the expected format, and to provide early fail if there is any unexpected change in the sources.
In the previous example, we used the "base" DataFrameModel to create a validator that will never fail. We can create a more strict schema to validate the data:
before_schema = pa.DataFrameSchema(
{
"client": pa.Column(pa.String),
"policy_start_date": pa.Column(pa.DateTime),
}
)
after_schema = pa.DataFrameSchema(
{
"client": pa.Column(pa.String),
"policy_start_date": pa.Column(pa.DateTime),
"days_since_start": pa.Column(pa.Int),
}
)
And inform the ETL pipeline to use these schemas:
etl = ETL(
sources={
"data": CSVSource("data.csv"),
},
before_schemas={"data": before_schema},
transformer=MyTransformer(),
after_schemas={
"data": after_schema,
},
destinations={
"data": [
SQLDestination(engine, "data_group", None, if_exists="replace"),
],
},
)
etl.execute()
Notice that we got SchemaErrors (since the validation is always performed with lazy=True
): policy_start_date
is not a DateTime
.
Lets fix it and try again:
before_schema = pa.DataFrameSchema(
{
"client": pa.Column(pa.String),
"policy_start_date": pa.Column(pa.String),
}
)
etl = ETL(
sources={
"data": CSVSource("data.csv"),
},
before_schemas={"data": before_schema},
transformer=MyTransformer(),
after_schemas={
"data": after_schema,
},
destinations={
"data": [
SQLDestination(engine, "data_group", None, if_exists="replace"),
],
},
)
etl.execute()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file extralo-0.9.1.tar.gz
.
File metadata
- Download URL: extralo-0.9.1.tar.gz
- Upload date:
- Size: 10.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.0 CPython/3.12.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5f51c60b4dd8a391363611248f12913a913cc9787266ffa6f5afc7a9e1f1b41b |
|
MD5 | f808c7d86f2383d8c7d95c03612d117b |
|
BLAKE2b-256 | 6b2acd3d2c22c25c7e429fe31f67317ca9a0d6ead6a018e4ea0260599117fdee |
File details
Details for the file extralo-0.9.1-py3-none-any.whl
.
File metadata
- Download URL: extralo-0.9.1-py3-none-any.whl
- Upload date:
- Size: 10.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.0 CPython/3.12.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9566d27ec9da8b5dce8f4eeb09a0947fe311da4d90ebfa4a38cb05350c6a9257 |
|
MD5 | 49dad266cc4847a8db1c2d8e70162159 |
|
BLAKE2b-256 | baf8110402fe2052832d25b3fa6e6cc6f0a2b5fb33face66ba22595e9940edf5 |