Skip to main content

Shared HTTP, storage, and runtime helpers for automation services.

Project description

Infra Core

PyPI Python Versions Build Status License

infra-core contains the reusable HTTP, storage, Azure upload, and runtime helpers shared by multiple services.

Supported Python versions: 3.11 · 3.12 · 3.13 (3.13 added to the testing matrix as soon as it is available in the execution environment).

Modules

Module Purpose
infra_core.http / http_async Request helpers with sensible defaults and retry/backoff logic.
infra_core.fs_utils Local filesystem helpers (ensure_parent, hashed_asset_path, compute_checksum).
infra_core.asset_client Async asset downloader with retries, connection pooling, and checksum support.
infra_core.polling Reusable polling utilities with exponential backoff for status-based operations (poll_until, StatusPoller).
infra_core.azure.client Azure Function-oriented HTTP clients that inject API keys, honor timeout/retry overrides, and support sync wrappers.
infra_core.azure.job_store Lightweight Azure Table Storage job tracker with environment-driven configuration and caching.
infra_core.azure.monitoring Monitoring helpers for heartbeats, run logs, and telemetry enrichment with mirror logging.
infra_core.azure.storage Thin Azure Blob client with configurable retries, local mirroring, and async helpers.
infra_core.task_runtime Cooperative asyncio runtime (TaskRuntime) with per-task budgeting.

Quick Start

HTTP Fetching

from infra_core import fetch, fetch_async

html = fetch("https://example.com", timeout=30)

async def load_async() -> str:
    return await fetch_async("https://example.com")

Storage with Optional Azure Mirroring

from pathlib import Path
from infra_core import AzureStorageClient, AzureStorageSettings, download_asset

settings = AzureStorageSettings.from_env()
storage = AzureStorageClient.from_settings(settings)
storage.write_json(Path("output/results.json"), {"status": "ok"})

asset_path = download_asset(
    "https://example.com/image.png",
    Path("assets/image.png"),
    skip_if_exists=True,
)

Run Storage Manifests

Generate enriched manifests for run artifacts with automatic file type detection, MIME types, and blob URLs:

from pathlib import Path
from infra_core.azure.run_storage import build_manifest
from infra_core.azure.storage import get_client

# Build manifest for run artifacts
storage_path = Path("/tmp/runs/batch-123/job-456/outputs")
azure_client = get_client()

manifest = build_manifest(storage_path, client=azure_client)

# Manifest structure (v0.4.0+)
{
    "files": [
        {
            "path": "screenshot.png",
            "type": "image",                    # Generic category: image, document, data, other
            "content_type": "image/png",        # MIME type via mimetypes library
            "size_bytes": 245680,
            "url": "https://account.blob.core.windows.net/container/blob/path"  # Full blob URL
        },
        {
            "path": "metadata.json",
            "type": "data",
            "content_type": "application/json",
            "size_bytes": 1024,
            "url": "https://..."
        }
    ],
    "storage": {
        "provider": "azure_blob",
        "container": "output-container",
        "prefix": "runs/batch-123/job-456/outputs"
    },
    "summary": {
        "total_files": 2,
        "total_size_bytes": 246704,
        "file_types": {"image": 1, "data": 1}
    }
}

Migration from v0.3.x:

# Old format (v0.3.x)
storage_path = manifest["storage_path"]
blob_prefix = manifest["blob_prefix"]
files = manifest["files"]

# New format (v0.4.0+)
storage = manifest.get("storage", {})
blob_prefix = storage.get("prefix")
files = manifest["files"]  # Now includes type, content_type, size_bytes, url

# Access file URLs directly
for file in files:
    print(f"{file['path']}: {file['url']}")

Features:

  • Automatic MIME type detection using Python's mimetypes library
  • Generic file categorization (image, document, data, other) for flexible organization
  • Full blob URLs with proper encoding of special characters (spaces, unicode, #, ?, &)
  • Summary statistics for efficient UI rendering
  • Prefers local files with automatic fallback to blob listing when local directory is empty

Concurrent Task Runtime

import asyncio
from infra_core import TaskRuntime, RuntimeConfig

async def process(item: str) -> None:
    ...

async def main() -> None:
    runtime = TaskRuntime(config=RuntimeConfig(concurrency=5, task_timeout=30.0))
    tasks = [(item, lambda item=item: process(item)) for item in ["a", "b", "c"]]
    await runtime.run(tasks)

asyncio.run(main())

Task Runtime Semantics

TaskRuntime enforces the configured concurrency per run() call by limiting the inflight set of tasks it schedules at once. A separate _active_tasks set tracks all tasks spawned across overlapping run() calls so that a single cancel() sweeps everything that is currently executing. As a result _active_tasks can temporarily exceed config.concurrency, which is expected and keeps cancellation comprehensive while each run() call still honours the configured bound.

By default, any task exception (including per-task timeouts) is propagated back to the caller so failure is obvious. Supplying on_error and/or on_timeout callbacks opts you into best-effort mode where the runtime reports failures via callbacks and continues processing the remaining tasks.

Status Polling

from infra_core.polling import poll_until, StatusPoller, PollingConfig

# Simple polling with timeout
result = poll_until(
    fetch_fn=lambda: api_client.get_status(job_id),
    is_terminal=lambda r: r["status"] in {"completed", "failed"},
    timeout=600.0,
    poll_interval=5.0,
)

# High-level polling with progress callbacks
def fetch_status():
    return {"status": "running", "progress": 50}

config = PollingConfig(
    timeout=600.0,
    poll_interval=5.0,
    terminal_statuses={"completed", "failed"},
    failure_statuses={"failed"},
    initial_delay=2.0,
    backoff_multiplier=2.0,
    max_backoff=30.0,
)

poller = StatusPoller(fetch_fn=fetch_status, config=config)
final_status = poller.poll(
    on_poll=lambda s: print(f"Progress: {s.get('progress')}%")
)

Async polling is also supported via poll_until_async and StatusPoller.poll_async(). Both the low-level helpers and StatusPoller honour the PollingConfig backoff settings (including initial_delay and exponential backoff bounded by max_backoff), so increasing backoff_multiplier automatically slows subsequent polls without reimplementing the loop.

Installation

Using uv (recommended):

uv pip install infra-core

Install with Azure helpers:

uv pip install "infra-core[azure]"

Or using pip:

pip install infra-core
pip install "infra-core[azure]"  # with Azure support

Tests

Matrix: Python 3.11, 3.12, and 3.13 (once the interpreter is available in the execution environment).

pytest tests -v

Or with coverage:

pytest tests -v --cov=infra_core --cov-report=term-missing

Configuration

Azure Storage (Optional)

When Azure credentials are provided, infra_core mirrors files written via helpers such as write_json, write_text, and their async counterparts.

Required

  • AZURE_STORAGE_CONTAINER – Target container name.

Authentication (choose one)

  • AZURE_STORAGE_CONNECTION_STRING – Full connection string; or
  • AZURE_STORAGE_ACCOUNT – Storage account name (uses DefaultAzureCredential)
    • AZURE_STORAGE_BLOB_ENDPOINT – Optional custom endpoint.

Optional

  • AZURE_STORAGE_BLOB_PREFIX – Prefix applied to uploaded blobs.

Blob Download Safety

Blob download helpers stream data into unique temp files via tempfile.mkstemp() (see azure_storage._stream_blob_to_path*). The files are placed next to the destination, created with owner-only permissions, flushed and fsync'd, then atomically renamed into place. This preserves original extensions, avoids collisions when multiple processes download the same blob, and prevents partially-downloaded data from clobbering the real file.

No configuration is required to use the HTTP or runtime helpers; sensible defaults are provided.

Azure-specific helpers

Storage mirroring & downloads

infra_core.azure.storage.AzureStorageClient layers on AzureStorageSettings.from_env() so the same env vars power every service. It caches blob/credential clients (AzureStorageClient.from_settings()), mirrors write_text/write_json calls to Azure while keeping a local file, and exposes sync/async helpers such as download_tree, list_tree, and blob_path_for to rehydrate blob hierarchies without boilerplate. Use upload_text/upload_file for overlaying prefix-aware blob names and handle swallow_errors=False if you want every upload failure to trip downstream error handling.

Azure Function HTTP clients

infra_core.azure.client.AzureFunctionAsyncClient provides a retry/backoff wrapper around httpx, automatically injects x-functions-key when api_key is configured, and supports RequestOptions to tune retries, delays, and timeouts at each call site. The sync counterpart (AzureFunctionSyncClient) simply runs the async flow via asyncio.run() so CLI scripts can reuse the same configuration without juggling event loops. Both helpers offer .from_env() helpers to materialize clients directly from AZURE_FUNCTION_BASE_URL and AZURE_FUNCTION_API_KEY (or similar) environment variables.

Azure Table job store

infra_core.azure.job_store.AzureTableJobStore and TableConfig.from_env() share consistent env vars (AZURE_JOB_TABLE, AZURE_JOB_CONNECTION, fallback to AZURE_STORAGE_*) plus configurable cache size (via functools.lru_cache). Each record implements to_entity() / from_entity() so you can store structured run metadata and look up retries via fetch() plus list_runs() pagination helpers.

Monitoring & telemetry

infra_core.azure.monitoring keeps a running heartbeat, mirrors logs to blob storage, and validates telemetry payloads before emitting them. Pair that module with the HTTP helpers to report failures from Azure Functions, and enable the infra_core.azure.monitoring.run_log_writer when you need consistent log labeling across services.

Sample environment (.env) snippet

# Blob mirroring
export AZURE_STORAGE_CONTAINER="crawler-artifacts"
export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true"  # Azurite/local
export AZURE_STORAGE_BLOB_PREFIX="screenshots/"

# Azure Function triggers
export AZURE_FUNCTION_BASE_URL="https://my-team-func.azurewebsites.net"
export AZURE_FUNCTION_API_KEY="your-secret-key"

# Azure Table job tracking
export AZURE_JOB_TABLE="crawlerRuns"
export AZURE_JOB_CONNECTION="UseDevelopmentStorage=true"

CLI wiring example

Use infra_core.azure.function_cli.add_azure_service_arguments() to add the shared Azure flags to any argparse-based entrypoint, then call into AzureFunctionAsyncClient with values resolved from flags or environment variables:

import argparse
import asyncio
import os

from infra_core.azure.client import AzureFunctionAsyncClient
from infra_core.azure.function_cli import add_azure_service_arguments

parser = argparse.ArgumentParser()
add_azure_service_arguments(parser, base_url_env_var="AZURE_FUNCTION_BASE_URL")
args = parser.parse_args()

async def main() -> None:
    client = AzureFunctionAsyncClient(
        args.base_url or os.environ["AZURE_FUNCTION_BASE_URL"],
        api_key=os.getenv("AZURE_FUNCTION_API_KEY"),
    )
    payload = await client.post_json(
        "/api/run",
        json={"batchId": args.batch_id, "concurrency": args.concurrency},
    )
    print(payload)

asyncio.run(main())

Pair this with the .env snippet above and you have a ready-to-run CLI that schedules Azure Function batches, polls for completion (via the parser defaults), and mirrors summaries to blob storage when --summary-file is provided.

Logging

infra_core uses Python's standard logging module. To enable diagnostics in your application:

import logging

logging.basicConfig(level=logging.INFO)
logging.getLogger("infra_core.asset_client").setLevel(logging.DEBUG)

Logger namespaces:

Logger Purpose
infra_core.asset_client Download/retry lifecycle
infra_core.polling Polling attempts, backoff, and terminal states
infra_core.azure.storage Blob uploads/downloads
infra_core.task_runtime Concurrency and cancellation events

All log records include structured extra={...} fields (e.g., url, blob_name, attempt). Configure your formatter (JSON or text) to emit those keys for easier filtering, and sanitize environment-specific secrets before forwarding logs.

OpenTelemetry/trace correlation

If your application uses OpenTelemetry, start spans around infra_core operations and add span IDs to log records so traces and logs stay aligned:

from opentelemetry import trace

tracer = trace.get_tracer(__name__)

with tracer.start_as_current_span("download_batch") as span:
    logger = logging.getLogger("infra_core.asset_client")
    logger.info(
        "Starting download",
        extra={
            "url": url,
            "trace_id": span.get_span_context().trace_id,
            "span_id": span.get_span_context().span_id,
        },
    )
    await download_asset_async(url, dest)

Troubleshooting

  • "Azure storage not configured" warning – Ensure AZURE_STORAGE_CONTAINER is set along with either AZURE_STORAGE_CONNECTION_STRING or AZURE_STORAGE_ACCOUNT. Call AzureStorageClient.from_settings() after loading environment variables to confirm configuration.
  • download_asset raises inside async code – Use download_asset_async when an event loop is running; the sync helper intentionally fails inside asyncio contexts to avoid deadlocks.
  • Type checker cannot find infra_core stubs – Install dev extras (pip install .[dev] or uv sync --extra dev) so py.typed and dependency stubs are available to mypy/pyright.
  • HTTP retries still hitting rate limits – Pass a delay to fetch/fetch_async or construct a custom RequestsHttpClient/AsyncHttpClient with tuned limits and headers.
  • Large Azure uploads timing out – Use the async helpers (they stream files) and tweak AzureStorageClient settings (e.g., swallow_errors=False, custom retry logic) to observe detailed failures.

Contributing

Using uv (recommended):

git clone https://github.com/pj-ms/infra-core.git
cd infra-core
uv sync --extra azure --extra dev
uv run pytest tests -v --cov=infra_core --cov-report=term-missing
uv run mypy src/infra_core
uv run ruff check src/ tests/

See CONTRIBUTING.md for more details.

Releasing

This project uses bump-my-version for versioning.

# Bump patch version (0.1.0 -> 0.1.1)
uv run bump-my-version bump patch

# Bump minor version (0.1.0 -> 0.2.0)
uv run bump-my-version bump minor

# Bump major version (0.1.0 -> 1.0.0)
uv run bump-my-version bump major

This will:

  1. Update version in pyproject.toml
  2. Create a git commit
  3. Create a git tag (v0.1.1, etc.)
  4. Push tag to trigger automatic PyPI publish

Then push:

git push && git push --tags

License

MIT License - see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

infra_core-0.4.0.tar.gz (81.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

infra_core-0.4.0-py3-none-any.whl (62.2 kB view details)

Uploaded Python 3

File details

Details for the file infra_core-0.4.0.tar.gz.

File metadata

  • Download URL: infra_core-0.4.0.tar.gz
  • Upload date:
  • Size: 81.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for infra_core-0.4.0.tar.gz
Algorithm Hash digest
SHA256 7877b388edcbb68367b6cd8c2662b13fc92f46edd6f73277c8b4c4d1873be5a1
MD5 968fc796e2a2c84096b8ce80d79bf852
BLAKE2b-256 2fb690b465a6600726818b17e1667a173046865c846fd2ca306cb1fa364ce698

See more details on using hashes here.

Provenance

The following attestation bundles were made for infra_core-0.4.0.tar.gz:

Publisher: publish.yml on pj-ms/infra-core

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file infra_core-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: infra_core-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 62.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for infra_core-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 80f947cca932207ccb1b65e946c6b58283281173a97d77f86a3e634bbe7e2d26
MD5 1f371df6be8608bb47da37b427961f2f
BLAKE2b-256 051e662a263671ab884c2a4586143bc46a9b3b626d1f10fd2cf42ed3ce8a97a5

See more details on using hashes here.

Provenance

The following attestation bundles were made for infra_core-0.4.0-py3-none-any.whl:

Publisher: publish.yml on pj-ms/infra-core

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page