Skip to main content

Azure cloud services SDK with Storage (blob, file share, queue), Key Vault, Cosmos DB, AI Foundry Projects (agents, deployments, evaluations), Document Intelligence, Speech, OpenTelemetry tracing, AI Foundry GenAI tracing, and builder patterns.

Project description

azpaddypy

Azure cloud services SDK with Storage (blob, append blob, file share, queue), Key Vault, Cosmos DB, AI Foundry Projects (agents, deployments, evaluations), Document Intelligence, Speech, OpenTelemetry tracing, AI Foundry GenAI tracing, and builder patterns.

Designed for Dockerized Azure Function Apps and Web Apps.

Installation

Requires Python 3.11–3.13.

uv add azpaddypy

Quick Start

Minimal: bootstrap telemetry, get a logger, create a resource client.

from azpaddypy import AzureIdentity, AzureStorage
from azpaddypy.mgmt.logging import AzureLogger, bootstrap_azure_monitor

# 1. Wire Azure Monitor + GenAI instrumentors once (idempotent)
bootstrap_azure_monitor(service_name="my_service", service_version="1.0.0")

# 2. Get a logger anywhere -- auto-enriches records with correlation_id (when
#    active) and any flattened OTel baggage entries. Trace/span IDs and
#    cloud_RoleName are populated by the exporter on the standard App Insights
#    columns, so we don't duplicate them into customDimensions.
logger = AzureLogger(__name__)
logger.info("service starting")

# 3. Create resource clients (factory caches instances; identity must be injected)
storage = AzureStorage(
    account_url="https://myaccount.blob.core.windows.net/",
    azure_identity=AzureIdentity(service_name="my_service"),
    enable_file_storage=True,
)

Prefer the builder for multi-resource apps -- see Builder Pattern. For the full telemetry surface (trace_function, correlation IDs, GenAI tracing), see Logging & Tracing.

Storage Operations

Blob Storage

# Upload
storage.upload_blob(
    container_name="documents",
    blob_name="report.pdf",
    data=pdf_bytes,
    content_type="application/pdf",
    metadata={"author": "team"},
)

# Download (returns None if not found)
data = storage.download_blob(container_name="documents", blob_name="report.pdf")

# Upload and get SAS URL
sas_url = storage.upload_blob_with_sas(
    container_name="documents",
    blob_name="report.pdf",
    data=pdf_bytes,
    sas_permission="r",
    sas_expiry_delta=timedelta(hours=3),
)

# List, exists, delete
blobs = storage.list_blobs(container_name="documents", name_starts_with="reports/")
exists = storage.blob_exists(container_name="documents", blob_name="report.pdf")
storage.delete_blob(container_name="documents", blob_name="report.pdf")

# Metadata upsert (merges with existing)
storage.upsert_blob_metadata(
    container_name="documents",
    blob_name="report.pdf",
    metadata={"status": "processed"},
)

# SAS token generation
blob_sas = storage.get_blob_sas(container_name="docs", blob_name="file.pdf")
container_sas = storage.get_container_sas(container_name="docs", permission="r")

Append Blob Storage

Append blobs are optimized for append operations such as logging, auditing, or streaming data. Each append block can be up to 4 MiB. Unlike block blobs, append blobs do not support overwriting existing content.

# Create an empty append blob
storage.create_append_blob(
    container_name="logs",
    blob_name="app-2026-04-05.log",
    content_type="text/plain; charset=utf-8",
    metadata={"source": "web-app"},
)

# Append data blocks
storage.append_block(
    container_name="logs",
    blob_name="app-2026-04-05.log",
    data="2026-04-05T10:00:00Z INFO Application started\n",
)

storage.append_block(
    container_name="logs",
    blob_name="app-2026-04-05.log",
    data=b"2026-04-05T10:00:01Z DEBUG Connection pool initialized\n",
)

# Convenience: create-if-missing + append in one call
storage.append_blob_from_text(
    container_name="logs",
    blob_name="app-2026-04-05.log",
    text="2026-04-05T10:05:00Z WARN High memory usage\n",
    create_if_not_exists=True,  # default, skips creation if blob already exists
)

File Share Storage

Requires enable_file_storage=True. Uses Azure File Shares (SMB/NFS), not blob storage.

storage = AzureStorage(
    account_url="https://myaccount.blob.core.windows.net/",
    azure_identity=identity,
    enable_file_storage=True,
)

# Upload (auto-creates parent directories)
storage.upload_share_file(
    share_name="myshare",
    file_path="reports/2026/q1.pdf",
    data=pdf_bytes,
    content_type="application/pdf",
)

# Download (returns None if not found)
data = storage.download_share_file(share_name="myshare", file_path="reports/2026/q1.pdf")

# List files and directories
items = storage.list_share_files(share_name="myshare", directory_path="reports/2026")
# Returns: [{"name": "q1.pdf", "is_directory": False, "size": 1024}, ...]

# Exists, properties, delete
exists = storage.share_file_exists(share_name="myshare", file_path="reports/2026/q1.pdf")
props = storage.get_share_file_properties(share_name="myshare", file_path="reports/2026/q1.pdf")
storage.delete_share_file(share_name="myshare", file_path="reports/2026/q1.pdf")

# Directory management
storage.create_share_directory(share_name="myshare", directory_path="reports/2026/q2")
storage.delete_share_directory(share_name="myshare", directory_path="reports/2026/q2")

# Metadata upsert (merges with existing)
storage.upsert_share_file_metadata(
    share_name="myshare",
    file_path="reports/2026/q1.pdf",
    metadata={"reviewed": "true"},
)

Queue Storage

# Send
storage.send_message(
    queue_name="tasks",
    content='{"task": "process"}',
    visibility_timeout=30,
    time_to_live=3600,
)

# Receive
messages = storage.receive_messages(queue_name="tasks", messages_per_page=5)
for msg in messages:
    print(msg["id"], msg["content"])
    storage.delete_message(
        queue_name="tasks",
        message_id=msg["id"],
        pop_receipt=msg["pop_receipt"],
    )

Builder Pattern

Recommended for multi-resource setups: the builders read service name, version, connection string, and log level from the environment configuration you pass them, so you don't repeat that information in each with_* call.

from azpaddypy.builder import (
    AzureManagementBuilder,
    AzureResourceBuilder,
    ConfigurationSetupBuilder,
)

# 1. Environment configuration (reads REFLECTION_NAME, LOGGER_LOG_LEVEL,
#    APPLICATIONINSIGHTS_CONNECTION_STRING, IDENTITY_* etc. from env/.env)
env_config = (
    ConfigurationSetupBuilder()
    .with_local_env_management()
    .with_environment_detection()
    .with_service_configuration()
    .with_logging_configuration()
    .with_identity_configuration()
    .build()
)

# 2. Management services: bootstraps Azure Monitor + creates the service logger
mgmt = (
    AzureManagementBuilder(env_config)
    .with_logger()                      # bootstraps Azure Monitor, returns AzureLogger
    .with_identity()
    .with_keyvault(vault_url="https://myvault.vault.azure.net/", name="main")
    .build()
)

logger = mgmt.logger                    # AzureLogger, already wired to App Insights
identity = mgmt.identity
keyvaults = mgmt.keyvaults

# 3. Resource clients -- share the logger and identity via the mgmt config
resources = (
    AzureResourceBuilder(mgmt, env_config)
    .with_storage(name="default", account_url="https://mystg.blob.core.windows.net/")
    .with_storage(name="archive", account_url="https://archive.blob.core.windows.net/")
    .with_ai_project(name="aiservices", endpoint="https://my-ai.services.ai.azure.com/")
    .with_document_intelligence(name="docs", endpoint="https://my-ai.cognitiveservices.azure.com/")
    .with_speech(
        name="speech",
        region="westeurope",
        resource_id="/subscriptions/<sub>/resourceGroups/<rg>/providers/Microsoft.CognitiveServices/accounts/<ai>",
    )
    .build()
)

storage = resources.storage_accounts["default"]
ai_project = resources.ai_project_clients["aiservices"]
doc_intel = resources.document_intelligence_clients["docs"]
speech = resources.speech_clients["speech"]

See examples/mgmt_config_template.py for a full production-shaped example including Key Vault-sourced resource names and the ConfigurationManager + LogExecution tool wiring.

The template's module body is side-effect-free: the bootstrap (Azure Monitor, identity, Key Vault calls, resource clients) runs lazily on the first attribute access and is cached process-wide via a sentinel in sys.modules. So import mgmt_config is cheap, and even if the file is loaded under multiple module names in one process, the bootstrap runs exactly once. Either access style works:

# Lazy module-level (existing style — unchanged):
from mgmt_config import logger, configuration_manager, storage_accounts

# Explicit accessor (returns the same singleton):
from mgmt_config import get_config
cfg = get_config()
cfg.logger.info("ready")

Note: Document Intelligence and Speech are configured exclusively through mgmt_config (typically from Key Vault secrets). They have no environment-variable fallbacks — pass endpoint (and for Speech, region + resource_id) explicitly.

Key Vault

from azpaddypy import AzureKeyVault, create_azure_keyvault, create_azure_identity

identity = create_azure_identity(service_name="my_service")
kv = create_azure_keyvault(
    vault_url="https://myvault.vault.azure.net/",
    azure_identity=identity,
    service_name="my_service",
)

secret = kv.get_secret("database-connection-string")

AI Foundry Projects

Manage Azure AI Foundry agents, deployments, and connections with integrated OpenAI client support.

from azpaddypy import AzureAIProject, create_azure_ai_project, create_azure_identity

# Factory function (cached instances; identity must be injected explicitly)
identity = create_azure_identity(service_name="my_service")
ai = create_azure_ai_project(
    endpoint="https://my-ai.services.ai.azure.com/api/projects/my-project",
    azure_identity=identity,
    service_name="my_service",
)

# List deployments
deployments = ai.list_deployments()

# Get an authenticated OpenAI client
openai_client = ai.get_openai_client()

# Agent operations
from azure.ai.projects.models import PromptAgentDefinition

agent = ai.create_agent(
    agent_name="my-agent",
    definition=PromptAgentDefinition(model="gpt-4o", instructions="You are helpful"),
)

agents = ai.list_agents()
details = ai.get_agent(agent_name="my-agent")

# Invoke an agent via OpenAI responses API
result = ai.invoke_agent(agent_name="my-agent", user_message="Hello")
print(result["response"])

# Connections
connections = ai.list_connections()
connection = ai.get_connection(name="my-openai-connection", include_credentials=True)

# Fetch the Application Insights connection string linked to this Foundry project
# (requires the linkage to have been configured once via portal -> Project -> Tracing).
# Returns None if no App Insights resource is linked.
conn_str = ai.get_application_insights_connection_string()

Feature Flags

ai = AzureAIProject(
    endpoint="https://my-ai.services.ai.azure.com/api/projects/my-project",
    azure_identity=identity,
    enable_agents=True,       # Agent CRUD + invocation
    enable_deployments=True,  # List/get model deployments
    enable_connections=False,  # Disable connection enumeration
)

Evaluation

Run AI quality and safety evaluations against your model outputs. Results appear in the AI Foundry portal under Evaluation with average scores and per-row details.

from mgmt_config import ai_projects

ai = ai_projects["aiservices"]

# One-shot: create eval + run + poll + return results
result = ai.evaluate(
    name="my-eval",
    evaluator_names=[
        # Quality (need a judge model)
        "builtin.coherence",
        "builtin.groundedness",
        "builtin.relevance",
        # Safety (no judge model needed)
        "builtin.violence",
        "builtin.hate_unfairness",
    ],
    data=[
        {
            "query": "What is Azure?",
            "response": "Azure is Microsoft's cloud platform.",
            "context": "Azure documentation overview.",
        },
        {
            "query": "How do I deploy?",
            "response": "Use az webapp deploy.",
            "context": "CLI deployment docs.",
        },
    ],
    judge_model="gpt-4o",  # required for quality evaluators
    cleanup=True,           # delete eval definition after getting results
)

print(result["status"])        # "completed" or "failed"
print(result["report_url"])    # portal link to the evaluation report
print(result["result_counts"]) # aggregate pass/fail counts
for item in result["output_items"]:
    print(item)                # per-row evaluator scores

Available built-in evaluators:

Category Evaluators Judge model required
Quality builtin.coherence, builtin.fluency, builtin.groundedness, builtin.relevance, builtin.similarity, builtin.task_adherence Yes
NLP builtin.f1_score, builtin.bleu_score, builtin.rouge_score, builtin.meteor_score, builtin.gleu_score No
Safety builtin.violence, builtin.sexual, builtin.self_harm, builtin.hate_unfairness, builtin.prohibited_actions, builtin.sensitive_data_leakage No

For finer control, use the individual methods: create_evaluation(), run_evaluation(), get_evaluation_run(), get_evaluation_run_output_items(), list_evaluations(), delete_evaluation().

Document Intelligence

Analyze documents using Azure AI Document Intelligence (formerly Form Recognizer). Shares the same Cognitive Services / AI Services account as AI Foundry.

from azpaddypy import AzureDocumentIntelligence, create_azure_document_intelligence, create_azure_identity

identity = create_azure_identity(service_name="my_service")
di = create_azure_document_intelligence(
    endpoint="https://my-ai.cognitiveservices.azure.com/",
    azure_identity=identity,
    service_name="my_service",
    enable_administration=True,  # opt in to model management
)

# Analyze from URL with a prebuilt model
result = di.analyze_document_from_url(
    model_id="prebuilt-layout",
    url_source="https://example.com/invoice.pdf",
)
print(f"Pages: {len(result.pages)}")

# Analyze from bytes
with open("contract.pdf", "rb") as f:
    result = di.analyze_document_from_bytes(model_id="prebuilt-read", document=f.read())

# Manage custom models
models = di.list_models()
model = di.get_model(model_id="my-custom-model")
di.delete_model(model_id="my-custom-model")

Speech

Azure Cognitive Services Speech with Entra ID authentication. Unlike most Azure SDKs, the Speech SDK does not accept TokenCredential directly — it requires the special aad#<resource-id>#<token> auth string. azpaddypy handles token acquisition, format, and refresh.

You must provide both the Azure region and the full ARM resource ID of the Speech / AI Services account.

from azpaddypy import AzureSpeech, create_azure_speech, create_azure_identity

identity = create_azure_identity(service_name="my_service")
speech = create_azure_speech(
    region="westeurope",
    resource_id=(
        "/subscriptions/<sub>/resourceGroups/<rg>"
        "/providers/Microsoft.CognitiveServices/accounts/<ai-services>"
    ),
    azure_identity=identity,
    service_name="my_service",
    default_speech_synthesis_voice_name="en-US-JennyNeural",
)

# Synthesize text to in-memory bytes (server / container scenarios)
audio: bytes = speech.synthesize_text_to_bytes("Hello from azpaddypy")

# Synthesize and write directly to a file
speech.synthesize_text_to_file("Hello from azpaddypy", file_path="out.wav")

# Synthesize and play on the default speaker (interactive / local dev)
speech.synthesize_text_to_speaker("Hello from azpaddypy")

Custom synthesizers and recognizers

For full control (streaming, recognition, custom audio configs, event callbacks), get a fresh SpeechConfig and build your own:

import azure.cognitiveservices.speech as speechsdk

speech_config = speech.get_speech_config()
synthesizer = speechsdk.SpeechSynthesizer(
    speech_config=speech_config,
    audio_config=speechsdk.audio.AudioOutputConfig(filename="out.wav"),
)
synthesizer.speak_text_async("Hello from azpaddypy").get()

# Refresh AAD token on long-lived synthesizers/recognizers
# (Speech tokens expire after ~10 minutes)
speech.refresh_authorization_token(synthesizer)

Logging & Tracing

azpaddypy splits telemetry into two entry points with one concern each.

1. Bootstrap once, at process start

bootstrap_azure_monitor() configures the Azure Monitor exporter + GenAI instrumentors for the whole process. It's idempotent — call it as many times as you like. Without a connection string it logs a warning and disables telemetry cleanly.

from azpaddypy.mgmt.logging import bootstrap_azure_monitor

bootstrap_azure_monitor(
    service_name="crawler-api",       # cloud role name in App Insights
    service_version="0.5.1",
    # connection_string falls back to APPLICATIONINSIGHTS_CONNECTION_STRING
)

If you use AzureManagementBuilder.with_logger(), it calls bootstrap_azure_monitor for you with the values from EnvironmentConfiguration.

Sampling

Since azure-monitor-opentelemetry v1.8.6, the distro's default sampler is microsoft.rate_limited capped at 5 traces/second — silent on small apps, catastrophic on busy ones (most spans, including all of their descendants, are dropped). bootstrap_azure_monitor overrides this by setting OTEL_TRACES_SAMPLER=microsoft.fixed_percentage with OTEL_TRACES_SAMPLER_ARG=1.0 via os.environ.setdefault, so explicit env-var configuration still wins.

bootstrap_azure_monitor(
    service_name="crawler-api",
    sampling_ratio=0.25,                          # ~25% of traces
    enable_trace_based_sampling_for_logs=True,    # drop logs whose trace was sampled out
)

sampling_ratio is honoured only when OTEL_TRACES_SAMPLER isn't already set in the environment, so deployment-time overrides via App Settings continue to work.

Custom processors

Pass span_processors=[...] and log_record_processors=[...] to attach global enrichers (e.g. tenant ID on every span, redaction on every log). These forward to the v1.8.4+ distro pass-throughs on configure_azure_monitor.

from opentelemetry.sdk.trace import SpanProcessor

class TenantSpanProcessor(SpanProcessor):
    def on_end(self, span):
        span._attributes["tenant.id"] = current_tenant_id()
    def on_start(self, span, parent_context=None): ...
    def shutdown(self): ...
    def force_flush(self, timeout_millis=30000): return True

bootstrap_azure_monitor(
    service_name="crawler-api",
    span_processors=[TenantSpanProcessor()],
)

2. Create loggers anywhere

AzureLogger(name) is a thin logging.Logger-shaped wrapper that auto-enriches every record with correlation_id (when active) and any OTel baggage entries (flattened to baggage.<key>). It deliberately does not copy service_name, timestamp, trace_id, or span_id into customDimensions — the Azure Monitor exporter already populates those columns (cloud_RoleName, timestamp, operation_Id, operation_ParentId), so duplicating them would just inflate ingestion. Pass __name__ so logs are properly namespaced per module.

from azpaddypy.mgmt.logging import AzureLogger

logger = AzureLogger(__name__)

# stdlib-compatible API: msg + %-style args + extra
logger.info("field filled: selector=%s", selector)
logger.warning("retrying: attempt=%d", attempt, extra={"backoff_ms": 250})
logger.error("field failed: selector=%s", selector, exc_info=True)

# Inside an except block, logger.exception() attaches the active traceback
try:
    do_thing()
except RuntimeError:
    logger.exception("operation failed")

Since it matches logging.Logger, AzureLogger drops into any third-party code (including logging.LoggerAdapter) that expects a stdlib logger.

3. Decorate functions for distributed tracing

trace_function creates an OpenTelemetry span around the call. It manages a correlation ID scoped to the span (UUID4 if none is active, reset on exit) so long-running workers don't leak IDs across requests.

from azpaddypy.mgmt.logging import AzureLogger, trace_function

@trace_function()                         # default: span named {module}.{qualname}
async def crawl(url: str) -> dict:
    return await do_crawl(url)

# Custom span name + record the return value as a span attribute
@trace_function(name="summarize", record_result=True)
def summarize(text: str) -> str:
    return text[:100]

# Also available as a method on AzureLogger for ergonomics
logger = AzureLogger(__name__)

@logger.trace_function(record_args=True)
def process(user_id: int, payload: dict) -> None: ...

record_args / record_result add the arguments / return value as span attributes (stringified, truncated to 1000 chars). Default is off so you don't accidentally ship sensitive values. When applied to bound methods or classmethods, self and cls are skipped automatically — only the call's own parameters are recorded.

Each decorated function emits its span under its own module's instrumentation scope (trace.get_tracer(func.__module__)), so spans are attributed to your code rather than to azpaddypy.mgmt.logging. The span's start/end timestamps already drive App Insights' duration column, so no separate duration_ms attribute is emitted.

Span kind

kind defaults to SpanKind.INTERNAL, which lands in App Insights' dependencies table. App Insights' Performance / end-to-end transaction tree only renders when there's a SpanKind.SERVER span at the root (the requests table). Use it on entry-point handlers that aren't auto-instrumented by a framework:

from opentelemetry.trace import SpanKind

@trace_function(kind=SpanKind.SERVER)
async def consume_message(msg):  # custom Service Bus consumer outside FastAPI
    ...
Inside an Azure Function App

Don't pass kind=SpanKind.SERVER on Functions handlers. The Functions host already emits the SERVER request span for every trigger invocation (HTTP, Timer, Queue, Service Bus, ...) when host.json sets "telemetryMode": "OpenTelemetry". A worker-side SERVER span would produce a duplicate row in the requests table with a different operation_Id, breaking the end-to-end transaction tree.

The right pattern: leave kind at its default (INTERNAL) on every worker-side decoration — the trigger handler, any helpers it calls, any Blueprint-registered function. The host-emitted request anchors the tree and your @trace_function spans render as children of it under dependencies.

import azure.functions as func
from mgmt_config import logger, log_execution_config

bp_orders: func.Blueprint = func.Blueprint()

# Trigger handler — INTERNAL (default). The host emits the SERVER request
# span; this becomes a child dependency under it.
@bp_orders.timer_trigger(arg_name="mytimer", schedule="0 */5 * * * *")
@logger.trace_function(**log_execution_config.to_dict())
async def reconcile_orders(mytimer: func.TimerRequest) -> None:
    await _load_orders()
    await _persist_results()

# Helpers called from the handler — also INTERNAL. They appear as nested
# dependencies underneath reconcile_orders, which itself sits under the
# host's request span.
@logger.trace_function()
async def _load_orders() -> list[Order]: ...

@logger.trace_function()
async def _persist_results() -> None: ...

The resulting tree in App Insights' Performance blade:

[REQUEST]   reconcile_orders             ← emitted by the Functions host
  └─ [DEPENDENCY] reconcile_orders        ← @logger.trace_function (INTERNAL)
       ├─ [DEPENDENCY] _load_orders       ← INTERNAL
       │    └─ [DEPENDENCY] CosmosDB query   ← azpaddypy span
       └─ [DEPENDENCY] _persist_results   ← INTERNAL
            └─ [DEPENDENCY] HTTP PUT ...    ← requests instrumentor

The builder also auto-disables the FastAPI / Django / Flask instrumentors inside Functions (it detects the function context via reflection_kind), so even if you embed FastAPI behind azure.functions.AsgiMiddleware you won't get duplicate request rows. See Azure Function Apps for the full host configuration (host.json, app settings).

Correlation IDs

Correlation IDs are stored in a contextvars.ContextVar, so every logger in the same task tree sees the same ID, and async tasks do not contaminate each other. You can set one manually at the edge of a request:

from azpaddypy.mgmt.logging import AzureLogger

token = AzureLogger.set_correlation_id("request-abc-123")
try:
    await handle_request(...)
finally:
    AzureLogger.reset_correlation_id(token)

If you don't set one, the first @trace_function-decorated call will generate a UUID4 for the duration of that span.

Resource clients expose set_correlation_id / reset_correlation_id / correlation_id_scope / get_correlation_id on the instance for convenience. set_correlation_id returns a contextvars.Token — keep it and pass it to reset_correlation_id so the value is scoped to one request. Long-lived workers (Functions, ASGI) that never reset will leak a single correlation ID across every subsequent request:

# Manual scoping
token = storage.set_correlation_id("request-abc-123")
try:
    storage.upload_blob(...)
finally:
    storage.reset_correlation_id(token)

# Or use the context manager (recommended)
with storage.correlation_id_scope("request-abc-123"):
    storage.upload_blob(...)

All three delegate to the same process-wide contextvar that AzureLogger and @trace_function consult.

Flushing at shutdown

logger.flush() flushes console handlers and force-flushes all three configured OTel providers (tracer, logger, meter). Call it at the end of an Azure Functions invocation or before container exit so batched traces, logs, and metrics are pushed before the worker terminates:

try:
    await main(...)
finally:
    logger.flush()

AI Foundry Tracing

bootstrap_azure_monitor() installs two instrumentors so that traces from both direct OpenAI SDK calls and AI Foundry agent invocations flow into Application Insights and the AI Foundry Tracing UI:

  1. opentelemetry-instrumentation-openai-v2 — instruments openai.chat.completions.create(), embeddings.create(), etc. Emits OTel GenAI spans with model, token usage, latency, and optional prompt/completion content.
  2. azure.ai.projects.telemetry.AIProjectInstrumentor — instruments the OpenAI Responses API so that agent_reference calls attach agent metadata, tool-call spans, and the gen_ai.* attributes the AI Foundry Tracing UI groups traces on. Requires the AZURE_EXPERIMENTAL_ENABLE_GENAI_TRACING=true feature gate, which the bootstrap sets for you when install_genai_instrumentors=True (the default).

Both instrumentors are harmless for non-Foundry apps: if your code never calls responses.create() or chat.completions.create(), neither instrumentor has any runtime effect.

Configuration kwargs

Pass these to bootstrap_azure_monitor() or, equivalently, to AzureManagementBuilder.with_logger():

Kwarg Default Effect
capture_gen_ai_content False When True, sets both OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT (honored by opentelemetry-instrumentation-openai-v2 and AIProjectInstrumentor) and AZURE_TRACING_GEN_AI_CONTENT_RECORDING_ENABLED (honored by azure-ai-inference's AIInferenceInstrumentor) before the instrumentors activate. Off by default so prompts/completions don't ship to App Insights unexpectedly -- opt in per deployment.
install_genai_instrumentors True Installs opentelemetry-instrumentation-openai-v2 and AIProjectInstrumentor, and sets AZURE_EXPERIMENTAL_ENABLE_GENAI_TRACING=true. Required for the AI Foundry Tracing UI to render agent metadata, tool-call spans, and gen_ai.agent.* attributes on Responses API traces.
enable_gen_ai_trace_propagation True Sets AZURE_TRACING_GEN_AI_ENABLE_TRACE_CONTEXT_PROPAGATION=true so outbound OpenAI SDK HTTP calls carry W3C traceparent/tracestate headers. Server-side spans in Foundry correlate with your client spans.

Log export attaches to the root logger so third-party library logs (e.g. azure.core.pipeline, urllib3) also flow to Application Insights and contribute to Application Map dependency edges. bootstrap_azure_monitor raises the root logger to INFO (only when it would otherwise be coarser) so those records aren't filtered before reaching the handler, and silences two known-noisy loggers (azure.core.pipeline.policies.http_logging_policy, urllib3.connectionpool) at WARNING. Apply additional level filters with logging.getLogger("azure").setLevel(...) if a particular library is still too chatty for your taste.

Azure Function Apps

Function Apps run a two-process model: a host (the Functions runtime) and a worker (your Python code). When host.json sets "telemetryMode": "OpenTelemetry", the host emits the SERVER request span for every trigger invocation (HTTP, Timer, Queue, Service Bus, ...). Your worker spans become children of that request via W3C traceparent propagation, and the end-to-end transaction tree renders in the Performance blade.

Required configuration:

  1. host.json:

    { "version": "2.0", "telemetryMode": "OpenTelemetry" }
    
  2. App settings:

    APPLICATIONINSIGHTS_CONNECTION_STRING=<your conn string>
    PYTHON_APPLICATIONINSIGHTS_ENABLE_TELEMETRY=true
    

    The second setting tells the host to let the Python worker stream OTel logs directly, avoiding duplicate host-side log records.

  3. In the worker, just call bootstrap_azure_monitor() (or use the builder). AzureManagementBuilder.with_logger() detects the function context via reflection_kind and automatically disables the FastAPI / Django / Flask auto-instrumentors so the worker doesn't emit competing SERVER spans for the same invocation. Use @trace_function() (default INTERNAL) on handlers and nested calls — they'll appear as dependencies under the host's request.

Outside Function Apps (App Service, Container Apps, VM), the FastAPI / Django / Flask auto-instrumentors stay enabled because there is no host emitter and the framework instrumentor is the only source of the SERVER request span.

Per-function vs. process-wide: the trace_function(record_result=True) flag controls only whether the decorated function's return value is attached as a span attribute. GenAI prompt/completion content capture is a separate, process-level concern controlled by bootstrap_azure_monitor(capture_gen_ai_content=True) — the two are deliberately decoupled because GenAI instrumentors read the env var at install time, not per-call.

Example: tracing a chat completion

from mgmt_config import logger, ai_projects, log_execution_config

@logger.trace_function()
async def generate_summary(document_text: str) -> str:
    ai_project = ai_projects.get("aiservices")
    openai_client = ai_project.get_openai_client()

    response = openai_client.chat.completions.create(
        model="gpt-5",
        messages=[
            {"role": "system", "content": "Summarize the document."},
            {"role": "user", "content": document_text},
        ],
    )
    return response.choices[0].message.content

The trace in AI Foundry shows a parent span for generate_summary with a child chat gpt-5 span containing model, token counts, latency, and (when the logger was constructed with capture_gen_ai_content=True) the full prompt/completion content.

Example: tracing a Foundry agent invocation

# Agent trace flows through AIProjectInstrumentor -> AI Foundry Tracing UI
result = ai_projects["aiservices"].invoke_agent(
    agent_name="doc-summarizer",
    user_message="Summarize the attached document",
)

The trace shows AzureAIProject.invoke_agent with gen_ai.system=az.ai.projects, gen_ai.operation.name=invoke_agent, gen_ai.agent.name=doc-summarizer, and (via AIProjectInstrumentor) nested spans for the Responses API call, tool calls, and model invocation — all grouped under the agent in the Foundry Tracing UI.

Linking Application Insights to your Foundry project

The AI Foundry Tracing tab in the portal reads directly from the Application Insights resource linked to your Foundry project (Project → Tracing → "Manage data source"). Setting APPLICATIONINSIGHTS_CONNECTION_STRING is not enough on its own — the resource must be linked once via the portal for the Tracing UI to find the traces.

If your app wants to fetch the linked connection string at runtime instead of hand-wiring it:

from mgmt_config import ai_projects

ai = ai_projects["aiservices"]
conn_str = ai.get_application_insights_connection_string()  # returns None if not linked

This wraps azure-ai-projects' client.telemetry.get_application_insights_connection_string() and is the recommended bootstrap path when you want the logger to always target whatever App Insights is currently linked to your Foundry resource.

Feature Flags

Enable only the storage services you need:

Flag Default Service
enable_blob_storage True BlobServiceClient
enable_file_storage False ShareServiceClient (requires token_intent="backup" RBAC)
enable_queue_storage True QueueServiceClient

Dependencies

  • azure-storage-blob - Blob operations
  • azure-storage-file-share - File share operations
  • azure-storage-queue - Queue operations
  • azure-identity - Credential management
  • azure-keyvault-secrets / keys / certificates - Key Vault
  • azure-cosmos - Cosmos DB
  • azure-ai-projects - AI Foundry Projects (agents, deployments, connections, AIProjectInstrumentor for Responses API tracing)
  • azure-ai-documentintelligence - Document Intelligence (analyze, model management)
  • azure-cognitiveservices-speech - Speech (synthesis, recognition with Entra ID)
  • azure-monitor-opentelemetry - Telemetry
  • opentelemetry-instrumentation-openai-v2 - AI Foundry tracing for OpenAI SDK calls

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

azpaddypy-1.0.6.tar.gz (150.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

azpaddypy-1.0.6-py3-none-any.whl (83.6 kB view details)

Uploaded Python 3

File details

Details for the file azpaddypy-1.0.6.tar.gz.

File metadata

  • Download URL: azpaddypy-1.0.6.tar.gz
  • Upload date:
  • Size: 150.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for azpaddypy-1.0.6.tar.gz
Algorithm Hash digest
SHA256 779e68056f997b2074fef22c8929ec062cd9013d82ac0f07e123f93270e6209f
MD5 05e60da6fa3d400c19ad7b99f1806b97
BLAKE2b-256 e386727d3f71ab51be64d4479293263d8bac71617c32e2b02e1d2acfd58caf51

See more details on using hashes here.

File details

Details for the file azpaddypy-1.0.6-py3-none-any.whl.

File metadata

  • Download URL: azpaddypy-1.0.6-py3-none-any.whl
  • Upload date:
  • Size: 83.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for azpaddypy-1.0.6-py3-none-any.whl
Algorithm Hash digest
SHA256 0aba82b330f971c7cc63a9a083fe7ffedd5cba83b1c81fa3a0bf4debfbe0f566
MD5 3a8b313ac420d2971d55b50f9426436d
BLAKE2b-256 69157cd21f22b5165d411e6501e9357ee976541adf92760ebd6c9a72df35e850

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page