Skip to main content

Internal analytics toolkit for data pipelines

Project description

Mnemosynecore

Библиотека mnemosynecore предоставляет набор утилит для работы с секретами, Mattermost, Vertica, (Superset и Clickhouse в будущем) и другими службами.


Источники конфигурации

Функции поддерживают три способа передачи конфигурации:

  • Vault / Airflow Connection — по conn_id

  • JSON-файл — локально (test-режим)

  • JSON-строка — напрямую в параметре bot_id

Формат конфигурации Mattermost

{
  "host": "mattermost.example.com",
  "password": "BOT_TOKEN",
  "schema": "https",
  "port": 443,
  "basepath": "/api/v4"
}

Test-функции (локальная разработка)

get_mattermost_driver_test

def get_mattermost_driver_test(
    bot_id: str,
    dir_path: str | None = None,
) -> Driver
Описание

Создаёт и возвращает Mattermost Driver для локального тестирования, используя JSON-файл с конфигурацией.

Параметры
Параметр Тип Описание
bot_id str Имя JSON-файла без .json
dir_path str | None Директория поиска файла
Поиск файла

Файл ищется в:

  • dir_path (если указан)

  • текущей рабочей директории

Имя файла:

{bot_id}.json

Возвращает

mattermostdriver.Driver — авторизованный клиент

Пример

driver = get_mattermost_driver_test("MM_BOT", dir_path=".")


send_message_test

def send_message_test(
    *,
    channel_id: str,
    bot_id: str,
    text: str,
    dir_path: str | None = None,
    silent: bool = False,
) -> None
Описание

Отправляет сообщение в Mattermost в тестовом окружении.

Параметры
Параметр Тип Описание
channel_id str ID канала
bot_id str Имя JSON-файла с конфигурацией
text str Текст сообщения
dir_path str | None Директория с JSON
silent bool Не выводить лог
Пример
send_message_test(
    channel_id="abc123",
    bot_id="MM_BOT",
    text="Тестовое сообщение",
    dir_path="."
)

Prod-функции

get_mattermost_driver

def get_mattermost_driver(bot_id: str) -> Driver

Описание

Создаёт Mattermost Driver для prod-окружения.

Источники конфигурации
  • Если bot_id — JSON-строка → используется напрямую

  • Иначе → get_secret(bot_id) (Vault)

Параметры
Параметр Тип Описание
bot_id str conn_id или JSON

Возвращает mattermostdriver.Driver

Примеры

Через Vault

driver = get_mattermost_driver("MM_BOT_PROD")

Через JSON
driver = get_mattermost_driver("""
{
  "host": "mm.company.ru",
  "password": "TOKEN"
}
""")

send_message

def send_message(
    *,
    channel_id: str,
    bot_id: str,
    text: str,
    silent: bool = False,
) -> None
Описание

Отправляет сообщение в Mattermost (prod).

Параметры
Параметр Тип Описание
channel_id str ID канала
bot_id str conn_id или JSON
text str Текст сообщения
silent bool Не логировать
Поведение
  • Логирует успешную отправку
  • Пробрасывает исключения при ошибке
Пример
send_message(
    channel_id="channel123",
    bot_id="MM_BOT_PROD",
    text="Сообщение из prod"
)

Вспомогательные функции

get_mattermost_conn

def get_mattermost_conn(conn_id: str) -> dict

Описание

Извлекает конфигурацию Mattermost из Airflow Connection.

Параметры
Параметр Тип Описание
conn_id str Airflow connection ID
Возвращает
{
  "host": str,
  "password": str,
  "schema": str,
  "port": int,
  "basepath": str
}

get_connection_as_json

def get_connection_as_json(conn_name: str) -> str

Описание

Получает Airflow Connection и возвращает его в виде JSON-строки.

Параметры
Параметр Тип Описание
conn_name str Airflow connection name

Возвращает JSON (str)

Пример

cfg_json = get_connection_as_json("MM_BOT")


Работа с секретами (Vault / ENV / локальные файлы)

resolve_secret

def resolve_secret(
    conn_id: str,
    dir_path: Optional[str] = None,
) -> Dict
Назначение

Универсальный резолвер секрета:

  • Пытается загрузить локальный JSON (test-режим)
  • Если не найден — берёт секрет из Vault / окружения
Используется для:
  • одинакового кода в local / test / prod.
Параметры
  • conn_id — имя секрета

  • dir_path — директория для поиска {conn_id}.json

Возвращает
  • dict — распаршенный JSON секрета

get_connection_as_json

def get_connection_as_json(conn_id: str) -> str

Назначение

Получает секрет в виде JSON-строки:

  • из переменной окружения ENV[conn_id]

  • из Vault (через VaultClient)

Падает с ошибкой, если секрет не найден или Vault недоступен.

Возвращает
  • str — JSON-конфигурация

get_secret

def get_secret(conn_id: str) -> Dict

Назначение
  1. Обёртка над get_connection_as_json с автоматическим json.loads.

  2. Используется в prod-коде.

Возвращает
  • dict — конфигурация секрета

Test-режим (локальные JSON-файлы)

get_connection_as_json_test
def get_connection_as_json_test(
    conn_id: str,
    dir_path: str | None = None,
) -> str
Назначение

Читает тестовый секрет из файла:

{conn_id}.json

Порядок поиска
  1. dir_path (если указан)

  2. текущая рабочая директория

Если файл не найден → FileNotFoundError

Возвращает
  • str — содержимое JSON-файла

get_secret_test

def get_secret_test(conn_id: str, dir_path: str | None = None) -> Dict

Назначение

Test-версия get_secret:

  • читает локальный JSON-файл

  • парсит в dict

Используется для:
  • ноутбуков

  • локальной разработки

Возвращает
  • dict — конфигурация секрета

Airflow / SQL utilities

load_sql_tasks_from_dir

def load_sql_tasks_from_dir(dir_sql: str, vertica_conn_id: str)

Назначение

Автоматически создаёт Airflow-таски (VerticaOperator) из .sql-файлов в директории.

Как работает
  • читает все .sql файлы

  • делит SQL по ;

  • создаёт по одному таску на файл

  • привязывает к текущему DAG

Возвращает: dict[str, VerticaOperator]


read_sql_file

def read_sql_file(file_path: str)

Назначение

Читает SQL-файл в строку.

Возвращает
  • str — содержимое файла

  • None — если файл не найден


vertica_dedupe

def vertica_dedupe(
    table_name: str,
    unique_keys: Union[str, List[str]],
    conn: Optional[Connection] = None,
    conn_id: Optional[str] = None,
    date_col: Optional[str] = None,
    keep: str = "last",
    commit: bool = True
)
Назначение

Удаляет дубликаты строк в таблице Vertica.

Логика
  • группировка по unique_keys

  • оставляет одну строку

  • при date_col + keep="last" сохраняет самую свежую

Используется для: пост-загрузочной чистки данных.


vertica_upsert

def vertica_upsert(
    df: pd.DataFrame,
    table_name: str,
    unique_keys: Union[str, List[str]],
    conn,
    date_col: Optional[str] = None,
    days_back: Optional[int] = None,
    commit: bool = True
)
Назначение

Upsert данных из DataFrame в таблицу Vertica.

Как работает
  • создаёт temp-таблицу

  • заливает туда df

  • опционально чистит данные за days_back

  • выполняет MERGE

Подходит для: инкрементальных загрузок.


vertica_conn

def vertica_conn(conn_id: str) -> Connection

Назначение

Создаёт Vertica-соединение из секрета (Vault / ENV).

Возвращает
  • vertica_python.Connection

get_vertica_engine

def get_vertica_engine(conn_id: str)

Назначение

Создаёт SQLAlchemy Engine для Vertica.

Используется для
  • pandas.to_sql

  • ORM / Core запросов


vertica_sql

def vertica_sql(
    *,
    conn_id: Optional[str] = None,
    conn: Optional[Connection] = None,
    sql: str,
    params: Optional[Iterable[Any]] = None,
    many: bool = False,
    commit: bool = True
)
Назначение

Универсальный executor SQL-запросов.

Поддерживает
  • execute / executemany

  • автокоммит

  • rollback при ошибке

Используется как: базовый слой для всех операций.


vertica_select

def vertica_select(
    *,
    conn_id: Optional[str] = None,
    conn: Optional[Connection] = None,
    sql: str,
    params: Optional[Iterable[Any]] = None
) -> pd.DataFrame
Назначение

Выполняет SELECT и возвращает результат в DataFrame.

Возвращает
  • pd.DataFrame

Используется для: аналитических и сервисных запросов.

Общий паттерн использования
df = vertica_select(
    conn_id="VERTICA_CONN",
    sql="SELECT * FROM my_table"
)
vertica_sql(
    conn_id="VERTICA_CONN",
    sql="DELETE FROM my_table WHERE dt < CURRENT_DATE - 30"
)

un_conn

Использование Vertica
engine = un_conn("conn_id", "vertica")
result = engine.execute("SELECT COUNT(*) FROM my_table")
Использование Mattermost
driver = un_conn("CBA_send_review", "mattermost")
driver.posts.create_post({
    "channel_id": "s5c11srqkf8j3pbdwfbn9imrde",
    "message": "Привет, мир!"
})

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mnemosynecore-1.0.2.tar.gz (13.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mnemosynecore-1.0.2-py3-none-any.whl (12.8 kB view details)

Uploaded Python 3

File details

Details for the file mnemosynecore-1.0.2.tar.gz.

File metadata

  • Download URL: mnemosynecore-1.0.2.tar.gz
  • Upload date:
  • Size: 13.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for mnemosynecore-1.0.2.tar.gz
Algorithm Hash digest
SHA256 e8a43d900f5c12484a0d8248edfd6edab99a84b3301ac08ee9c1532099130f77
MD5 50b6d1da9f5f4da6f4048799e930b684
BLAKE2b-256 2a6514e479ac4c6b3a6fe8d188121efd7fde0fc20e57645fbb268bfcd5b75b1a

See more details on using hashes here.

Provenance

The following attestation bundles were made for mnemosynecore-1.0.2.tar.gz:

Publisher: publish.yml on ASAFGracia/mnemosynecore

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file mnemosynecore-1.0.2-py3-none-any.whl.

File metadata

  • Download URL: mnemosynecore-1.0.2-py3-none-any.whl
  • Upload date:
  • Size: 12.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for mnemosynecore-1.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 49e047284a7d1b87f38aef6c238369ddd09cb63ed60af8222b8693ea15970aeb
MD5 598a32c01f31f474b8406a5d621ba577
BLAKE2b-256 c1be0a94f3469dc36c702de2240381b09eb4ba7405077e8a67fd7ec9571d065f

See more details on using hashes here.

Provenance

The following attestation bundles were made for mnemosynecore-1.0.2-py3-none-any.whl:

Publisher: publish.yml on ASAFGracia/mnemosynecore

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page