Skip to main content

Dataflows (ETL) backend for DPE Energy Performance Analysis.

Project description

DPE-Energy-Performance-Analysis-ETL

ETL module repos for DPE-Energy-Performance-Analysis-ETL package.

CI - Tests PyPI Latest Release

📃 Description

This ETL module is responsible for extracting, transforming, and loading energy performance data for the DPE-Energy-Performance-Analysis project. It supports both local and containerized (Docker) execution modes, enabling flexible data processing workflows. The ETL pipeline handles data ingestion from CSV files or API sources, applies schema-based validation and transformation, and loads the processed data into a PostgreSQL database or remote storage (S3). For the input, you are free to provide a csv input file (which must meet the input schema) (see here) or not. If not provided, the ETL will fetch directly from Enedis API source (see API sources). Providing a csv input file is recommended for batch extraction exemple when targetting specifc locations. Logging and profiling are integrated for monitoring and debugging purposes.

  • The flow is made of 3 steps : Extraction - Transformation - Loading.

  • Depending on whether you have input file (a csv file which specifies locations on which you want to focus your extraction) or not there are two ways to use the extract module.

  • Transformation steps will require that you define dataframe schemas. There are some function used in this project (based on our red-line project). But you can also go through the code and implement your own transformation functions.

  • Loading step send the output golden data to the RDMS (postgres connection is implemented but one more time you are free to customize the code to fit your needs).

⚙️ How to set up

Set up for - local running

To run the ETL pipeline locally, follow these steps :

  1. Get the codebase on your local machine with the package pip install from pypi index or clone this repository. In the second case, you can pip install -e or use the code as is.
pip install dpe_enedis_ademe_etl_engine
  1. Set up environment variables as described in the Environment variables section. In the environment variables, you will have to define the paths.
  2. Install dependencies: If you choosed to clone the repos, do not forgot to install dependencies. If not, just skip this step.
pip install -r requirements.txt

Set up for - remote running

Remote running refers to when one runs the pipeline in non-local environment (not on your computer). Then, the ETL requires a filesytem to save files. That's why a S3-like storage is required. The switch is transparent to the final users. They just have to specify paths in the bucket storage and the bucket name (see Environment variables). Plus, you may need to load results somewhere for persistance (RDMS). Postgres is used here but you are free to implement any other solution.

  1. Start by installing the package on your remote machine :
pip install dpe_enedis_ademe_etl_engine
  1. Define the environment variables.

✅ How to use

➡️ Extraction

  1. If you are not extracting (in batch) from input file, skip this step ; else follow the steps to prepare your input data. Input datafile must be a csv-file. See the here. You should put it in the locations specified in environment variables for PATH_FILE_INPUT_ENEDIS_CSV. Please respect this if not your extraction will raise exceptions.
  2. Then import the extraction module. The following code snippet is an exemple of how to, but you have some parameters not defined here such as input_csv_path or save_schema and n_threads_for_querying available in the full documentation of the method DataEnedisAdemeExtractor.extract().
from dpe_enedis_ademe_etl_engine.pipelines import DataEnedisAdemeExtractor

pipeline = DataEnedisAdemeExtractor()
pipeline.extract(
  from_input=False, # set as True if input file is provided
  code_departement=95, 
  annee=2023, 
  rows=3
)
  1. (Not implemented yet) running flow from CLI :
python src/pipelines/etl_app.py  < -year=2022 > < -nrows=100 >
  1. Outputs files will be saved in the paths directory defined as environment variables (bronze, silver and golden zones). Logs/profiling stats will be available in given locations as well. You may understand why these env variables are compulsory 🧏.

➡️ Transformation

The transformation step is responsible for cleaning, validating, and enriching the extracted data. You must define the input and output schemas (see the SCHEMA_ETL_INPUT_FILEPATH and SCHEMA_ETL_OUTPUT_FILEPATH environment variables). The transformation logic can be customized by editing or extending the transformation functions in the codebase.

Example usage:

from dpe_enedis_ademe_etl_engine.pipelines import DataEnedisAdemeTransformer

transformer = DataEnedisAdemeTransformer()
transformer.transform(
  input_path="etl/data/1_bronze/conso_enedis.csv",
  output_path="etl/data/2_silver/transformed_data.csv",
  input_schema_path="etl_engine/ressources/schemas/schema_input.json",
  output_schema_path="etl_engine/ressources/schemas/schema_output.json"
)

The transformed data will be saved to the path specified in your environment variables. You can implement your own transformation logic by modifying the transformation functions in the pipeline.

➡️ Loading

The loading step is responsible for persisting the transformed data into the target storage, typically a PostgreSQL database or an S3 bucket. You can use the provided loader class to handle this process.

Example usage:

from dpe_enedis_ademe_etl_engine.pipelines import DataEnedisAdemeLoader
from sqlalchemy import create_engine

engine = create_engine(f"postgresql://{USERNAME}:{PASSWORD}@{HOST}:{PORT}/{DATABASE}")
# these informations are defined in env vars
loader = DataEnedisAdemeLoader(engine)
loader.run()

This will load the data from the specified files from the gold data zone into the configured PostgreSQL table. Make sure your environment variables for the database connection are set correctly. You can customize the loading logic or implement additional loaders for other storage backends as needed.

Environment variables

This package requires some environment variables to be set. These are :

config = {
  "ENV": "LOCAL", # or NOLOCAL
  "APP_NAME": "DPE-API",
  "APP_DESCRIPTION": "ETL for DPE Energy Performance Analysis",
  "LOGGER_APP_NAME": "ETL-Logger",
  # compulsory if you aim to run the ETL flow until the load step
  "POSTGRES_HOST": "******",
  "POSTGRES_PORT": "5432",
  "POSTGRES_DB_NAME": "******",
  "POSTGRES_ADMIN_USERNAME": "postgres",
  "POSTGRES_ADMIN_PASSWORD": "******",
  "POSTGRES_READER_USERNAME": "reader",
  "POSTGRES_READER_PASSWORD": "******",
  "POSTGRES_WRITER_USERNAME": "writer",
  "POSTGRES_WRITER_PASSWORD": "******",
  # compulsory if you aim to run the pipeline with remote storage for files
  "S3_ACCESS_KEY": "<YOUR_S3_ACCESS_KEY>",
  "S3_SECRET_KEY": "<YOUR_S3_SECRET_KEY>",
  "S3_BUCKET_NAME": "dpe-storage-v1",
  "S3_REGION": "eu-west",
  "S3_ENDPOINT_URL": "<HOST>:<PORT>",
  # compulsory
  "PATH_LOG_DIR" : "etl/logs/",
  "PATH_ARCHIVE_DIR" : "etl/data/archive/",
  "PATH_DATA_BRONZE" : "etl/data/1_bronze/",
  "PATH_DATA_SILVER" : "etl/data/2_silver/",
  "PATH_DATA_GOLD" : "etl/data/3_gold/",
  "PATH_FILE_INPUT_ENEDIS_CSV": "etl/data/1_bronze/conso_enedis.csv",
  # compulsory if you aim to run the transformation pipeline
  "SCHEMA_ETL_INPUT_FILEPATH": "etl/ressources/schemas/schema_input.json",
  "SCHEMA_ETL_OUTPUT_FILEPATH": "etl/ressources/schemas/schema_output.json",
  "SCHEMA_GOLDEN_DATA_FILEPATH": "etl/ressources/schemas/schema_golden_data.json",
  # orchestration tool, compulsory
  "PREFECT_API_URL": "http://host:port/api",
}

API-sources

  1. Enedis
  2. BAN
  3. ADEME

Schemas

Authors

  • Fereol Gbenou - feel free to reach me here for any contribution

LinkedIn

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dpe_enedis_ademe_etl_engine-2.3.1.tar.gz (37.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dpe_enedis_ademe_etl_engine-2.3.1-py3-none-any.whl (37.6 kB view details)

Uploaded Python 3

File details

Details for the file dpe_enedis_ademe_etl_engine-2.3.1.tar.gz.

File metadata

File hashes

Hashes for dpe_enedis_ademe_etl_engine-2.3.1.tar.gz
Algorithm Hash digest
SHA256 1c53010cafffc272fcf23ed66acd8d01c6c6e8b6602ec6aa7b39f0f3371b4091
MD5 1e33d17707bf93712ed6e2d0878041f2
BLAKE2b-256 499f287fe688d6d7d4d75988e3266b5016abbd2edbb32dabb5f965a1c745bd34

See more details on using hashes here.

File details

Details for the file dpe_enedis_ademe_etl_engine-2.3.1-py3-none-any.whl.

File metadata

File hashes

Hashes for dpe_enedis_ademe_etl_engine-2.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 10e94b093963fdb662e89deb4ecec21e282363f54ce95d0e28c0347d1f9378db
MD5 7ca4bd599ef69f91126e48bd7c05e685
BLAKE2b-256 dbf74ce9a644184ffc6645dbd38f228da67650562dfafdb4a07239b1b3176c12

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page