Internal analytics toolkit for data pipelines
Project description
mnemosynecore
mnemosynecore - внутренняя Python-библиотека для задач аналитиков и data-инженеров в Airflow/DAG-пайплайнах.
Основная идея: писать меньше инфраструктурного кода в дагах и переиспользовать стабильные функции для:
- секретов и коннектов (ENV/Vault/Airflow/local JSON),
- Vertica,
- Mattermost,
- Superset (скриншоты дашбордов/чартов),
- SharePoint,
- retry-логики.
Содержание
- Быстрый старт
- Как устроены коннекты и секреты
- Функции: секреты и подключения
- Функции: retry
- Функции: Vertica
- Функции: Mattermost
- Функции: Superset
- Функции: SharePoint
- Удобные связки функций (готовые рецепты)
- CI/CD и релизы
- Полный список публичного API
Быстрый старт
Установка
pip install mnemosynecore
С доп. зависимостями:
# для Airflow-хелперов
pip install "mnemosynecore[airflow]"
# для Superset API
pip install "mnemosynecore[superset]"
# для SharePoint NTLM
pip install "mnemosynecore[sharepoint]"
# для clickhouse через un_conn
pip install "mnemosynecore[clickhouse]"
# все вместе
pip install "mnemosynecore[airflow,superset,sharepoint,clickhouse]"
Импорт
import mnemosynecore as mn
Пример первого использования:
# Получить секрет (dict)
cfg = mn.get_secret("VERTICA_PROD")
# Выполнить SQL в Vertica
mn.vertica_sql(conn_id="VERTICA_PROD", sql="SELECT 1")
Как устроены коннекты и секреты
Почти все функции принимают либо conn_id, либо JSON-конфиг.
Приоритет источников секрета (get_connection_as_json)
- Переменная окружения
os.environ[conn_name] VaultClient().get_secret(conn_name)- Airflow Connection (
BaseHook.get_connection) - Если ничего не найдено -
ValueError
Локальный test-режим (*_test и resolve_secret)
Для локального запуска без Airflow/Vault можно положить CONN_ID.json рядом со скриптом.
resolve_secret(conn_id, dir_path) работает так:
- сначала пытается читать локальный JSON (
get_secret_test), - если файла нет - берет прод-секрет через
get_secret.
Базовые JSON-форматы
Универсальный Airflow-style
{
"host": "example.host",
"login": "user",
"password": "secret",
"port": 443,
"schema": "https",
"extra": "{\"basepath\":\"/api/v4\"}"
}
Mattermost
{
"host": "mattermost.company.ru",
"password": "BOT_TOKEN",
"schema": "https",
"port": 443,
"extra": "{\"basepath\":\"/api/v4\"}"
}
Superset
{
"host": "https://ss.o3.ru",
"login": "svc_user",
"password": "svc_password",
"extra": "{\"auth_provider\":\"ldap\",\"security_cookie_auth_token\":\"...\"}"
}
SharePoint
{
"host": "https://sharepoint.company.ru",
"login": "svc_sharepoint",
"password": "svc_password",
"schema": "O3"
}
Функции: секреты и подключения
get_connection_as_json(conn_name: str) -> str
Возвращает секрет как JSON-строку (ENV -> Vault -> Airflow).
Когда полезно:
- передать connection в макрос DAG,
- сохранить совместимость со старым кодом.
raw = mn.get_connection_as_json("CLICKHOUSE_ANALYTICS")
get_secret(conn_id: str) -> dict
Парсит get_connection_as_json и возвращает dict.
cfg = mn.get_secret("VERTICA_PROD")
print(cfg["host"])
get_connection_as_json_test(conn_id: str, dir_path: str | None = None) -> str
Читает локальный файл <conn_id>.json.
raw = mn.get_connection_as_json_test("MM_BOT", dir_path="./secrets")
get_secret_test(conn_id: str, dir_path: str | None = None) -> dict
Локальный секрет как dict.
cfg = mn.get_secret_test("MM_BOT", dir_path="./secrets")
resolve_secret(conn_id: str, dir_path: str | None = None) -> dict
Универсальный режим local/prod.
cfg = mn.resolve_secret("SUPSET_CONN", dir_path="./secrets")
has_connection(conn_name: str) -> bool
Проверяет, можно ли резолвить connection.
if not mn.has_connection("VERTICA_PROD"):
raise RuntimeError("Нет секрета VERTICA_PROD")
get_secret_field(...) -> Any
Читает конкретное поле секрета, поддерживает вложенность через точку (a.b.c).
token = mn.get_secret_field("MM_BOT", "password", required=True)
client_id = mn.get_secret_field("SUPSET", "extra.client_id", default="public")
require_secret_fields(conn_id, required_fields, ...) -> dict
Валидирует обязательные поля до старта тяжелой логики.
cfg = mn.require_secret_fields(
"SUPSET",
["host", "login", "password"],
)
get_secret_with_defaults(conn_id, defaults, ...) -> dict
Подмешивает дефолты в секрет.
timeout_cfg = mn.get_secret_with_defaults(
"SP_CONN",
{"timeout": 60, "verify": True},
)
un_conn(conn_id: str, conn_type: str)
Универсальный конструктор соединений из conn_id:
vertica/vertica_engine-> SQLAlchemy engine,clickhouse->clickhouse_driver.Client,superset-> dict-конфиг,mattermost-> авторизованный Driver,raw-> исходный dict.
engine = mn.un_conn("VERTICA_PROD", "vertica")
mm_driver = mn.un_conn("MM_BOT", "mattermost")
Функции: retry
retry_call(func, *args, attempts=3, delay_sec=1.0, backoff=2.0, max_delay_sec=None, jitter_sec=0.0, exceptions=(Exception,), on_retry=None, sleep_func=time.sleep, **kwargs)
Вызывает функцию с повторами.
Когда полезно:
- нестабильные API,
- временные сетевые ошибки,
- "прогрев" внешних сервисов.
import requests
resp = mn.retry_call(
requests.get,
"https://api.example.com/health",
attempts=5,
delay_sec=2,
backoff=1.5,
timeout=10,
)
retry(...) decorator
Декоратор с теми же параметрами.
@mn.retry(attempts=4, delay_sec=1, backoff=2)
def fetch_data():
...
Функции: Vertica
Базовые
vertica_conn(conn_id: str)
Открывает vertica_python connection по секрету.
conn = mn.vertica_conn("VERTICA_PROD")
vertica_sql(...) -> None
Выполняет SQL (включая executemany).
mn.vertica_sql(
conn_id="VERTICA_PROD",
sql="INSERT INTO mart.t(id, dt) VALUES (%s, %s)",
params=[(1, "2026-03-01"), (2, "2026-03-01")],
many=True,
)
vertica_select(...) -> pandas.DataFrame
Читает данные в DataFrame.
df = mn.vertica_select(
conn_id="VERTICA_PROD",
sql="SELECT id, value FROM mart.table LIMIT 100",
)
vertica_select_scalar(...) -> Any
Возвращает первое значение первой строки (или default, если пусто).
cnt = mn.vertica_select_scalar(
conn_id="VERTICA_PROD",
sql="SELECT COUNT(*) FROM mart.table",
default=0,
)
SQL-файлы
read_sql_file(file_path: str) -> str | None
sql_text = mn.read_sql_file("/opt/sql/mart/load.sql")
split_sql_statements(sql_text: str) -> list[str]
Простое деление по ;.
parts = mn.split_sql_statements("select 1; select 2;")
vertica_sql_file(...) -> int
Выполняет все statements из .sql файла; возвращает число выполненных statements.
executed = mn.vertica_sql_file(
file_path="/opt/sql/mart/load.sql",
conn_id="VERTICA_PROD",
)
vertica_sql_dir(...) -> dict[str, int]
Выполняет все .sql в директории (опц. рекурсивно), возвращает статистику по файлам.
stats = mn.vertica_sql_dir(
dir_sql="/opt/sql/mart",
conn_id="VERTICA_PROD",
recursive=False,
)
Работа с DataFrame
vertica_insert_dataframe(...) -> int
Пакетная вставка DataFrame в таблицу (с chunk_size).
inserted = mn.vertica_insert_dataframe(
df=df,
table_name="mart.events_daily",
conn_id="VERTICA_PROD",
chunk_size=5000,
)
vertica_table_exists(...) -> bool
Проверка существования таблицы.
exists = mn.vertica_table_exists(
table_name="events_daily",
schema_name="mart",
conn_id="VERTICA_PROD",
)
Data quality / обслуживание таблиц
vertica_dedupe(...)
Удаление дубликатов по ключам (опц. с date_col).
mn.vertica_dedupe(
table_name="mart.events_daily",
unique_keys=["event_id", "event_dt"],
conn_id="VERTICA_PROD",
date_col="load_dttm",
keep="last",
)
vertica_upsert(...)
Upsert DataFrame через temp table + MERGE.
mn.vertica_upsert(
df=df,
table_name="mart.events_daily",
unique_keys=["event_id"],
conn_id="VERTICA_PROD",
date_col="event_dt",
days_back=7,
)
Airflow helper
load_sql_tasks_from_dir(dir_sql, vertica_conn_id)
Генерирует набор VerticaOperator-тасок по SQL-файлам (требует Airflow).
tasks = mn.load_sql_tasks_from_dir("/opt/airflow/sql/my_dag", "VERTICA_PROD")
Функции: Mattermost
Сообщения
send_message(channel_id, bot_id, text, silent=False)
Prod-отправка текста.
mn.send_message(
channel_id="abc123channel",
bot_id="MM_BOT_PROD",
text="Пайплайн завершен успешно",
)
send_message_test(channel_id, bot_id, text, dir_path=None, silent=False)
Локальный режим через <bot_id>.json.
mn.send_message_test(
channel_id="abc123channel",
bot_id="MM_BOT_LOCAL",
text="Локальная проверка",
dir_path="./secrets",
)
Файлы
send_file_bytes(...) -> dict
Загрузка байтов в канал.
png_bytes = open("dash.png", "rb").read()
info = mn.send_file_bytes(
channel_id="abc123channel",
bot_id="MM_BOT_PROD",
file_name="dash.png",
file_bytes=png_bytes,
text="Скриншот дашборда",
)
send_file(...) -> dict
Загрузка файла по пути.
mn.send_file(
channel_id="abc123channel",
bot_id="MM_BOT_PROD",
file_path="/tmp/report.csv",
text="Дневной отчет",
)
send_files(...) -> list[dict]
Отправка пачки файлов.
mn.send_files(
channel_id="abc123channel",
bot_id="MM_BOT_PROD",
file_paths=["/tmp/a.csv", "/tmp/b.csv"],
text="Пакет файлов",
)
DataFrame
send_dataframe_as_csv(...) -> dict
mn.send_dataframe_as_csv(
channel_id="abc123channel",
bot_id="MM_BOT_PROD",
df=df,
file_name="metrics.csv",
)
send_dataframe_preview(...) -> None
Отправляет текстовый preview (head) в код-блоке.
mn.send_dataframe_preview(
channel_id="abc123channel",
bot_id="MM_BOT_PROD",
df=df,
title="Проверка витрины",
max_rows=10,
max_cols=8,
)
Функции: Superset
Для большинства задач достаточно функций из этого раздела без ручных вызовов REST API.
Низкоуровневый запрос
superset_request(endpoint, method="GET", payload=None, vault_conn_id=...)
Использует Bearer token (password из секрета).
me = mn.superset_request(
endpoint="/api/v1/me/",
vault_conn_id="SUPSET_BEARER",
)
Клиент
get_superset_client(...)
Поддерживает:
superset_connкакconn_id, JSON-строку илиdict,auth_provider,security_cookie_auth_token.
client = mn.get_superset_client(superset_conn="SUPSET_LDAP")
Получение PNG
superset_dashboard_thumbnail(...) -> bytes
png = mn.superset_dashboard_thumbnail(
superset_conn="SUPSET_LDAP",
dashboard_name="Canary test dashboard",
refresh=True,
refresh_wait_sec=60,
thumb_size="2048,1536",
window_size="2048,1536",
)
with open("dashboard.png", "wb") as f:
f.write(png)
superset_chart_thumbnail(...) -> bytes
png = mn.superset_chart_thumbnail(
superset_conn="SUPSET_LDAP",
chart_id=31729,
force_refresh=True,
)
Сохранение пачкой на диск
superset_screenshot_dashboard(conn_id, dashboards, output_dir=".") -> list[str]
files = mn.superset_screenshot_dashboard(
conn_id="SUPSET_LDAP",
dashboards=[
{"dashboard_name": "Sales Main"},
{"dashboard_name": "Ops Main", "file_name": "ops_daily"},
],
output_dir="/tmp/superset_dash",
)
superset_screenshot_charts(conn_id, charts, output_dir=".") -> list[str]
files = mn.superset_screenshot_charts(
conn_id="SUPSET_LDAP",
charts=[
{"chart_id": 101},
{"chart_id": 102, "file_name": "finance_trend"},
],
output_dir="/tmp/superset_charts",
)
Отправка в Mattermost
send_superset_dashboard_screenshot(...) -> dict
mn.send_superset_dashboard_screenshot(
channel_id="mm_channel_1",
bot_id="MM_BOT_PROD",
superset_conn="SUPSET_LDAP",
dashboard_name="Sales Main",
text="Ежедневный скриншот",
)
send_superset_chart_screenshot(...) -> dict
mn.send_superset_chart_screenshot(
channel_id="mm_channel_1",
bot_id="MM_BOT_PROD",
superset_conn="SUPSET_LDAP",
chart_id=31729,
text="Ключевой график",
)
send_superset_dashboards_to_channels(...) -> list[dict]
mn.send_superset_dashboards_to_channels(
bot_id="MM_BOT_PROD",
superset_conn="SUPSET_LDAP",
default_channel_id="mm_default",
dashboards=[
{"dashboard_name": "Sales", "text": "Sales update"},
{"dashboard_name": "Ops", "channel_id": "mm_ops", "text": "Ops update"},
],
)
send_superset_charts_to_channels(...) -> list[dict]
mn.send_superset_charts_to_channels(
bot_id="MM_BOT_PROD",
superset_conn="SUPSET_LDAP",
charts=[
{"chart_id": 101, "channel_id": "mm_fin"},
{"chart_id": 202, "channel_id": "mm_ops", "text": "Второй чарт"},
],
)
Функции: SharePoint
sharepoint_download_file(...) -> bytes
Скачивает файл из SharePoint в память.
data = mn.sharepoint_download_file(
sharepoint_conn="SP_CONN",
file_url="/sites/analytics/Shared%20Documents/report.csv",
auth_type="ntlm", # ntlm | basic | none
)
sharepoint_download_to_file(...) -> str
path = mn.sharepoint_download_to_file(
sharepoint_conn="SP_CONN",
file_url="/sites/analytics/Shared%20Documents/report.csv",
output_path="/tmp/report.csv",
)
sharepoint_read_text(...) -> str
sql_text = mn.sharepoint_read_text(
sharepoint_conn="SP_CONN",
file_url="/sites/analytics/sql/load.sql",
)
sharepoint_read_dataframe(...) -> DataFrame
Поддерживает file_format: auto | csv | tsv | excel | json.
df = mn.sharepoint_read_dataframe(
sharepoint_conn="SP_CONN",
file_url="/sites/analytics/data/source.xlsx",
file_format="excel",
)
Wrapper-функции
sharepoint_read_csv(...)
df = mn.sharepoint_read_csv(
sharepoint_conn="SP_CONN",
file_url="/sites/analytics/data/source.csv",
)
sharepoint_read_excel(...)
df = mn.sharepoint_read_excel(
sharepoint_conn="SP_CONN",
file_url="/sites/analytics/data/source.xlsx",
)
sharepoint_read_json(...)
df = mn.sharepoint_read_json(
sharepoint_conn="SP_CONN",
file_url="/sites/analytics/data/source.json",
)
sharepoint_read_sql(...) -> str
sql_text = mn.sharepoint_read_sql(
sharepoint_conn="SP_CONN",
file_url="/sites/analytics/sql/transform.sql",
)
Массовая загрузка
sharepoint_download_many(...) -> dict
result = mn.sharepoint_download_many(
sharepoint_conn="SP_CONN",
files=[
{"file_url": "/sites/a/data/one.csv"},
{"url": "/sites/a/data/two.csv", "file_name": "two_local.csv"},
],
output_dir="/tmp/sp_data",
continue_on_error=True,
)
print(result["downloaded"])
print(result["failed"])
Удобные связки функций (готовые рецепты)
1) Минимум кода в DAG для коннектов
Для совместимости со старым паттерном get_connection_as_json:
from airflow.models import DAG
import mnemosynecore as mn
dag = DAG(
"my_dag",
user_defined_macros={"get_connection_as_json": mn.get_connection_as_json},
)
Зачем:
- оставляете старый контракт в SQL/Jinja,
- не копируете helper-функцию в каждый DAG.
2) SQL из SharePoint -> выполнение в Vertica
import mnemosynecore as mn
sql_text = mn.sharepoint_read_sql(
sharepoint_conn="SP_CONN",
file_url="/sites/analytics/sql/mart_load.sql",
)
for statement in mn.split_sql_statements(sql_text):
mn.vertica_sql(conn_id="VERTICA_PROD", sql=statement)
Зачем:
- централизованное хранение SQL в SharePoint,
- один и тот же код работает в разных дагах.
3) Скриншот дашборда Superset -> Mattermost
import mnemosynecore as mn
mn.send_superset_dashboard_screenshot(
channel_id="mm_sales",
bot_id="MM_BOT_PROD",
superset_conn="SUPSET_LDAP",
dashboard_name="Sales Main Dashboard",
text="Скриншот за {{ ds }}",
)
Зачем:
- no-code интеграция отчетов в командные каналы.
4) Скриншоты нескольких чартов в разные каналы
import mnemosynecore as mn
mn.send_superset_charts_to_channels(
bot_id="MM_BOT_PROD",
superset_conn="SUPSET_LDAP",
charts=[
{"chart_id": 101, "channel_id": "mm_fin", "text": "Финансы"},
{"chart_id": 202, "channel_id": "mm_ops", "text": "Операции"},
],
)
Зачем:
- одно место, где маршрутизируется аналитическая рассылка.
5) Quality check + алерт
import mnemosynecore as mn
count_bad = mn.vertica_select_scalar(
conn_id="VERTICA_PROD",
sql="SELECT COUNT(*) FROM mart.table WHERE metric < 0",
default=0,
)
if count_bad > 0:
mn.send_message(
channel_id="mm_alerts",
bot_id="MM_BOT_PROD",
text=f"Найдены аномалии: {count_bad}",
)
Зачем:
- простая и быстрая data-quality сигнализация.
6) Нестабильные интеграции c retry
import mnemosynecore as mn
payload = mn.retry_call(
mn.sharepoint_download_file,
sharepoint_conn="SP_CONN",
file_url="/sites/analytics/data/daily.csv",
attempts=4,
delay_sec=2,
backoff=2,
)
Зачем:
- меньше случайных падений DAG из-за временных сетевых проблем.
7) Результат запроса как CSV в канал
import mnemosynecore as mn
df = mn.vertica_select(
conn_id="VERTICA_PROD",
sql="SELECT * FROM mart.kpi_daily LIMIT 1000",
)
mn.send_dataframe_as_csv(
channel_id="mm_reports",
bot_id="MM_BOT_PROD",
df=df,
file_name="kpi_daily.csv",
text="Срез KPI",
)
Зачем:
- мгновенно отдать выборку бизнесу без внешних файловых хранилищ.
CI/CD и релизы
CI
Workflow: .github/workflows/ci.yml
Что делает на push/PR:
pytest -qpython -m buildpython -m twine check dist/*
Публикация при теге
Workflow: .github/workflows/publish.yml
При теге v*:
- билд и тесты,
- публикация в TestPyPI,
- публикация в PyPI.
Локальный релиз-скрипт
Файл: release.sh
Примеры:
# bump patch, загрузка в testpypi+pypi, commit+push
./release.sh --target both
# только testpypi
./release.sh --target testpypi
# dry run
./release.sh --target both --dry-run
# релиз конкретной версии и тега
./release.sh --version 1.2.0 --target both --tag
Скрипт автоматически:
- обновляет версию в
pyproject.toml, - прогоняет тесты,
- собирает дистрибутив,
- проверяет метаданные,
- публикует (по target),
- делает commit (и tag при
--tag).
Полный список публичного API
Импортируется из корня:
import mnemosynecore as mn
Секреты и коннекты:
get_connection_as_jsonget_connection_as_json_testget_secretget_secret_testresolve_secrethas_connectionget_secret_fieldrequire_secret_fieldsget_secret_with_defaultsun_conn
Retry:
retry_callretry
Vertica:
vertica_connvertica_sqlvertica_selectvertica_select_scalarread_sql_filesplit_sql_statementsvertica_sql_filevertica_sql_dirvertica_insert_dataframevertica_table_existsvertica_dedupevertica_upsertload_sql_tasks_from_dir
Mattermost:
send_messagesend_message_testsend_file_bytessend_filesend_filessend_dataframe_as_csvsend_dataframe_preview
Superset:
superset_requestget_superset_clientsuperset_dashboard_thumbnailsuperset_chart_thumbnailsuperset_screenshot_dashboardsuperset_screenshot_chartssend_superset_dashboard_screenshotsend_superset_chart_screenshotsend_superset_dashboards_to_channelssend_superset_charts_to_channels
SharePoint:
sharepoint_download_filesharepoint_download_to_filesharepoint_read_textsharepoint_read_dataframesharepoint_read_csvsharepoint_read_excelsharepoint_read_jsonsharepoint_read_sqlsharepoint_download_many
Deprecated/demo:
old_function(вызываетDeprecationWarning)
Практические рекомендации
- Для DAG-кода в проде используйте
conn_id, а не хардкод credentials. - Для локальной отладки используйте
resolve_secretи*_testфункции. - Перед тяжелыми интеграциями валидируйте секрет через
require_secret_fields. - Для нестабильных API оборачивайте вызовы в
retry_call/retry. - Для рассылок в Mattermost лучше отправлять и preview (
send_dataframe_preview), и полный CSV (send_dataframe_as_csv).
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mnemosynecore-1.1.2.tar.gz.
File metadata
- Download URL: mnemosynecore-1.1.2.tar.gz
- Upload date:
- Size: 38.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
acb34a38db0f6ef5a3f644f391230f63977242e84e1c257c5d08503b760c11bf
|
|
| MD5 |
d761158269df77e182a6a7b23cc083b2
|
|
| BLAKE2b-256 |
9a5db53e6610955df189a74530eb66afc88fd6fff45e5867785739507bddf842
|
File details
Details for the file mnemosynecore-1.1.2-py3-none-any.whl.
File metadata
- Download URL: mnemosynecore-1.1.2-py3-none-any.whl
- Upload date:
- Size: 26.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1cebb6d53900a847ed73200495c9d84ed76269c23107d8732c22e5dd69b22046
|
|
| MD5 |
d60de00258b7ce8d33802db312fb4153
|
|
| BLAKE2b-256 |
98d78fe92d98593f8cccf6ca8b7696044bf3eb9764382dc845779ba3cefa7484
|