Shared HTTP, storage, and runtime helpers for automation services.
Project description
Infra Core
infra-core contains the reusable HTTP, storage, Azure upload, and runtime helpers shared by multiple services.
For a task-oriented index, see CAPABILITIES.md or inspect infra_core.capabilities from an installed environment. The installed wheel exposes infra_core.capabilities; the source tree and source distribution also include AGENTS.md, examples/, agent-snippets/, and skills/.
Supported Python versions: 3.10 · 3.11 · 3.12 · 3.13 (3.13 added to the testing matrix as soon as it is available in the execution environment).
Modules
| Module | Purpose |
|---|---|
infra_core.http / http_async |
Request helpers with sensible defaults and retry/backoff logic. |
infra_core.capabilities |
Task-oriented index that maps common automation jobs to the preferred public imports. |
infra_core.fs_utils |
Local filesystem helpers (ensure_parent, hashed_asset_path, compute_checksum). |
infra_core.asset_client |
Async asset downloader with retries, connection pooling, and checksum support. |
infra_core.polling |
Reusable polling utilities with exponential backoff for status-based operations (poll_until, StatusPoller). |
infra_core.azure.client |
Azure Function-oriented HTTP clients that inject API keys, honor timeout/retry overrides, and support sync wrappers. |
infra_core.azure.job_store |
Lightweight Azure Table Storage job tracker with environment-driven configuration and caching. |
infra_core.azure.monitoring |
Monitoring helpers for heartbeats, run logs, and telemetry enrichment with mirror logging. |
infra_core.azure.storage |
Thin Azure Blob client with configurable retries, local mirroring, and async helpers. |
infra_core.task_runtime |
Cooperative asyncio runtime (TaskRuntime) with per-task budgeting. |
Quick Start
More complete patterns live in examples/.
HTTP Fetching
from infra_core import fetch, fetch_async
html = fetch("https://example.com", timeout=30)
async def load_async() -> str:
return await fetch_async("https://example.com")
Storage with Optional Azure Mirroring
from pathlib import Path
from infra_core import AzureStorageClient, AzureStorageSettings, download_asset
# Load settings with mode (v0.6.0+)
settings, mode = AzureStorageSettings.from_env()
storage = AzureStorageClient.from_settings(settings, mode)
storage.write_json(Path("output/results.json"), {"status": "ok"})
asset_path = download_asset(
"https://example.com/image.png",
Path("assets/image.png"),
skip_if_exists=True,
)
Run Storage Manifests
Generate enriched manifests for run artifacts with automatic file type detection, MIME types, and blob URLs:
from pathlib import Path
from infra_core.azure.run_storage import build_manifest
from infra_core.azure.storage import AzureStorageSettings, get_client
# Build manifest for run artifacts (v0.6.0+)
storage_path = Path("/tmp/runs/batch-123/job-456/outputs")
settings, mode = AzureStorageSettings.from_env_with_prefix("MYSERVICE")
azure_client = get_client(settings, mode)
manifest = build_manifest(storage_path, client=azure_client)
# Manifest structure (v0.4.0+)
{
"files": [
{
"path": "screenshot.png",
"type": "image", # Generic category: image, document, data, other
"content_type": "image/png", # MIME type via mimetypes library
"size_bytes": 245680,
"url": "https://account.blob.core.windows.net/container/blob/path" # Full blob URL
},
{
"path": "metadata.json",
"type": "data",
"content_type": "application/json",
"size_bytes": 1024,
"url": "https://..."
}
],
"storage": {
"provider": "azure_blob",
"container": "output-container",
"prefix": "runs/batch-123/job-456/outputs"
},
"summary": {
"total_files": 2,
"total_size_bytes": 246704,
"file_types": {"image": 1, "data": 1}
}
}
Migration from v0.3.x:
# Old format (v0.3.x)
storage_path = manifest["storage_path"]
blob_prefix = manifest["blob_prefix"]
files = manifest["files"]
# New format (v0.4.0+)
storage = manifest.get("storage", {})
blob_prefix = storage.get("prefix")
files = manifest["files"] # Now includes type, content_type, size_bytes, url
# Access file URLs directly
for file in files:
print(f"{file['path']}: {file['url']}")
Features:
- Automatic MIME type detection using Python's
mimetypeslibrary - Generic file categorization (image, document, data, other) for flexible organization
- Full blob URLs with proper encoding of special characters (spaces, unicode, #, ?, &)
- Summary statistics for efficient UI rendering
- Prefers local files with automatic fallback to blob listing when local directory is empty
Concurrent Task Runtime
import asyncio
from infra_core import TaskRuntime, RuntimeConfig
async def process(item: str) -> None:
...
async def main() -> None:
runtime = TaskRuntime(config=RuntimeConfig(concurrency=5, task_timeout=30.0))
tasks = [(item, lambda item=item: process(item)) for item in ["a", "b", "c"]]
await runtime.run(tasks)
asyncio.run(main())
Task Runtime Semantics
TaskRuntime enforces the configured concurrency per run() call by limiting the inflight set of tasks it schedules at once. A separate _active_tasks set tracks all tasks spawned across overlapping run() calls so that a single cancel() sweeps everything that is currently executing. As a result _active_tasks can temporarily exceed config.concurrency, which is expected and keeps cancellation comprehensive while each run() call still honours the configured bound.
By default, any task exception (including per-task timeouts) is propagated back to the caller so failure is obvious. Supplying on_error and/or on_timeout callbacks opts you into best-effort mode where the runtime reports failures via callbacks and continues processing the remaining tasks.
Status Polling
from infra_core.polling import poll_until, StatusPoller, PollingConfig
# Simple polling with timeout
result = poll_until(
fetch_fn=lambda: api_client.get_status(job_id),
is_terminal=lambda r: r["status"] in {"completed", "failed"},
timeout=600.0,
poll_interval=5.0,
)
# High-level polling with progress callbacks
def fetch_status():
return {"status": "running", "progress": 50}
config = PollingConfig(
timeout=600.0,
poll_interval=5.0,
terminal_statuses={"completed", "failed"},
failure_statuses={"failed"},
initial_delay=2.0,
backoff_multiplier=2.0,
max_backoff=30.0,
)
poller = StatusPoller(fetch_fn=fetch_status, config=config)
final_status = poller.poll(
on_poll=lambda s: print(f"Progress: {s.get('progress')}%")
)
Async polling is also supported via poll_until_async and StatusPoller.poll_async(). Both the low-level helpers and StatusPoller honour the PollingConfig backoff settings (including initial_delay and exponential backoff bounded by max_backoff), so increasing backoff_multiplier automatically slows subsequent polls without reimplementing the loop.
Example scripts:
- examples/http_fetch_with_retries.py
- examples/poll_remote_job.py
- examples/bounded_async_runtime.py
- examples/download_and_mirror_asset.py
- examples/azure_function_and_job_store.py
Installation
Using uv (recommended):
uv pip install infra-core
Install with Azure helpers:
uv pip install "infra-core[azure]"
Or using pip:
pip install infra-core
pip install "infra-core[azure]" # with Azure support
Tests
Matrix: Python 3.10, 3.11, 3.12, and 3.13 (once the interpreter is available in the execution environment).
Recommended local verification:
make verify
This wrapper runs pre-commit on tracked and untracked files, then runs the test suite and package build.
pytest tests -v
Or with coverage:
pytest tests -v --cov=infra_core --cov-report=term-missing
Configuration
Azure Storage (Optional)
When Azure credentials are provided, infra_core mirrors files written via helpers such as write_json, write_text, and their async counterparts.
Required
AZURE_STORAGE_CONTAINER– Target container name.
Authentication (choose one)
AZURE_STORAGE_CONNECTION_STRING– Full connection string; orAZURE_STORAGE_ACCOUNT– Storage account name (uses DefaultAzureCredential)AZURE_STORAGE_BLOB_ENDPOINT– Optional custom endpoint.
Optional
AZURE_STORAGE_BLOB_PREFIX– Prefix applied to uploaded blobs.AZURE_STORAGE_MODE– Storage mode:strict(default, raises on missing config),best-effort(logs and skips), ordev-skip(never uploads).
Blob Download Safety
Blob download helpers stream data into unique temp files via tempfile.mkstemp() (see azure_storage._stream_blob_to_path*). The files are placed next to the destination, created with owner-only permissions, flushed and fsync'd, then atomically renamed into place. This preserves original extensions, avoids collisions when multiple processes download the same blob, and prevents partially-downloaded data from clobbering the real file.
No configuration is required to use the HTTP or runtime helpers; sensible defaults are provided.
Azure-specific helpers
Storage mirroring & downloads
infra_core.azure.storage.AzureStorageClient provides Azure Blob Storage operations with explicit configuration. Settings and mode must be passed explicitly to prevent accidental cross-service configuration conflicts.
from infra_core.azure import AzureStorageSettings, get_client
# Load settings with service-specific prefix (recommended for multi-service deployments)
settings, mode = AzureStorageSettings.from_env_with_prefix("CRAWLER")
# Reads: CRAWLER_AZURE_STORAGE_CONTAINER, CRAWLER_AZURE_STORAGE_MODE, etc.
# Or use default env vars
settings, mode = AzureStorageSettings.from_env()
# Reads: AZURE_STORAGE_CONTAINER, AZURE_STORAGE_MODE, etc.
client = get_client(settings, mode)
client.upload_text(path, "content")
client.download_tree(target_path)
Modes:
strict(default): Container + credentials required;ConfigurationErrorif missingbest-effort: Log and skip when config is missingdev-skip: Never attempt uploads (for local development)
Azure Function HTTP clients
infra_core.azure.client.AzureFunctionAsyncClient provides a retry/backoff wrapper around httpx, automatically injects x-functions-key when api_key is configured, and supports RequestOptions to tune retries, delays, and timeouts at each call site. The sync counterpart (AzureFunctionSyncClient) runs the async flow on a dedicated background event loop so CLI scripts can reuse the same configuration without juggling event loops, and close() now tears down the owned HTTP client before stopping that loop. Both helpers offer .from_env() helpers to materialize clients directly from AZURE_FUNCTION_BASE_URL and AZURE_FUNCTION_API_KEY (or similar) environment variables. These helpers do not use Azure Identity; az login only applies to the Azure SDK-based storage and table helpers.
Azure Table job store
infra_core.azure.job_store.AzureTableJobStore and TableConfig.from_env() share consistent env vars (AZURE_JOB_TABLE, AZURE_JOB_CONNECTION, fallback to AZURE_STORAGE_*) plus a shared client cache. For authentication, choose either AZURE_JOB_CONNECTION / AZURE_STORAGE_CONNECTION_STRING or AZURE_STORAGE_ACCOUNT with optional AZURE_STORAGE_TABLE_ENDPOINT; the latter uses DefaultAzureCredential, so az login and managed identity work without a connection string. AzureTableJobStore now exposes close() and a sync context manager so token-auth clients release their credentials deterministically when you are done with them. Each record implements to_entity() / from_entity() so you can store structured run metadata and look up retries via fetch() plus list_runs() pagination helpers. list_runs() sorts records within each returned page by updated_at; continuation pages still follow Azure Table pagination order rather than a global newest-first order.
Monitoring & telemetry
infra_core.azure.monitoring keeps a running heartbeat, mirrors logs to blob storage, and validates telemetry payloads before emitting them. Pair that module with the HTTP helpers to report failures from Azure Functions, and enable the infra_core.azure.monitoring.run_log_writer when you need consistent log labeling across services.
from infra_core.azure.monitoring import TelemetryConfig, TelemetryClient, TelemetryEvent
# Explicit telemetry configuration (v0.7.0+)
config = TelemetryConfig.from_env_with_prefix("CRAWLER")
# Reads: CRAWLER_APPLICATIONINSIGHTS_CONNECTION_STRING
# Falls back to: APPLICATIONINSIGHTS_CONNECTION_STRING, APPINSIGHTS_INSTRUMENTATIONKEY
client = TelemetryClient(config=config, service_name="crawler")
client.track_event(TelemetryEvent.STARTED, batch_id="batch-1", job_id="job-1")
client.track_event(TelemetryEvent.COMPLETED, batch_id="batch-1", metadata={"duration_ms": 1500})
Sample environment (.env) snippet
# Blob mirroring
export AZURE_STORAGE_CONTAINER="crawler-artifacts"
export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true" # Azurite/local
export AZURE_STORAGE_BLOB_PREFIX="screenshots/"
# Azure Function triggers
export AZURE_FUNCTION_BASE_URL="https://my-team-func.azurewebsites.net"
export AZURE_FUNCTION_API_KEY="your-secret-key"
# Azure Table job tracking (choose one auth mode)
export AZURE_JOB_TABLE="crawlerRuns"
export AZURE_JOB_CONNECTION="UseDevelopmentStorage=true"
# Or instead of AZURE_JOB_CONNECTION:
# export AZURE_STORAGE_ACCOUNT="mystorageaccount"
# export AZURE_STORAGE_TABLE_ENDPOINT="https://mystorageaccount.table.core.windows.net" # optional custom endpoint
CLI wiring example
Use infra_core.azure.function_cli.add_azure_service_arguments() to add the shared Azure flags to any argparse-based entrypoint, then call into AzureFunctionAsyncClient with values resolved from flags or environment variables:
import argparse
import asyncio
import os
from infra_core.azure.client import AzureFunctionAsyncClient
from infra_core.azure.function_cli import add_azure_service_arguments
parser = argparse.ArgumentParser()
add_azure_service_arguments(parser, base_url_env_var="AZURE_FUNCTION_BASE_URL")
args = parser.parse_args()
async def main() -> None:
client = AzureFunctionAsyncClient(
args.base_url or os.environ["AZURE_FUNCTION_BASE_URL"],
api_key=os.getenv("AZURE_FUNCTION_API_KEY"),
)
payload = await client.post_json(
"/api/run",
json={"batchId": args.batch_id, "concurrency": args.concurrency},
)
print(payload)
asyncio.run(main())
Pair this with the .env snippet above and you have a ready-to-run CLI that schedules Azure Function batches, polls for completion (via the parser defaults), and mirrors summaries to blob storage when --summary-file is provided.
Logging
infra_core uses Python's standard logging module. To enable diagnostics in your application:
import logging
logging.basicConfig(level=logging.INFO)
logging.getLogger("infra_core.asset_client").setLevel(logging.DEBUG)
Logger namespaces:
| Logger | Purpose |
|---|---|
infra_core.asset_client |
Download/retry lifecycle |
infra_core.polling |
Polling attempts, backoff, and terminal states |
infra_core.azure.storage |
Blob uploads/downloads |
infra_core.task_runtime |
Concurrency and cancellation events |
All log records include structured extra={...} fields (e.g., url, blob_name, attempt). Configure your formatter (JSON or text) to emit those keys for easier filtering, and sanitize environment-specific secrets before forwarding logs.
OpenTelemetry/trace correlation
If your application uses OpenTelemetry, start spans around infra_core operations and add span IDs to log records so traces and logs stay aligned:
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("download_batch") as span:
logger = logging.getLogger("infra_core.asset_client")
logger.info(
"Starting download",
extra={
"url": url,
"trace_id": span.get_span_context().trace_id,
"span_id": span.get_span_context().span_id,
},
)
await download_asset_async(url, dest)
Troubleshooting
- "Azure storage not configured" warning – Ensure
AZURE_STORAGE_CONTAINERis set along with eitherAZURE_STORAGE_CONNECTION_STRINGorAZURE_STORAGE_ACCOUNT. CallAzureStorageClient.from_settings()after loading environment variables to confirm configuration. download_assetraises inside async code – Usedownload_asset_asyncwhen an event loop is running; the sync helper intentionally fails insideasynciocontexts to avoid deadlocks.- Type checker cannot find infra_core stubs – Install dev extras (
pip install .[dev]oruv sync --extra dev) sopy.typedand dependency stubs are available to mypy/pyright. - HTTP retries still hitting rate limits – Pass a
delaytofetch/fetch_asyncor construct a customRequestsHttpClient/AsyncHttpClientwith tuned limits and headers. - Large Azure uploads timing out – Use the async helpers (they stream files). Upload errors raise by default; set
swallow_errors=Trueonly if you explicitly want best-effort mirroring.
Contributing
Using uv (recommended):
git clone https://github.com/pj-ms/infra-core.git
cd infra-core
uv sync --extra azure --extra dev
uv run pytest tests -v --cov=infra_core --cov-report=term-missing
uv run mypy src/infra_core
uv run ruff check src/ tests/
See CONTRIBUTING.md for more details.
License
MIT License - see LICENSE for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file infra_core-0.7.5.tar.gz.
File metadata
- Download URL: infra_core-0.7.5.tar.gz
- Upload date:
- Size: 113.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2cf6948ff0155a308c08a97b33a8d4f648ecbe4f1272af17e6a6954cbb4d5dc3
|
|
| MD5 |
ab47f6edc8faedcfe3af08cd2b01100b
|
|
| BLAKE2b-256 |
e17e6b7375a64597dee0c43ea6b58942fb3d71a12f440ebd75c3ef20cd97a776
|
Provenance
The following attestation bundles were made for infra_core-0.7.5.tar.gz:
Publisher:
publish.yml on pj-ms/infra-core
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
infra_core-0.7.5.tar.gz -
Subject digest:
2cf6948ff0155a308c08a97b33a8d4f648ecbe4f1272af17e6a6954cbb4d5dc3 - Sigstore transparency entry: 1100164743
- Sigstore integration time:
-
Permalink:
pj-ms/infra-core@80a82ab5735718c4489ca0fbcdb3c53aabd1d0d1 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/pj-ms
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@80a82ab5735718c4489ca0fbcdb3c53aabd1d0d1 -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file infra_core-0.7.5-py3-none-any.whl.
File metadata
- Download URL: infra_core-0.7.5-py3-none-any.whl
- Upload date:
- Size: 74.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5185318783966c42bf94f393e2d45d0724cd00993a9fe6ccdc9b1bf569e3e194
|
|
| MD5 |
74c3f090945c5b21a491246553d1c1c0
|
|
| BLAKE2b-256 |
2f177617138bff629c1108caaad1b1e6b2296a6f1ebe098299f0d7e79f13d3f3
|
Provenance
The following attestation bundles were made for infra_core-0.7.5-py3-none-any.whl:
Publisher:
publish.yml on pj-ms/infra-core
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
infra_core-0.7.5-py3-none-any.whl -
Subject digest:
5185318783966c42bf94f393e2d45d0724cd00993a9fe6ccdc9b1bf569e3e194 - Sigstore transparency entry: 1100164747
- Sigstore integration time:
-
Permalink:
pj-ms/infra-core@80a82ab5735718c4489ca0fbcdb3c53aabd1d0d1 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/pj-ms
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@80a82ab5735718c4489ca0fbcdb3c53aabd1d0d1 -
Trigger Event:
workflow_dispatch
-
Statement type: