Skip to main content

Reusable common utility functions including email and hash functions

Project description

CPILake_Utils

Introduction

A lightweight internal Python library providing reusable email-sending utilities and helper functions for the Fabric Notebook.

This package is distributed as a Python wheel (.whl) and can be used across notebooks, scripts, or internal pipelines.

Features

  • Send emails via a simple POST API
  • Supports multiple recipients
  • Handles HTTP 200, 201, 202 as successful response codes
  • Minimal dependencies (requests only)
  • Includes a pure-Python hash function for alphanumeric strings

Package Structure

common_functions/ <- root folder ├─ CPILake_Utils/ <- package folder │ ├─ init.py <- import all functions here │ └─ CPILake_Utils.py <- all utilities go here (email, hash, etc.) ├─ pyproject.toml └─ README.md

Installation

Install the wheel file directly

pip install CPILake-Utils-1.0.0-py3-none-any.whl

OR

pip install build # Install the build tool python -m build # Creates the .whl file in dist/ twine upload dist/CPILake-Utils-1.0.0-py3-none-any.whl

Usage Example

pip install CPILake-Utils==1.0.0

OR

from CPILake-Utils import send_email_no_attachment, hash_function, send_email_no_attachment_01

Functions Overview:

1. hash_function(s)

Objective:
Generate a deterministic numeric hash for alphanumeric strings. This can be used for creating unique identifiers, simplified comparisons, or internal keys.

Parameters:

  • s (str | None) – Input string to hash.
    • If None, the function returns None.
    • Non-string inputs are automatically converted to strings.

Returns:

  • int – A numeric hash value derived from the alphanumeric content of the string.

Example Usage:

from utils import hash_function

text = "AB12-xyz"
hash_value = hash_function(text)
print(hash_value)
# Output: a unique integer based on the string


### 2. `send_email_via_http(...)`

**Objective:**  
Send emails from a Fabric notebook using an HTTP endpoint, authenticating via a platform SPN certificate.  
Supports optional Spark/Pandas DataFrame rendering as HTML in the email body or as attachments.

---

**Parameters:**  

| Parameter | Type | Description |
|-----------|------|-------------|
| `body` | `str` | Email body content (HTML or plain text). |
| `to` | `List[str]` | List of recipient email addresses. |
| `tenant_id` | `str` | Azure tenant ID for the platform SPN. |
| `client_id` | `str` | Client ID of the platform SPN. |
| `certificate_secret_name` | `str` | KeyVault secret name for the SPN certificate. |
| `keyvault_url` | `str` | Azure KeyVault URL to fetch the certificate. |
| `df_in_body` | `bool` | If `True`, a provided DataFrame (optional) is rendered as HTML in the email body. |
| `df_attach` | `bool` | If `True`, a provided DataFrame is attached as an HTML file. |
| `endpoint_url` | `Optional[str]` | Custom HTTP endpoint URL. Defaults to internal Fabric LogicApp endpoint. |
| `scope` | `Optional[str]` | OAuth2 scope for token. Default: `api://27d45411-0d7a-4f27-bc5f-412d74ea249b/.default`. |
| `subject` | `Optional[str]` | Email subject. |
| `headers` | `Optional[Dict[str,str]]` | Additional HTTP headers for the request. |
| `timeout` | `int` | Request timeout in seconds (default: 15). |

---

**Returns:**  
- `Tuple[Optional[int], str, Dict[str,str]]`  HTTP response code, response text, and request headers.  
  - `None` if sending fails.

---

**Example Usage:**  

```python
from utils import send_email_via_http

status, response, req_headers = send_email_via_http(
    body                    = body_html,
    to                      = RECIPIENTS,
    subject                 = subject,
    tenant_id               = tenant_id,
    client_id               = client_id,
    certificate_secret_name = certificate_secret_name,
    keyvault_url            = keyvault_url,
    df_in_body              = False,
    df_attach               = False
)

print("Status:", status)
print("Response:", response)


## 3. `_df_to_html_table(pdf, tz_name="America/Los_Angeles")` 

**Objective:**  
Render a Pandas DataFrame as a styled HTML table for email reports.  
Highlights rows where `FailureFlag == "Yes"` in red, and defaults others to light green. Includes a refresh timestamp.

---

**Parameters:**  

| Parameter | Type | Description |
|-----------|------|-------------|
| `pdf` | `pandas.DataFrame` | The DataFrame to render. Can be empty. |
| `tz_name` | `str` | Timezone name for refresh timestamp. Defaults to `"America/Los_Angeles"`. |

---

**Returns:**  
- `str`  HTML string representing the table.  
  - If the DataFrame is empty, returns a simple message:  
    ```html
    <html><body><h4>No data available to display.</h4></body></html>
    ```

---

**Features:**  
- Styled HTML table with borders, padding, and small font for readability.  
- Column headers exclude `"FailureFlag"`.  
- Rows with `FailureFlag == "Yes"` are highlighted in red (`#ff8080`).  
- Default rows highlighted in light green (`#ccff66`).  
- Includes a timestamp (formatted as `YYYY-MM-DD HH:MM:SS`) in the specified timezone.  

---

**Example Usage:**  

```python
from utils import _df_to_html_table
im


4. ## `send_email_no_attachment(...)`

**Objective:**  
Send a plain HTML or text email via HTTP without any attachments.  
Uses a **Platform SPN certificate** from KeyVault to authenticate securelyno user credentials required.

---

**Parameters:**  

| Parameter | Type | Description |
|-----------|------|-------------|
| `body` | `str` | HTML or text content of the email. |
| `recipients` | `List[str]` | List of recipient email addresses. |
| `tenant_id` | `str` | Azure tenant ID for Platform SPN. |
| `client_id` | `str` | Azure client ID for Platform SPN. |
| `certificate_secret_name` | `str` | Name of the certificate stored in KeyVault. |
| `keyvault_url` | `str` | URL of the KeyVault containing the SPN certificate. |
| `endpoint_url` | `str`, optional | Logic App HTTP endpoint. Defaults to FDNE Infra Mail sender. |
| `scope` | `str`, optional | Scope for access token. Defaults to: `api://27d45411-0d7a-4f27-bc5f-412d74ea249b/.default`. |
| `subject` | `str`, optional | Email subject. Defaults to empty string. |
| `headers` | `Dict[str, str]`, optional | Additional HTTP headers. |
| `timeout` | `int`, optional | Request timeout in seconds. Default is `15`. |

---

**Returns:**  
- Tuple `(status_code, response_text)`  
  - `status_code`: HTTP status code or `None` if request failed.  
  - `response_text`: Response message or error description.  

---

**Example Usage:**  

```python
from utils import send_email_no_attachment

status, response = send_email_no_attachment(
    body="<html><body><h3>Hello Team!</h3></body></html>",
    recipients=["user@example.com"],
    subject="Test Email",
    tenant_id=tenant_id,
    client_id=client_id,
    certificate_secret_name=certificate_secret_name,
    keyvault_url=keyvault_url
)

print("Status:", status)
print("Response:", response)

5. ## `QA_CheckUtil(source_df, qa_df)`

**Objective:**  
Perform basic QA checks between a source DataFrame and a QA DataFrame.  
The function returns a DataFrame containing results of the following checks:  

1. **Row Count Check**  Compares the number of rows.  
2. **Null Count Check**  Compares null counts for all common columns.  
3. **Aggregation Check**  Compares sums of the `"amount"` column.  
4. **Duplicate Check**  Checks for duplicate `"id"` values.  

Checks are **skipped** instead of failing when:  
- One or both DataFrames are empty.  
- Required columns are missing.  
- Aggregation column exists but contains only nulls.  
- `match` becomes `None` when skipped (clearer than `False`).  

---

**Parameters:**  

| Parameter | Type | Description |
|-----------|------|-------------|
| `source_df` | `DataFrame` | Source DataFrame to compare. |
| `qa_df` | `DataFrame` | QA DataFrame to compare against. |

---

**Returns:**  
- `DataFrame` containing the following columns:

| Column | Description |
|--------|-------------|
| `check_type` | Type of QA check (ROW_COUNT, NULL_CHECK, AGG_CHECK, DUPLICATE_CHECK). |
| `check_name` | Specific check name. |
| `column_name` | Column on w_

6. ## `create_lakehouse_shortcuts(shortcut_configs)`

**Objective:**  
Automatically create Fabric Lakehouse shortcuts from a source path to a target schema in a workspace.  
Uses Fabric API and the platform token obtained via `mssparkutils.credentials.getToken`.

---

**Parameters:**  

| Parameter | Type | Description |
|-----------|------|-------------|
| `shortcut_configs` | `List[Dict]` | List of shortcut configurations. Each dictionary should contain: |
|  | `"target_shortcut_name"` | Name of the shortcut to create in the target lakehouse. |
|  | `"workspace_name"` | Display name of the workspace where the lakehouse exists. |
|  | `"lakehouse_name"` | Display name of the lakehouse in the workspace. |
|  | `"source_subpath"` | Path in the lakehouse to create a shortcut for. |
|  | `"target_schema"` | Schema name in the target lakehouse where the shortcut will reside (default `dbo`). |

---

**How it works:**  

1. Fetches a platform access token using `mssparkutils.credentials.getToken`.  
2. Retrieves the workspace ID and lakehouse ID based on the display names.  
3. Constructs a shortcut payload for each configuration.  
4. Sends a `POST` request to the Fabric API to create the shortcut.  
5. Prints success/failure messages along with API responses.  

---

**Example Usage:**  

```python
shortcut_configs = [
    {
        "target_shortcut_name": "DIM_Date",
        "workspace_name": "FDnECostHubReporting_DEV",
        "lakehouse_name": "Cost_Hub",
        "source_subpath": "Tables/DIM_Date",
        "target_schema": "CostHub",
    }
]

create_lakehouse_shortcuts(shortcut_configs)

7. ## `create_adls_shortcuts(shortcut_configs)`

**Objective:**  
Create shortcuts in a Fabric Lakehouse pointing to ADLS Gen2 locations.  
Uses Fabric API and the platform token obtained via `mssparkutils.credentials.getToken`.

---

**Parameters:**  

| Parameter | Type | Description |
|-----------|------|-------------|
| `shortcut_configs` | `List[Dict]` | List of shortcut configurations. Each dictionary should contain: |
|  | `"name"` | Name of the shortcut to create in the lakehouse. |
|  | `"target_schema"` | Schema in the lakehouse where the shortcut will be created. |
|  | `"workspace_name"` | Display name of the workspace where the lakehouse exists. |
|  | `"lakehouse_name"` | Display name of the lakehouse in the workspace. |
|  | `"connection_name"` | Name of the ADLS Gen2 connection to link to. |
|  | `"subpath"` | Path within the ADLS Gen2 storage to point the shortcut to. |

---

**How it works:**  

1. Fetches a platform access token using `mssparkutils.credentials.getToken`.  
2. Retrieves workspace ID and lakehouse ID based on their display names.  
3. Retrieves the ADLS Gen2 connection ID and path from Fabric API.  
4. Constructs a shortcut payload for each configuration.  
5. Sends a `POST` request to the Fabric API to create the shortcut.  
6. Prints success or failure messages with API responses.  

---

**Example Usage:**  

```python
shortcut_configs = [
    {
        "name": "Bridge_ExecOrgSummary",
        "target_schema": "CostHub",
        "workspace_name": "FDnECostHubReporting_DEV",
        "lakehouse_name": "Cost_Hub",
        "connection_name": "CostHub_ADLS abibrahi",
        "subpath": "/abidatamercury/MercuryDataProd/CostHub/Bridge_ExecOrgSummary"
    }
]

create_adls_shortcuts(shortcut_configs)

8. ## `lakehouse_metadata_sync(workspace_id, lakehouse_id)`

**Objective:**  
Refresh and synchronize metadata for a Fabric Lakehouse and track the sync progress.  
Returns detailed status of tables, including last successful update, warnings, and sync state.

---

**Parameters:**  

| Parameter | Type | Description |
|-----------|------|-------------|
| `workspace_id` | `str` | The ID of the Fabric workspace containing the lakehouse. |
| `lakehouse_id` | `str` | The ID of the lakehouse whose metadata should be refreshed. |

---

**How it works:**  

1. Uses `FabricRestClient` to query lakehouse properties.  
2. Retrieves the SQL endpoint ID from the lakehouse.  
3. Sends a metadata refresh request to Fabric (`MetadataRefreshExternalCommand`).  
4. Polls the batch status every 2 seconds until the progress state is no longer `"inProgress"`.  
5. On success, extracts table-level details:  
   - `tableName`  
   - `lastSuccessfulUpdate`  
   - `tableSyncState`  
   - `sqlSyncState`  
   - `warningMessages`  
6. Returns a dictionary with `"status"` and `"details"` (or `"error"` if sync fails).  

---

**Return Value:**  

- **Success:**  

```json
{
  "status": "success",
  "details": [
    {
      "tableName": "Table1",
      "lastSuccessfulUpdate": "2026-01-13T12:34:56Z",
      "tableSyncState": "synced",
      "sqlSyncState": "synced",
      "warningMessages": []
    },
    ...
  ]
}


# Upload to PyPI (Token-based Authentication)
-- upload it to PyPI using an API token instead of username/password

# Replace $env:PYPI_API_TOKEN with your PyPI API token (PowerShell)
twine upload dist/* -u __token__ -p $env:PYPI_API_TOKEN

This securely uploads all files in the dist/ folder, After upload, your package will be available at:
https://pypi.org/project/CPILake-Utils/1.0.0/

## Author

Abhilash Ibrahimpatnam

## Summary

This README includes:

1. Clean introduction and feature list  
2. Clear package structure explanation  
3. Local installation and build instructions  
4. PyPI install instructions  
5. Usage examples with both functions  
6. Token-based upload instructions for PyPI  
7. Dependencies and author info


## Clean old build artifacts
Remove-Item -Recurse -Force build, dist
Remove-Item -Recurse -Force *.egg-info



![alt text](image.png)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cpilake_utils-0.0.10.tar.gz (12.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cpilake_utils-0.0.10-py3-none-any.whl (13.2 kB view details)

Uploaded Python 3

File details

Details for the file cpilake_utils-0.0.10.tar.gz.

File metadata

  • Download URL: cpilake_utils-0.0.10.tar.gz
  • Upload date:
  • Size: 12.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for cpilake_utils-0.0.10.tar.gz
Algorithm Hash digest
SHA256 66101788a7e512b288425a2fd863d6994c2d5fb09f3985462294f370115e833d
MD5 d2e3d05df60bbe9f9685fb92457f7b19
BLAKE2b-256 9dc875b0371e71818c02dea31a3ac8d1f7def4a114250cf9588d6793dafa3419

See more details on using hashes here.

File details

Details for the file cpilake_utils-0.0.10-py3-none-any.whl.

File metadata

  • Download URL: cpilake_utils-0.0.10-py3-none-any.whl
  • Upload date:
  • Size: 13.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for cpilake_utils-0.0.10-py3-none-any.whl
Algorithm Hash digest
SHA256 4098b0b4c19db918bab0480bc76dc2912faab48e8db59a061502c609cd4b8d4f
MD5 f4489be98e5033e637aa63273c8b3f0a
BLAKE2b-256 7d55334a81bcc3d6aecdcf296ea9e998e97a2e549b10d706c080e7ee539c3f4a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page