Skip to main content

Internal analytics toolkit for data pipelines

Project description

Mnemosynecore

Библиотека mnemosynecore предоставляет набор утилит для работы с секретами, Mattermost, Vertica, (Superset и Clickhouse в будущем) и другими службами.


Источники конфигурации

Функции поддерживают три способа передачи конфигурации:

  • Vault / Airflow Connection — по conn_id

  • JSON-файл — локально (test-режим)

  • JSON-строка — напрямую в параметре bot_id

Формат конфигурации Mattermost

{
  "host": "mattermost.example.com",
  "password": "BOT_TOKEN",
  "schema": "https",
  "port": 443,
  "basepath": "/api/v4"
}

Test-функции (локальная разработка)

get_mattermost_driver_test

def get_mattermost_driver_test(
    bot_id: str,
    dir_path: str | None = None,
) -> Driver
Описание

Создаёт и возвращает Mattermost Driver для локального тестирования, используя JSON-файл с конфигурацией.

Параметры
Параметр Тип Описание
bot_id str Имя JSON-файла без .json
dir_path str | None Директория поиска файла
Поиск файла

Файл ищется в:

  • dir_path (если указан)

  • текущей рабочей директории

Имя файла:

{bot_id}.json

Возвращает

mattermostdriver.Driver — авторизованный клиент

Пример

driver = get_mattermost_driver_test("MM_BOT", dir_path=".")


send_message_test

def send_message_test(
    *,
    channel_id: str,
    bot_id: str,
    text: str,
    dir_path: str | None = None,
    silent: bool = False,
) -> None
Описание

Отправляет сообщение в Mattermost в тестовом окружении.

Параметры
Параметр Тип Описание
channel_id str ID канала
bot_id str Имя JSON-файла с конфигурацией
text str Текст сообщения
dir_path str | None Директория с JSON
silent bool Не выводить лог
Пример
send_message_test(
    channel_id="abc123",
    bot_id="MM_BOT",
    text="Тестовое сообщение",
    dir_path="."
)

Prod-функции

get_mattermost_driver

def get_mattermost_driver(bot_id: str) -> Driver

Описание

Создаёт Mattermost Driver для prod-окружения.

Источники конфигурации
  • Если bot_id — JSON-строка → используется напрямую

  • Иначе → get_secret(bot_id) (Vault)

Параметры
Параметр Тип Описание
bot_id str conn_id или JSON

Возвращает mattermostdriver.Driver

Примеры

Через Vault

driver = get_mattermost_driver("MM_BOT_PROD")

Через JSON
driver = get_mattermost_driver("""
{
  "host": "mm.company.ru",
  "password": "TOKEN"
}
""")

send_message

def send_message(
    *,
    channel_id: str,
    bot_id: str,
    text: str,
    silent: bool = False,
) -> None
Описание

Отправляет сообщение в Mattermost (prod).

Параметры
Параметр Тип Описание
channel_id str ID канала
bot_id str conn_id или JSON
text str Текст сообщения
silent bool Не логировать
Поведение
  • Логирует успешную отправку
  • Пробрасывает исключения при ошибке
Пример
send_message(
    channel_id="channel123",
    bot_id="MM_BOT_PROD",
    text="Сообщение из prod"
)

Вспомогательные функции

get_mattermost_conn

def get_mattermost_conn(conn_id: str) -> dict

Описание

Извлекает конфигурацию Mattermost из Airflow Connection.

Параметры
Параметр Тип Описание
conn_id str Airflow connection ID
Возвращает
{
  "host": str,
  "password": str,
  "schema": str,
  "port": int,
  "basepath": str
}

get_connection_as_json

def get_connection_as_json(conn_name: str) -> str

Описание

Получает Airflow Connection и возвращает его в виде JSON-строки.

Параметры
Параметр Тип Описание
conn_name str Airflow connection name

Возвращает JSON (str)

Пример

cfg_json = get_connection_as_json("MM_BOT")


Работа с секретами (Vault / ENV / локальные файлы)

resolve_secret

def resolve_secret(
    conn_id: str,
    dir_path: Optional[str] = None,
) -> Dict
Назначение

Универсальный резолвер секрета:

  • Пытается загрузить локальный JSON (test-режим)
  • Если не найден — берёт секрет из Vault / окружения
Используется для:
  • одинакового кода в local / test / prod.
Параметры
  • conn_id — имя секрета

  • dir_path — директория для поиска {conn_id}.json

Возвращает
  • dict — распаршенный JSON секрета

get_connection_as_json

def get_connection_as_json(conn_id: str) -> str

Назначение

Получает секрет в виде JSON-строки:

  • из переменной окружения ENV[conn_id]

  • из Vault (через VaultClient)

Падает с ошибкой, если секрет не найден или Vault недоступен.

Возвращает
  • str — JSON-конфигурация

get_secret

def get_secret(conn_id: str) -> Dict

Назначение
  1. Обёртка над get_connection_as_json с автоматическим json.loads.

  2. Используется в prod-коде.

Возвращает
  • dict — конфигурация секрета

Test-режим (локальные JSON-файлы)

get_connection_as_json_test
def get_connection_as_json_test(
    conn_id: str,
    dir_path: str | None = None,
) -> str
Назначение

Читает тестовый секрет из файла:

{conn_id}.json

Порядок поиска
  1. dir_path (если указан)

  2. текущая рабочая директория

Если файл не найден → FileNotFoundError

Возвращает
  • str — содержимое JSON-файла

get_secret_test

def get_secret_test(conn_id: str, dir_path: str | None = None) -> Dict

Назначение

Test-версия get_secret:

  • читает локальный JSON-файл

  • парсит в dict

Используется для:
  • ноутбуков

  • локальной разработки

Возвращает
  • dict — конфигурация секрета

Airflow / SQL utilities

load_sql_tasks_from_dir

def load_sql_tasks_from_dir(dir_sql: str, vertica_conn_id: str)

Назначение

Автоматически создаёт Airflow-таски (VerticaOperator) из .sql-файлов в директории.

Как работает
  • читает все .sql файлы

  • делит SQL по ;

  • создаёт по одному таску на файл

  • привязывает к текущему DAG

Возвращает: dict[str, VerticaOperator]


read_sql_file

def read_sql_file(file_path: str)

Назначение

Читает SQL-файл в строку.

Возвращает
  • str — содержимое файла

  • None — если файл не найден


vertica_dedupe

def vertica_dedupe(
    table_name: str,
    unique_keys: Union[str, List[str]],
    conn: Optional[Connection] = None,
    conn_id: Optional[str] = None,
    date_col: Optional[str] = None,
    keep: str = "last",
    commit: bool = True
)
Назначение

Удаляет дубликаты строк в таблице Vertica.

Логика
  • группировка по unique_keys

  • оставляет одну строку

  • при date_col + keep="last" сохраняет самую свежую

Используется для: пост-загрузочной чистки данных.


vertica_upsert

def vertica_upsert(
    df: pd.DataFrame,
    table_name: str,
    unique_keys: Union[str, List[str]],
    conn,
    date_col: Optional[str] = None,
    days_back: Optional[int] = None,
    commit: bool = True
)
Назначение

Upsert данных из DataFrame в таблицу Vertica.

Как работает
  • создаёт temp-таблицу

  • заливает туда df

  • опционально чистит данные за days_back

  • выполняет MERGE

Подходит для: инкрементальных загрузок.


vertica_conn

def vertica_conn(conn_id: str) -> Connection

Назначение

Создаёт Vertica-соединение из секрета (Vault / ENV).

Возвращает
  • vertica_python.Connection

get_vertica_engine

def get_vertica_engine(conn_id: str)

Назначение

Создаёт SQLAlchemy Engine для Vertica.

Используется для
  • pandas.to_sql

  • ORM / Core запросов


vertica_sql

def vertica_sql(
    *,
    conn_id: Optional[str] = None,
    conn: Optional[Connection] = None,
    sql: str,
    params: Optional[Iterable[Any]] = None,
    many: bool = False,
    commit: bool = True
)
Назначение

Универсальный executor SQL-запросов.

Поддерживает
  • execute / executemany

  • автокоммит

  • rollback при ошибке

Используется как: базовый слой для всех операций.


vertica_select

def vertica_select(
    *,
    conn_id: Optional[str] = None,
    conn: Optional[Connection] = None,
    sql: str,
    params: Optional[Iterable[Any]] = None
) -> pd.DataFrame
Назначение

Выполняет SELECT и возвращает результат в DataFrame.

Возвращает
  • pd.DataFrame

Используется для: аналитических и сервисных запросов.

Общий паттерн использования
df = vertica_select(
    conn_id="VERTICA_CONN",
    sql="SELECT * FROM my_table"
)
vertica_sql(
    conn_id="VERTICA_CONN",
    sql="DELETE FROM my_table WHERE dt < CURRENT_DATE - 30"
)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mnemosynecore-0.1.12.tar.gz (12.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mnemosynecore-0.1.12-py3-none-any.whl (11.9 kB view details)

Uploaded Python 3

File details

Details for the file mnemosynecore-0.1.12.tar.gz.

File metadata

  • Download URL: mnemosynecore-0.1.12.tar.gz
  • Upload date:
  • Size: 12.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for mnemosynecore-0.1.12.tar.gz
Algorithm Hash digest
SHA256 5a47b5007653df8a1043ccbfcc3e937ec86ce5b7c488548b92703b6d35136e50
MD5 93125308c3148b74d46d470a80be9e9b
BLAKE2b-256 e411182120279d6103ff69bec2e8001b2c688104dd3488ebcbd1ce4bbf29f792

See more details on using hashes here.

Provenance

The following attestation bundles were made for mnemosynecore-0.1.12.tar.gz:

Publisher: publish.yml on ASAFGracia/mnemosynecore

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file mnemosynecore-0.1.12-py3-none-any.whl.

File metadata

  • Download URL: mnemosynecore-0.1.12-py3-none-any.whl
  • Upload date:
  • Size: 11.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for mnemosynecore-0.1.12-py3-none-any.whl
Algorithm Hash digest
SHA256 7b5812f911b9fec7ff53c9dd8a448e70e0dbf61889cfc544720aa09555b94e34
MD5 2026cf27e0cfecbe06128e92a60ef031
BLAKE2b-256 d8aedc6e4e523588b8fdc2db35ffec4195a06e580930d399a89b04d3a44f1a8f

See more details on using hashes here.

Provenance

The following attestation bundles were made for mnemosynecore-0.1.12-py3-none-any.whl:

Publisher: publish.yml on ASAFGracia/mnemosynecore

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page