Internal analytics toolkit for data pipelines
Project description
Mnemosynecore
Библиотека mnemosynecore предоставляет набор утилит для работы с секретами, Mattermost, Vertica, (Superset и Clickhouse в будущем) и другими службами.
Источники конфигурации
Функции поддерживают три способа передачи конфигурации:
-
Vault / Airflow Connection — по conn_id
-
JSON-файл — локально (test-режим)
-
JSON-строка — напрямую в параметре bot_id
Формат конфигурации Mattermost
{
"host": "mattermost.example.com",
"password": "BOT_TOKEN",
"schema": "https",
"port": 443,
"basepath": "/api/v4"
}
Test-функции (локальная разработка)
get_mattermost_driver_test
def get_mattermost_driver_test(
bot_id: str,
dir_path: str | None = None,
) -> Driver
Описание
Создаёт и возвращает Mattermost Driver для локального тестирования, используя JSON-файл с конфигурацией.
Параметры
| Параметр | Тип | Описание |
|---|---|---|
bot_id |
str |
Имя JSON-файла без .json |
dir_path |
str | None |
Директория поиска файла |
Поиск файла
Файл ищется в:
-
dir_path (если указан)
-
текущей рабочей директории
Имя файла:
{bot_id}.json
Возвращает
mattermostdriver.Driver — авторизованный клиент
Пример
driver = get_mattermost_driver_test("MM_BOT", dir_path=".")
send_message_test
def send_message_test(
*,
channel_id: str,
bot_id: str,
text: str,
dir_path: str | None = None,
silent: bool = False,
) -> None
Описание
Отправляет сообщение в Mattermost в тестовом окружении.
Параметры
| Параметр | Тип | Описание |
|---|---|---|
channel_id |
str |
ID канала |
bot_id |
str |
Имя JSON-файла с конфигурацией |
text |
str |
Текст сообщения |
dir_path |
str | None |
Директория с JSON |
silent |
bool |
Не выводить лог |
Пример
send_message_test(
channel_id="abc123",
bot_id="MM_BOT",
text="Тестовое сообщение",
dir_path="."
)
Prod-функции
get_mattermost_driver
def get_mattermost_driver(bot_id: str) -> Driver
Описание
Создаёт Mattermost Driver для prod-окружения.
Источники конфигурации
-
Если bot_id — JSON-строка → используется напрямую
-
Иначе → get_secret(bot_id) (Vault)
Параметры
| Параметр | Тип | Описание |
|---|---|---|
bot_id |
str |
conn_id или JSON |
Возвращает mattermostdriver.Driver
Примеры
Через Vault
driver = get_mattermost_driver("MM_BOT_PROD")
Через JSON
driver = get_mattermost_driver("""
{
"host": "mm.company.ru",
"password": "TOKEN"
}
""")
send_message
def send_message(
*,
channel_id: str,
bot_id: str,
text: str,
silent: bool = False,
) -> None
Описание
Отправляет сообщение в Mattermost (prod).
Параметры
| Параметр | Тип | Описание |
|---|---|---|
channel_id |
str |
ID канала |
bot_id |
str |
conn_id или JSON |
text |
str |
Текст сообщения |
silent |
bool |
Не логировать |
Поведение
- Логирует успешную отправку
- Пробрасывает исключения при ошибке
Пример
send_message(
channel_id="channel123",
bot_id="MM_BOT_PROD",
text="Сообщение из prod"
)
Вспомогательные функции
get_mattermost_conn
def get_mattermost_conn(conn_id: str) -> dict
Описание
Извлекает конфигурацию Mattermost из Airflow Connection.
Параметры
| Параметр | Тип | Описание |
|---|---|---|
conn_id |
str |
Airflow connection ID |
Возвращает
{
"host": str,
"password": str,
"schema": str,
"port": int,
"basepath": str
}
get_connection_as_json
def get_connection_as_json(conn_name: str) -> str
Описание
Получает Airflow Connection и возвращает его в виде JSON-строки.
Параметры
| Параметр | Тип | Описание |
|---|---|---|
conn_name |
str |
Airflow connection name |
Возвращает JSON (str)
Пример
cfg_json = get_connection_as_json("MM_BOT")
Работа с секретами (Vault / ENV / локальные файлы)
resolve_secret
def resolve_secret(
conn_id: str,
dir_path: Optional[str] = None,
) -> Dict
Назначение
Универсальный резолвер секрета:
- Пытается загрузить локальный JSON (test-режим)
- Если не найден — берёт секрет из Vault / окружения
Используется для:
- одинакового кода в local / test / prod.
Параметры
-
conn_id — имя секрета
-
dir_path — директория для поиска {conn_id}.json
Возвращает
- dict — распаршенный JSON секрета
get_connection_as_json
def get_connection_as_json(conn_id: str) -> str
Назначение
Получает секрет в виде JSON-строки:
-
из переменной окружения ENV[conn_id]
-
из Vault (через VaultClient)
Падает с ошибкой, если секрет не найден или Vault недоступен.
Возвращает
- str — JSON-конфигурация
get_secret
def get_secret(conn_id: str) -> Dict
Назначение
-
Обёртка над get_connection_as_json с автоматическим json.loads.
-
Используется в prod-коде.
Возвращает
- dict — конфигурация секрета
Test-режим (локальные JSON-файлы)
get_connection_as_json_test
def get_connection_as_json_test(
conn_id: str,
dir_path: str | None = None,
) -> str
Назначение
Читает тестовый секрет из файла:
{conn_id}.json
Порядок поиска
-
dir_path (если указан)
-
текущая рабочая директория
Если файл не найден → FileNotFoundError
Возвращает
- str — содержимое JSON-файла
get_secret_test
def get_secret_test(conn_id: str, dir_path: str | None = None) -> Dict
Назначение
Test-версия get_secret:
-
читает локальный JSON-файл
-
парсит в dict
Используется для:
-
ноутбуков
-
локальной разработки
Возвращает
- dict — конфигурация секрета
Airflow / SQL utilities
load_sql_tasks_from_dir
def load_sql_tasks_from_dir(dir_sql: str, vertica_conn_id: str)
Назначение
Автоматически создаёт Airflow-таски (VerticaOperator) из .sql-файлов в директории.
Как работает
-
читает все .sql файлы
-
делит SQL по ;
-
создаёт по одному таску на файл
-
привязывает к текущему DAG
Возвращает: dict[str, VerticaOperator]
read_sql_file
def read_sql_file(file_path: str)
Назначение
Читает SQL-файл в строку.
Возвращает
-
str — содержимое файла
-
None — если файл не найден
vertica_dedupe
def vertica_dedupe(
table_name: str,
unique_keys: Union[str, List[str]],
conn: Optional[Connection] = None,
conn_id: Optional[str] = None,
date_col: Optional[str] = None,
keep: str = "last",
commit: bool = True
)
Назначение
Удаляет дубликаты строк в таблице Vertica.
Логика
-
группировка по unique_keys
-
оставляет одну строку
-
при date_col + keep="last" сохраняет самую свежую
Используется для: пост-загрузочной чистки данных.
vertica_upsert
def vertica_upsert(
df: pd.DataFrame,
table_name: str,
unique_keys: Union[str, List[str]],
conn,
date_col: Optional[str] = None,
days_back: Optional[int] = None,
commit: bool = True
)
Назначение
Upsert данных из DataFrame в таблицу Vertica.
Как работает
-
создаёт temp-таблицу
-
заливает туда df
-
опционально чистит данные за days_back
-
выполняет MERGE
Подходит для: инкрементальных загрузок.
vertica_conn
def vertica_conn(conn_id: str) -> Connection
Назначение
Создаёт Vertica-соединение из секрета (Vault / ENV).
Возвращает
- vertica_python.Connection
get_vertica_engine
def get_vertica_engine(conn_id: str)
Назначение
Создаёт SQLAlchemy Engine для Vertica.
Используется для
-
pandas.to_sql
-
ORM / Core запросов
vertica_sql
def vertica_sql(
*,
conn_id: Optional[str] = None,
conn: Optional[Connection] = None,
sql: str,
params: Optional[Iterable[Any]] = None,
many: bool = False,
commit: bool = True
)
Назначение
Универсальный executor SQL-запросов.
Поддерживает
-
execute / executemany
-
автокоммит
-
rollback при ошибке
Используется как: базовый слой для всех операций.
vertica_select
def vertica_select(
*,
conn_id: Optional[str] = None,
conn: Optional[Connection] = None,
sql: str,
params: Optional[Iterable[Any]] = None
) -> pd.DataFrame
Назначение
Выполняет SELECT и возвращает результат в DataFrame.
Возвращает
- pd.DataFrame
Используется для: аналитических и сервисных запросов.
Общий паттерн использования
df = vertica_select(
conn_id="VERTICA_CONN",
sql="SELECT * FROM my_table"
)
vertica_sql(
conn_id="VERTICA_CONN",
sql="DELETE FROM my_table WHERE dt < CURRENT_DATE - 30"
)
un_conn
Использование Vertica
engine = un_conn("conn_id", "vertica")
result = engine.execute("SELECT COUNT(*) FROM my_table")
Использование Mattermost
driver = un_conn("CBA_send_review", "mattermost")
driver.posts.create_post({
"channel_id": "s5c11srqkf8j3pbdwfbn9imrde",
"message": "Привет, мир!"
})
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mnemosynecore-1.0.3.tar.gz.
File metadata
- Download URL: mnemosynecore-1.0.3.tar.gz
- Upload date:
- Size: 13.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ef72ec3d1f88154f946a5e4d305cf869c229d75e36d50d0bd90d306a460931ba
|
|
| MD5 |
3299163e5e4c3a6b721b2b6eef51e672
|
|
| BLAKE2b-256 |
0d8b7d83bb1cbe2f30682ee214c8e7b8ccc306eebd41598a6b4022b3786bd54c
|
Provenance
The following attestation bundles were made for mnemosynecore-1.0.3.tar.gz:
Publisher:
publish.yml on ASAFGracia/mnemosynecore
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
mnemosynecore-1.0.3.tar.gz -
Subject digest:
ef72ec3d1f88154f946a5e4d305cf869c229d75e36d50d0bd90d306a460931ba - Sigstore transparency entry: 919136575
- Sigstore integration time:
-
Permalink:
ASAFGracia/mnemosynecore@100a11c85c1a08bd6d8f3ed013064c53abc0acd4 -
Branch / Tag:
refs/tags/v1.0.8 - Owner: https://github.com/ASAFGracia
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@100a11c85c1a08bd6d8f3ed013064c53abc0acd4 -
Trigger Event:
push
-
Statement type:
File details
Details for the file mnemosynecore-1.0.3-py3-none-any.whl.
File metadata
- Download URL: mnemosynecore-1.0.3-py3-none-any.whl
- Upload date:
- Size: 12.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d710e41b628465fd97b2a7717c488c6b1ac96ce0009c6e696df4b7fea056f700
|
|
| MD5 |
97cccb8f87c7067cb9befff157b6e863
|
|
| BLAKE2b-256 |
26eb2d46584d7222c1665cb6100f2393205f0be63c80db4cf20f8a9a9dc81e7b
|
Provenance
The following attestation bundles were made for mnemosynecore-1.0.3-py3-none-any.whl:
Publisher:
publish.yml on ASAFGracia/mnemosynecore
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
mnemosynecore-1.0.3-py3-none-any.whl -
Subject digest:
d710e41b628465fd97b2a7717c488c6b1ac96ce0009c6e696df4b7fea056f700 - Sigstore transparency entry: 919136598
- Sigstore integration time:
-
Permalink:
ASAFGracia/mnemosynecore@100a11c85c1a08bd6d8f3ed013064c53abc0acd4 -
Branch / Tag:
refs/tags/v1.0.8 - Owner: https://github.com/ASAFGracia
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@100a11c85c1a08bd6d8f3ed013064c53abc0acd4 -
Trigger Event:
push
-
Statement type: