Skip to main content

Comprehensive Python logger for Azure, integrating OpenTelemetry for advanced, structured, and distributed tracing.

Project description

AzPaddyPy

Python 3.11+ Azure License: MIT

AzPaddyPy is a comprehensive Python package for Azure cloud services integration with standardized configuration management, OpenTelemetry tracing, and builder patterns. It simplifies Azure service orchestration while providing flexible, production-ready patterns for complex cloud applications.

🌟 Key Features

  • 🔐 Azure Identity Management - Token caching, multiple credential types, seamless authentication
  • 🗝️ Azure Key Vault Integration - Secrets, keys, and certificates management
  • 💾 Azure Storage Operations - Blob, file, and queue storage with unified APIs
  • 📊 Comprehensive Logging - Application Insights integration with OpenTelemetry tracing
  • 🏗️ Builder Patterns - Flexible service composition and configuration
  • 🌍 Environment Detection - Docker vs local development with smart defaults
  • ⚙️ Configuration Management - Environment variables, .env files, and service discovery
  • 🗄️ CosmosDB Integration - Document database operations with unified client
  • 🤖 Cosmos DB Prompt Management - Centralized prompt storage and management

📦 Installation

# Install with pip
pip install azpaddypy

# Install with uv (recommended)
uv add azpaddypy

# Install with development dependencies
uv add azpaddypy[dev]

🚀 Quick Start

Simple Usage (Direct Imports)

from azpaddypy import logger, identity, keyvault, storage_account

# Use logger for application logging
logger.info("Application started")

# Use identity for Azure authentication  
token = identity.get_token("https://management.azure.com/.default")

# Access secrets from Key Vault
secret_value = keyvault.get_secret("my-secret")

# Use storage services
blob_client = storage_account.blob_service_client

Builder Pattern Usage (Recommended)

from azpaddypy.builder import (
    ConfigurationSetupBuilder, 
    AzureManagementBuilder, 
    AzureResourceBuilder
)

# 1. Setup environment configuration
env_config = (
    ConfigurationSetupBuilder()
    .with_local_env_management()      # Load .env files (FIRST)
    .with_environment_detection()     # Detect Docker vs local
    .with_service_configuration()     # Parse service settings
    .with_logging_configuration()     # Setup logging
    .with_identity_configuration()    # Configure authentication
    .build()
)

# 2. Build management services (logger, identity, key vault)
management = (
    AzureManagementBuilder(env_config)
    .with_logger()
    .with_identity() 
    .with_keyvault(vault_url="https://my-vault.vault.azure.net/")
    .build()
)

# 3. Build resource services (storage, etc.)
resources = (
    AzureResourceBuilder(management, env_config)
    .with_storage(account_url="https://mystorageaccount.blob.core.windows.net/")
    .build()
)

# 4. Use the configured services
management.logger.info("Services configured successfully")
secret = management.keyvault.get_secret("database-password")
blob_client = resources.storage_account.blob_service_client

🔧 Configuration

Environment Variables

Create a .env file or set environment variables:

# Required: Key Vault Configuration
key_vault_uri=https://my-vault.vault.azure.net/
head_key_vault_uri=https://my-admin-vault.vault.azure.net/

# Required: Storage Configuration  
STORAGE_ACCOUNT_URL=https://mystorageaccount.blob.core.windows.net/

# Optional: Service Configuration
REFLECTION_NAME=my-application
REFLECTION_KIND=functionapp
SERVICE_VERSION=1.0.0

# Optional: Logging Configuration
LOGGER_LOG_LEVEL=INFO
APPLICATIONINSIGHTS_CONNECTION_STRING=InstrumentationKey=...

# Optional: Identity Configuration
IDENTITY_ENABLE_TOKEN_CACHE=true
IDENTITY_ALLOW_UNENCRYPTED_STORAGE=true

# Optional: Feature Toggles
KEYVAULT_ENABLE_SECRETS=true
KEYVAULT_ENABLE_KEYS=false
KEYVAULT_ENABLE_CERTIFICATES=false

STORAGE_ENABLE_BLOB=true
STORAGE_ENABLE_FILE=true  
STORAGE_ENABLE_QUEUE=true

Azure Authentication

AzPaddyPy supports multiple authentication methods automatically:

Local Development:

# Option 1: Azure CLI (recommended)
az login

# Option 2: Environment variables
export AZURE_CLIENT_ID=your-client-id
export AZURE_TENANT_ID=your-tenant-id  
export AZURE_CLIENT_SECRET=your-client-secret

Production (Azure):

  • Managed Identity (automatically detected)
  • Service Principal (via environment variables)

📚 Usage Examples

1. Basic Logging Setup

from azpaddypy.builder.directors import ConfigurationSetupDirector, AzureManagementDirector

# Quick setup with sensible defaults
env_config = ConfigurationSetupDirector.build_default_config()
management = AzureManagementDirector.build_default_config(env_config)

logger = management.logger

# Basic logging
logger.info("Application started")
logger.warning("This is a warning")
logger.error("This is an error")

# Structured logging with custom fields
logger.info("User action", extra={
    "user_id": "12345",
    "action": "login",
    "ip_address": "192.168.1.1",
    "session_id": "abc123"
})

# Error logging with context
try:
    risky_operation()
except Exception as e:
    logger.error("Operation failed", extra={
        "error_type": type(e).__name__,
        "operation": "risky_operation",
        "user_id": "12345"
    }, exc_info=True)

2. Key Vault Operations

from azpaddypy.builder.directors import AzureManagementDirector, ConfigurationSetupDirector

# Setup
env_config = ConfigurationSetupDirector.build_default_config()
management = (
    AzureManagementDirector.build_default_config(env_config)
    .with_keyvault(name="primary", vault_url="https://my-vault.vault.azure.net/")
    .with_keyvault(name="admin", vault_url="https://my-admin-vault.vault.azure.net/")
    .build()
)

# Use Key Vault clients by name
primary_secret = management.keyvault.primary.get_secret("my-secret")
admin_key = management.keyvault.admin.get_key("my-key")

3. Storage Operations

from azpaddypy.builder.directors import AzureResourceDirector

# Build storage configuration
resources = (
    AzureResourceDirector.build_default_config(env_config, management)
    .with_storage(
        name="main",
        account_url="https://mystorageaccount.blob.core.windows.net/",
        enable_blob=True,
        enable_file=True,
        enable_queue=True
    )
    .build()
)

storage = resources.storage_accounts["main"]

# Blob Storage Operations
blob_client = storage.blob_service_client
container_client = blob_client.get_container_client("my-container")

# Upload a file
with open("local-file.txt", "rb") as data:
    container_client.upload_blob(name="remote-file.txt", data=data)

# Download a file
with open("downloaded-file.txt", "wb") as data:
    container_client.download_blob(name="remote-file.txt").readinto(data)

# List blobs
blobs = container_client.list_blobs()
for blob in blobs:
    print(f"Blob: {blob.name}")

# Delete blob
container_client.delete_blob("remote-file.txt")

# Queue Storage Operations
queue_client = storage.queue_service_client
queue = queue_client.get_queue_client("my-queue")

# Send message
queue.send_message("Hello from azpaddypy!")

# Receive messages
messages = queue.receive_messages()
for message in messages:
    print(f"Message: {message.content}")
    queue.delete_message(message)

# File Share Operations
file_client = storage.file_service_client
share_client = file_client.get_share_client("my-share")

# Create directory
directory_client = share_client.get_directory_client("my-directory")
directory_client.create_directory()

# Upload file to file share
file_client = directory_client.get_file_client("my-file.txt")
with open("local-file.txt", "rb") as data:
    file_client.upload_file(data)

4. CosmosDB Operations

from azpaddypy.builder.directors import AzureResourceDirector

# Build configuration with CosmosDB
resources = (
    AzureResourceDirector.build_default_config(env_config, management)
    .with_cosmosdb(
        name="main",
        endpoint="https://my-cosmosdb.documents.azure.com:443/",
        database_name="my-database"
    )
    .build()
)

cosmosdb = resources.cosmosdb_accounts["main"]

# Create container
container = cosmosdb.create_container(
    container_name="users",
    partition_key_path="/id"
)

# Insert document
document = {
    "id": "user123",
    "name": "John Doe",
    "email": "john@example.com",
    "created_at": "2024-01-01T00:00:00Z"
}
cosmosdb.insert_document("users", document)

# Query documents
query = "SELECT * FROM c WHERE c.name = @name"
parameters = [{"name": "@name", "value": "John Doe"}]
results = cosmosdb.query_documents("users", query, parameters)

for doc in results:
    print(f"User: {doc['name']}")

# Update document
updated_doc = {
    "id": "user123",
    "name": "John Doe",
    "email": "john.updated@example.com",
    "updated_at": "2024-01-02T00:00:00Z"
}
cosmosdb.upsert_document("users", updated_doc)

# Delete document by id and partition key
cosmosdb.delete_item(
    database_name="my-database",
    container_name="users",
    item_id="user123",
    partition_key="user123"
)

5. Advanced Configuration Patterns

from azpaddypy.builder import ConfigurationSetupBuilder, AzureManagementBuilder, AzureResourceBuilder

# Environment-specific configuration
local_config = {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "DATABASE_URL": "sqlite:///local.db",
    "LOGGER_LOG_LEVEL": "DEBUG"
}

production_config = {
    "LOGGER_LOG_LEVEL": "WARNING",
    "IDENTITY_ALLOW_UNENCRYPTED_STORAGE": "false"
}

# Build environment configuration
env_config = (
    ConfigurationSetupBuilder()
    .with_local_env_management()
    .with_environment_detection()
    .with_environment_variables(
        local_config,
        in_docker=False,    # Don't apply in Docker
        in_machine=True     # Apply on local machine
    )
    .with_service_configuration()
    .with_logging_configuration()
    .with_identity_configuration()
    .build()
)

# Build management with multiple Key Vaults
management = (
    AzureManagementBuilder(env_config)
    .with_logger(
        log_level="DEBUG",
        enable_console=True,
        enable_application_insights=True
    )
    .with_identity(
        enable_token_cache=True,
        allow_unencrypted_storage=True
    )
    .with_keyvault(
        name="primary",
        vault_url="https://primary-vault.vault.azure.net/",
        enable_secrets=True,
        enable_keys=True,
        enable_certificates=False
    )
    .with_keyvault(
        name="admin",
        vault_url="https://admin-vault.vault.azure.net/",
        enable_secrets=True,
        enable_keys=False,
        enable_certificates=True
    )
    .build()
)

# Build resources with multiple storage accounts
resources = (
    AzureResourceBuilder(management, env_config)
    .with_storage(
        name="main",
        account_url="https://mainstorage.blob.core.windows.net/",
        enable_blob=True,
        enable_file=True,
        enable_queue=True
    )
    .with_storage(
        name="backup",
        account_url="https://backupstorage.blob.core.windows.net/",
        enable_blob=True,
        enable_file=False,
        enable_queue=False
    )
    .build()
)

6. Azure Functions Integration

# function_app.py
import azure.functions as func
from azpaddypy.builder.directors import AzureManagementDirector, ConfigurationSetupDirector

# Initialize once at module level
env_config = ConfigurationSetupDirector.build_default_config()
management = AzureManagementDirector.build_default_config(env_config)

app = func.FunctionApp()

@app.function_name("HttpTrigger")
@app.route(route="api/data")
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
    management.logger.info("Function triggered", extra={
        "function_name": "HttpTrigger",
        "request_method": req.method,
        "request_url": req.url
    })
    
    try:
        # Access secrets
        api_key = management.keyvault.get_secret("external-api-key")
        
        # Your function logic here
        result = process_request(req, api_key)
        
        management.logger.info("Function completed successfully", extra={
            "function_name": "HttpTrigger",
            "result_status": "success"
        })
        
        return func.HttpResponse(result)
        
    except Exception as e:
        management.logger.error("Function failed", extra={
            "function_name": "HttpTrigger",
            "error_type": type(e).__name__,
            "error_message": str(e)
        }, exc_info=True)
        
        return func.HttpResponse(
            "Internal server error",
            status_code=500
        )

def process_request(req: func.HttpRequest, api_key: str):
    # Your business logic here
    return "Success"

7. Web Application Integration

# app.py (Flask example)
from flask import Flask, request, jsonify
from azpaddypy.builder.directors import AzureResourceDirector, AzureManagementDirector, ConfigurationSetupDirector

# Initialize Azure services
env_config = ConfigurationSetupDirector.build_default_config()
management = AzureManagementDirector.build_default_config(env_config)
resources = AzureResourceDirector.build_default_config(env_config, management)

app = Flask(__name__)

@app.route('/api/users', methods=['GET'])
def get_users():
    management.logger.info("GET /api/users", extra={
        "user_id": request.headers.get('X-User-ID'),
        "ip_address": request.remote_addr
    })
    
    try:
        # Query CosmosDB for users
        users = resources.cosmosdb_accounts["main"].query_documents(
            "users",
            "SELECT * FROM c"
        )
        
        return jsonify(list(users))
        
    except Exception as e:
        management.logger.error("Failed to get users", extra={
            "error_type": type(e).__name__,
            "error_message": str(e)
        }, exc_info=True)
        
        return jsonify({"error": "Internal server error"}), 500

@app.route('/api/users', methods=['POST'])
def create_user():
    user_data = request.json
    
    try:
        # Store user in CosmosDB
        user_id = resources.cosmosdb_accounts["main"].insert_document(
            "users",
            user_data
        )
        
        # Store user avatar in blob storage
        if 'avatar' in user_data:
            blob_client = resources.storage_accounts["main"].blob_service_client
            container_client = blob_client.get_container_client("avatars")
            container_client.upload_blob(
                name=f"{user_id}.jpg",
                data=user_data['avatar']
            )
        
        management.logger.info("User created", extra={
            "user_id": user_id,
            "user_email": user_data.get('email')
        })
        
        return jsonify({"id": user_id}), 201
        
    except Exception as e:
        management.logger.error("Failed to create user", extra={
            "error_type": type(e).__name__,
            "error_message": str(e)
        }, exc_info=True)
        
        return jsonify({"error": "Internal server error"}), 500

8. Background Job Processing

# job_processor.py
import time
from azpaddypy.builder.directors import AzureResourceDirector, AzureManagementDirector, ConfigurationSetupDirector

# Initialize Azure services
env_config = ConfigurationSetupDirector.build_default_config()
management = AzureManagementDirector.build_default_config(env_config)
resources = AzureResourceDirector.build_default_config(env_config, management)

def process_jobs():
    """Background job processor using Azure Queue Storage."""
    queue_client = resources.storage_accounts["main"].queue_service_client
    queue = queue_client.get_queue_client("job-queue")
    
    management.logger.info("Starting job processor")
    
    while True:
        try:
            # Receive messages from queue
            messages = queue.receive_messages(max_messages=10, visibility_timeout=300)
            
            for message in messages:
                try:
                    management.logger.info("Processing job", extra={
                        "job_id": message.id,
                        "job_content": message.content
                    })
                    
                    # Process the job
                    result = process_job(message.content)
                    
                    # Store result in blob storage
                    blob_client = resources.storage_accounts["main"].blob_service_client
                    container_client = blob_client.get_container_client("job-results")
                    container_client.upload_blob(
                        name=f"{message.id}.json",
                        data=result
                    )
                    
                    # Delete message from queue
                    queue.delete_message(message)
                    
                    management.logger.info("Job completed", extra={
                        "job_id": message.id,
                        "status": "success"
                    })
                    
                except Exception as e:
                    management.logger.error("Job failed", extra={
                        "job_id": message.id,
                        "error_type": type(e).__name__,
                        "error_message": str(e)
                    }, exc_info=True)
                    
                    # Message will return to queue after visibility timeout
                    
            time.sleep(5)  # Wait before next poll
            
        except Exception as e:
            management.logger.error("Queue processing error", extra={
                "error_type": type(e).__name__,
                "error_message": str(e)
            }, exc_info=True)
            time.sleep(30)  # Wait longer on error

def process_job(job_data):
    """Process individual job."""
    # Your job processing logic here
    return {"status": "completed", "result": "success"}

9. Data Pipeline Example

# data_pipeline.py
from azpaddypy.builder.directors import AzureResourceDirector, AzureManagementDirector, ConfigurationSetupDirector

# Initialize Azure services
env_config = ConfigurationSetupDirector.build_default_config()
management = AzureManagementDirector.build_default_config(env_config)
resources = AzureResourceDirector.build_default_config(env_config, management)

def process_data_pipeline():
    """Example data pipeline using multiple Azure services."""
    
    management.logger.info("Starting data pipeline")
    
    try:
        # 1. Read data from blob storage
        blob_client = resources.storage_accounts["main"].blob_service_client
        container_client = blob_client.get_container_client("raw-data")
        
        blobs = container_client.list_blobs()
        processed_data = []
        
        for blob in blobs:
            # Download blob
            blob_data = container_client.download_blob(blob.name).read()
            
            # Process data
            processed_item = process_data_item(blob_data)
            processed_data.append(processed_item)
            
            management.logger.info("Processed blob", extra={
                "blob_name": blob.name,
                "processed_size": len(processed_item)
            })
        
        # 2. Store processed data in CosmosDB
        for item in processed_data:
            resources.cosmosdb_accounts["main"].upsert_document(
                "processed-data",
                item
            )
        
        # 3. Send notification to queue
        queue_client = resources.storage_accounts["main"].queue_service_client
        queue = queue_client.get_queue_client("notifications")
        queue.send_message(f"Pipeline completed: {len(processed_data)} items processed")
        
        management.logger.info("Data pipeline completed", extra={
            "items_processed": len(processed_data)
        })
        
    except Exception as e:
        management.logger.error("Data pipeline failed", extra={
            "error_type": type(e).__name__,
            "error_message": str(e)
        }, exc_info=True)
        raise

def process_data_item(data):
    """Process individual data item."""
    # Your data processing logic here
    return {"processed": True, "data": data.decode()}

10. Multi-Environment Configuration

# config_manager.py
import os
from azpaddypy.builder import ConfigurationSetupBuilder, AzureManagementBuilder, AzureResourceBuilder

class EnvironmentManager:
    """Manages different environment configurations."""
    
    @staticmethod
    def get_development_config():
        """Development environment configuration."""
        local_config = {
            "AzureWebJobsStorage": "UseDevelopmentStorage=true",
            "LOGGER_LOG_LEVEL": "DEBUG",
            "IDENTITY_ALLOW_UNENCRYPTED_STORAGE": "true"
        }
        
        env_config = (
            ConfigurationSetupBuilder()
            .with_local_env_management()
            .with_environment_detection()
            .with_environment_variables(local_config, in_docker=False, in_machine=True)
            .with_service_configuration()
            .with_logging_configuration()
            .with_identity_configuration()
            .build()
        )
        
        management = (
            AzureManagementBuilder(env_config)
            .with_logger(log_level="DEBUG")
            .with_identity(allow_unencrypted_storage=True)
            .with_keyvault(vault_url="https://dev-vault.vault.azure.net/")
            .build()
        )
        
        resources = (
            AzureResourceBuilder(management, env_config)
            .with_storage(account_url="https://devstorage.blob.core.windows.net/")
            .with_cosmosdb(
                endpoint="https://dev-cosmosdb.documents.azure.com:443/",
                database_name="dev-database"
            )
            .build()
        )
        
        return management, resources
    
    @staticmethod
    def get_production_config():
        """Production environment configuration."""
        production_config = {
            "LOGGER_LOG_LEVEL": "WARNING",
            "IDENTITY_ALLOW_UNENCRYPTED_STORAGE": "false"
        }
        
        env_config = (
            ConfigurationSetupBuilder()
            .with_local_env_management()
            .with_environment_detection()
            .with_environment_variables(production_config, in_docker=True, in_machine=False)
            .with_service_configuration()
            .with_logging_configuration()
            .with_identity_configuration()
            .build()
        )
        
        management = (
            AzureManagementBuilder(env_config)
            .with_logger(log_level="WARNING")
            .with_identity(allow_unencrypted_storage=False)
            .with_keyvault(vault_url="https://prod-vault.vault.azure.net/")
            .build()
        )
        
        resources = (
            AzureResourceBuilder(management, env_config)
            .with_storage(account_url="https://prodstorage.blob.core.windows.net/")
            .with_cosmosdb(
                endpoint="https://prod-cosmosdb.documents.azure.com:443/",
                database_name="prod-database"
            )
            .build()
        )
        
        return management, resources

# Usage
if os.getenv("ENVIRONMENT") == "production":
    management, resources = EnvironmentManager.get_production_config()
else:
    management, resources = EnvironmentManager.get_development_config()

🏗️ Architecture

AzPaddyPy follows a layered builder pattern architecture:

┌─────────────────────────────────────┐
│        Application Layer           │
├─────────────────────────────────────┤
│     AzureConfiguration (Combined)   │
├─────────────────────────────────────┤
│  AzureResourceConfiguration         │
│  - Storage Accounts                 │
│  - CosmosDB Accounts               │
│  - Additional Resources             │
├─────────────────────────────────────┤
│  AzureManagementConfiguration       │  
│  - Logger (App Insights)            │
│  - Identity (Token Cache)           │
│  - Key Vaults                       │
├─────────────────────────────────────┤
│  EnvironmentConfiguration           │
│  - Environment Detection            │
│  - Service Configuration            │
│  - Local Development Support        │
└─────────────────────────────────────┘

Builder Flow

  1. ConfigurationSetupBuilder - Environment setup and detection
  2. AzureManagementBuilder - Core management services
  3. AzureResourceBuilder - Azure resource services
  4. Directors - Pre-configured common patterns

🔒 Security Best Practices

Key Vault Security

# ✅ Good: Use specific vault URLs
management.with_keyvault(vault_url="https://prod-vault.vault.azure.net/")

# ❌ Avoid: Hardcoding secrets
database_password = "hardcoded-password"  # DON'T DO THIS

# ✅ Good: Retrieve from Key Vault
database_password = keyvault.get_secret("database-password")

Identity Security

# ✅ Good: Enable token caching for performance
.with_identity_configuration(
    enable_token_cache=True,
    allow_unencrypted_storage=False  # Use encrypted cache in production
)

# ✅ Good: Use Managed Identity in production
# No additional configuration needed - automatically detected

Environment Security

# ✅ Good: Environment-specific configurations
production_config = {
    "IDENTITY_ALLOW_UNENCRYPTED_STORAGE": "false",
    "LOGGER_LOG_LEVEL": "WARNING"
}

development_config = {
    "IDENTITY_ALLOW_UNENCRYPTED_STORAGE": "true", 
    "LOGGER_LOG_LEVEL": "DEBUG"
}

🛠️ Additional Tools

Cosmos DB Prompt Manager

The CosmosPromptManager provides a robust solution for managing and versioning prompts in a centralized Cosmos DB instance. It features optimized performance through Cosmos DB's integrated cache, batch operations, configurable consistency levels, async support, and comprehensive error handling with retry logic.

Features:

  • Optimized Performance: Leverages Cosmos DB's integrated cache without additional local caching
  • Batch Operations: High-performance batch retrieval and saving of multiple prompts
  • Configurable Consistency: Choose between eventual, bounded, or strong consistency levels
  • Async Support: High-throughput async operations for production workloads
  • Retry Logic: Exponential backoff retry mechanism for resilient operations
  • Health Monitoring: Built-in health checks and performance monitoring

Initialization:

from azpaddypy.builder.directors import AzureResourceDirector
from azpaddypy.tools.cosmos_prompt_manager import create_cosmos_prompt_manager

# Assuming 'management' and 'env_config' are already built
resources = AzureResourceDirector.build_default_config(management, env_config)

# Create a prompt manager instance with enhanced features
prompt_manager = create_cosmos_prompt_manager(
    cosmos_client=resources.cosmos_db,
    database_name="prompts_db",
    container_name="prompts_container",
    service_name="my_app",
    logger=management.logger,
    max_retries=3,                    # Retry configuration
    base_retry_delay=1.0             # Base delay for exponential backoff
)

Basic Usage:

# 1. Save prompts with different data types
prompt_manager.save_prompt(
    prompt_name="greeting_prompt",
    prompt_data="Hello, {{name}}! Welcome to our service."
)

# Save with dictionary data
prompt_manager.save_prompt(
    prompt_name="complex_prompt",
    prompt_data={
        "prompt_template": "You are a helpful assistant for {{domain}}",
        "category": "assistant",
        "temperature": 0.7,
        "max_tokens": 150
    }
)

# 2. Retrieve prompts with configurable consistency
greeting = prompt_manager.get_prompt("greeting_prompt")  # Default: bounded consistency
greeting_eventual = prompt_manager.get_prompt("greeting_prompt", consistency_level="eventual")
greeting_strong = prompt_manager.get_prompt("greeting_prompt", consistency_level="strong")

# 3. List all available prompts
all_prompts = prompt_manager.list_prompts()
print(f"Found {len(all_prompts)} prompts")

# 4. Get detailed information for all prompts
all_details = prompt_manager.get_all_prompt_details()
for detail in all_details:
    print(f"ID: {detail['id']}, Template: {detail['prompt_template']}")

# 5. Delete a prompt
success = prompt_manager.delete_prompt("greeting_prompt")
print(f"Deletion successful: {success}")

Batch Operations for High Performance:

# Batch retrieval of multiple prompts
prompt_names = ["greeting", "farewell", "error_handling", "system_prompt"]
batch_results = prompt_manager.get_prompts_batch(
    prompt_names=prompt_names,
    consistency_level="eventual"  # Use eventual consistency for better performance
)

# Process results
for name, template in batch_results.items():
    if template:
        print(f"{name}: {template}")
    else:
        print(f"{name}: Not found")

# Batch saving of multiple prompts
prompts_to_save = [
    {
        "name": "greeting",
        "data": "Hello! How can I help you today?"
    },
    {
        "name": "farewell", 
        "data": "Thank you for using our service. Goodbye!"
    },
    {
        "name": "error_handling",
        "data": {
            "prompt_template": "I apologize, but I encountered an error: {{error_message}}",
            "category": "error",
            "priority": "high"
        }
    },
    {
        "name": "system_prompt",
        "data": "You are a helpful, harmless, and honest AI assistant."
    }
]

batch_save_results = prompt_manager.save_prompts_batch(prompts_to_save)
successful_saves = [name for name, success in batch_save_results.items() if success]
print(f"Successfully saved {len(successful_saves)} prompts")

Async Operations for High-Throughput Scenarios:

import asyncio

async def async_prompt_operations():
    """Example of async operations for high-throughput scenarios."""
    
    # Async prompt retrieval
    async with prompt_manager.async_context() as manager:
        tasks = [
            manager.get_prompt_async("greeting"),
            manager.get_prompt_async("farewell"),
            manager.get_prompt_async("system_prompt")
        ]
        
        results = await asyncio.gather(*tasks)
        
        for i, result in enumerate(results):
            if result:
                print(f"Prompt {i}: {result}")

# Run async operations
asyncio.run(async_prompt_operations())

Health Monitoring and Diagnostics:

# Perform health check
health_status = prompt_manager.health_check()

print(f"Overall Status: {health_status['status']}")
print(f"Timestamp: {health_status['timestamp']}")

# Check individual components
if health_status['status'] == 'healthy':
    checks = health_status['checks']
    print(f"Database Connection: {checks['database_connection']['response_time_ms']}ms")
    print(f"Container Access: {checks['container_access']['response_time_ms']}ms")
    print(f"Prompt Count: {checks['basic_operations']['prompt_count']}")
else:
    print(f"Health Check Failed: {health_status.get('error', 'Unknown error')}")

Production Configuration:

# Production-ready configuration with optimized settings
production_prompt_manager = create_cosmos_prompt_manager(
    cosmos_client=resources.cosmos_db,
    database_name="production_prompts",
    container_name="prompts",
    service_name="production_ai_service",
    service_version="1.0.0",
    logger=management.logger,
    max_retries=5,           # More retries for production
    base_retry_delay=2.0     # Longer base delay for production
)

# Use eventual consistency for better performance in production
production_prompts = production_prompt_manager.get_prompts_batch(
    prompt_names=["system", "user", "assistant"],
    consistency_level="eventual"
)

🚀 Production Deployment

Azure Functions

# function_app.py
import azure.functions as func
from azpaddypy.builder.directors import AzureManagementDirector, ConfigurationSetupDirector

# Initialize once at module level
env_config = ConfigurationSetupDirector.build_default_config()
management = AzureManagementDirector.build_default_config(env_config)

app = func.FunctionApp()

@app.function_name("HttpTrigger")
@app.route(route="api/data")
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
    management.logger.info("Function triggered")
    
    # Access secrets
    api_key = management.keyvault.get_secret("external-api-key")
    
    # Your function logic here
    return func.HttpResponse("Success")

Docker Deployment

# Dockerfile
FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

# Set environment variables
ENV REFLECTION_KIND=functionapp
ENV LOGGER_LOG_LEVEL=INFO

CMD ["python", "app.py"]

Environment Configuration

# docker-compose.yml
version: '3.8'
services:
  app:
    build: .
    environment:
      - key_vault_uri=https://prod-vault.vault.azure.net/
      - STORAGE_ACCOUNT_URL=https://prodstorage.blob.core.windows.net/
      - APPLICATIONINSIGHTS_CONNECTION_STRING=${APP_INSIGHTS_CONN_STRING}
    depends_on:
      - azurite
  
  azurite:
    image: mcr.microsoft.com/azure-storage/azurite
    ports:
      - "10000:10000"
      - "10001:10001" 
      - "10002:10002"

🧪 Testing

# test_azpaddypy.py
import pytest
from azpaddypy.builder import ConfigurationSetupBuilder, AzureManagementBuilder

def test_configuration_setup():
    """Test basic configuration setup."""
    env_config = (
        ConfigurationSetupBuilder()
        .with_local_env_management()
        .with_environment_detection()
        .build()
    )
    
    assert env_config.service_name is not None
    assert env_config.logger_log_level in ["DEBUG", "INFO", "WARNING", "ERROR"]

@pytest.mark.asyncio
async def test_key_vault_integration():
    """Test Key Vault integration.""" 
    management = (
        AzureManagementBuilder(env_config)
        .with_identity()
        .with_keyvault(vault_url="https://test-vault.vault.azure.net/")
        .build()
    )
    
    # Test secret retrieval (requires actual vault in integration tests)
    # secret = management.keyvault.get_secret("test-secret")
    # assert secret is not None

def test_storage_operations():
    """Test storage operations."""
    resources = (
        AzureResourceBuilder(management, env_config)
        .with_storage(account_url="https://teststorage.blob.core.windows.net/")
        .build()
    )
    
    # Test storage client creation
    assert resources.storage_accounts["default"] is not None

🤝 Contributing

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Make your changes
  4. Add tests for new functionality
  5. Run tests (uv run pytest)
  6. Commit your changes (git commit -m 'Add amazing feature')
  7. Push to the branch (git push origin feature/amazing-feature)
  8. Open a Pull Request

📄 License

This project is licensed under the MIT License - see the LICENSE file for details.

📞 Support

🔄 Changelog

v0.7.9

  • Enhanced builder patterns for Azure services
  • OpenTelemetry integration for advanced tracing
  • Environment detection and local development support
  • Multi-Key Vault support with named configurations
  • Enhanced storage operations with unified APIs
  • CosmosDB integration with unified client
  • Comprehensive logging with Application Insights

Made with ❤️ for Azure developers

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

azpaddypy-0.9.2.tar.gz (107.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

azpaddypy-0.9.2-py3-none-any.whl (61.5 kB view details)

Uploaded Python 3

File details

Details for the file azpaddypy-0.9.2.tar.gz.

File metadata

  • Download URL: azpaddypy-0.9.2.tar.gz
  • Upload date:
  • Size: 107.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.7.20

File hashes

Hashes for azpaddypy-0.9.2.tar.gz
Algorithm Hash digest
SHA256 136875b8f61b157070f31d6f660cf15555a3d77d0c739aed68dab954adbe42da
MD5 5133d7c7170d4ceacbf9b2f53791f076
BLAKE2b-256 16b475cfeb1626b40ed7b5dd0a75818ada532f5552d46884d4904ac062d82386

See more details on using hashes here.

File details

Details for the file azpaddypy-0.9.2-py3-none-any.whl.

File metadata

  • Download URL: azpaddypy-0.9.2-py3-none-any.whl
  • Upload date:
  • Size: 61.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.7.20

File hashes

Hashes for azpaddypy-0.9.2-py3-none-any.whl
Algorithm Hash digest
SHA256 0286ed8b000c2e0666a6ea2b9ef35e41a528095b6677ca5d219a373c3ca4f84c
MD5 06931448535aa9ac3080140117d7573d
BLAKE2b-256 6d4f40d793d42d52626003307a80b8af43e43f2b4c0eb80d17f0e8e098c9542f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page