Skip to main content

Plugins for Liberty Airflow

Project description

liberty-plugins

Plugins for Liberty Framework

✅ Dags

Daily DAGs

  • airflow-purge-daily-1: Purges old Airflow logs and metadata on a daily schedule (@daily).
  • database-backup-daily-1: Backs up databases every day at 01:00 AM (00 1 * * *).

Weekly DAGs

  • database-purge-weekly-1: Performs database cleanup and purging on a weekly schedule (@weekly).

Unscheduled DAGs

  • airflow-sync-1: Synchronizes repositories as needed (manually triggered).

✅ Airflow Purge

Overview

This function automates the purge of Airflow dags, jobs and logs.

Purge Functions

  • dag_runs: Deletes old DAG run records from the Airflow database based on the retention period set in the airflow_retention_days variable.
  • task_instances: Removes outdated task instance records from the database, ensuring only recent executions are retained.
  • jobs: Cleans up old job entries by deleting records where the job's end date is beyond the retention threshold.
  • logs_in_db: Deletes historical log entries stored in the Airflow database to free up space and improve query performance.
  • logs_on_disk: Scans and removes log files from the Airflow logs directory that exceed the configured retention period, keeping disk usage in check.

Configuration

  • The retention period is controlled by the Airflow Variable: airflow_retention_days.
  • If the variable is not set, the default retention period is 30 days.
  • Logs stored in the database and on disk are purged accordingly.

Usage

  • These functions should be executed periodically to prevent excessive log and metadata buildup.
  • They can be added to an Airflow DAG that runs on a daily schedule (@daily).

✅ Airflow PostgreSQL Backup

Overview

This function automates the backup of PostgreSQL databases using pg_dump, managed as an Airflow task.

Function

  • pg_dump(dag, database_name, conn_id="postgres_conn"): Creates a BashOperator task to back up a PostgreSQL database.
    • dag: The DAG to which this task belongs.
    • database_name: The name of the database to be backed up.
    • conn_id: The Airflow connection ID for PostgreSQL (default: postgres_conn).
    • Returns: A BashOperator task that executes the backup.

Configuration

  • The connection details are fetched dynamically from Airflow using get_connection.
  • The backup location is inside $AIRFLOW_HOME/tmp/.
  • Environment variables are used to avoid storing credentials in plaintext.

Usage

  • This function should be used within an Airflow DAG.
  • It ensures database backups are automated and stored securely.
  • Can be scheduled to run at desired intervals using DAG scheduling.

✅ Database Utils

Overview

This module provides utility functions for retrieving database connection details and column metadata for Oracle and PostgreSQL.

Functions

  • get_column_lengths(spark, table, schema, conn_source): Retrieves column lengths based on the database type.
  • get_column_types(data_df, column_lengths, conn_source): Generates a SQL column type definition string for Oracle or PostgreSQL.
  • get_connection(conn_id, schema=None): Dynamically retrieves database connection details from Airflow based on the connection type.

Configuration

  • Supports both Oracle (oracle.jdbc.driver.OracleDriver) and PostgreSQL (org.postgresql.Driver).
  • Uses Airflow's BaseHook to dynamically retrieve connection details.
  • Calls appropriate helper functions based on the detected database type.

Usage

  • Can be integrated into Airflow DAGs for database connection handling.
  • Useful for schema extraction, table metadata analysis, and type mapping.
  • Ensures compatibility with both Oracle and PostgreSQL environments.

✅ Postgres Utils

Overview

This module provides utility functions for working with Apache Spark and PostgreSQL within an Airflow environment.

Functions

  • create_spark_session(): Initializes and returns a Spark session with predefined configurations.
  • get_all_tables(spark, conn): Retrieves all tables from the database, categorizing them based on foreign key dependencies.
  • get_foreign_key_dependencies(spark, conn): Fetches foreign key relationships between tables.
  • topological_sort(dependencies): Performs a topological sort on a given dependency graph.
  • get_primary_key_for_table(spark, table_name, conn): Retrieves the primary key columns for a specified table.
  • delete_existing_rows(target_conn, primary_keys, table, rows_to_update): Deletes rows from the target database that need to be updated.
  • merge_all_tables(conn_source, conn_target, source_schema, target_schema): Manages table synchronization by processing tables without foreign keys first and then handling dependent tables.
  • merge_single_table(spark, table, source_conn, target_conn): Handles the data merging process for a single table, identifying rows to insert or update in the target database.

Configuration

  • Spark session is configured with JDBC drivers for PostgreSQL and Oracle.
  • PostgreSQL connections are retrieved dynamically using get_connection.
  • Foreign key dependencies are processed using topological sorting.

Usage

  • Used for efficient data synchronization and migration between databases.
  • Can be integrated into Airflow DAGs for automated execution.
  • Supports large datasets by leveraging Spark’s distributed processing capabilities.

✅ Data Transfer Utils

Overview

This module provides utility functions for reading, processing, and writing data between databases using Apache Spark.

Functions

  • read_data_from_db(spark, table, source_conn, source_schema): Reads data from a source database using JDBC.
  • lowercase_columns(df): Converts all column names in a DataFrame to lowercase.
  • write_data_to_db(spark, data_df, table, source_conn, source_schema, target_conn, target_schema): Writes data to a target database, ensuring proper column types and formatting.
  • create_spark_session(): Initializes a Spark session with necessary configurations.
  • copy_table(conn_source, conn_target, table_name, source_schema, target_schema): Copies a table from a source schema to a target schema, handling data transformation and loading.

Configuration

  • Uses JDBC for database interactions.
  • Handles column data type conversion to ensure compatibility.
  • Utilizes get_column_lengths, get_column_types, and get_connection for metadata extraction and connection handling.

Usage

  • Facilitates ETL operations between databases.
  • Ensures clean and structured data processing.
  • Can be integrated into Airflow DAGs for automated data migration workflows.

✅ Git Backup Utils

Overview

This module provides utility functions for managing backups in Git, including pulling, pushing, and purging old backups.

Functions

  • pull_repository(local_path, repo_name, conn_id="git_conn"): Pulls the latest changes from a Git repository.
  • push_backup(local_path, repo_name, databases, conn_id="git_conn"): Pushes database backups to a Git repository.
  • purge_old_backups(local_path, repo_name, conn_id="git_conn"): Deletes backups older than the configured retention period and commits the changes.

Configuration

  • Uses Airflow’s BaseHook to retrieve Git connection details dynamically.
  • Retrieves the backup retention period from the backup_retention_days Airflow variable (default: 30 days).
  • Automatically configures Git user details for committing changes.

Usage

  • Automates backup management by storing database dumps in Git.
  • Ensures that outdated backups are removed efficiently.
  • Can be scheduled within an Airflow DAG for periodic execution.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

liberty_airflow_plugins-1.0.8.tar.gz (29.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

liberty_airflow_plugins-1.0.8-py3-none-any.whl (33.0 kB view details)

Uploaded Python 3

File details

Details for the file liberty_airflow_plugins-1.0.8.tar.gz.

File metadata

  • Download URL: liberty_airflow_plugins-1.0.8.tar.gz
  • Upload date:
  • Size: 29.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for liberty_airflow_plugins-1.0.8.tar.gz
Algorithm Hash digest
SHA256 3ccb4f8484c0eaa5881c3c49b6990a4dc8a462a5df98065c9d7dc2270e562aa1
MD5 0760eae9346c35f30ab60a89d48f1727
BLAKE2b-256 ba20ef1be5d46bfe8872b12a3b43d1024aba312795fc50cc7349f4d5e350fedd

See more details on using hashes here.

File details

Details for the file liberty_airflow_plugins-1.0.8-py3-none-any.whl.

File metadata

File hashes

Hashes for liberty_airflow_plugins-1.0.8-py3-none-any.whl
Algorithm Hash digest
SHA256 0d6afa726ca3090a6c81785082b3ad4b1681ee4ef1181982f3ebe45b9b34c8d7
MD5 2c1c5f80a7a0a9153c768782a2860364
BLAKE2b-256 f9635ca26ca0f453296bb1f7534717acf5810fed8c1b92c16e0a5e3a2125cc27

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page