Skip to main content

Internal analytics toolkit for data pipelines

Project description

Mnemosynecore

Библиотека mnemosynecore предоставляет набор утилит для работы с секретами, Mattermost, Vertica, (Superset и Clickhouse в будущем) и другими службами.


Источники конфигурации

Функции поддерживают три способа передачи конфигурации:

  • Vault / Airflow Connection — по conn_id

  • JSON-файл — локально (test-режим)

  • JSON-строка — напрямую в параметре bot_id

Формат конфигурации Mattermost

{
  "host": "mattermost.example.com",
  "password": "BOT_TOKEN",
  "schema": "https",
  "port": 443,
  "basepath": "/api/v4"
}

Test-функции (локальная разработка)

get_mattermost_driver_test

def get_mattermost_driver_test(
    bot_id: str,
    dir_path: str | None = None,
) -> Driver
Описание

Создаёт и возвращает Mattermost Driver для локального тестирования, используя JSON-файл с конфигурацией.

Параметры
Параметр Тип Описание
bot_id str Имя JSON-файла без .json
dir_path str | None Директория поиска файла
Поиск файла

Файл ищется в:

  • dir_path (если указан)

  • текущей рабочей директории

Имя файла:

{bot_id}.json

Возвращает

mattermostdriver.Driver — авторизованный клиент

Пример

driver = get_mattermost_driver_test("MM_BOT", dir_path=".")


send_message_test

def send_message_test(
    *,
    channel_id: str,
    bot_id: str,
    text: str,
    dir_path: str | None = None,
    silent: bool = False,
) -> None
Описание

Отправляет сообщение в Mattermost в тестовом окружении.

Параметры
Параметр Тип Описание
channel_id str ID канала
bot_id str Имя JSON-файла с конфигурацией
text str Текст сообщения
dir_path str | None Директория с JSON
silent bool Не выводить лог
Пример
send_message_test(
    channel_id="abc123",
    bot_id="MM_BOT",
    text="Тестовое сообщение",
    dir_path="."
)

Prod-функции

get_mattermost_driver

def get_mattermost_driver(bot_id: str) -> Driver

Описание

Создаёт Mattermost Driver для prod-окружения.

Источники конфигурации
  • Если bot_id — JSON-строка → используется напрямую

  • Иначе → get_secret(bot_id) (Vault)

Параметры
Параметр Тип Описание
bot_id str conn_id или JSON

Возвращает mattermostdriver.Driver

Примеры

Через Vault

driver = get_mattermost_driver("MM_BOT_PROD")

Через JSON
driver = get_mattermost_driver("""
{
  "host": "mm.company.ru",
  "password": "TOKEN"
}
""")

send_message

def send_message(
    *,
    channel_id: str,
    bot_id: str,
    text: str,
    silent: bool = False,
) -> None
Описание

Отправляет сообщение в Mattermost (prod).

Параметры
Параметр Тип Описание
channel_id str ID канала
bot_id str conn_id или JSON
text str Текст сообщения
silent bool Не логировать
Поведение
  • Логирует успешную отправку
  • Пробрасывает исключения при ошибке
Пример
send_message(
    channel_id="channel123",
    bot_id="MM_BOT_PROD",
    text="Сообщение из prod"
)

Вспомогательные функции

get_mattermost_conn

def get_mattermost_conn(conn_id: str) -> dict

Описание

Извлекает конфигурацию Mattermost из Airflow Connection.

Параметры
Параметр Тип Описание
conn_id str Airflow connection ID
Возвращает
{
  "host": str,
  "password": str,
  "schema": str,
  "port": int,
  "basepath": str
}

get_connection_as_json

def get_connection_as_json(conn_name: str) -> str

Описание

Получает Airflow Connection и возвращает его в виде JSON-строки.

Параметры
Параметр Тип Описание
conn_name str Airflow connection name

Возвращает JSON (str)

Пример

cfg_json = get_connection_as_json("MM_BOT")


Работа с секретами (Vault / ENV / локальные файлы)

resolve_secret

def resolve_secret(
    conn_id: str,
    dir_path: Optional[str] = None,
) -> Dict
Назначение

Универсальный резолвер секрета:

  • Пытается загрузить локальный JSON (test-режим)
  • Если не найден — берёт секрет из Vault / окружения
Используется для:
  • одинакового кода в local / test / prod.
Параметры
  • conn_id — имя секрета

  • dir_path — директория для поиска {conn_id}.json

Возвращает
  • dict — распаршенный JSON секрета

get_connection_as_json

def get_connection_as_json(conn_id: str) -> str

Назначение

Получает секрет в виде JSON-строки:

  • из переменной окружения ENV[conn_id]

  • из Vault (через VaultClient)

Падает с ошибкой, если секрет не найден или Vault недоступен.

Возвращает
  • str — JSON-конфигурация

get_secret

def get_secret(conn_id: str) -> Dict

Назначение
  1. Обёртка над get_connection_as_json с автоматическим json.loads.

  2. Используется в prod-коде.

Возвращает
  • dict — конфигурация секрета

Test-режим (локальные JSON-файлы)

get_connection_as_json_test
def get_connection_as_json_test(
    conn_id: str,
    dir_path: str | None = None,
) -> str
Назначение

Читает тестовый секрет из файла:

{conn_id}.json

Порядок поиска
  1. dir_path (если указан)

  2. текущая рабочая директория

Если файл не найден → FileNotFoundError

Возвращает
  • str — содержимое JSON-файла

get_secret_test

def get_secret_test(conn_id: str, dir_path: str | None = None) -> Dict

Назначение

Test-версия get_secret:

  • читает локальный JSON-файл

  • парсит в dict

Используется для:
  • ноутбуков

  • локальной разработки

Возвращает
  • dict — конфигурация секрета

Airflow / SQL utilities

load_sql_tasks_from_dir

def load_sql_tasks_from_dir(dir_sql: str, vertica_conn_id: str)

Назначение

Автоматически создаёт Airflow-таски (VerticaOperator) из .sql-файлов в директории.

Как работает
  • читает все .sql файлы

  • делит SQL по ;

  • создаёт по одному таску на файл

  • привязывает к текущему DAG

Возвращает: dict[str, VerticaOperator]


read_sql_file

def read_sql_file(file_path: str)

Назначение

Читает SQL-файл в строку.

Возвращает
  • str — содержимое файла

  • None — если файл не найден


vertica_dedupe

def vertica_dedupe(
    table_name: str,
    unique_keys: Union[str, List[str]],
    conn: Optional[Connection] = None,
    conn_id: Optional[str] = None,
    date_col: Optional[str] = None,
    keep: str = "last",
    commit: bool = True
)
Назначение

Удаляет дубликаты строк в таблице Vertica.

Логика
  • группировка по unique_keys

  • оставляет одну строку

  • при date_col + keep="last" сохраняет самую свежую

Используется для: пост-загрузочной чистки данных.


vertica_upsert

def vertica_upsert(
    df: pd.DataFrame,
    table_name: str,
    unique_keys: Union[str, List[str]],
    conn,
    date_col: Optional[str] = None,
    days_back: Optional[int] = None,
    commit: bool = True
)
Назначение

Upsert данных из DataFrame в таблицу Vertica.

Как работает
  • создаёт temp-таблицу

  • заливает туда df

  • опционально чистит данные за days_back

  • выполняет MERGE

Подходит для: инкрементальных загрузок.


vertica_conn

def vertica_conn(conn_id: str) -> Connection

Назначение

Создаёт Vertica-соединение из секрета (Vault / ENV).

Возвращает
  • vertica_python.Connection

get_vertica_engine

def get_vertica_engine(conn_id: str)

Назначение

Создаёт SQLAlchemy Engine для Vertica.

Используется для
  • pandas.to_sql

  • ORM / Core запросов


vertica_sql

def vertica_sql(
    *,
    conn_id: Optional[str] = None,
    conn: Optional[Connection] = None,
    sql: str,
    params: Optional[Iterable[Any]] = None,
    many: bool = False,
    commit: bool = True
)
Назначение

Универсальный executor SQL-запросов.

Поддерживает
  • execute / executemany

  • автокоммит

  • rollback при ошибке

Используется как: базовый слой для всех операций.


vertica_select

def vertica_select(
    *,
    conn_id: Optional[str] = None,
    conn: Optional[Connection] = None,
    sql: str,
    params: Optional[Iterable[Any]] = None
) -> pd.DataFrame
Назначение

Выполняет SELECT и возвращает результат в DataFrame.

Возвращает
  • pd.DataFrame

Используется для: аналитических и сервисных запросов.

Общий паттерн использования
df = vertica_select(
    conn_id="VERTICA_CONN",
    sql="SELECT * FROM my_table"
)
vertica_sql(
    conn_id="VERTICA_CONN",
    sql="DELETE FROM my_table WHERE dt < CURRENT_DATE - 30"
)

un_conn

Использование Vertica
engine = un_conn("conn_id", "vertica")
result = engine.execute("SELECT COUNT(*) FROM my_table")
Использование Mattermost
driver = un_conn("CBA_send_review", "mattermost")
driver.posts.create_post({
    "channel_id": "s5c11srqkf8j3pbdwfbn9imrde",
    "message": "Привет, мир!"
})

Superset: подключение и скриншоты

get_superset_client

def get_superset_client(*, superset_conn: str | dict, dir_path: str | None = None, security_cookie_auth_token: str | None = None)

Создаёт клиент Superset из:

  • conn_id (через Vault/ENV/Airflow),
  • JSON-строки,
  • dict конфигурации.

Поддерживает внутреннюю superset_o3_api_lib (рекомендуемый путь), а также fallback на старый клиент.

Пример:

from mnemosynecore import get_superset_client

client = get_superset_client(superset_conn="SUPSERSET_CONN_ID")

Получить PNG дашборда

from mnemosynecore import superset_dashboard_thumbnail

png_bytes = superset_dashboard_thumbnail(
    superset_conn="SUPSERSET_CONN_ID",
    dashboard_name="Canary dashboard",
    dashboard_url="https://ss.o3.ru/superset/dashboard/p/1lDznvX2zB7/",
    refresh=True,
    refresh_wait_sec=60,
    thumb_size="2048,1536",
    window_size="2048,1536",
)

Получить PNG чарта

from mnemosynecore import superset_chart_thumbnail

png_bytes = superset_chart_thumbnail(
    superset_conn="SUPSERSET_CONN_ID",
    chart_id=31729,
    force_refresh=True,
    refresh_wait_sec=60,
    thumb_size="1600,1200",
    window_size="1600,1200",
)

Сохранить пачку скриншотов на диск

from mnemosynecore import superset_screenshot_dashboard, superset_screenshot_charts

dash_files = superset_screenshot_dashboard(
    conn_id="SUPSERSET_CONN_ID",
    dashboards=[
        {"dashboard_name": "Sales main", "channel_id": "optional"},
        {"dashboard_name": "Finance", "dashboard_url": "https://ss.o3.ru/superset/dashboard/p/abc/"},
    ],
    output_dir="./screenshots",
)

chart_files = superset_screenshot_charts(
    conn_id="SUPSERSET_CONN_ID",
    charts=[
        {"chart_id": 31729},
        {"chart_id": 44001, "file_name": "daily_margin"},
    ],
    output_dir="./screenshots",
)

Отправить скриншоты в Mattermost (дашборды и чарты отдельно)

from mnemosynecore import (
    send_superset_dashboards_to_channels,
    send_superset_charts_to_channels,
)

send_superset_dashboards_to_channels(
    bot_id="MM_BOT",
    superset_conn="SUPSERSET_CONN_ID",
    dashboards=[
        {
            "dashboard_name": "Sales main",
            "channel_id": "channel_dashboard",
            "text": "Ежедневный скрин дашборда",
        }
    ],
)

send_superset_charts_to_channels(
    bot_id="MM_BOT",
    superset_conn="SUPSERSET_CONN_ID",
    charts=[
        {
            "chart_id": 31729,
            "channel_id": "channel_charts",
            "text": "Ежедневный скрин чарта",
        }
    ],
)

SharePoint: чтение файлов

Функции поддерживают конфиг SharePoint через:

  • conn_id (Vault/ENV/Airflow),
  • JSON-строку,
  • dict конфигурации.

Минимальный конфиг:

{
  "host": "https://sharepoint.company.ru",
  "login": "svc_user",
  "password": "secret",
  "domain": "O3"
}

Скачать файл в bytes

from mnemosynecore import sharepoint_download_file

content = sharepoint_download_file(
    sharepoint_conn="SHAREPOINT_CONN_ID",
    file_url="/sites/team/shared/report.csv",
)

Считать файл как текст

from mnemosynecore import sharepoint_read_text

text = sharepoint_read_text(
    sharepoint_conn="SHAREPOINT_CONN_ID",
    file_url="/sites/team/shared/query.sql",
    encoding="utf-8",
)

Считать CSV/TSV/Excel/JSON в DataFrame

from mnemosynecore import sharepoint_read_dataframe

df = sharepoint_read_dataframe(
    sharepoint_conn="SHAREPOINT_CONN_ID",
    file_url="/sites/team/shared/events.csv",
    file_format="auto",  # auto/csv/tsv/excel/json
)

Сохранить файл локально

from mnemosynecore import sharepoint_download_to_file

path = sharepoint_download_to_file(
    sharepoint_conn="SHAREPOINT_CONN_ID",
    file_url="/sites/team/shared/events.xlsx",
    output_path="./downloads/events.xlsx",
)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mnemosynecore-1.1.1.tar.gz (27.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mnemosynecore-1.1.1-py3-none-any.whl (19.6 kB view details)

Uploaded Python 3

File details

Details for the file mnemosynecore-1.1.1.tar.gz.

File metadata

  • Download URL: mnemosynecore-1.1.1.tar.gz
  • Upload date:
  • Size: 27.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for mnemosynecore-1.1.1.tar.gz
Algorithm Hash digest
SHA256 1309e112d36c9c34de24ad226efeb1139cf73218f7b8724df9915e271ce179f8
MD5 ac58656938b788a28128f659e7d6d7c8
BLAKE2b-256 44185ac9e396bdcc8799fae821a2f060a2646923fa52512051fc6e1d2da5d5c2

See more details on using hashes here.

File details

Details for the file mnemosynecore-1.1.1-py3-none-any.whl.

File metadata

  • Download URL: mnemosynecore-1.1.1-py3-none-any.whl
  • Upload date:
  • Size: 19.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for mnemosynecore-1.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 c7535d6db7ee8770ce59d518ec1022cd42a76c96b45c940222a2f72a43db2340
MD5 78d06b80ea67a384fc3f844eea988677
BLAKE2b-256 2baba8eeeb509a402e5f3267dd935a6270728440d6c349f3691ecd119dbe4b0e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page