Skip to main content

Plugins for Liberty Airflow

Project description

liberty-plugins

Plugins for Liberty Framework

✅ Dags

Daily DAGs

  • airflow-purge-daily-1: Purges old Airflow logs and metadata on a daily schedule (@daily).
  • database-backup-daily-1: Backs up databases every day at 01:00 AM (00 1 * * *).

Weekly DAGs

  • database-purge-weekly-1: Performs database cleanup and purging on a weekly schedule (@weekly).

Unscheduled DAGs

  • airflow-sync-1: Synchronizes repositories as needed (manually triggered).

✅ Airflow Purge

Overview

This function automates the purge of Airflow dags, jobs and logs.

Purge Functions

  • dag_runs: Deletes old DAG run records from the Airflow database based on the retention period set in the airflow_retention_days variable.
  • task_instances: Removes outdated task instance records from the database, ensuring only recent executions are retained.
  • jobs: Cleans up old job entries by deleting records where the job's end date is beyond the retention threshold.
  • logs_in_db: Deletes historical log entries stored in the Airflow database to free up space and improve query performance.
  • logs_on_disk: Scans and removes log files from the Airflow logs directory that exceed the configured retention period, keeping disk usage in check.

Configuration

  • The retention period is controlled by the Airflow Variable: airflow_retention_days.
  • If the variable is not set, the default retention period is 30 days.
  • Logs stored in the database and on disk are purged accordingly.

Usage

  • These functions should be executed periodically to prevent excessive log and metadata buildup.
  • They can be added to an Airflow DAG that runs on a daily schedule (@daily).

✅ Airflow PostgreSQL Backup

Overview

This function automates the backup of PostgreSQL databases using pg_dump, managed as an Airflow task.

Function

  • pg_dump(dag, database_name, conn_id="postgres_conn"): Creates a BashOperator task to back up a PostgreSQL database.
    • dag: The DAG to which this task belongs.
    • database_name: The name of the database to be backed up.
    • conn_id: The Airflow connection ID for PostgreSQL (default: postgres_conn).
    • Returns: A BashOperator task that executes the backup.

Configuration

  • The connection details are fetched dynamically from Airflow using get_connection.
  • The backup location is inside $AIRFLOW_HOME/tmp/.
  • Environment variables are used to avoid storing credentials in plaintext.

Usage

  • This function should be used within an Airflow DAG.
  • It ensures database backups are automated and stored securely.
  • Can be scheduled to run at desired intervals using DAG scheduling.

✅ Database Utils

Overview

This module provides utility functions for retrieving database connection details and column metadata for Oracle and PostgreSQL.

Functions

  • get_column_lengths(spark, table, schema, conn_source): Retrieves column lengths based on the database type.
  • get_column_types(data_df, column_lengths, conn_source): Generates a SQL column type definition string for Oracle or PostgreSQL.
  • get_connection(conn_id, schema=None): Dynamically retrieves database connection details from Airflow based on the connection type.

Configuration

  • Supports both Oracle (oracle.jdbc.driver.OracleDriver) and PostgreSQL (org.postgresql.Driver).
  • Uses Airflow's BaseHook to dynamically retrieve connection details.
  • Calls appropriate helper functions based on the detected database type.

Usage

  • Can be integrated into Airflow DAGs for database connection handling.
  • Useful for schema extraction, table metadata analysis, and type mapping.
  • Ensures compatibility with both Oracle and PostgreSQL environments.

✅ Postgres Utils

Overview

This module provides utility functions for working with Apache Spark and PostgreSQL within an Airflow environment.

Functions

  • create_spark_session(): Initializes and returns a Spark session with predefined configurations.
  • get_all_tables(spark, conn): Retrieves all tables from the database, categorizing them based on foreign key dependencies.
  • get_foreign_key_dependencies(spark, conn): Fetches foreign key relationships between tables.
  • topological_sort(dependencies): Performs a topological sort on a given dependency graph.
  • get_primary_key_for_table(spark, table_name, conn): Retrieves the primary key columns for a specified table.
  • delete_existing_rows(target_conn, primary_keys, table, rows_to_update): Deletes rows from the target database that need to be updated.
  • merge_all_tables(conn_source, conn_target, source_schema, target_schema): Manages table synchronization by processing tables without foreign keys first and then handling dependent tables.
  • merge_single_table(spark, table, source_conn, target_conn): Handles the data merging process for a single table, identifying rows to insert or update in the target database.

Configuration

  • Spark session is configured with JDBC drivers for PostgreSQL and Oracle.
  • PostgreSQL connections are retrieved dynamically using get_connection.
  • Foreign key dependencies are processed using topological sorting.

Usage

  • Used for efficient data synchronization and migration between databases.
  • Can be integrated into Airflow DAGs for automated execution.
  • Supports large datasets by leveraging Spark’s distributed processing capabilities.

✅ Data Transfer Utils

Overview

This module provides utility functions for reading, processing, and writing data between databases using Apache Spark.

Functions

  • read_data_from_db(spark, table, source_conn, source_schema): Reads data from a source database using JDBC.
  • lowercase_columns(df): Converts all column names in a DataFrame to lowercase.
  • write_data_to_db(spark, data_df, table, source_conn, source_schema, target_conn, target_schema): Writes data to a target database, ensuring proper column types and formatting.
  • create_spark_session(): Initializes a Spark session with necessary configurations.
  • copy_table(conn_source, conn_target, table_name, source_schema, target_schema): Copies a table from a source schema to a target schema, handling data transformation and loading.

Configuration

  • Uses JDBC for database interactions.
  • Handles column data type conversion to ensure compatibility.
  • Utilizes get_column_lengths, get_column_types, and get_connection for metadata extraction and connection handling.

Usage

  • Facilitates ETL operations between databases.
  • Ensures clean and structured data processing.
  • Can be integrated into Airflow DAGs for automated data migration workflows.

✅ Git Backup Utils

Overview

This module provides utility functions for managing backups in Git, including pulling, pushing, and purging old backups.

Functions

  • pull_repository(local_path, repo_name, conn_id="git_conn"): Pulls the latest changes from a Git repository.
  • push_backup(local_path, repo_name, databases, conn_id="git_conn"): Pushes database backups to a Git repository.
  • purge_old_backups(local_path, repo_name, conn_id="git_conn"): Deletes backups older than the configured retention period and commits the changes.

Configuration

  • Uses Airflow’s BaseHook to retrieve Git connection details dynamically.
  • Retrieves the backup retention period from the backup_retention_days Airflow variable (default: 30 days).
  • Automatically configures Git user details for committing changes.

Usage

  • Automates backup management by storing database dumps in Git.
  • Ensures that outdated backups are removed efficiently.
  • Can be scheduled within an Airflow DAG for periodic execution.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

liberty_airflow_plugins-1.0.9.tar.gz (29.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

liberty_airflow_plugins-1.0.9-py3-none-any.whl (33.0 kB view details)

Uploaded Python 3

File details

Details for the file liberty_airflow_plugins-1.0.9.tar.gz.

File metadata

  • Download URL: liberty_airflow_plugins-1.0.9.tar.gz
  • Upload date:
  • Size: 29.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for liberty_airflow_plugins-1.0.9.tar.gz
Algorithm Hash digest
SHA256 e943ea933ee4e8b04debad220ba0b5ecbf2d4c55163941bbfadd9097547cf6e8
MD5 8768c01334e6d9d3103d618e0936d4d4
BLAKE2b-256 012d4a0666823902608a4420b90b5a82bcb78e5141c637606053092c95fd7f23

See more details on using hashes here.

File details

Details for the file liberty_airflow_plugins-1.0.9-py3-none-any.whl.

File metadata

File hashes

Hashes for liberty_airflow_plugins-1.0.9-py3-none-any.whl
Algorithm Hash digest
SHA256 69e9765419ed9451fc83d7c3992cd878e2b3a34bacb123b65de750dafc07d229
MD5 d2390cb5a54de6c3606b5f50e3966912
BLAKE2b-256 03de95d891c4bd55bcec7c717411ace7b18ecfd9e455cc3db7f5f0847f85082d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page