Python client for Azure Resource Graph API with storage encryption analysis
Project description
Azure Resource Graph - Storage Encryption Analysis
A Python client for analyzing Azure storage encryption compliance across your applications using the Azure Resource Graph API.
🎯 What This Tool Does
This tool helps you automatically discover and analyze storage encryption across your Azure environment:
✅ Discovers all storage resources associated with your applications
✅ Analyzes encryption methods (Platform Managed, Customer Managed Keys, etc.)
✅ Generates compliance reports with pass/fail status
✅ Identifies security gaps in your storage configuration
✅ Tracks encryption across Storage Accounts, Managed Disks, Cosmos DB, SQL Databases
🚀 Quick Start
1. Installation
# Clone and install
git clone <your-repo-url>
cd azure-resource-graph
pip install -e .
2. Azure Setup
# Login to Azure
az login
# Get your Azure credentials (run this script)
./get_azure_env.sh
3. Configure Environment
Create .env file with your Azure credentials:
AZURE_TENANT_ID=your-tenant-id
AZURE_CLIENT_ID=your-client-id
AZURE_CLIENT_SECRET=your-client-secret
AZURE_SUBSCRIPTION_IDS=your-subscription-id
4. Run Analysis
from azure_resource_graph import AzureResourceGraphClient
# Initialize client
client = AzureResourceGraphClient()
# Analyze storage encryption across all applications
results = client.query_storage_encryption()
# Print summary
for item in results:
status = "✅" if item['ComplianceStatus'] == "Compliant" else "❌"
print(f"{status} {item['Application']} - {item['StorageResource']} - {item['EncryptionMethod']}")
📊 Core Use Cases
Enterprise Storage Compliance
Problem: "I need to ensure all storage in my Azure environment is properly encrypted"
from azure_resource_graph import AzureResourceGraphClient
client = AzureResourceGraphClient()
# Get comprehensive encryption analysis
storage_results = client.query_storage_encryption()
compliance_summary = client.get_compliance_summary()
# Generate compliance report
for app in compliance_summary:
if app['CompliancePercentage'] < 100:
print(f"⚠️ {app['Application']}: {app['CompliancePercentage']}% compliant")
print(f" {app['NonCompliantResources']} resources need attention")
Application Security Audit
Problem: "I need to check encryption for a specific application"
# Analyze specific application
app_storage = client.query_application_storage("ECommerceApp")
print(f"📊 Storage Analysis for ECommerceApp:")
for resource in app_storage:
print(f" - {resource['StorageResource']} ({resource['StorageType']})")
print(f" Encryption: {resource['EncryptionMethod']}")
print(f" Compliance: {resource['ComplianceStatus']}")
Security Gap Detection
Problem: "Find all storage that's not properly encrypted"
# Find non-compliant storage
storage_results = client.query_storage_encryption()
security_gaps = [
item for item in storage_results
if item['ComplianceStatus'] in ['Non-Compliant', 'Partially Compliant']
]
print(f"🚨 Found {len(security_gaps)} security gaps:")
for gap in security_gaps:
print(f" ❌ {gap['Application']}: {gap['StorageResource']}")
print(f" Issue: {gap['EncryptionMethod']}")
print(f" Location: {gap['ResourceGroup']}")
Cross-Subscription Analysis
Problem: "Analyze encryption across multiple Azure subscriptions"
# Analyze multiple subscriptions
subscription_ids = ["sub-1", "sub-2", "sub-3"]
results = client.query_storage_encryption(subscription_ids)
# Group by subscription
from collections import defaultdict
by_subscription = defaultdict(list)
for item in results:
# Extract subscription from resource ID
sub_id = item['ResourceId'].split('/')[2]
by_subscription[sub_id].append(item)
for sub_id, resources in by_subscription.items():
compliant = sum(1 for r in resources if r['ComplianceStatus'] == 'Compliant')
print(f"Subscription {sub_id}: {compliant}/{len(resources)} compliant")
🔍 Advanced Queries
Custom KQL Queries
# Custom query for specific storage types
query = """
Resources
| where type in ('microsoft.storage/storageaccounts', 'microsoft.compute/disks')
| where location == 'eastus'
| extend EncryptionEnabled = case(
type == 'microsoft.storage/storageaccounts',
properties.encryption.services.blob.enabled,
type == 'microsoft.compute/disks',
properties.encryption.type contains 'Encryption',
false
)
| project name, type, location, EncryptionEnabled, resourceGroup
| where EncryptionEnabled == false
"""
non_encrypted = client.query_resource_graph(query)
print(f"Found {len(non_encrypted)} unencrypted resources in East US")
Storage Cost Analysis with Encryption
# Analyze storage with cost implications
query = """
Resources
| where type == 'microsoft.storage/storageaccounts'
| extend EncryptionType = case(
properties.encryption.keySource == 'Microsoft.Keyvault', 'Customer Managed (Premium)',
properties.encryption.services.blob.enabled == true, 'Platform Managed (Standard)',
'None (Risk)'
)
| extend AccountTier = properties.sku.tier
| project name, EncryptionType, AccountTier, location, resourceGroup
| summarize count() by EncryptionType, AccountTier
"""
cost_analysis = client.query_resource_graph(query)
for item in cost_analysis:
print(f"{item['EncryptionType']} + {item['AccountTier']}: {item['count_']} accounts")
📈 Reporting and Monitoring
Generate Compliance Dashboard
import json
from datetime import datetime
def generate_compliance_report():
client = AzureResourceGraphClient()
# Get all data
storage_results = client.query_storage_encryption()
compliance_summary = client.get_compliance_summary()
# Create dashboard data
dashboard = {
'timestamp': datetime.now().isoformat(),
'overall_stats': {
'total_storage_resources': len(storage_results),
'total_applications': len(compliance_summary),
'overall_compliance': sum(app['CompliancePercentage'] for app in compliance_summary) / len(compliance_summary) if compliance_summary else 0
},
'by_application': compliance_summary,
'security_gaps': [
item for item in storage_results
if item['ComplianceStatus'] != 'Compliant'
],
'by_storage_type': {}
}
# Group by storage type
from collections import Counter
storage_types = Counter(item['StorageType'] for item in storage_results)
compliant_by_type = Counter(
item['StorageType'] for item in storage_results
if item['ComplianceStatus'] == 'Compliant'
)
for storage_type, total in storage_types.items():
compliant = compliant_by_type.get(storage_type, 0)
dashboard['by_storage_type'][storage_type] = {
'total': total,
'compliant': compliant,
'compliance_rate': (compliant / total) * 100
}
# Save report
with open(f'compliance_report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json', 'w') as f:
json.dump(dashboard, f, indent=2)
return dashboard
# Generate report
report = generate_compliance_report()
print(f"📊 Overall compliance: {report['overall_stats']['overall_compliance']:.1f}%")
Automated Monitoring Script
#!/usr/bin/env python3
"""
Daily storage encryption monitoring script
"""
def daily_compliance_check():
client = AzureResourceGraphClient()
# Check for new non-compliant resources
results = client.query_storage_encryption()
critical_issues = [
item for item in results
if item['ComplianceStatus'] == 'Non-Compliant'
]
if critical_issues:
print(f"🚨 ALERT: {len(critical_issues)} non-compliant storage resources found!")
for issue in critical_issues:
print(f"❌ {issue['Application']}: {issue['StorageResource']}")
print(f" Issue: {issue['EncryptionMethod']}")
print(f" Resource Group: {issue['ResourceGroup']}")
# You could send alerts here (email, Slack, etc.)
return False
else:
print("✅ All storage resources are compliant")
return True
if __name__ == "__main__":
daily_compliance_check()
🏢 Enterprise Integration
CI/CD Pipeline Integration
# Azure DevOps Pipeline
name: Storage Encryption Compliance Check
trigger:
- main
jobs:
- job: ComplianceCheck
steps:
- task: UsePythonVersion@0
inputs:
versionSpec: '3.9'
- script: |
pip install -e .
displayName: 'Install dependencies'
- script: |
python scripts/compliance_check.py
env:
AZURE_TENANT_ID: $(AZURE_TENANT_ID)
AZURE_CLIENT_ID: $(AZURE_CLIENT_ID)
AZURE_CLIENT_SECRET: $(AZURE_CLIENT_SECRET)
AZURE_SUBSCRIPTION_IDS: $(AZURE_SUBSCRIPTION_IDS)
displayName: 'Run storage encryption compliance check'
PowerBI Integration
def export_for_powerbi():
"""Export data in PowerBI-friendly format"""
client = AzureResourceGraphClient()
results = client.query_storage_encryption()
# Flatten for PowerBI
powerbi_data = []
for item in results:
powerbi_data.append({
'Application': item['Application'],
'StorageResource': item['StorageResource'],
'StorageType': item['StorageType'],
'EncryptionMethod': item['EncryptionMethod'],
'IsCompliant': 1 if item['ComplianceStatus'] == 'Compliant' else 0,
'ComplianceStatus': item['ComplianceStatus'],
'ResourceGroup': item['ResourceGroup'],
'Location': item['Location'],
'AnalysisDate': datetime.now().isoformat()
})
# Export to CSV for PowerBI
import pandas as pd
df = pd.DataFrame(powerbi_data)
df.to_csv('azure_storage_compliance.csv', index=False)
print(f"📊 Exported {len(powerbi_data)} records to PowerBI CSV")
🔧 Testing
# Run all tests
pytest
# Run only integration tests (requires Azure credentials)
pytest -m integration
# Run only unit tests (no Azure required)
pytest -m unit
# Run specific storage tests
pytest -m storage -v
🛡️ Security Best Practices
Least Privilege Access
Your service principal only needs Reader permissions:
# Create minimal-privilege service principal
az ad sp create-for-rbac \
--name "StorageComplianceReader" \
--role "Reader" \
--scopes "/subscriptions/$SUBSCRIPTION_ID"
Secure Credential Management
# Use Azure Key Vault for production
az keyvault secret set \
--vault-name "your-keyvault" \
--name "azure-client-secret" \
--value "$AZURE_CLIENT_SECRET"
# Or use managed identity when running on Azure
# No credentials needed!
📚 API Reference
Main Methods
client.query_storage_encryption(subscription_ids=None)
- Returns: List of storage resources with encryption analysis
- Use: Primary method for compliance checking
client.get_compliance_summary(subscription_ids=None)
- Returns: Application-level compliance summary
- Use: Executive dashboards and reporting
client.query_application_storage(app_name, subscription_ids=None)
- Returns: All storage for a specific application
- Use: Application-specific security audits
client.query_resource_graph(query, subscription_ids=None)
- Returns: Custom KQL query results
- Use: Advanced analysis and custom reports
🚨 Common Issues
No Results Returned
# Check if your resources are tagged
results = client.query_resource_graph("Resources | where isnotempty(tags.Application) | limit 10")
if not results:
print("❌ No tagged resources found. Make sure your resources have Application tags.")
Authentication Errors
# Test authentication
try:
token = client._get_access_token()
print("✅ Authentication successful")
except Exception as e:
print(f"❌ Authentication failed: {e}")
Subscription Access
# Check subscription access
subs = client.query_resource_graph("Resources | distinct subscriptionId")
print(f"✅ Access to {len(subs)} subscriptions")
🤝 Contributing
- Fork the repository
- Create feature branch (
git checkout -b feature/amazing-feature) - Run tests (
pytest) - Commit changes (
git commit -m 'Add amazing feature') - Push to branch (
git push origin feature/amazing-feature) - Open Pull Request
📄 License
This project is licensed under the MIT License - see the LICENSE file for details.
🎯 Ready to secure your Azure storage? Start with the Quick Start guide above and begin analyzing your storage encryption compliance today!
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file azure_resource_graph-1.0.14.tar.gz.
File metadata
- Download URL: azure_resource_graph-1.0.14.tar.gz
- Upload date:
- Size: 68.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
94605698849cef995f1a9bc7600f9f13abffb5f20b5a83954f020b5ab4157486
|
|
| MD5 |
55a8815afefeaaf0a2a20e8e969da111
|
|
| BLAKE2b-256 |
6fd1a2b1876cd756c46d4756e135d48731b36b688321b2ac8e27a60f992a0a69
|
File details
Details for the file azure_resource_graph-1.0.14-py3-none-any.whl.
File metadata
- Download URL: azure_resource_graph-1.0.14-py3-none-any.whl
- Upload date:
- Size: 50.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1273d9039a5e9dd1df9240eb756e7a731486102e3f82d03092c6463cd4edb70c
|
|
| MD5 |
c879015ab4125223b36dac8241500415
|
|
| BLAKE2b-256 |
7e50c749278d5cd834280f362add6022b3df646d4b10fc97c362e85d3b9efcf8
|