Internal analytics toolkit for data pipelines
Project description
mnemosynecore Documentation
Общее описание
Библиотека mnemosynecore предоставляет набор инструментов для работы с Vertica БД, Airflow, Mattermost и другими системами аналитики. Она предназначена для автоматизации работы с данными и интеграции различных сервисов.
Установка
pip install mnemosynecore
Для Airflow:
pip install mnemosynecore[airflow]
Основные модули
1. Работа с Vertica (mnemosynecore.db.vertica)
Функции для работы с БД
vertica_conn(conn_id) - Создание подключения к Vertica
from mnemosynecore.db.vertica import vertica_conn
# Подключение к Vertica
conn = vertica_conn("VERTICA_CONN_ID")
# Использование соединения
result = vertica_sql(conn=conn, sql="SELECT * FROM table")
conn.close()
**vertica_sql(kwargs) - Выполнение SQL запросов
from mnemosynecore.db.vertica import vertica_sql
# Выполнение запроса с ID подключения
vertica_sql(
conn_id="VERTICA_CONN_ID",
sql="INSERT INTO table VALUES (%s, %s)",
params=[1, "test"]
)
# Выполнение запроса с готовым соединением
vertica_sql(conn=conn, sql="UPDATE table SET col = %s WHERE id = %s", params=[100, 1])
**vertica_select(kwargs) - Выполнение SELECT запросов
from mnemosynecore.db.vertica import vertica_select
# Получение данных в DataFrame
df = vertica_select(
conn_id="VERTICA_CONN_ID",
sql="SELECT * FROM table WHERE id = %s",
params=[123]
)
print(df.head())
**vertica_dedupe(table_name, unique_keys, kwargs) - Удаление дубликатов
from mnemosynecore.db.vertica import vertica_dedupe
# Удаление дубликатов по одному ключу
vertica_dedupe(
table_name="schema.table",
unique_keys="id",
conn_id="VERTICA_CONN_ID"
)
# Удаление дубликатов по нескольким ключам с учетом даты
vertica_dedupe(
table_name="schema.table",
unique_keys=["id", "date"],
conn_id="VERTICA_CONN_ID",
date_col="created_at",
keep="last"
)
**vertica_upsert(df, table_name, unique_keys, kwargs) - Upsert операции
from mnemosynecore.db.vertica import vertica_upsert
import pandas as pd
# Подготовка данных
df = pd.DataFrame({
'id': [1, 2, 3],
'name': ['Alice', 'Bob', 'Charlie'],
'value': [100, 200, 300]
})
# Upsert данных
vertica_upsert(
df=df,
table_name="schema.table",
unique_keys=["id"],
conn_id="VERTICA_CONN_ID"
)
load_sql_tasks_from_dir(dir_sql, vertica_conn_id) - Создание задач из SQL файлов
from mnemosynecore.db.vertica import load_sql_tasks_from_dir
# Создание задач из директории SQL файлов
tasks = load_sql_tasks_from_dir("/path/to/sql/files", "VERTICA_CONN_ID")
# Использование в Airflow DAG
from airflow import DAG
from airflow.utils.task_group import TaskGroup
with DAG("my_dag") as dag:
with TaskGroup("vertica_tasks") as vertica_group:
for task in tasks.values():
task
read_sql_file(file_path) - Чтение SQL файлов
from mnemosynecore.db.vertica import read_sql_file
# Чтение SQL файла
sql_content = read_sql_file("/path/to/query.sql")
if sql_content:
print("SQL загружен успешно")
2. Работа с секретами (mnemosynecore.vault)
get_secret(conn_id) - Получение секретов
from mnemosynecore.vault import get_secret
# Получение секрета из Vault
secret = get_secret("SECRET_ID")
print(secret["host"]) # Хост базы данных
print(secret["password"]) # Пароль
3. Интеграция с Mattermost (mnemosynecore.mattermost)
send_message(channel_id, bot_id, text, silent=False) - Отправка сообщений
from mnemosynecore.mattermost import send_message
# Отправка сообщения в канал
send_message(
channel_id="s5c11srqkf8j3pbdwfbn9imrde",
bot_id="MATTERMOST_BOT_ID",
text="Привет! Это тестовое сообщение"
)
# Отправка Markdown сообщения
send_message(
channel_id="s5c11srqkf8j3pbdwfbn9imrde",
bot_id="MATTERMOST_BOT_ID",
text="""
📊 **Отчет по данным** 📊
- Данные успешно загружены
- Обработано записей: 1000
- Время выполнения: 2 минуты
[Подробнее](https://your-dashboard.com)
"""
)
4. Интеграция с Superset (mnemosynecore.superset)
superset_request(endpoint, method="GET", payload=None, vault_conn_id) - Запросы к Superset
from mnemosynecore.superset import superset_request
# Получение информации о дашборде
response = superset_request(
endpoint="/api/v1/dashboard/123",
method="GET",
vault_conn_id="SUPERSET_CONN_ID"
)
# Создание нового дашборда
new_dashboard = superset_request(
endpoint="/api/v1/dashboard/",
method="POST",
payload={"name": "New Dashboard"},
vault_conn_id="SUPERSET_CONN_ID"
)
Конфигурация секретов
Формат секретов для Vault
Для Vertica:
{
"host": "vertica-host.com",
"port": "5433",
"login": "username",
"password": "password",
"schema": "database_schema"
}
Для Mattermost:
{
"host": "https://mattermost.company.com",
"password": "bot_token_here",
"scheme": "https",
"port": 443,
"basepath": "/api/v4"
}
Для Superset:
{
"host": "https://superset.company.com",
"password": "access_token_here",
"scheme": "https",
"port": 443,
"basepath": "/api/v1"
}
Примеры использования
Пример 1: Полный пайплайн работы с Vertica
from mnemosynecore.db.vertica import vertica_conn, vertica_sql, vertica_select, vertica_upsert
from mnemosynecore.mattermost import send_message
import pandas as pd
def process_data_pipeline():
# Подключение к Vertica
conn = vertica_conn("VERTICA_CONN_ID")
try:
# Выполнение запроса
df = vertica_select(
conn=conn,
sql="SELECT * FROM source_table WHERE date > %s",
params=['2023-01-01']
)
# Обработка данных
processed_df = df.groupby('category').sum()
# Upsert в целевую таблицу
vertica_upsert(
df=processed_df,
table_name="analytics.summary",
unique_keys=["category"],
conn=conn
)
# Отправка уведомления
send_message(
channel_id="s5c11srqkf8j3pbdwfbn9imrde",
bot_id="MATTERMOST_BOT_ID",
text="✅ Пайплайн выполнен успешно"
)
except Exception as e:
send_message(
channel_id="s5c11srqkf8j3pbdwfbn9imrde",
bot_id="MATTERMOST_BOT_ID",
text=f"❌ Ошибка в пайплайне: {str(e)}"
)
raise
finally:
conn.close()
Пример 2: Автоматическая очистка дубликатов
from mnemosynecore.db.vertica import vertica_dedupe
def remove_duplicates():
vertica_dedupe(
table_name="analytics.user_events",
unique_keys=["user_id", "event_time"],
conn_id="VERTICA_CONN_ID",
date_col="event_time",
keep="last"
)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
No source distribution files available for this release.See tutorial on generating distribution archives.
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mnemosynecoretest-0.0.1-py3-none-any.whl.
File metadata
- Download URL: mnemosynecoretest-0.0.1-py3-none-any.whl
- Upload date:
- Size: 11.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ab6923ab1db2d8f65874de4babc31835a3876a485126ce9f8fdd55f014ca978c
|
|
| MD5 |
aaa5066609281a8e2606c3e15b844d82
|
|
| BLAKE2b-256 |
2597ba11a64888a039d5738068924bf14f693b840927cef79c4713282c59360a
|