Skip to main content

Internal analytics toolkit for data pipelines

Project description

mnemosynecore Documentation

Общее описание

Библиотека mnemosynecore предоставляет набор инструментов для работы с Vertica БД, Airflow, Mattermost и другими системами аналитики. Она предназначена для автоматизации работы с данными и интеграции различных сервисов.

Установка

pip install mnemosynecore

Для Airflow:
pip install mnemosynecore[airflow]

Основные модули
1. Работа с Vertica (mnemosynecore.db.vertica)
Функции для работы с БД
vertica_conn(conn_id) - Создание подключения к Vertica
from mnemosynecore.db.vertica import vertica_conn

# Подключение к Vertica
conn = vertica_conn("VERTICA_CONN_ID")
# Использование соединения
result = vertica_sql(conn=conn, sql="SELECT * FROM table")
conn.close()

**vertica_sql(kwargs) - Выполнение SQL запросов

from mnemosynecore.db.vertica import vertica_sql

# Выполнение запроса с ID подключения
vertica_sql(
    conn_id="VERTICA_CONN_ID",
    sql="INSERT INTO table VALUES (%s, %s)",
    params=[1, "test"]
)

# Выполнение запроса с готовым соединением
vertica_sql(conn=conn, sql="UPDATE table SET col = %s WHERE id = %s", params=[100, 1])

**vertica_select(kwargs) - Выполнение SELECT запросов

from mnemosynecore.db.vertica import vertica_select

# Получение данных в DataFrame
df = vertica_select(
    conn_id="VERTICA_CONN_ID",
    sql="SELECT * FROM table WHERE id = %s",
    params=[123]
)
print(df.head())

**vertica_dedupe(table_name, unique_keys, kwargs) - Удаление дубликатов

from mnemosynecore.db.vertica import vertica_dedupe

# Удаление дубликатов по одному ключу
vertica_dedupe(
    table_name="schema.table",
    unique_keys="id",
    conn_id="VERTICA_CONN_ID"
)

# Удаление дубликатов по нескольким ключам с учетом даты
vertica_dedupe(
    table_name="schema.table",
    unique_keys=["id", "date"],
    conn_id="VERTICA_CONN_ID",
    date_col="created_at",
    keep="last"
)

**vertica_upsert(df, table_name, unique_keys, kwargs) - Upsert операции

from mnemosynecore.db.vertica import vertica_upsert
import pandas as pd

# Подготовка данных
df = pd.DataFrame({
    'id': [1, 2, 3],
    'name': ['Alice', 'Bob', 'Charlie'],
    'value': [100, 200, 300]
})

# Upsert данных
vertica_upsert(
    df=df,
    table_name="schema.table",
    unique_keys=["id"],
    conn_id="VERTICA_CONN_ID"
)

load_sql_tasks_from_dir(dir_sql, vertica_conn_id) - Создание задач из SQL файлов

from mnemosynecore.db.vertica import load_sql_tasks_from_dir

# Создание задач из директории SQL файлов
tasks = load_sql_tasks_from_dir("/path/to/sql/files", "VERTICA_CONN_ID")

# Использование в Airflow DAG
from airflow import DAG
from airflow.utils.task_group import TaskGroup

with DAG("my_dag") as dag:
    with TaskGroup("vertica_tasks") as vertica_group:
        for task in tasks.values():
            task

read_sql_file(file_path) - Чтение SQL файлов
from mnemosynecore.db.vertica import read_sql_file

# Чтение SQL файла
sql_content = read_sql_file("/path/to/query.sql")
if sql_content:
    print("SQL загружен успешно")

2. Работа с секретами (mnemosynecore.vault)
get_secret(conn_id) - Получение секретов
from mnemosynecore.vault import get_secret

# Получение секрета из Vault
secret = get_secret("SECRET_ID")
print(secret["host"])  # Хост базы данных
print(secret["password"])  # Пароль

3. Интеграция с Mattermost (mnemosynecore.mattermost)
send_message(channel_id, bot_id, text, silent=False) - Отправка сообщений

from mnemosynecore.mattermost import send_message

# Отправка сообщения в канал
send_message(
    channel_id="s5c11srqkf8j3pbdwfbn9imrde",
    bot_id="MATTERMOST_BOT_ID",
    text="Привет! Это тестовое сообщение"
)

# Отправка Markdown сообщения
send_message(
    channel_id="s5c11srqkf8j3pbdwfbn9imrde",
    bot_id="MATTERMOST_BOT_ID",
    text="""
    📊 **Отчет по данным** 📊
    
    - Данные успешно загружены
    - Обработано записей: 1000
    - Время выполнения: 2 минуты
    
    [Подробнее](https://your-dashboard.com)
    """
)

4. Интеграция с Superset (mnemosynecore.superset)
superset_request(endpoint, method="GET", payload=None, vault_conn_id) - Запросы к Superset
from mnemosynecore.superset import superset_request

# Получение информации о дашборде
response = superset_request(
    endpoint="/api/v1/dashboard/123",
    method="GET",
    vault_conn_id="SUPERSET_CONN_ID"
)

# Создание нового дашборда
new_dashboard = superset_request(
    endpoint="/api/v1/dashboard/",
    method="POST",
    payload={"name": "New Dashboard"},
    vault_conn_id="SUPERSET_CONN_ID"
)

Конфигурация секретов
Формат секретов для Vault
Для Vertica:
{
    "host": "vertica-host.com",
    "port": "5433",
    "login": "username",
    "password": "password",
    "schema": "database_schema"
}

Для Mattermost:
{
    "host": "https://mattermost.company.com",
    "password": "bot_token_here",
    "scheme": "https",
    "port": 443,
    "basepath": "/api/v4"
}

Для Superset:
{
    "host": "https://superset.company.com",
    "password": "access_token_here",
    "scheme": "https",
    "port": 443,
    "basepath": "/api/v1"
}

Примеры использования
Пример 1: Полный пайплайн работы с Vertica
from mnemosynecore.db.vertica import vertica_conn, vertica_sql, vertica_select, vertica_upsert
from mnemosynecore.mattermost import send_message
import pandas as pd

def process_data_pipeline():
    # Подключение к Vertica
    conn = vertica_conn("VERTICA_CONN_ID")
    
    try:
        # Выполнение запроса
        df = vertica_select(
            conn=conn,
            sql="SELECT * FROM source_table WHERE date > %s",
            params=['2023-01-01']
        )
        
        # Обработка данных
        processed_df = df.groupby('category').sum()
        
        # Upsert в целевую таблицу
        vertica_upsert(
            df=processed_df,
            table_name="analytics.summary",
            unique_keys=["category"],
            conn=conn
        )
        
        # Отправка уведомления
        send_message(
            channel_id="s5c11srqkf8j3pbdwfbn9imrde",
            bot_id="MATTERMOST_BOT_ID",
            text="✅ Пайплайн выполнен успешно"
        )
        
    except Exception as e:
        send_message(
            channel_id="s5c11srqkf8j3pbdwfbn9imrde",
            bot_id="MATTERMOST_BOT_ID",
            text=f"❌ Ошибка в пайплайне: {str(e)}"
        )
        raise
    finally:
        conn.close()

Пример 2: Автоматическая очистка дубликатов

from mnemosynecore.db.vertica import vertica_dedupe

def remove_duplicates():
    vertica_dedupe(
        table_name="analytics.user_events",
        unique_keys=["user_id", "event_time"],
        conn_id="VERTICA_CONN_ID",
        date_col="event_time",
        keep="last"
    )

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mnemosynecore-0.1.3-py3-none-any.whl (10.9 kB view details)

Uploaded Python 3

File details

Details for the file mnemosynecore-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: mnemosynecore-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 10.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for mnemosynecore-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 daa63e8ac3c55daef0ec94627be3b5e9202fa991524ff657866646938287f56e
MD5 6eee0c3636f4249f8b36615f17efe133
BLAKE2b-256 5a9eed0b8bd4cfb6a20e8d586a328993a07b204fe14ca860e3ca41457b183417

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page