Skip to main content

Internal analytics toolkit for data pipelines

Project description

mnemosynecore Documentation

Общее описание

Библиотека mnemosynecore предоставляет набор инструментов для работы с Vertica БД, Airflow, Mattermost и другими системами аналитики. Она предназначена для автоматизации работы с данными и интеграции различных сервисов.

Установка

pip install mnemosynecore

Для Airflow:
pip install mnemosynecore[airflow]

Основные модули
1. Работа с Vertica (mnemosynecore.db.vertica)
Функции для работы с БД
vertica_conn(conn_id) - Создание подключения к Vertica
from mnemosynecore.db.vertica import vertica_conn

# Подключение к Vertica
conn = vertica_conn("VERTICA_CONN_ID")
# Использование соединения
result = vertica_sql(conn=conn, sql="SELECT * FROM table")
conn.close()

**vertica_sql(kwargs) - Выполнение SQL запросов

from mnemosynecore.db.vertica import vertica_sql

# Выполнение запроса с ID подключения
vertica_sql(
    conn_id="VERTICA_CONN_ID",
    sql="INSERT INTO table VALUES (%s, %s)",
    params=[1, "test"]
)

# Выполнение запроса с готовым соединением
vertica_sql(conn=conn, sql="UPDATE table SET col = %s WHERE id = %s", params=[100, 1])

**vertica_select(kwargs) - Выполнение SELECT запросов

from mnemosynecore.db.vertica import vertica_select

# Получение данных в DataFrame
df = vertica_select(
    conn_id="VERTICA_CONN_ID",
    sql="SELECT * FROM table WHERE id = %s",
    params=[123]
)
print(df.head())

**vertica_dedupe(table_name, unique_keys, kwargs) - Удаление дубликатов

from mnemosynecore.db.vertica import vertica_dedupe

# Удаление дубликатов по одному ключу
vertica_dedupe(
    table_name="schema.table",
    unique_keys="id",
    conn_id="VERTICA_CONN_ID"
)

# Удаление дубликатов по нескольким ключам с учетом даты
vertica_dedupe(
    table_name="schema.table",
    unique_keys=["id", "date"],
    conn_id="VERTICA_CONN_ID",
    date_col="created_at",
    keep="last"
)

**vertica_upsert(df, table_name, unique_keys, kwargs) - Upsert операции

from mnemosynecore.db.vertica import vertica_upsert
import pandas as pd

# Подготовка данных
df = pd.DataFrame({
    'id': [1, 2, 3],
    'name': ['Alice', 'Bob', 'Charlie'],
    'value': [100, 200, 300]
})

# Upsert данных
vertica_upsert(
    df=df,
    table_name="schema.table",
    unique_keys=["id"],
    conn_id="VERTICA_CONN_ID"
)

load_sql_tasks_from_dir(dir_sql, vertica_conn_id) - Создание задач из SQL файлов

from mnemosynecore.db.vertica import load_sql_tasks_from_dir

# Создание задач из директории SQL файлов
tasks = load_sql_tasks_from_dir("/path/to/sql/files", "VERTICA_CONN_ID")

# Использование в Airflow DAG
from airflow import DAG
from airflow.utils.task_group import TaskGroup

with DAG("my_dag") as dag:
    with TaskGroup("vertica_tasks") as vertica_group:
        for task in tasks.values():
            task

read_sql_file(file_path) - Чтение SQL файлов
from mnemosynecore.db.vertica import read_sql_file

# Чтение SQL файла
sql_content = read_sql_file("/path/to/query.sql")
if sql_content:
    print("SQL загружен успешно")

2. Работа с секретами (mnemosynecore.vault)
get_secret(conn_id) - Получение секретов
from mnemosynecore.vault import get_secret

# Получение секрета из Vault
secret = get_secret("SECRET_ID")
print(secret["host"])  # Хост базы данных
print(secret["password"])  # Пароль

3. Интеграция с Mattermost (mnemosynecore.mattermost)
send_message(channel_id, bot_id, text, silent=False) - Отправка сообщений

from mnemosynecore.mattermost import send_message

# Отправка сообщения в канал
send_message(
    channel_id="s5c11srqkf8j3pbdwfbn9imrde",
    bot_id="MATTERMOST_BOT_ID",
    text="Привет! Это тестовое сообщение"
)

# Отправка Markdown сообщения
send_message(
    channel_id="s5c11srqkf8j3pbdwfbn9imrde",
    bot_id="MATTERMOST_BOT_ID",
    text="""
    📊 **Отчет по данным** 📊
    
    - Данные успешно загружены
    - Обработано записей: 1000
    - Время выполнения: 2 минуты
    
    [Подробнее](https://your-dashboard.com)
    """
)

4. Интеграция с Superset (mnemosynecore.superset)
superset_request(endpoint, method="GET", payload=None, vault_conn_id) - Запросы к Superset
from mnemosynecore.superset import superset_request

# Получение информации о дашборде
response = superset_request(
    endpoint="/api/v1/dashboard/123",
    method="GET",
    vault_conn_id="SUPERSET_CONN_ID"
)

# Создание нового дашборда
new_dashboard = superset_request(
    endpoint="/api/v1/dashboard/",
    method="POST",
    payload={"name": "New Dashboard"},
    vault_conn_id="SUPERSET_CONN_ID"
)

Конфигурация секретов
Формат секретов для Vault
Для Vertica:
{
    "host": "vertica-host.com",
    "port": "5433",
    "login": "username",
    "password": "password",
    "schema": "database_schema"
}

Для Mattermost:
{
    "host": "https://mattermost.company.com",
    "password": "bot_token_here",
    "scheme": "https",
    "port": 443,
    "basepath": "/api/v4"
}

Для Superset:
{
    "host": "https://superset.company.com",
    "password": "access_token_here",
    "scheme": "https",
    "port": 443,
    "basepath": "/api/v1"
}

Примеры использования
Пример 1: Полный пайплайн работы с Vertica
from mnemosynecore.db.vertica import vertica_conn, vertica_sql, vertica_select, vertica_upsert
from mnemosynecore.mattermost import send_message
import pandas as pd

def process_data_pipeline():
    # Подключение к Vertica
    conn = vertica_conn("VERTICA_CONN_ID")
    
    try:
        # Выполнение запроса
        df = vertica_select(
            conn=conn,
            sql="SELECT * FROM source_table WHERE date > %s",
            params=['2023-01-01']
        )
        
        # Обработка данных
        processed_df = df.groupby('category').sum()
        
        # Upsert в целевую таблицу
        vertica_upsert(
            df=processed_df,
            table_name="analytics.summary",
            unique_keys=["category"],
            conn=conn
        )
        
        # Отправка уведомления
        send_message(
            channel_id="s5c11srqkf8j3pbdwfbn9imrde",
            bot_id="MATTERMOST_BOT_ID",
            text="✅ Пайплайн выполнен успешно"
        )
        
    except Exception as e:
        send_message(
            channel_id="s5c11srqkf8j3pbdwfbn9imrde",
            bot_id="MATTERMOST_BOT_ID",
            text=f"❌ Ошибка в пайплайне: {str(e)}"
        )
        raise
    finally:
        conn.close()

Пример 2: Автоматическая очистка дубликатов

from mnemosynecore.db.vertica import vertica_dedupe

def remove_duplicates():
    vertica_dedupe(
        table_name="analytics.user_events",
        unique_keys=["user_id", "event_time"],
        conn_id="VERTICA_CONN_ID",
        date_col="event_time",
        keep="last"
    )

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mnemosynecore-0.1.6-py3-none-any.whl (10.9 kB view details)

Uploaded Python 3

File details

Details for the file mnemosynecore-0.1.6-py3-none-any.whl.

File metadata

  • Download URL: mnemosynecore-0.1.6-py3-none-any.whl
  • Upload date:
  • Size: 10.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for mnemosynecore-0.1.6-py3-none-any.whl
Algorithm Hash digest
SHA256 92988e531ec4d70bcfad231b0b33b19265805782cffcf629188af1c1abe7ea2d
MD5 2bf639dee4711b70930d96bb3d9e1cda
BLAKE2b-256 e5604a75b43bef1dc2d383d805be9008083e9a780a98f660fa4ca682027f3f9e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page