Azure cloud services SDK with Storage (blob, file share, queue), Key Vault, Cosmos DB, AI Foundry Projects (agents, deployments, evaluations), Document Intelligence, Speech, OpenTelemetry tracing, AI Foundry GenAI tracing, and builder patterns.
Project description
azpaddypy
Azure cloud services SDK with Storage (blob, append blob, file share, queue), Key Vault, Cosmos DB, AI Foundry Projects (agents, deployments, evaluations), Document Intelligence, Speech, OpenTelemetry tracing, AI Foundry GenAI tracing, and builder patterns.
Designed for Dockerized Azure Function Apps and Web Apps.
Installation
Requires Python 3.11–3.13.
uv add azpaddypy
Quick Start
Minimal: bootstrap telemetry, get a logger, create a resource client.
from azpaddypy import AzureIdentity, AzureStorage
from azpaddypy.mgmt.logging import AzureLogger, bootstrap_azure_monitor
# 1. Wire Azure Monitor + GenAI instrumentors once (idempotent)
bootstrap_azure_monitor(service_name="my_service", service_version="1.0.0")
# 2. Get a logger anywhere -- auto-enriches records with correlation_id (when
# active) and any flattened OTel baggage entries. Trace/span IDs and
# cloud_RoleName are populated by the exporter on the standard App Insights
# columns, so we don't duplicate them into customDimensions.
logger = AzureLogger(__name__)
logger.info("service starting")
# 3. Create resource clients (factory caches instances; identity must be injected)
storage = AzureStorage(
account_url="https://myaccount.blob.core.windows.net/",
azure_identity=AzureIdentity(service_name="my_service"),
enable_file_storage=True,
)
Prefer the builder for multi-resource apps -- see Builder Pattern.
For the full telemetry surface (trace_function, correlation IDs, GenAI
tracing), see Logging & Tracing.
Storage Operations
Blob Storage
# Upload
storage.upload_blob(
container_name="documents",
blob_name="report.pdf",
data=pdf_bytes,
content_type="application/pdf",
metadata={"author": "team"},
)
# Download (returns None if not found)
data = storage.download_blob(container_name="documents", blob_name="report.pdf")
# Upload and get SAS URL
sas_url = storage.upload_blob_with_sas(
container_name="documents",
blob_name="report.pdf",
data=pdf_bytes,
sas_permission="r",
sas_expiry_delta=timedelta(hours=3),
)
# List, exists, delete
blobs = storage.list_blobs(container_name="documents", name_starts_with="reports/")
exists = storage.blob_exists(container_name="documents", blob_name="report.pdf")
storage.delete_blob(container_name="documents", blob_name="report.pdf")
# Metadata upsert (merges with existing)
storage.upsert_blob_metadata(
container_name="documents",
blob_name="report.pdf",
metadata={"status": "processed"},
)
# SAS token generation
blob_sas = storage.get_blob_sas(container_name="docs", blob_name="file.pdf")
container_sas = storage.get_container_sas(container_name="docs", permission="r")
Append Blob Storage
Append blobs are optimized for append operations such as logging, auditing, or streaming data. Each append block can be up to 4 MiB. Unlike block blobs, append blobs do not support overwriting existing content.
# Create an empty append blob
storage.create_append_blob(
container_name="logs",
blob_name="app-2026-04-05.log",
content_type="text/plain; charset=utf-8",
metadata={"source": "web-app"},
)
# Append data blocks
storage.append_block(
container_name="logs",
blob_name="app-2026-04-05.log",
data="2026-04-05T10:00:00Z INFO Application started\n",
)
storage.append_block(
container_name="logs",
blob_name="app-2026-04-05.log",
data=b"2026-04-05T10:00:01Z DEBUG Connection pool initialized\n",
)
# Convenience: create-if-missing + append in one call
storage.append_blob_from_text(
container_name="logs",
blob_name="app-2026-04-05.log",
text="2026-04-05T10:05:00Z WARN High memory usage\n",
create_if_not_exists=True, # default, skips creation if blob already exists
)
File Share Storage
Requires enable_file_storage=True. Uses Azure File Shares (SMB/NFS), not blob storage.
storage = AzureStorage(
account_url="https://myaccount.blob.core.windows.net/",
azure_identity=identity,
enable_file_storage=True,
)
# Upload (auto-creates parent directories)
storage.upload_share_file(
share_name="myshare",
file_path="reports/2026/q1.pdf",
data=pdf_bytes,
content_type="application/pdf",
)
# Download (returns None if not found)
data = storage.download_share_file(share_name="myshare", file_path="reports/2026/q1.pdf")
# List files and directories
items = storage.list_share_files(share_name="myshare", directory_path="reports/2026")
# Returns: [{"name": "q1.pdf", "is_directory": False, "size": 1024}, ...]
# Exists, properties, delete
exists = storage.share_file_exists(share_name="myshare", file_path="reports/2026/q1.pdf")
props = storage.get_share_file_properties(share_name="myshare", file_path="reports/2026/q1.pdf")
storage.delete_share_file(share_name="myshare", file_path="reports/2026/q1.pdf")
# Directory management
storage.create_share_directory(share_name="myshare", directory_path="reports/2026/q2")
storage.delete_share_directory(share_name="myshare", directory_path="reports/2026/q2")
# Metadata upsert (merges with existing)
storage.upsert_share_file_metadata(
share_name="myshare",
file_path="reports/2026/q1.pdf",
metadata={"reviewed": "true"},
)
Queue Storage
# Send
storage.send_message(
queue_name="tasks",
content='{"task": "process"}',
visibility_timeout=30,
time_to_live=3600,
)
# Receive
messages = storage.receive_messages(queue_name="tasks", messages_per_page=5)
for msg in messages:
print(msg["id"], msg["content"])
storage.delete_message(
queue_name="tasks",
message_id=msg["id"],
pop_receipt=msg["pop_receipt"],
)
Builder Pattern
Recommended for multi-resource setups: the builders read service name, version,
connection string, and log level from the environment configuration you pass
them, so you don't repeat that information in each with_* call.
from azpaddypy.builder import (
AzureManagementBuilder,
AzureResourceBuilder,
ConfigurationSetupBuilder,
)
# 1. Environment configuration (reads REFLECTION_NAME, LOGGER_LOG_LEVEL,
# APPLICATIONINSIGHTS_CONNECTION_STRING, IDENTITY_* etc. from env/.env)
env_config = (
ConfigurationSetupBuilder()
.with_local_env_management()
.with_environment_detection()
.with_service_configuration()
.with_logging_configuration()
.with_identity_configuration()
.build()
)
# 2. Management services: bootstraps Azure Monitor + creates the service logger
mgmt = (
AzureManagementBuilder(env_config)
.with_logger() # bootstraps Azure Monitor, returns AzureLogger
.with_identity()
.with_keyvault(vault_url="https://myvault.vault.azure.net/", name="main")
.build()
)
logger = mgmt.logger # AzureLogger, already wired to App Insights
identity = mgmt.identity
keyvaults = mgmt.keyvaults
# 3. Resource clients -- share the logger and identity via the mgmt config
resources = (
AzureResourceBuilder(mgmt, env_config)
.with_storage(name="default", account_url="https://mystg.blob.core.windows.net/")
.with_storage(name="archive", account_url="https://archive.blob.core.windows.net/")
.with_ai_project(name="aiservices", endpoint="https://my-ai.services.ai.azure.com/")
.with_document_intelligence(name="docs", endpoint="https://my-ai.cognitiveservices.azure.com/")
.with_speech(
name="speech",
region="westeurope",
resource_id="/subscriptions/<sub>/resourceGroups/<rg>/providers/Microsoft.CognitiveServices/accounts/<ai>",
)
.build()
)
storage = resources.storage_accounts["default"]
ai_project = resources.ai_project_clients["aiservices"]
doc_intel = resources.document_intelligence_clients["docs"]
speech = resources.speech_clients["speech"]
See examples/mgmt_config_template.py for a
full production-shaped example including Key Vault-sourced resource names and
the ConfigurationManager + LogExecution tool wiring.
The template's module body is side-effect-free: the bootstrap (Azure
Monitor, identity, Key Vault calls, resource clients) runs lazily on the
first attribute access and is cached process-wide via a sentinel in
sys.modules. So import mgmt_config is cheap, and even if the file is
loaded under multiple module names in one process, the bootstrap runs
exactly once. Either access style works:
# Lazy module-level (existing style — unchanged):
from mgmt_config import logger, configuration_manager, storage_accounts
# Explicit accessor (returns the same singleton):
from mgmt_config import get_config
cfg = get_config()
cfg.logger.info("ready")
Note:
Document IntelligenceandSpeechare configured exclusively throughmgmt_config(typically from Key Vault secrets). They have no environment-variable fallbacks — passendpoint(and for Speech,region+resource_id) explicitly.
Key Vault
from azpaddypy import AzureKeyVault, create_azure_keyvault, create_azure_identity
identity = create_azure_identity(service_name="my_service")
kv = create_azure_keyvault(
vault_url="https://myvault.vault.azure.net/",
azure_identity=identity,
service_name="my_service",
)
secret = kv.get_secret("database-connection-string")
AI Foundry Projects
Manage Azure AI Foundry agents, deployments, and connections with integrated OpenAI client support.
from azpaddypy import AzureAIProject, create_azure_ai_project, create_azure_identity
# Factory function (cached instances; identity must be injected explicitly)
identity = create_azure_identity(service_name="my_service")
ai = create_azure_ai_project(
endpoint="https://my-ai.services.ai.azure.com/api/projects/my-project",
azure_identity=identity,
service_name="my_service",
)
# List deployments
deployments = ai.list_deployments()
# Get an authenticated OpenAI client
openai_client = ai.get_openai_client()
# Agent operations
from azure.ai.projects.models import PromptAgentDefinition
agent = ai.create_agent(
agent_name="my-agent",
definition=PromptAgentDefinition(model="gpt-4o", instructions="You are helpful"),
)
agents = ai.list_agents()
details = ai.get_agent(agent_name="my-agent")
# Invoke an agent via OpenAI responses API
result = ai.invoke_agent(agent_name="my-agent", user_message="Hello")
print(result["response"])
# Connections
connections = ai.list_connections()
connection = ai.get_connection(name="my-openai-connection", include_credentials=True)
# Fetch the Application Insights connection string linked to this Foundry project
# (requires the linkage to have been configured once via portal -> Project -> Tracing).
# Returns None if no App Insights resource is linked.
conn_str = ai.get_application_insights_connection_string()
Feature Flags
ai = AzureAIProject(
endpoint="https://my-ai.services.ai.azure.com/api/projects/my-project",
azure_identity=identity,
enable_agents=True, # Agent CRUD + invocation
enable_deployments=True, # List/get model deployments
enable_connections=False, # Disable connection enumeration
)
Evaluation
Run AI quality and safety evaluations against your model outputs. Results appear in the AI Foundry portal under Evaluation with average scores and per-row details.
from mgmt_config import ai_projects
ai = ai_projects["aiservices"]
# One-shot: create eval + run + poll + return results
result = ai.evaluate(
name="my-eval",
evaluator_names=[
# Quality (need a judge model)
"builtin.coherence",
"builtin.groundedness",
"builtin.relevance",
# Safety (no judge model needed)
"builtin.violence",
"builtin.hate_unfairness",
],
data=[
{
"query": "What is Azure?",
"response": "Azure is Microsoft's cloud platform.",
"context": "Azure documentation overview.",
},
{
"query": "How do I deploy?",
"response": "Use az webapp deploy.",
"context": "CLI deployment docs.",
},
],
judge_model="gpt-4o", # required for quality evaluators
cleanup=True, # delete eval definition after getting results
)
print(result["status"]) # "completed" or "failed"
print(result["report_url"]) # portal link to the evaluation report
print(result["result_counts"]) # aggregate pass/fail counts
for item in result["output_items"]:
print(item) # per-row evaluator scores
Available built-in evaluators:
| Category | Evaluators | Judge model required |
|---|---|---|
| Quality | builtin.coherence, builtin.fluency, builtin.groundedness, builtin.relevance, builtin.similarity, builtin.task_adherence |
Yes |
| NLP | builtin.f1_score, builtin.bleu_score, builtin.rouge_score, builtin.meteor_score, builtin.gleu_score |
No |
| Safety | builtin.violence, builtin.sexual, builtin.self_harm, builtin.hate_unfairness, builtin.prohibited_actions, builtin.sensitive_data_leakage |
No |
For finer control, use the individual methods: create_evaluation(), run_evaluation(), get_evaluation_run(), get_evaluation_run_output_items(), list_evaluations(), delete_evaluation().
Document Intelligence
Analyze documents using Azure AI Document Intelligence (formerly Form Recognizer). Shares the same Cognitive Services / AI Services account as AI Foundry.
from azpaddypy import AzureDocumentIntelligence, create_azure_document_intelligence, create_azure_identity
identity = create_azure_identity(service_name="my_service")
di = create_azure_document_intelligence(
endpoint="https://my-ai.cognitiveservices.azure.com/",
azure_identity=identity,
service_name="my_service",
enable_administration=True, # opt in to model management
)
# Analyze from URL with a prebuilt model
result = di.analyze_document_from_url(
model_id="prebuilt-layout",
url_source="https://example.com/invoice.pdf",
)
print(f"Pages: {len(result.pages)}")
# Analyze from bytes
with open("contract.pdf", "rb") as f:
result = di.analyze_document_from_bytes(model_id="prebuilt-read", document=f.read())
# Manage custom models
models = di.list_models()
model = di.get_model(model_id="my-custom-model")
di.delete_model(model_id="my-custom-model")
Speech
Azure Cognitive Services Speech with Entra ID authentication. Unlike most Azure SDKs, the Speech SDK does not accept TokenCredential directly — it requires the special aad#<resource-id>#<token> auth string. azpaddypy handles token acquisition, format, and refresh.
You must provide both the Azure region and the full ARM resource ID of the Speech / AI Services account.
from azpaddypy import AzureSpeech, create_azure_speech, create_azure_identity
identity = create_azure_identity(service_name="my_service")
speech = create_azure_speech(
region="westeurope",
resource_id=(
"/subscriptions/<sub>/resourceGroups/<rg>"
"/providers/Microsoft.CognitiveServices/accounts/<ai-services>"
),
azure_identity=identity,
service_name="my_service",
default_speech_synthesis_voice_name="en-US-JennyNeural",
)
# Synthesize text to in-memory bytes (server / container scenarios)
audio: bytes = speech.synthesize_text_to_bytes("Hello from azpaddypy")
# Synthesize and write directly to a file
speech.synthesize_text_to_file("Hello from azpaddypy", file_path="out.wav")
# Synthesize and play on the default speaker (interactive / local dev)
speech.synthesize_text_to_speaker("Hello from azpaddypy")
Custom synthesizers and recognizers
For full control (streaming, recognition, custom audio configs, event callbacks), get a fresh SpeechConfig and build your own:
import azure.cognitiveservices.speech as speechsdk
speech_config = speech.get_speech_config()
synthesizer = speechsdk.SpeechSynthesizer(
speech_config=speech_config,
audio_config=speechsdk.audio.AudioOutputConfig(filename="out.wav"),
)
synthesizer.speak_text_async("Hello from azpaddypy").get()
# Refresh AAD token on long-lived synthesizers/recognizers
# (Speech tokens expire after ~10 minutes)
speech.refresh_authorization_token(synthesizer)
Logging & Tracing
azpaddypy splits telemetry into two entry points with one concern each.
1. Bootstrap once, at process start
bootstrap_azure_monitor() configures the Azure Monitor exporter + GenAI
instrumentors for the whole process. It's idempotent — call it as many
times as you like. Without a connection string it logs a warning and disables
telemetry cleanly.
from azpaddypy.mgmt.logging import bootstrap_azure_monitor
bootstrap_azure_monitor(
service_name="crawler-api", # cloud role name in App Insights
service_version="0.5.1",
# connection_string falls back to APPLICATIONINSIGHTS_CONNECTION_STRING
)
If you use AzureManagementBuilder.with_logger(), it calls bootstrap_azure_monitor
for you with the values from EnvironmentConfiguration.
Sampling
By default, bootstrap_azure_monitor does not override the trace sampler
— whatever the distro (or the Functions host, when running inside one)
chooses applies. Pass sampling_ratio= only when you explicitly want a
fixed-percentage sampler:
bootstrap_azure_monitor(
service_name="crawler-api",
sampling_ratio=0.25, # ~25% of traces
enable_trace_based_sampling_for_logs=True, # drop logs whose trace was sampled out
)
When sampling_ratio is set, bootstrap sets
OTEL_TRACES_SAMPLER=microsoft.fixed_percentage and
OTEL_TRACES_SAMPLER_ARG=<ratio> via os.environ.setdefault, so explicit
deployment-time overrides via App Settings continue to win.
⚠️ Don't pass
sampling_ratioinside an Azure Functions worker. WhenPYTHON_APPLICATIONINSIGHTS_ENABLE_TELEMETRY=truethe host has already wired the OTel pipeline before your worker code runs, and changing the sampler env var mid-flight can leave the SDK falling back to NoOp via an entry-point lookup race — silently dropping every span and log. Configure the sampler at the host level via app settings (OTEL_TRACES_SAMPLER,OTEL_TRACES_SAMPLER_ARG) instead.
About the
azure-monitor-opentelemetryv1.8.6 sampler change. Some blog posts claim the distro default flipped from 100% to a 5 trace/sec rate-limited sampler in v1.8.6. The official Microsoft Learn docs still stateApplicationInsightsSampleris the default. If you observe heavy drop rates after upgrading the distro, setOTEL_TRACES_SAMPLER/OTEL_TRACES_SAMPLER_ARGexplicitly via App Settings — that's the Microsoft-recommended path on Functions, and it sidesteps any worker-side initialization race.
Custom processors
Pass span_processors=[...] and log_record_processors=[...] to attach
global enrichers (e.g. tenant ID on every span, redaction on every log). These
forward to the v1.8.4+ distro pass-throughs on configure_azure_monitor.
from opentelemetry.sdk.trace import SpanProcessor
class TenantSpanProcessor(SpanProcessor):
def on_end(self, span):
span._attributes["tenant.id"] = current_tenant_id()
def on_start(self, span, parent_context=None): ...
def shutdown(self): ...
def force_flush(self, timeout_millis=30000): return True
bootstrap_azure_monitor(
service_name="crawler-api",
span_processors=[TenantSpanProcessor()],
)
2. Create loggers anywhere
AzureLogger(name) is a thin logging.Logger-shaped wrapper that auto-enriches
every record with correlation_id (when active) and any OTel baggage entries
(flattened to baggage.<key>). It deliberately does not copy service_name,
timestamp, trace_id, or span_id into customDimensions — the Azure Monitor
exporter already populates those columns (cloud_RoleName, timestamp,
operation_Id, operation_ParentId), so duplicating them would just inflate
ingestion. Pass __name__ so logs are properly namespaced per module.
from azpaddypy.mgmt.logging import AzureLogger
logger = AzureLogger(__name__)
# stdlib-compatible API: msg + %-style args + extra
logger.info("field filled: selector=%s", selector)
logger.warning("retrying: attempt=%d", attempt, extra={"backoff_ms": 250})
logger.error("field failed: selector=%s", selector, exc_info=True)
# Inside an except block, logger.exception() attaches the active traceback
try:
do_thing()
except RuntimeError:
logger.exception("operation failed")
Since it matches logging.Logger, AzureLogger drops into any third-party
code (including logging.LoggerAdapter) that expects a stdlib logger.
3. Decorate functions for distributed tracing
trace_function creates an OpenTelemetry span around the call. It manages a
correlation ID scoped to the span (UUID4 if none is active, reset on exit) so
long-running workers don't leak IDs across requests.
from azpaddypy.mgmt.logging import AzureLogger, trace_function
@trace_function() # default: span named {module}.{qualname}
async def crawl(url: str) -> dict:
return await do_crawl(url)
# Custom span name + record the return value as a span attribute
@trace_function(name="summarize", record_result=True)
def summarize(text: str) -> str:
return text[:100]
# Also available as a method on AzureLogger for ergonomics
logger = AzureLogger(__name__)
@logger.trace_function(record_args=True)
def process(user_id: int, payload: dict) -> None: ...
record_args / record_result add the arguments / return value as span
attributes (stringified, truncated to 1000 chars). Default is off so you don't
accidentally ship sensitive values. When applied to bound methods or
classmethods, self and cls are skipped automatically — only the call's
own parameters are recorded.
Each decorated function emits its span under its own module's instrumentation
scope (trace.get_tracer(func.__module__)), so spans are attributed to your
code rather than to azpaddypy.mgmt.logging. The span's start/end timestamps
already drive App Insights' duration column, so no separate duration_ms
attribute is emitted.
Span kind
kind defaults to SpanKind.INTERNAL, which lands in App Insights'
dependencies table. App Insights' Performance / end-to-end transaction
tree only renders when there's a SpanKind.SERVER span at the root (the
requests table). Use it on entry-point handlers that aren't auto-instrumented
by a framework:
from opentelemetry.trace import SpanKind
@trace_function(kind=SpanKind.SERVER)
async def consume_message(msg): # custom Service Bus consumer outside FastAPI
...
Inside an Azure Function App
Don't pass kind=SpanKind.SERVER on Functions handlers. The Functions
host already emits the SERVER request span for every trigger invocation
(HTTP, Timer, Queue, Service Bus, ...) when host.json sets
"telemetryMode": "OpenTelemetry". A worker-side SERVER span would produce
a duplicate row in the requests table with a different operation_Id,
breaking the end-to-end transaction tree.
The right pattern: leave kind at its default (INTERNAL) on every
worker-side decoration — the trigger handler, any helpers it calls, any
Blueprint-registered function. The host-emitted request anchors the tree
and your @trace_function spans render as children of it under
dependencies.
import azure.functions as func
from mgmt_config import logger, log_execution_config
bp_orders: func.Blueprint = func.Blueprint()
# Trigger handler — INTERNAL (default). The host emits the SERVER request
# span; this becomes a child dependency under it.
@bp_orders.timer_trigger(arg_name="mytimer", schedule="0 */5 * * * *")
@logger.trace_function(**log_execution_config.to_dict())
async def reconcile_orders(mytimer: func.TimerRequest) -> None:
await _load_orders()
await _persist_results()
# Helpers called from the handler — also INTERNAL. They appear as nested
# dependencies underneath reconcile_orders, which itself sits under the
# host's request span.
@logger.trace_function()
async def _load_orders() -> list[Order]: ...
@logger.trace_function()
async def _persist_results() -> None: ...
The resulting tree in App Insights' Performance blade:
[REQUEST] reconcile_orders ← emitted by the Functions host
└─ [DEPENDENCY] reconcile_orders ← @logger.trace_function (INTERNAL)
├─ [DEPENDENCY] _load_orders ← INTERNAL
│ └─ [DEPENDENCY] CosmosDB query ← azpaddypy span
└─ [DEPENDENCY] _persist_results ← INTERNAL
└─ [DEPENDENCY] HTTP PUT ... ← requests instrumentor
The builder also auto-disables the FastAPI / Django / Flask instrumentors
inside Functions (it detects the function context via reflection_kind), so
even if you embed FastAPI behind azure.functions.AsgiMiddleware you won't
get duplicate request rows. See Azure Function Apps
for the full host configuration (host.json, app settings).
Correlation IDs
Correlation IDs are stored in a contextvars.ContextVar, so every logger in
the same task tree sees the same ID, and async tasks do not contaminate each
other. You can set one manually at the edge of a request:
from azpaddypy.mgmt.logging import AzureLogger
token = AzureLogger.set_correlation_id("request-abc-123")
try:
await handle_request(...)
finally:
AzureLogger.reset_correlation_id(token)
If you don't set one, the first @trace_function-decorated call will generate
a UUID4 for the duration of that span.
Resource clients expose set_correlation_id / reset_correlation_id /
correlation_id_scope / get_correlation_id on the instance for convenience.
set_correlation_id returns a contextvars.Token — keep it and pass it
to reset_correlation_id so the value is scoped to one request. Long-lived
workers (Functions, ASGI) that never reset will leak a single correlation ID
across every subsequent request:
# Manual scoping
token = storage.set_correlation_id("request-abc-123")
try:
storage.upload_blob(...)
finally:
storage.reset_correlation_id(token)
# Or use the context manager (recommended)
with storage.correlation_id_scope("request-abc-123"):
storage.upload_blob(...)
All three delegate to the same process-wide contextvar that AzureLogger and
@trace_function consult.
Flushing at shutdown
logger.flush() flushes console handlers and force-flushes all three
configured OTel providers (tracer, logger, meter). Call it at the end of an
Azure Functions invocation or before container exit so batched traces, logs,
and metrics are pushed before the worker terminates:
try:
await main(...)
finally:
logger.flush()
AI Foundry Tracing
bootstrap_azure_monitor() installs two instrumentors so that traces from both direct OpenAI SDK calls and AI Foundry agent invocations flow into Application Insights and the AI Foundry Tracing UI:
opentelemetry-instrumentation-openai-v2— instrumentsopenai.chat.completions.create(),embeddings.create(), etc. Emits OTel GenAI spans with model, token usage, latency, and optional prompt/completion content.azure.ai.projects.telemetry.AIProjectInstrumentor— instruments the OpenAI Responses API so thatagent_referencecalls attach agent metadata, tool-call spans, and thegen_ai.*attributes the AI Foundry Tracing UI groups traces on. Requires theAZURE_EXPERIMENTAL_ENABLE_GENAI_TRACING=truefeature gate, which the bootstrap sets for you wheninstall_genai_instrumentors=True(the default).
Both instrumentors are harmless for non-Foundry apps: if your code never calls responses.create() or chat.completions.create(), neither instrumentor has any runtime effect.
Configuration kwargs
Pass these to bootstrap_azure_monitor() or, equivalently, to AzureManagementBuilder.with_logger():
| Kwarg | Default | Effect |
|---|---|---|
capture_gen_ai_content |
False |
When True, sets both OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT (honored by opentelemetry-instrumentation-openai-v2 and AIProjectInstrumentor) and AZURE_TRACING_GEN_AI_CONTENT_RECORDING_ENABLED (honored by azure-ai-inference's AIInferenceInstrumentor) before the instrumentors activate. Off by default so prompts/completions don't ship to App Insights unexpectedly -- opt in per deployment. |
install_genai_instrumentors |
True |
Installs opentelemetry-instrumentation-openai-v2 and AIProjectInstrumentor, and sets AZURE_EXPERIMENTAL_ENABLE_GENAI_TRACING=true. Required for the AI Foundry Tracing UI to render agent metadata, tool-call spans, and gen_ai.agent.* attributes on Responses API traces. |
enable_gen_ai_trace_propagation |
True |
Sets AZURE_TRACING_GEN_AI_ENABLE_TRACE_CONTEXT_PROPAGATION=true so outbound OpenAI SDK HTTP calls carry W3C traceparent/tracestate headers. Server-side spans in Foundry correlate with your client spans. |
Log export attaches to the root logger so third-party library logs (e.g. azure.core.pipeline, urllib3) also flow to Application Insights and contribute to Application Map dependency edges. bootstrap_azure_monitor raises the root logger to INFO (only when it would otherwise be coarser) so those records aren't filtered before reaching the handler, and silences two known-noisy loggers (azure.core.pipeline.policies.http_logging_policy, urllib3.connectionpool) at WARNING. Apply additional level filters with logging.getLogger("azure").setLevel(...) if a particular library is still too chatty for your taste.
Azure Function Apps
Function Apps run a two-process model: a host (the Functions runtime) and a
worker (your Python code). When host.json sets "telemetryMode": "OpenTelemetry",
the host emits the SERVER request span for every trigger invocation (HTTP, Timer,
Queue, Service Bus, ...). Your worker spans become children of that request via
W3C traceparent propagation, and the end-to-end transaction tree renders in
the Performance blade.
Required configuration:
-
host.json:{ "version": "2.0", "telemetryMode": "OpenTelemetry" }
-
App settings:
APPLICATIONINSIGHTS_CONNECTION_STRING=<your conn string> PYTHON_APPLICATIONINSIGHTS_ENABLE_TELEMETRY=trueThe second setting tells the host to let the Python worker stream OTel logs directly, avoiding duplicate host-side log records.
-
In the worker, just call
bootstrap_azure_monitor()(or use the builder).AzureManagementBuilder.with_logger()detects the function context viareflection_kindand automatically disables the FastAPI / Django / Flask auto-instrumentors so the worker doesn't emit competing SERVER spans for the same invocation. Use@trace_function()(defaultINTERNAL) on handlers and nested calls — they'll appear as dependencies under the host's request.
Outside Function Apps (App Service, Container Apps, VM), the FastAPI / Django / Flask auto-instrumentors stay enabled because there is no host emitter and the framework instrumentor is the only source of the SERVER request span.
Per-function vs. process-wide: the
trace_function(record_result=True)flag controls only whether the decorated function's return value is attached as a span attribute. GenAI prompt/completion content capture is a separate, process-level concern controlled bybootstrap_azure_monitor(capture_gen_ai_content=True)— the two are deliberately decoupled because GenAI instrumentors read the env var at install time, not per-call.
Example: tracing a chat completion
from mgmt_config import logger, ai_projects, log_execution_config
@logger.trace_function()
async def generate_summary(document_text: str) -> str:
ai_project = ai_projects.get("aiservices")
openai_client = ai_project.get_openai_client()
response = openai_client.chat.completions.create(
model="gpt-5",
messages=[
{"role": "system", "content": "Summarize the document."},
{"role": "user", "content": document_text},
],
)
return response.choices[0].message.content
The trace in AI Foundry shows a parent span for generate_summary with a child chat gpt-5 span containing model, token counts, latency, and (when the logger was constructed with capture_gen_ai_content=True) the full prompt/completion content.
Example: tracing a Foundry agent invocation
# Agent trace flows through AIProjectInstrumentor -> AI Foundry Tracing UI
result = ai_projects["aiservices"].invoke_agent(
agent_name="doc-summarizer",
user_message="Summarize the attached document",
)
The trace shows AzureAIProject.invoke_agent with gen_ai.system=az.ai.projects, gen_ai.operation.name=invoke_agent, gen_ai.agent.name=doc-summarizer, and (via AIProjectInstrumentor) nested spans for the Responses API call, tool calls, and model invocation — all grouped under the agent in the Foundry Tracing UI.
Linking Application Insights to your Foundry project
The AI Foundry Tracing tab in the portal reads directly from the Application Insights resource linked to your Foundry project (Project → Tracing → "Manage data source"). Setting APPLICATIONINSIGHTS_CONNECTION_STRING is not enough on its own — the resource must be linked once via the portal for the Tracing UI to find the traces.
If your app wants to fetch the linked connection string at runtime instead of hand-wiring it:
from mgmt_config import ai_projects
ai = ai_projects["aiservices"]
conn_str = ai.get_application_insights_connection_string() # returns None if not linked
This wraps azure-ai-projects' client.telemetry.get_application_insights_connection_string() and is the recommended bootstrap path when you want the logger to always target whatever App Insights is currently linked to your Foundry resource.
Feature Flags
Enable only the storage services you need:
| Flag | Default | Service |
|---|---|---|
enable_blob_storage |
True |
BlobServiceClient |
enable_file_storage |
False |
ShareServiceClient (requires token_intent="backup" RBAC) |
enable_queue_storage |
True |
QueueServiceClient |
Dependencies
azure-storage-blob- Blob operationsazure-storage-file-share- File share operationsazure-storage-queue- Queue operationsazure-identity- Credential managementazure-keyvault-secrets/keys/certificates- Key Vaultazure-cosmos- Cosmos DBazure-ai-projects- AI Foundry Projects (agents, deployments, connections,AIProjectInstrumentorfor Responses API tracing)azure-ai-documentintelligence- Document Intelligence (analyze, model management)azure-cognitiveservices-speech- Speech (synthesis, recognition with Entra ID)azure-monitor-opentelemetry- Telemetryopentelemetry-instrumentation-openai-v2- AI Foundry tracing for OpenAI SDK calls
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file azpaddypy-1.0.7.tar.gz.
File metadata
- Download URL: azpaddypy-1.0.7.tar.gz
- Upload date:
- Size: 152.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
44656c96b5af148666cef980e00e1f59d881469e5a2501246d1e3cce43a17538
|
|
| MD5 |
082dfb839fdd955d42f67bf4fb5255f1
|
|
| BLAKE2b-256 |
f5846fdad8b786d4bf467e71ed8ad5335ece4935aa7e59561fbdc529701de579
|
File details
Details for the file azpaddypy-1.0.7-py3-none-any.whl.
File metadata
- Download URL: azpaddypy-1.0.7-py3-none-any.whl
- Upload date:
- Size: 84.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8636ebad65039cd95c71f9c8ce042bd3b1426591b90da180ad082265a3ba24e3
|
|
| MD5 |
8ebcdf77f16329fc65a5ddd65b41c376
|
|
| BLAKE2b-256 |
c19177dc042a7f1964369b20827c08b0aa9a87e83095429db558b351e82e8f36
|