Shared HTTP, storage, and runtime helpers for automation services.
Project description
Infra Core
infra-core contains the reusable HTTP, storage, Azure upload, and runtime helpers shared by multiple services.
Modules
| Module | Purpose |
|---|---|
infra_core.http / http_async |
Request helpers with sensible defaults and retry/backoff logic. |
infra_core.fs_utils |
Local filesystem helpers (ensure_parent, hashed_asset_path, compute_checksum). |
infra_core.asset_client |
Async asset downloader with retries, connection pooling, and checksum support. |
infra_core.azure_storage |
Thin Azure Blob client with configurable retries and optional async wrappers. |
infra_core.task_runtime |
Cooperative asyncio runtime (TaskRuntime) with per-task budgeting. |
Quick Start
HTTP Fetching
from infra_core import fetch, fetch_async
html = fetch("https://example.com", timeout=30)
async def load_async() -> str:
return await fetch_async("https://example.com")
Storage with Optional Azure Mirroring
from pathlib import Path
from infra_core import AzureStorageClient, AzureStorageSettings, download_asset
settings = AzureStorageSettings.from_env()
storage = AzureStorageClient.from_settings(settings)
storage.write_json(Path("output/results.json"), {"status": "ok"})
asset_path = download_asset(
"https://example.com/image.png",
Path("assets/image.png"),
skip_if_exists=True,
)
Concurrent Task Runtime
import asyncio
from infra_core import TaskRuntime, RuntimeConfig
async def process(item: str) -> None:
...
async def main() -> None:
runtime = TaskRuntime(config=RuntimeConfig(concurrency=5, task_timeout=30.0))
tasks = [(item, lambda item=item: process(item)) for item in ["a", "b", "c"]]
await runtime.run(tasks)
asyncio.run(main())
Task Runtime Semantics
TaskRuntime enforces the configured concurrency per run() call by limiting the inflight set of tasks it schedules at once. A separate _active_tasks set tracks all tasks spawned across overlapping run() calls so that a single cancel() sweeps everything that is currently executing. As a result _active_tasks can temporarily exceed config.concurrency, which is expected and keeps cancellation comprehensive while each run() call still honours the configured bound.
By default, any task exception (including per-task timeouts) is propagated back to the caller so failure is obvious. Supplying on_error and/or on_timeout callbacks opts you into best-effort mode where the runtime reports failures via callbacks and continues processing the remaining tasks.
Installation
Using uv (recommended):
uv pip install infra-core
Install with Azure helpers:
uv pip install "infra-core[azure]"
Or using pip:
pip install infra-core
pip install "infra-core[azure]" # with Azure support
Tests
pytest tests -v
Or with coverage:
pytest tests -v --cov=infra_core --cov-report=term-missing
Configuration
Azure Storage (Optional)
When Azure credentials are provided, infra_core mirrors files written via helpers such as write_json, write_text, and their async counterparts.
Required
AZURE_STORAGE_CONTAINER– Target container name.
Authentication (choose one)
AZURE_STORAGE_CONNECTION_STRING– Full connection string; orAZURE_STORAGE_ACCOUNT– Storage account name (uses DefaultAzureCredential)AZURE_STORAGE_BLOB_ENDPOINT– Optional custom endpoint.
Optional
AZURE_STORAGE_BLOB_PREFIX– Prefix applied to uploaded blobs.
Blob Download Safety
Blob download helpers stream data into unique temp files via tempfile.mkstemp() (see azure_storage._stream_blob_to_path*). The files are placed next to the destination, created with owner-only permissions, flushed and fsync'd, then atomically renamed into place. This preserves original extensions, avoids collisions when multiple processes download the same blob, and prevents partially-downloaded data from clobbering the real file.
No configuration is required to use the HTTP or runtime helpers; sensible defaults are provided.
Logging
infra_core uses Python's standard logging module. To enable diagnostics in your application:
import logging
logging.basicConfig(level=logging.INFO)
logging.getLogger("infra_core.asset_client").setLevel(logging.DEBUG)
Logger namespaces:
| Logger | Purpose |
|---|---|
infra_core.asset_client |
Download/retry lifecycle |
infra_core.azure_storage |
Blob uploads/downloads |
infra_core.task_runtime |
Concurrency and cancellation events |
All log records include structured extra={...} fields (e.g., url, blob_name, attempt). Configure your formatter (JSON or text) to emit those keys for easier filtering, and sanitize environment-specific secrets before forwarding logs.
OpenTelemetry/trace correlation
If your application uses OpenTelemetry, start spans around infra_core operations and add span IDs to log records so traces and logs stay aligned:
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("download_batch") as span:
logger = logging.getLogger("infra_core.asset_client")
logger.info(
"Starting download",
extra={
"url": url,
"trace_id": span.get_span_context().trace_id,
"span_id": span.get_span_context().span_id,
},
)
await download_asset_async(url, dest)
Troubleshooting
- "Azure storage not configured" warning – Ensure
AZURE_STORAGE_CONTAINERis set along with eitherAZURE_STORAGE_CONNECTION_STRINGorAZURE_STORAGE_ACCOUNT. CallAzureStorageClient.from_settings()after loading environment variables to confirm configuration. download_assetraises inside async code – Usedownload_asset_asyncwhen an event loop is running; the sync helper intentionally fails insideasynciocontexts to avoid deadlocks.- Type checker cannot find infra_core stubs – Install dev extras (
pip install .[dev]oruv sync --extra dev) sopy.typedand dependency stubs are available to mypy/pyright. - HTTP retries still hitting rate limits – Pass a
delaytofetch/fetch_asyncor construct a customRequestsHttpClient/AsyncHttpClientwith tuned limits and headers. - Large Azure uploads timing out – Use the async helpers (they stream files) and tweak
AzureStorageClientsettings (e.g.,swallow_errors=False, custom retry logic) to observe detailed failures.
Contributing
Using uv (recommended):
git clone https://github.com/pj-ms/infra-core.git
cd infra-core
uv sync --extra azure --extra dev
uv run pytest tests -v --cov=infra_core --cov-report=term-missing
uv run mypy src/infra_core
uv run ruff check src/ tests/
See CONTRIBUTING.md for more details.
Releasing
This project uses bump-my-version for versioning.
# Bump patch version (0.1.0 -> 0.1.1)
uv run bump-my-version bump patch
# Bump minor version (0.1.0 -> 0.2.0)
uv run bump-my-version bump minor
# Bump major version (0.1.0 -> 1.0.0)
uv run bump-my-version bump major
This will:
- Update version in
pyproject.toml - Create a git commit
- Create a git tag (
v0.1.1, etc.) - Push tag to trigger automatic PyPI publish
Then push:
git push && git push --tags
License
MIT License - see LICENSE for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file infra_core-0.1.0.tar.gz.
File metadata
- Download URL: infra_core-0.1.0.tar.gz
- Upload date:
- Size: 51.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1be326944bee58487cf6fec9533e1e120359a6b4fb5ed7441d424ee8c5493896
|
|
| MD5 |
bfd0c4d06c99e620da476f0dec59871b
|
|
| BLAKE2b-256 |
d536c1eddf264fbd05eb4551324e0ae537975da62744f207af6ed3c5c41df87a
|
Provenance
The following attestation bundles were made for infra_core-0.1.0.tar.gz:
Publisher:
publish.yml on pj-ms/infra-core
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
infra_core-0.1.0.tar.gz -
Subject digest:
1be326944bee58487cf6fec9533e1e120359a6b4fb5ed7441d424ee8c5493896 - Sigstore transparency entry: 684285767
- Sigstore integration time:
-
Permalink:
pj-ms/infra-core@c8bf455cbaaea2dab939aca76cd682dd3c3dce55 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/pj-ms
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@c8bf455cbaaea2dab939aca76cd682dd3c3dce55 -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file infra_core-0.1.0-py3-none-any.whl.
File metadata
- Download URL: infra_core-0.1.0-py3-none-any.whl
- Upload date:
- Size: 38.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f0bea02fba2b7d0219c29344986cd53784a355d00f797a659903574825ba7f1e
|
|
| MD5 |
7cf321089bc5a409c2392b9dcdd4e241
|
|
| BLAKE2b-256 |
37150f99a02f203ae019984edd887396df40740a62af85382c2022596b972024
|
Provenance
The following attestation bundles were made for infra_core-0.1.0-py3-none-any.whl:
Publisher:
publish.yml on pj-ms/infra-core
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
infra_core-0.1.0-py3-none-any.whl -
Subject digest:
f0bea02fba2b7d0219c29344986cd53784a355d00f797a659903574825ba7f1e - Sigstore transparency entry: 684285788
- Sigstore integration time:
-
Permalink:
pj-ms/infra-core@c8bf455cbaaea2dab939aca76cd682dd3c3dce55 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/pj-ms
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@c8bf455cbaaea2dab939aca76cd682dd3c3dce55 -
Trigger Event:
workflow_dispatch
-
Statement type: