Skip to main content

airflow class for exchanging data between DBMSs in native binary formats.

Project description

DBHose

Клиент для обмена данными между СУБД в нативных бинарных форматах

                                                                 (                )
 (  (                                                 )          )\ )     (    ( /(
 )\))(   '   (    (                   )       (     ( /(        (()/(   ( )\   )\())               (
((_)()\ )   ))\   )\    (     (      (       ))\    )\())   (    /(_))  )((_) ((_)\    (    (     ))\
_(())\_)() /((_) (( )   )\    )\     )\  '  /((_)  (_))/    )\   (_))_  ((_)_   _((_)   )\   )\   /((_)
\ \((_)/ /(_))   | |   ((_)  ((_)  _((_))  (_))    | |_    ((_)   |   \  | _ ) | || |  ((_) ((_) (_))
 \ \/\/ / / -_)  | |  / _|  / _ \ | '  \() / -_)   |  _|  / _ \   | |) | | _ \ | __ | / _ \ (_-< / -_)
  \_/\_/  \___|  |_|  \__|  \___/ |_|_|_|  \___|    \__|  \___/   |___/  |___/ |_||_| \___/ /__/ \___|

Аннотация

DBHose предназначен для обмена данными между различными СУБД с использованием их нативных бинарных форматов. Решение позволяет избежать накладных расходов, связанных с промежуточными преобразованиями, обеспечивая производительность при переносе больших объемов данных. На текущем этапе реализована поддержка PostgreSQL, Greenplum и ClickHouse, так же предусмотрена возможность расширения за счет подключения новых модулей.

Архитектура и Ключевые Компоненты

Платформа построена по модульному принципу и состоит из нескольких взаимосвязанных библиотек, каждая из которых решает свою задачу.

Низкоуровневые библиотеки для работы с бинарными форматами

pgcopylib (PostgreSQL/Greenplum)

https://github.com/0xMihalich/pgcopylib

Назначение

  • Парсинг и генерация бинарного формата COPY для PostgreSQL и совместимых СУБД.

Реализация

  • Низкоуровневые функции на Cython для прямого доступа к структуре данных при сериализации и десериализации.

pgpack (PostgreSQL/Greenplum) контейнер для хранения дампов pgcopy

https://github.com/0xMihalich/pgpack

Назначение

  • Специализированный контейнер для локального хранения бинарных дампов.

Структура

  • Заголовок b"PGPACK\n\x00" 8 байт
  • Контрольная сумма метадаты zlib.crc32 4 байта
  • Размер упакованного в zlib блока метадаты 4 байта
  • Упакованный в zlib блок метадаты
  • Метод компрессии 1 байт
  • Размер упакованного блока pgcopy 8 байт
  • Размер распакованного блока pgcopy 8 байт
  • Упакованный блок pgcopy

Интерфейс

  • PGPackReader для извлечения данных в виде списков Python объектов или pandas/polars DataFrame.
  • PGPackWriter для записи данных из списков Python объектов или pandas/polars DataFrame.

nativelib (ClickHouse)

https://github.com/0xMihalich/nativelib

Назначение

  • Работа с нативным бинарным форматом ClickHouse.

Реализация

  • Низкоуровневые функции на Cython. Ключевое отличие от принципа работы clickhouse-driver — обход стандартного требования ClickHouse к использованию CityHash128 для каждого блока данных за счет использования HTTP-протокола, который позволяет применять сжатие ко всему потоку, что упрощает процесс и повышает скорость.

light-compressor модуль для работы с компрессией в реальном времени (zstd, lz4)

https://github.com/0xMihalich/light_compressor

Назначение

  • Оптимизированные компрессоры и декомпрессоры.

Реализация

  • Переработанные низкоуровневые модули LZ4 и Zstandard с использованием Cython поверх CFFI и PyO3. Оптимизировано для достижения максимально возможной скорости при взаимодействии с python для упаковки и распаковки.

Драйверы для взаимодействия с СУБД

pgpack_dumper (PostgreSQL/Greenplum)

https://github.com/0xMihalich/pgpack_dumper

Назначение

  • Прием и отправка данных из таблиц, представлений, SQL запросов (поддерживается Multiquery), локальных файлов или представлений.

Реализация

  • Использует psycopg с активацией бинарного режима COPY.

Принцип работы

Сохранение в файл формата PGPack

  • получение метаданных -> получение pgcopy stream -> создание pgpack
┌─────────────────────────────────────────────────┐
│              СОХРАНЕНИЕ В PGPACK                │
└─────────────────────────────────────────────────┘

┌─────────────┐    1. Метаданные    ┌─────────────┐
│             │ ──────────────────► │             │
│ PostgreSQL  │                     │  PGPack     │
│             │    2. PGCopy Stream │  Container  │
│   Table     │ ──────────────────► │             │
│             │                     │• Schema     │
│             │    3. Создание      │• Data       │
│             │       контейнера    │• Compression│
└─────────────┘                     └─────────────┘
      │                                  │
      │ psycopg binary COPY              │ pgpack
      ▼                                  ▼
[Table Schema] → [Binary Stream] → [Zlib Schema] → [Compressed Data]

Запись из формата PGPack

  • открытие PGPackReader -> получение pgcopy stream -> передача в целевую таблицу
┌─────────────────────────────────────────────────┐
│                ЗАПИСЬ ИЗ PGPACK                 │
└─────────────────────────────────────────────────┘

┌─────────────┐    1. Открытие      ┌─────────────┐
│             │ ◄────────────────── │             │
│ PostgreSQL  │                     │  PGPack     │
│             │    2. PGCopy Stream │  Container  │
│   Table     │ ◄────────────────── │             │
│             │                     │• Schema     │
│             │    3. Передача      │• Data       │
│             │       в таблицу     │• Compression│
└─────────────┘                     └─────────────┘
      │                                  │
      │ psycopg binary COPY              │ pgpack
      ▼                                  ▼
[INSERT] ← [Binary Stream] ← [Zlib Decompress] ← [LZ4 Decompress]

Прямой обмен (PG/GP <-> PG/GP)

  • Данные передаются без конвертации в режиме stream
┌─────────────────────────────────────────────────┐
│           ПРЯМОЙ ОБМЕН PG/GP ↔ PG/GP            │
└─────────────────────────────────────────────────┘

┌─────────────┐                    ┌─────────────┐
│ Source      │                    │ Target      │
│ PostgreSQL  │ ─────────────────► │ PostgreSQL  │
│             │    PGCopy Stream   │             │
│   Table A   │   (без конвертации)│   Table B   │
│             │ ◄───────────────── │             │
└─────────────┘                    └─────────────┘
      │                                   │
      │                                   │
      ▼                                   ▼
    [Binary COPY] → [Network] → [Binary COPY]

Кросс-платформенная отправка (PG/GP -> ClickHouse)

  • получение метаданных -> получение pgcopy stream -> преобразование в native stream -> передача в целевую таблицу
┌─────────────────────────────────────────────────┐
│         PG/GP → ClickHouse (ОТПРАВКА)           │
└─────────────────────────────────────────────────┘

┌─────────────┐    1. Метаданные    ┌─────────────┐
│ PostgreSQL  │ ────┐    ┌───────── │ ClickHouse  │
│             │     ▼    ▼          │             │
│   Table     │    2. PGCopy Stream │   Table     │
│             │ ────┐               │             │
└─────────────┘     │               └─────────────┘
      │             │                      ▲
      │ pgcopylib   │ 4. Native Stream     │ native_dumper
      ▼             │                      │
[Binary COPY] → [Конвертер] → → [Native Format]
                    │
                3. Преобразование
                 PGCopy → Native
                    │
               nativelib + light_compressor

Кросс-платформенный прием (ClickHouse -> PG/GP)

  • получение метаданных -> получение native stream -> преобразование в pgcopy stream -> передача в целевую таблицу
┌─────────────────────────────────────────────────┐
│         ClickHouse → PG/GP (ПРИЕМ)              │
└─────────────────────────────────────────────────┘

┌─────────────┐    1. Метаданные    ┌─────────────┐
│ ClickHouse  │ ────┐    ┌───────── │ PostgreSQL  │
│             │     ▼    ▼          │             │
│   Table     │    2. Native Stream │   Table     │
│             │ ────┐               │             │
└─────────────┘     │               └─────────────┘
      │             │                     ▲
      │ nativelib   │ 4. PGCopy Stream    │ pgpack_dumper
      ▼             │                     │
[Native Format] → [Конвертер] → → [Binary COPY]
                    │
                3. Преобразование
                 Native → PGCopy
                    │
               pgcopylib + light_compressor

native_dumper (ClickHouse)

https://github.com/0xMihalich/native_dumper

Назначение

  • Прием и отправка данных из таблиц, представлений, SQL запросов (поддерживается Multiquery), локальных файлов или представлений.

Реализация

  • Кастомный HTTP-клиент, написанный на Rust, для работы с ClickHouse в формате Native. Поддерживаемые типы сжатия LZ4, ZSTD или NONE (без компрессии).

Принцип работы

Сохранение в файл формата Native

  • получение stream -> сохранение в файл
┌─────────────────────────────────────────────────┐
│           СОХРАНЕНИЕ В NATIVE ФАЙЛ              │
└─────────────────────────────────────────────────┘

┌─────────────┐                    ┌─────────────┐
│ ClickHouse  │                    │ Native File │
│             │                    │             │
│   Table     │ ─────────────────► │   Format    │
│             │   Native Stream    │             │
│             │                    │• Blocks     │
│             │                    │• Headers    │
│             │                    │• Compression│
└─────────────┘                    └─────────────┘
      │                                   │
      │ native_dumper                     │ File.write()
      ▼                                   ▼
     [Native Blocks] → → → → → → → → → [File]

Запись из формата Native

  • определение кодека сжатия -> принятие решения менять компрессор или передавать как есть -> передача в целевую таблицу
┌─────────────────────────────────────────────────┐
│           ЗАПИСЬ ИЗ NATIVE ФАЙЛА                │
└─────────────────────────────────────────────────┘

┌─────────────┐                    ┌─────────────┐
│ ClickHouse  │                    │ Native File │
│             │                    │             │
│   Table     │ ◄───────────────── │   Format    │
│             │   Native Stream    │             │
│             │                    │• Blocks     │
│             │                    │• Headers    │
│             │                    │• Compression│
└─────────────┘                    └─────────────┘
      │                                    │
      │ native_dumper                      │ File.read()
      ▼                                    ▼
[INSERT] ← [Native Blocks] ← [Decompression] ← [File Read]
              │
        Определение кодека сжатия
        → Решение: менять компрессор или оставить как есть

Прямой обмен (ClickHouse <-> ClickHouse)

  • получение stream -> принятие решения менять компрессор или передавать как есть -> передача в целевую таблицу
┌─────────────────────────────────────────────────┐
│        ПРЯМОЙ ОБМЕН CH ↔ ClickHouse             │
└─────────────────────────────────────────────────┘

┌─────────────┐                    ┌─────────────┐
│ Source      │                    │ Target      │
│ ClickHouse  │ ─────────────────► │ ClickHouse  │
│             │    Native Stream   │             │
│   Table A   │    (минимальная    │   Table B   │
│             │     конвертация)   │             │
│             │ ◄───────────────── │             │
└─────────────┘                    └─────────────┘
      │                                   │
      │ native_dumper                     │ native_dumper
      ▼                                   ▼
[Native Format] → [Compression Decision] → [Native Format]
                     │
              Анализ сжатия:
              • Оставить исходный компрессор
              • Или изменить на оптимальный

Кросс-платформенная отправка (ClickHouse -> PG/GP)

  • получение метаданных PG/GP -> получение native stream -> преобразование в pgcopy stream -> передача в целевую таблицу
┌─────────────────────────────────────────────────┐
│         ClickHouse → PG/GP (ОТПРАВКА)           │
└─────────────────────────────────────────────────┘

┌─────────────┐   1. Метаданные    ┌─────────────┐
│ ClickHouse  │     ┌───────────── │ PostgreSQL  │
│             │     ▼              │             │
│   Table     │   2. Native Stream │   Table     │
│             │ ────┐              │             │
└─────────────┘     │              └─────────────┘
      │             │                    ▲
      │ nativelib   │ 4. PGCopy Stream   │ pgpack_dumper
      ▼             │                    │
[Native Format] → [Конвертер] → → [Binary COPY]
                    │
                3. Преобразование
                 Native → light_compressor → PGCopy
                    │
              pgcopylib

Кросс-платформенный прием (PG/GP -> ClickHouse)

  • получение метаданных Clickhouse -> получение pgcopy stream -> преобразование в native stream -> передача в целевую таблицу
┌─────────────────────────────────────────────────┐
│         PG/GP → ClickHouse (ПРИЕМ)              │
└─────────────────────────────────────────────────┘

┌─────────────┐    1. Метаданные    ┌─────────────┐
│ PostgreSQL  │     ┌────────────── │ ClickHouse  │
│             │     ▼               │             │
│   Table     │    2. PGCopy Stream │   Table     │
│             │ ────┐               │             │
└─────────────┘     │               └─────────────┘
      │             │                      ▲
      │ pgcopylib   │ 4. Native Stream     │ native_dumper
      ▼             │                      │
[Binary COPY] → [Конвертер] → → [Native Format]
                    │
                3. Преобразование
                 PGCopy → Native
                    │
               nativelib + light_compressor

DBHouse Utils

https://github.com/0xMihalich/dbhose_utils

Назначение

  • Набор инструментов для конвертации между форматами Native, PGPack и PGCopy

Реализация

  • Функция dump_detective для автоматического определения формата входного файла дампа и выбора соответствующего ридера
  • Функция dump_convertor для конвертации дампа в другой формат либо смены кодека сжатия / распаковки в формат без компрессии

Ключевые особенности

  • Производительность: Использование нативных форматов и низкоуровневых оптимизаций на Cython/Rust минимизирует накладные расходы.
  • Сжатие: Интеграция с алгоритмами LZ4 и ZSTD на уровне контейнера (pgpack) и сетевого протокола.
  • Модульность: Архитектура позволяет добавлять поддержку других СУБД путем добавления новых библиотек.
  • Использование: Каждая библиотека может быть использована как отдельный модуль, так же в разработке общий CLI-интерфейс.

Дальнейшее развитие проекта

Ближайшее время

CLI с рабочим названием DBHose, который предоставит единую точку входа для управления всеми модулями и операциями по переносу данных.

Среднесрочная перспектива

  • Багфикс того, что будет найдено в процессе тестирования
  • Добавление поддержки других типов данных Clickhouse (nativelib)
  • Добавление конвертации в формат parquiet

Долгосрочная перспектива

  • Добавить модуль для работы с SQL Server (MS SQL)
  • Добавить модуль для работы с MySQL
  • Добавить модуль для работы с Oracle

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dbhose_airflow-0.0.0.1-py3-none-any.whl (16.0 kB view details)

Uploaded Python 3

File details

Details for the file dbhose_airflow-0.0.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for dbhose_airflow-0.0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 8558ec6e0bf1731457e3db689552a85e22826eab966837f01ce695a286143d3d
MD5 ff588e77b042947712b95736dad2f7f8
BLAKE2b-256 28b622b91ca1570bbb7778b88a6c76908b1875b6610e784fff01ba14bbb0c2af

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page