Skip to main content

airflow class for exchanging data between DBMSs in native binary formats.

Project description

DBHose для Apache Airflow

                                                                 (                )
 (  (                                                 )          )\ )     (    ( /(
 )\))(   '   (    (                   )       (     ( /(        (()/(   ( )\   )\())               (
((_)()\ )   ))\   )\    (     (      (       ))\    )\())   (    /(_))  )((_) ((_)\    (    (     ))\
_(())\_)() /((_) (( )   )\    )\     )\  '  /((_)  (_))/    )\   (_))_  ((_)_   _((_)   )\   )\   /((_)
\ \((_)/ /(_))   | |   ((_)  ((_)  _((_))  (_))    | |_    ((_)   |   \  | _ ) | || |  ((_) ((_) (_))
 \ \/\/ / / -_)  | |  / _|  / _ \ | ' ' |  / -_)   |  _|  / _ \   | |) | | _ \ | __ | / _ \ (_-< / -_)
  \_/\_/  \___|  |_|  \__|  \___/ |_|_|_|  \___|    \__|  \___/   |___/  |___/ |_||_| \___/ /__/ \___|

Класс DBHose предоставляет универсальный интерфейс для переноса данных между различными источниками в Apache Airflow DAGs.

Официальная документация

⚠️ Статус проекта

Проект находится в стадии бета-тестирования и может содержать ошибки. Используйте с осторожностью в production-средах.

Поддерживаемые СУБД

На данный момент работа с СУБД поддерживается только между следующими базами данных:

  • ClickHouse
  • Greenplum
  • PostgreSQL

Описание

DBHose - это инструмент для безопасного и эффективного перемещения данных между:

  • Файлами дампов
  • Python итераторами
  • DataFrame (Pandas/Polars)
  • Поддерживаемыми СУБД (ClickHouse, Greenplum, PostgreSQL)

Класс включает встроенные проверки качества данных (Data Quality) и поддерживает различные методы перемещения данных.

Инициализация

DBHose(
    table_dest: str,
    connection_dest: str,
    connection_src: str | None = None,
    dq_skip_check: list[str] = [],
    filter_by: list[str] = [],
    drop_temp_table: bool = True,
    move_method: MoveMethod = MoveMethod.replace,
    custom_move: str | None = None,
    compress_method: CompressionMethod = CompressionMethod.ZSTD,
    timeout: int = DBMS_DEFAULT_TIMEOUT_SEC,
)

Параметры

  • table_dest (str) - целевая таблица для загрузки данных
  • connection_dest (str) - подключение к целевой БД (должна быть одной из поддерживаемых: ClickHouse, Greenplum, PostgreSQL)
  • connection_src (str, optional) - подключение к исходной БД (если отличается от целевой)
  • dq_skip_check (list[str]) - список проверок качества данных для пропуска
  • filter_by (list[str]) - список колонок для фильтрации при перемещении данных
  • drop_temp_table (bool) - удалять ли временную таблицу после операции (по умолчанию True)
  • move_method (MoveMethod) - метод перемещения данных (по умолчанию MoveMethod.replace)
  • custom_move (str, optional) - пользовательский SQL запрос для перемещения данных
  • compress_method (CompressionMethod) - метод сжатия для дампов (по умолчанию CompressionMethod.ZSTD)
  • timeout (int) - таймаут операций с БД в секундах (по умолчанию DBMS_DEFAULT_TIMEOUT_SEC = 300)

Методы

Основные методы загрузки данных

from_file(fileobj: BufferedReader)

Загрузка данных из файла дампа.

Параметры:

  • fileobj (BufferedReader) - файловый объект для чтения дампа

from_iterable(dtype_data: Iterable[Any])

Загрузка данных из Python итератора.

Параметры:

  • dtype_data (Iterable[Any]) - итерируемый объект с данными

from_frame(data_frame: PDFrame | PLFrame)

Загрузка данных из DataFrame (Pandas или Polars).

Параметры:

  • data_frame (PDFrame | PLFrame) - DataFrame в формате Pandas или Polars

from_dmbs(query: str | None = None, table: str | None = None)

Загрузка данных из СУБД с использованием SQL запроса или прямой выгрузки из таблицы.

Параметры:

  • query (str, optional) - SQL запрос для выборки данных
  • table (str, optional) - имя таблицы для прямой выгрузки

Вспомогательные методы

create_temp()

Создание временной таблицы для промежуточного хранения данных.

drop_temp()

Удаление временной таблицы.

dq_check(table: str | None = None)

Запуск проверок качества данных.

Параметры:

  • table (str, optional) - имя исходной таблицы для сравнения данных

to_table()

Перемещение данных из временной таблицы в целевую.

Пример использования в DAG

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator
from dbhose_airflow import (
    DBHose,
    MoveMethod,
)

def transfer_data():
    # Перенос данных из PostgreSQL в ClickHouse
    dbhose = DBHose(
        table_dest="target_table",
        connection_dest="clickhouse_conn",
        connection_src="postgres_conn",
        move_method=MoveMethod.replace
    )
    
    dbhose.from_dmbs(table="source_table")

with DAG('data_transfer_dag', start_date=datetime(2025, 10, 27)) as dag:
    transfer_task = PythonOperator(
        task_id='transfer_data',
        python_callable=transfer_data
    )

Ограничения бета-версии

  • Поддерживаются только ClickHouse, Greenplum и PostgreSQL
  • Возможны ошибки при работе с большими объемами данных
  • API может изменяться в будущих версиях
  • Не все функции могут быть полностью протестированы
  • Документация может быть неполной

Особенности

  • Автоматическое создание временных таблиц - данные сначала загружаются во временную таблицу
  • Проверки качества данных - встроенные проверки перед финальным перемещением
  • Гибкие методы перемещения - поддержка различных стратегий обновления данных
  • Поддержка множества источников - файлы, DataFrame, СУБД
  • Логирование операций - детальное логирование всех этапов процесса

Требования

  • Apache Airflow
  • Подключения к поддерживаемым БД, настроенные в Airflow

Примечания

  • Класс использует временные таблицы для обеспечения целостности данных
  • Все операции включают проверки качества данных, которые можно кастомизировать
  • Поддерживаются различные методы сжатия для оптимизации передачи данных
  • В бета-версии рекомендуется тщательно тестировать все сценарии использования

Сообщения об ошибках

При обнаружении ошибок или неожиданного поведения, пожалуйста, сообщайте о них для улучшения стабильности проекта

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dbhose_airflow-0.1.0.7-py3-none-any.whl (33.9 kB view details)

Uploaded Python 3

File details

Details for the file dbhose_airflow-0.1.0.7-py3-none-any.whl.

File metadata

File hashes

Hashes for dbhose_airflow-0.1.0.7-py3-none-any.whl
Algorithm Hash digest
SHA256 3e671dec40a65a4d3f9f07d2e4a97e7959dcde19e464b6435c1eb731a9646e8f
MD5 a2eb88532947ba982f93437581402783
BLAKE2b-256 2e2ee63ee6b32157e29d5ce36ea7c073885efe1c4ffd25f12ee4315a353e4dc9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page