Comprehensive Python logger for Azure, integrating OpenTelemetry for advanced, structured, and distributed tracing.
Project description
AzPaddyPy
AzPaddyPy is a comprehensive Python package for Azure cloud services integration with standardized configuration management, OpenTelemetry tracing, and builder patterns. It simplifies Azure service orchestration while providing flexible, production-ready patterns for complex cloud applications.
🌟 Key Features
- 🔐 Azure Identity Management - Token caching, multiple credential types, seamless authentication
- 🗝️ Azure Key Vault Integration - Secrets, keys, and certificates management
- 💾 Azure Storage Operations - Blob, file, and queue storage with unified APIs
- 📊 Comprehensive Logging - Application Insights integration with OpenTelemetry tracing
- 🏗️ Builder Patterns - Flexible service composition and configuration
- 🌍 Environment Detection - Docker vs local development with smart defaults
- ⚙️ Configuration Management - Environment variables, .env files, and service discovery
- 🗄️ CosmosDB Integration - Document database operations with unified client
- 🤖 Cosmos DB Prompt Management - Centralized prompt storage and management
📦 Installation
# Install with pip
pip install azpaddypy
# Install with uv (recommended)
uv add azpaddypy
# Install with development dependencies
uv add azpaddypy[dev]
🚀 Quick Start
Simple Usage (Direct Imports)
from azpaddypy import logger, identity, keyvault, storage_account
# Use logger for application logging
logger.info("Application started")
# Use identity for Azure authentication
token = identity.get_token("https://management.azure.com/.default")
# Access secrets from Key Vault
secret_value = keyvault.get_secret("my-secret")
# Use storage services
blob_client = storage_account.blob_service_client
Builder Pattern Usage (Recommended)
from azpaddypy.builder import (
ConfigurationSetupBuilder,
AzureManagementBuilder,
AzureResourceBuilder
)
# 1. Setup environment configuration
env_config = (
ConfigurationSetupBuilder()
.with_local_env_management() # Load .env files (FIRST)
.with_environment_detection() # Detect Docker vs local
.with_service_configuration() # Parse service settings
.with_logging_configuration() # Setup logging
.with_identity_configuration() # Configure authentication
.build()
)
# 2. Build management services (logger, identity, key vault)
management = (
AzureManagementBuilder(env_config)
.with_logger()
.with_identity()
.with_keyvault(vault_url="https://my-vault.vault.azure.net/")
.build()
)
# 3. Build resource services (storage, etc.)
resources = (
AzureResourceBuilder(management, env_config)
.with_storage(account_url="https://mystorageaccount.blob.core.windows.net/")
.build()
)
# 4. Use the configured services
management.logger.info("Services configured successfully")
secret = management.keyvault.get_secret("database-password")
blob_client = resources.storage_account.blob_service_client
🔧 Configuration
Environment Variables
Create a .env file or set environment variables:
# Required: Key Vault Configuration
key_vault_uri=https://my-vault.vault.azure.net/
head_key_vault_uri=https://my-admin-vault.vault.azure.net/
# Required: Storage Configuration
STORAGE_ACCOUNT_URL=https://mystorageaccount.blob.core.windows.net/
# Optional: Service Configuration
REFLECTION_NAME=my-application
REFLECTION_KIND=functionapp
SERVICE_VERSION=1.0.0
# Optional: Logging Configuration
LOGGER_LOG_LEVEL=INFO
APPLICATIONINSIGHTS_CONNECTION_STRING=InstrumentationKey=...
# Optional: Identity Configuration
IDENTITY_ENABLE_TOKEN_CACHE=true
IDENTITY_ALLOW_UNENCRYPTED_STORAGE=true
# Optional: Feature Toggles
KEYVAULT_ENABLE_SECRETS=true
KEYVAULT_ENABLE_KEYS=false
KEYVAULT_ENABLE_CERTIFICATES=false
STORAGE_ENABLE_BLOB=true
STORAGE_ENABLE_FILE=true
STORAGE_ENABLE_QUEUE=true
Azure Authentication
AzPaddyPy supports multiple authentication methods automatically:
Local Development:
# Option 1: Azure CLI (recommended)
az login
# Option 2: Environment variables
export AZURE_CLIENT_ID=your-client-id
export AZURE_TENANT_ID=your-tenant-id
export AZURE_CLIENT_SECRET=your-client-secret
Production (Azure):
- Managed Identity (automatically detected)
- Service Principal (via environment variables)
📚 Usage Examples
1. Basic Logging Setup
from azpaddypy.builder.directors import ConfigurationSetupDirector, AzureManagementDirector
# Quick setup with sensible defaults
env_config = ConfigurationSetupDirector.build_default_config()
management = AzureManagementDirector.build_default_config(env_config)
logger = management.logger
# Basic logging
logger.info("Application started")
logger.warning("This is a warning")
logger.error("This is an error")
# Structured logging with custom fields
logger.info("User action", extra={
"user_id": "12345",
"action": "login",
"ip_address": "192.168.1.1",
"session_id": "abc123"
})
# Error logging with context
try:
risky_operation()
except Exception as e:
logger.error("Operation failed", extra={
"error_type": type(e).__name__,
"operation": "risky_operation",
"user_id": "12345"
}, exc_info=True)
2. Key Vault Operations
from azpaddypy.builder.directors import AzureManagementDirector, ConfigurationSetupDirector
# Setup
env_config = ConfigurationSetupDirector.build_default_config()
management = (
AzureManagementDirector.build_default_config(env_config)
.with_keyvault(name="primary", vault_url="https://my-vault.vault.azure.net/")
.with_keyvault(name="admin", vault_url="https://my-admin-vault.vault.azure.net/")
.build()
)
# Use Key Vault clients by name
primary_secret = management.keyvault.primary.get_secret("my-secret")
admin_key = management.keyvault.admin.get_key("my-key")
3. Storage Operations
from azpaddypy.builder.directors import AzureResourceDirector
# Build storage configuration
resources = (
AzureResourceDirector.build_default_config(env_config, management)
.with_storage(
name="main",
account_url="https://mystorageaccount.blob.core.windows.net/",
enable_blob=True,
enable_file=True,
enable_queue=True
)
.build()
)
storage = resources.storage_accounts["main"]
# Blob Storage Operations
blob_client = storage.blob_service_client
container_client = blob_client.get_container_client("my-container")
# Upload a file
with open("local-file.txt", "rb") as data:
container_client.upload_blob(name="remote-file.txt", data=data)
# Download a file
with open("downloaded-file.txt", "wb") as data:
container_client.download_blob(name="remote-file.txt").readinto(data)
# List blobs
blobs = container_client.list_blobs()
for blob in blobs:
print(f"Blob: {blob.name}")
# Delete blob
container_client.delete_blob("remote-file.txt")
# Queue Storage Operations
queue_client = storage.queue_service_client
queue = queue_client.get_queue_client("my-queue")
# Send message
queue.send_message("Hello from azpaddypy!")
# Receive messages
messages = queue.receive_messages()
for message in messages:
print(f"Message: {message.content}")
queue.delete_message(message)
# File Share Operations
file_client = storage.file_service_client
share_client = file_client.get_share_client("my-share")
# Create directory
directory_client = share_client.get_directory_client("my-directory")
directory_client.create_directory()
# Upload file to file share
file_client = directory_client.get_file_client("my-file.txt")
with open("local-file.txt", "rb") as data:
file_client.upload_file(data)
4. CosmosDB Operations
from azpaddypy.builder.directors import AzureResourceDirector
# Build configuration with CosmosDB
resources = (
AzureResourceDirector.build_default_config(env_config, management)
.with_cosmosdb(
name="main",
endpoint="https://my-cosmosdb.documents.azure.com:443/",
database_name="my-database"
)
.build()
)
cosmosdb = resources.cosmosdb_accounts["main"]
# Create container
container = cosmosdb.create_container(
container_name="users",
partition_key_path="/id"
)
# Insert document
document = {
"id": "user123",
"name": "John Doe",
"email": "john@example.com",
"created_at": "2024-01-01T00:00:00Z"
}
cosmosdb.insert_document("users", document)
# Query documents
query = "SELECT * FROM c WHERE c.name = @name"
parameters = [{"name": "@name", "value": "John Doe"}]
results = cosmosdb.query_documents("users", query, parameters)
for doc in results:
print(f"User: {doc['name']}")
# Update document
updated_doc = {
"id": "user123",
"name": "John Doe",
"email": "john.updated@example.com",
"updated_at": "2024-01-02T00:00:00Z"
}
cosmosdb.upsert_document("users", updated_doc)
# Delete document by id and partition key
cosmosdb.delete_item(
database_name="my-database",
container_name="users",
item_id="user123",
partition_key="user123"
)
5. Advanced Configuration Patterns
from azpaddypy.builder import ConfigurationSetupBuilder, AzureManagementBuilder, AzureResourceBuilder
# Environment-specific configuration
local_config = {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"DATABASE_URL": "sqlite:///local.db",
"LOGGER_LOG_LEVEL": "DEBUG"
}
production_config = {
"LOGGER_LOG_LEVEL": "WARNING",
"IDENTITY_ALLOW_UNENCRYPTED_STORAGE": "false"
}
# Build environment configuration
env_config = (
ConfigurationSetupBuilder()
.with_local_env_management()
.with_environment_detection()
.with_environment_variables(
local_config,
in_docker=False, # Don't apply in Docker
in_machine=True # Apply on local machine
)
.with_service_configuration()
.with_logging_configuration()
.with_identity_configuration()
.build()
)
# Build management with multiple Key Vaults
management = (
AzureManagementBuilder(env_config)
.with_logger(
log_level="DEBUG",
enable_console=True,
enable_application_insights=True
)
.with_identity(
enable_token_cache=True,
allow_unencrypted_storage=True
)
.with_keyvault(
name="primary",
vault_url="https://primary-vault.vault.azure.net/",
enable_secrets=True,
enable_keys=True,
enable_certificates=False
)
.with_keyvault(
name="admin",
vault_url="https://admin-vault.vault.azure.net/",
enable_secrets=True,
enable_keys=False,
enable_certificates=True
)
.build()
)
# Build resources with multiple storage accounts
resources = (
AzureResourceBuilder(management, env_config)
.with_storage(
name="main",
account_url="https://mainstorage.blob.core.windows.net/",
enable_blob=True,
enable_file=True,
enable_queue=True
)
.with_storage(
name="backup",
account_url="https://backupstorage.blob.core.windows.net/",
enable_blob=True,
enable_file=False,
enable_queue=False
)
.build()
)
6. Azure Functions Integration
# function_app.py
import azure.functions as func
from azpaddypy.builder.directors import AzureManagementDirector, ConfigurationSetupDirector
# Initialize once at module level
env_config = ConfigurationSetupDirector.build_default_config()
management = AzureManagementDirector.build_default_config(env_config)
app = func.FunctionApp()
@app.function_name("HttpTrigger")
@app.route(route="api/data")
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
management.logger.info("Function triggered", extra={
"function_name": "HttpTrigger",
"request_method": req.method,
"request_url": req.url
})
try:
# Access secrets
api_key = management.keyvault.get_secret("external-api-key")
# Your function logic here
result = process_request(req, api_key)
management.logger.info("Function completed successfully", extra={
"function_name": "HttpTrigger",
"result_status": "success"
})
return func.HttpResponse(result)
except Exception as e:
management.logger.error("Function failed", extra={
"function_name": "HttpTrigger",
"error_type": type(e).__name__,
"error_message": str(e)
}, exc_info=True)
return func.HttpResponse(
"Internal server error",
status_code=500
)
def process_request(req: func.HttpRequest, api_key: str):
# Your business logic here
return "Success"
7. Web Application Integration
# app.py (Flask example)
from flask import Flask, request, jsonify
from azpaddypy.builder.directors import AzureResourceDirector, AzureManagementDirector, ConfigurationSetupDirector
# Initialize Azure services
env_config = ConfigurationSetupDirector.build_default_config()
management = AzureManagementDirector.build_default_config(env_config)
resources = AzureResourceDirector.build_default_config(env_config, management)
app = Flask(__name__)
@app.route('/api/users', methods=['GET'])
def get_users():
management.logger.info("GET /api/users", extra={
"user_id": request.headers.get('X-User-ID'),
"ip_address": request.remote_addr
})
try:
# Query CosmosDB for users
users = resources.cosmosdb_accounts["main"].query_documents(
"users",
"SELECT * FROM c"
)
return jsonify(list(users))
except Exception as e:
management.logger.error("Failed to get users", extra={
"error_type": type(e).__name__,
"error_message": str(e)
}, exc_info=True)
return jsonify({"error": "Internal server error"}), 500
@app.route('/api/users', methods=['POST'])
def create_user():
user_data = request.json
try:
# Store user in CosmosDB
user_id = resources.cosmosdb_accounts["main"].insert_document(
"users",
user_data
)
# Store user avatar in blob storage
if 'avatar' in user_data:
blob_client = resources.storage_accounts["main"].blob_service_client
container_client = blob_client.get_container_client("avatars")
container_client.upload_blob(
name=f"{user_id}.jpg",
data=user_data['avatar']
)
management.logger.info("User created", extra={
"user_id": user_id,
"user_email": user_data.get('email')
})
return jsonify({"id": user_id}), 201
except Exception as e:
management.logger.error("Failed to create user", extra={
"error_type": type(e).__name__,
"error_message": str(e)
}, exc_info=True)
return jsonify({"error": "Internal server error"}), 500
8. Background Job Processing
# job_processor.py
import time
from azpaddypy.builder.directors import AzureResourceDirector, AzureManagementDirector, ConfigurationSetupDirector
# Initialize Azure services
env_config = ConfigurationSetupDirector.build_default_config()
management = AzureManagementDirector.build_default_config(env_config)
resources = AzureResourceDirector.build_default_config(env_config, management)
def process_jobs():
"""Background job processor using Azure Queue Storage."""
queue_client = resources.storage_accounts["main"].queue_service_client
queue = queue_client.get_queue_client("job-queue")
management.logger.info("Starting job processor")
while True:
try:
# Receive messages from queue
messages = queue.receive_messages(max_messages=10, visibility_timeout=300)
for message in messages:
try:
management.logger.info("Processing job", extra={
"job_id": message.id,
"job_content": message.content
})
# Process the job
result = process_job(message.content)
# Store result in blob storage
blob_client = resources.storage_accounts["main"].blob_service_client
container_client = blob_client.get_container_client("job-results")
container_client.upload_blob(
name=f"{message.id}.json",
data=result
)
# Delete message from queue
queue.delete_message(message)
management.logger.info("Job completed", extra={
"job_id": message.id,
"status": "success"
})
except Exception as e:
management.logger.error("Job failed", extra={
"job_id": message.id,
"error_type": type(e).__name__,
"error_message": str(e)
}, exc_info=True)
# Message will return to queue after visibility timeout
time.sleep(5) # Wait before next poll
except Exception as e:
management.logger.error("Queue processing error", extra={
"error_type": type(e).__name__,
"error_message": str(e)
}, exc_info=True)
time.sleep(30) # Wait longer on error
def process_job(job_data):
"""Process individual job."""
# Your job processing logic here
return {"status": "completed", "result": "success"}
9. Data Pipeline Example
# data_pipeline.py
from azpaddypy.builder.directors import AzureResourceDirector, AzureManagementDirector, ConfigurationSetupDirector
# Initialize Azure services
env_config = ConfigurationSetupDirector.build_default_config()
management = AzureManagementDirector.build_default_config(env_config)
resources = AzureResourceDirector.build_default_config(env_config, management)
def process_data_pipeline():
"""Example data pipeline using multiple Azure services."""
management.logger.info("Starting data pipeline")
try:
# 1. Read data from blob storage
blob_client = resources.storage_accounts["main"].blob_service_client
container_client = blob_client.get_container_client("raw-data")
blobs = container_client.list_blobs()
processed_data = []
for blob in blobs:
# Download blob
blob_data = container_client.download_blob(blob.name).read()
# Process data
processed_item = process_data_item(blob_data)
processed_data.append(processed_item)
management.logger.info("Processed blob", extra={
"blob_name": blob.name,
"processed_size": len(processed_item)
})
# 2. Store processed data in CosmosDB
for item in processed_data:
resources.cosmosdb_accounts["main"].upsert_document(
"processed-data",
item
)
# 3. Send notification to queue
queue_client = resources.storage_accounts["main"].queue_service_client
queue = queue_client.get_queue_client("notifications")
queue.send_message(f"Pipeline completed: {len(processed_data)} items processed")
management.logger.info("Data pipeline completed", extra={
"items_processed": len(processed_data)
})
except Exception as e:
management.logger.error("Data pipeline failed", extra={
"error_type": type(e).__name__,
"error_message": str(e)
}, exc_info=True)
raise
def process_data_item(data):
"""Process individual data item."""
# Your data processing logic here
return {"processed": True, "data": data.decode()}
10. Multi-Environment Configuration
# config_manager.py
import os
from azpaddypy.builder import ConfigurationSetupBuilder, AzureManagementBuilder, AzureResourceBuilder
class EnvironmentManager:
"""Manages different environment configurations."""
@staticmethod
def get_development_config():
"""Development environment configuration."""
local_config = {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"LOGGER_LOG_LEVEL": "DEBUG",
"IDENTITY_ALLOW_UNENCRYPTED_STORAGE": "true"
}
env_config = (
ConfigurationSetupBuilder()
.with_local_env_management()
.with_environment_detection()
.with_environment_variables(local_config, in_docker=False, in_machine=True)
.with_service_configuration()
.with_logging_configuration()
.with_identity_configuration()
.build()
)
management = (
AzureManagementBuilder(env_config)
.with_logger(log_level="DEBUG")
.with_identity(allow_unencrypted_storage=True)
.with_keyvault(vault_url="https://dev-vault.vault.azure.net/")
.build()
)
resources = (
AzureResourceBuilder(management, env_config)
.with_storage(account_url="https://devstorage.blob.core.windows.net/")
.with_cosmosdb(
endpoint="https://dev-cosmosdb.documents.azure.com:443/",
database_name="dev-database"
)
.build()
)
return management, resources
@staticmethod
def get_production_config():
"""Production environment configuration."""
production_config = {
"LOGGER_LOG_LEVEL": "WARNING",
"IDENTITY_ALLOW_UNENCRYPTED_STORAGE": "false"
}
env_config = (
ConfigurationSetupBuilder()
.with_local_env_management()
.with_environment_detection()
.with_environment_variables(production_config, in_docker=True, in_machine=False)
.with_service_configuration()
.with_logging_configuration()
.with_identity_configuration()
.build()
)
management = (
AzureManagementBuilder(env_config)
.with_logger(log_level="WARNING")
.with_identity(allow_unencrypted_storage=False)
.with_keyvault(vault_url="https://prod-vault.vault.azure.net/")
.build()
)
resources = (
AzureResourceBuilder(management, env_config)
.with_storage(account_url="https://prodstorage.blob.core.windows.net/")
.with_cosmosdb(
endpoint="https://prod-cosmosdb.documents.azure.com:443/",
database_name="prod-database"
)
.build()
)
return management, resources
# Usage
if os.getenv("ENVIRONMENT") == "production":
management, resources = EnvironmentManager.get_production_config()
else:
management, resources = EnvironmentManager.get_development_config()
🏗️ Architecture
AzPaddyPy follows a layered builder pattern architecture:
┌─────────────────────────────────────┐
│ Application Layer │
├─────────────────────────────────────┤
│ AzureConfiguration (Combined) │
├─────────────────────────────────────┤
│ AzureResourceConfiguration │
│ - Storage Accounts │
│ - CosmosDB Accounts │
│ - Additional Resources │
├─────────────────────────────────────┤
│ AzureManagementConfiguration │
│ - Logger (App Insights) │
│ - Identity (Token Cache) │
│ - Key Vaults │
├─────────────────────────────────────┤
│ EnvironmentConfiguration │
│ - Environment Detection │
│ - Service Configuration │
│ - Local Development Support │
└─────────────────────────────────────┘
Builder Flow
- ConfigurationSetupBuilder - Environment setup and detection
- AzureManagementBuilder - Core management services
- AzureResourceBuilder - Azure resource services
- Directors - Pre-configured common patterns
🔒 Security Best Practices
Key Vault Security
# ✅ Good: Use specific vault URLs
management.with_keyvault(vault_url="https://prod-vault.vault.azure.net/")
# ❌ Avoid: Hardcoding secrets
database_password = "hardcoded-password" # DON'T DO THIS
# ✅ Good: Retrieve from Key Vault
database_password = keyvault.get_secret("database-password")
Identity Security
# ✅ Good: Enable token caching for performance
.with_identity_configuration(
enable_token_cache=True,
allow_unencrypted_storage=False # Use encrypted cache in production
)
# ✅ Good: Use Managed Identity in production
# No additional configuration needed - automatically detected
Environment Security
# ✅ Good: Environment-specific configurations
production_config = {
"IDENTITY_ALLOW_UNENCRYPTED_STORAGE": "false",
"LOGGER_LOG_LEVEL": "WARNING"
}
development_config = {
"IDENTITY_ALLOW_UNENCRYPTED_STORAGE": "true",
"LOGGER_LOG_LEVEL": "DEBUG"
}
🛠️ Additional Tools
Cosmos DB Prompt Manager
The CosmosPromptManager provides a robust solution for managing and versioning prompts in a centralized Cosmos DB instance. It features optimized performance through Cosmos DB's integrated cache, batch operations, configurable consistency levels, async support, and comprehensive error handling with retry logic.
Features:
- Optimized Performance: Leverages Cosmos DB's integrated cache without additional local caching
- Batch Operations: High-performance batch retrieval and saving of multiple prompts
- Configurable Consistency: Choose between eventual, bounded, or strong consistency levels
- Async Support: High-throughput async operations for production workloads
- Retry Logic: Exponential backoff retry mechanism for resilient operations
- Health Monitoring: Built-in health checks and performance monitoring
Initialization:
from azpaddypy.builder.directors import AzureResourceDirector
from azpaddypy.tools.cosmos_prompt_manager import create_cosmos_prompt_manager
# Assuming 'management' and 'env_config' are already built
resources = AzureResourceDirector.build_default_config(management, env_config)
# Create a prompt manager instance with enhanced features
prompt_manager = create_cosmos_prompt_manager(
cosmos_client=resources.cosmos_db,
database_name="prompts_db",
container_name="prompts_container",
service_name="my_app",
logger=management.logger,
max_retries=3, # Retry configuration
base_retry_delay=1.0 # Base delay for exponential backoff
)
Basic Usage:
# 1. Save prompts with different data types
prompt_manager.save_prompt(
prompt_name="greeting_prompt",
prompt_data="Hello, {{name}}! Welcome to our service."
)
# Save with dictionary data
prompt_manager.save_prompt(
prompt_name="complex_prompt",
prompt_data={
"prompt_template": "You are a helpful assistant for {{domain}}",
"category": "assistant",
"temperature": 0.7,
"max_tokens": 150
}
)
# 2. Retrieve prompts with configurable consistency
greeting = prompt_manager.get_prompt("greeting_prompt") # Default: bounded consistency
greeting_eventual = prompt_manager.get_prompt("greeting_prompt", consistency_level="eventual")
greeting_strong = prompt_manager.get_prompt("greeting_prompt", consistency_level="strong")
# 3. List all available prompts
all_prompts = prompt_manager.list_prompts()
print(f"Found {len(all_prompts)} prompts")
# 4. Get detailed information for all prompts
all_details = prompt_manager.get_all_prompt_details()
for detail in all_details:
print(f"ID: {detail['id']}, Template: {detail['prompt_template']}")
# 5. Delete a prompt
success = prompt_manager.delete_prompt("greeting_prompt")
print(f"Deletion successful: {success}")
Batch Operations for High Performance:
# Batch retrieval of multiple prompts
prompt_names = ["greeting", "farewell", "error_handling", "system_prompt"]
batch_results = prompt_manager.get_prompts_batch(
prompt_names=prompt_names,
consistency_level="eventual" # Use eventual consistency for better performance
)
# Process results
for name, template in batch_results.items():
if template:
print(f"{name}: {template}")
else:
print(f"{name}: Not found")
# Batch saving of multiple prompts
prompts_to_save = [
{
"name": "greeting",
"data": "Hello! How can I help you today?"
},
{
"name": "farewell",
"data": "Thank you for using our service. Goodbye!"
},
{
"name": "error_handling",
"data": {
"prompt_template": "I apologize, but I encountered an error: {{error_message}}",
"category": "error",
"priority": "high"
}
},
{
"name": "system_prompt",
"data": "You are a helpful, harmless, and honest AI assistant."
}
]
batch_save_results = prompt_manager.save_prompts_batch(prompts_to_save)
successful_saves = [name for name, success in batch_save_results.items() if success]
print(f"Successfully saved {len(successful_saves)} prompts")
Async Operations for High-Throughput Scenarios:
import asyncio
async def async_prompt_operations():
"""Example of async operations for high-throughput scenarios."""
# Async prompt retrieval
async with prompt_manager.async_context() as manager:
tasks = [
manager.get_prompt_async("greeting"),
manager.get_prompt_async("farewell"),
manager.get_prompt_async("system_prompt")
]
results = await asyncio.gather(*tasks)
for i, result in enumerate(results):
if result:
print(f"Prompt {i}: {result}")
# Run async operations
asyncio.run(async_prompt_operations())
Health Monitoring and Diagnostics:
# Perform health check
health_status = prompt_manager.health_check()
print(f"Overall Status: {health_status['status']}")
print(f"Timestamp: {health_status['timestamp']}")
# Check individual components
if health_status['status'] == 'healthy':
checks = health_status['checks']
print(f"Database Connection: {checks['database_connection']['response_time_ms']}ms")
print(f"Container Access: {checks['container_access']['response_time_ms']}ms")
print(f"Prompt Count: {checks['basic_operations']['prompt_count']}")
else:
print(f"Health Check Failed: {health_status.get('error', 'Unknown error')}")
Production Configuration:
# Production-ready configuration with optimized settings
production_prompt_manager = create_cosmos_prompt_manager(
cosmos_client=resources.cosmos_db,
database_name="production_prompts",
container_name="prompts",
service_name="production_ai_service",
service_version="1.0.0",
logger=management.logger,
max_retries=5, # More retries for production
base_retry_delay=2.0 # Longer base delay for production
)
# Use eventual consistency for better performance in production
production_prompts = production_prompt_manager.get_prompts_batch(
prompt_names=["system", "user", "assistant"],
consistency_level="eventual"
)
🚀 Production Deployment
Azure Functions
# function_app.py
import azure.functions as func
from azpaddypy.builder.directors import AzureManagementDirector, ConfigurationSetupDirector
# Initialize once at module level
env_config = ConfigurationSetupDirector.build_default_config()
management = AzureManagementDirector.build_default_config(env_config)
app = func.FunctionApp()
@app.function_name("HttpTrigger")
@app.route(route="api/data")
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
management.logger.info("Function triggered")
# Access secrets
api_key = management.keyvault.get_secret("external-api-key")
# Your function logic here
return func.HttpResponse("Success")
Docker Deployment
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
# Set environment variables
ENV REFLECTION_KIND=functionapp
ENV LOGGER_LOG_LEVEL=INFO
CMD ["python", "app.py"]
Environment Configuration
# docker-compose.yml
version: '3.8'
services:
app:
build: .
environment:
- key_vault_uri=https://prod-vault.vault.azure.net/
- STORAGE_ACCOUNT_URL=https://prodstorage.blob.core.windows.net/
- APPLICATIONINSIGHTS_CONNECTION_STRING=${APP_INSIGHTS_CONN_STRING}
depends_on:
- azurite
azurite:
image: mcr.microsoft.com/azure-storage/azurite
ports:
- "10000:10000"
- "10001:10001"
- "10002:10002"
🧪 Testing
# test_azpaddypy.py
import pytest
from azpaddypy.builder import ConfigurationSetupBuilder, AzureManagementBuilder
def test_configuration_setup():
"""Test basic configuration setup."""
env_config = (
ConfigurationSetupBuilder()
.with_local_env_management()
.with_environment_detection()
.build()
)
assert env_config.service_name is not None
assert env_config.logger_log_level in ["DEBUG", "INFO", "WARNING", "ERROR"]
@pytest.mark.asyncio
async def test_key_vault_integration():
"""Test Key Vault integration."""
management = (
AzureManagementBuilder(env_config)
.with_identity()
.with_keyvault(vault_url="https://test-vault.vault.azure.net/")
.build()
)
# Test secret retrieval (requires actual vault in integration tests)
# secret = management.keyvault.get_secret("test-secret")
# assert secret is not None
def test_storage_operations():
"""Test storage operations."""
resources = (
AzureResourceBuilder(management, env_config)
.with_storage(account_url="https://teststorage.blob.core.windows.net/")
.build()
)
# Test storage client creation
assert resources.storage_accounts["default"] is not None
🤝 Contributing
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Make your changes
- Add tests for new functionality
- Run tests (
uv run pytest) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
📄 License
This project is licensed under the MIT License - see the LICENSE file for details.
📞 Support
- Documentation: GitHub Repository
- Issues: GitHub Issues
- Discussions: GitHub Discussions
🔄 Changelog
v0.7.9
- Enhanced builder patterns for Azure services
- OpenTelemetry integration for advanced tracing
- Environment detection and local development support
- Multi-Key Vault support with named configurations
- Enhanced storage operations with unified APIs
- CosmosDB integration with unified client
- Comprehensive logging with Application Insights
Made with ❤️ for Azure developers
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file azpaddypy-0.9.5.tar.gz.
File metadata
- Download URL: azpaddypy-0.9.5.tar.gz
- Upload date:
- Size: 109.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.20
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
eb871fc19e9c678291aec547ae59d85aa399f0cb08c2225757e4aa6e86b0a64a
|
|
| MD5 |
f13d68cfb0d4f276fc0e7d8f76a6922d
|
|
| BLAKE2b-256 |
1215e1f2ac83b087f8a70079e09ea53a33929eb853c89a7c9d6b3f985c5e260e
|
File details
Details for the file azpaddypy-0.9.5-py3-none-any.whl.
File metadata
- Download URL: azpaddypy-0.9.5-py3-none-any.whl
- Upload date:
- Size: 62.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.20
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
43165fe7a054fcf188cb37aecb71a0a5433e3b5ca1975eb94654577fb02f28e3
|
|
| MD5 |
300ddac9dcc16b7cf6e601d4958a617a
|
|
| BLAKE2b-256 |
802c7b64d87753a9ced8ef6dd05a9ac1a9856585f2843758f6e7c913eb304c0f
|