A reliable, provider-agnostic LLM abstraction layer (Hybrid Clean Architecture).
Project description
A reliable provider-agnostic LLM abstraction layer. Crux offers a turn-key provider client solution using Hybrid Clean Architecture patterns.
Overview
- Goal: Normalize provider interactions behind small, typed contracts while keeping all provider-specific logic contained under crux_providers/.
- Key benefits:
- Provider-agnostic DTOs and interfaces
- Central factory for adapter creation
- Model registry repository to read/write per-provider model listings (JSON)
- Pluggable “get models” fetchers per provider
- Non-invasive: adapters wrap existing clients where possible
Installation
Install the library from a source distribution or PyPI (planned name: crux-providers). The canonical import path is crux_providers (package root: crux_providers).
pip install crux-providers
# or with common extras (installs provider SDKs):
pip install "crux-providers[all]"
Provider-specific extras:
- OpenAI:
pip install "crux-providers[openai]" - Anthropic:
pip install "crux-providers[anthropic]" - Gemini:
pip install "crux-providers[gemini]" - Ollama:
pip install "crux-providers[ollama]" - OpenRouter:
pip install "crux-providers[openrouter]" - Deepseek:
pip install "crux-providers[deepseek]" - xAI:
pip install "crux-providers[xai]"
After installing, use the providers import path:
from crux_providers.base import ProviderFactory, ChatRequest, Message
Quick Start (Contributors)
Centralized Configuration
Defaults and environment mappings are centralized to reduce drift and improve maintainability:
crux_providers/config/defaults.py- CLI defaults (e.g., default provider, benchmark runs/warmups)
- OpenRouter defaults (model and base URL)
- SQLite defaults (journal mode, synchronous, busy timeout)
crux_providers/config/env.py- Canonical env var names per provider via
ENV_MAP - Aliases per provider via
ENV_ALIASES(e.g., Gemini supportsGEMINI_API_KEYandGOOGLE_API_KEY) - Helpers:
is_placeholder,get_env_var_name,get_env_var_candidates,resolve_provider_key,set_canonical_env_if_missing
- Canonical env var names per provider via
Usage guidelines:
- Do not hardcode defaults in adapters, CLI, or persistence; import from
crux_providers.config.defaults. - Resolve provider API keys via
crux_providers.config.envand promote alias → canonical usingset_canonical_env_if_missing.
Architecture Standards Enforcement
We enforce key architecture rules via tests:
- File size limit: no source file should exceed 500 lines.
- Policy test:
crux_providers/tests/test_policies_filesize.pyscans provider modules and fails if over budget. - Temporary allowlist: select oversized modules are allow-listed with a
# deviation:note and a revisit date. - Current revisit target: 2025-10-15. Oversized modules will be decomposed into focused submodules without changing public APIs.
- Policy test:
- Streaming must use
BaseStreamingAdapter; bespoke loops are prohibited. - Use
get_timeout_config()and guard start phases withoperation_timeout. Do not hardcode numeric timeouts. - Structured logging: include
provider,operation,stage,failure_class,fallback_used.
Testing
- Streaming contract tests live under
crux_providers/tests/streaming. - Edge cases for env config:
crux_providers/tests/test_env_config_edge_cases.py. - Run targeted tests:
/mnt/samsung_ssd/notes/.venv/bin/python -m pytest -q crux_providers/tests/test_env_config_edge_cases.py crux_providers/tests/test_policies_filesize.py crux_providers/tests/streaming
- Implementing a new provider:
- Create
crux_providers/<provider>/client.pyimplementing the coreLLMProviderinterface. - Add an optionalget_<provider>_models.pywith arun()entrypoint returning list[dict]. - Register the adapter inProviderFactory. - Streaming:
- All new streaming implementations MUST use
BaseStreamingAdapter; do not write custom loops. - Timestamps:
- Metrics & chat log
created_atare stored as ISO8601 UTC strings; naive datetimes are coerced to UTC. See policy section below. - Tests:
- Place all tests under
crux_providers/tests/; do not place test code outside the package. - Token usage:
- Placeholder
{"prompt": null, "completion": null, "total": null}must always be present until real counts are wired. - Logging & metrics:
- Streaming finalize path emits exactly one structured log with latency metrics; never suppress errors silently.
- External metrics emission (feature-flagged): when
PROVIDERS_METRICS_EXPORTis set to1ortrue, the streaming finalize path emits a summarized metrics payload via the default exporter. By default, a no-op exporter is used, so enabling the flag has no effect unless a concrete exporter is wired. Emission failures never raise and are logged once asmetrics.export.error. - Security:
- Never use
shell=True; resolve executables viashutil.whichin any future subprocess integrations. - Docstrings: - Provide full professional docstrings for every public class/function (purpose, params, returns, failure modes).
- CI & Enforcement:
- CI: Providers fast tests run on pushes/PRs that touch providers code.
- Manual: Workflow can be run on demand from Actions (workflow_dispatch enabled).
- Enforcement: Protect
mainand require the "Providers Fast Tests" check to pass before merging; optionally disallow direct pushes. - Local gate (optional): Use pre-commit pre-push hook to run providers tests beforegit push.
Minimal Example (Factory Usage):
from crux_providers.base import ProviderFactory, ChatRequest, Message
provider = ProviderFactory.create("openai")
resp = provider.chat(
ChatRequest(
model=provider.default_model() or "o3-mini",
messages=[
Message(role="system", content="You are concise."),
Message(role="user", content="Ping")
],
max_tokens=32,
response_format="text",
)
)
print(resp.text)
For streaming, wrap provider call with the standardized
BaseStreamingAdapter; see Streaming section below.
Cross-Provider Guarantees Matrix
The providers layer enforces a consistent set of behavioral and data-shape guarantees across all adapters. These are contract-level promises intended to simplify integrations and testing.
-
Requests and DTOs
- Always use provider-agnostic DTOs:
Message,ChatRequest,ChatResponse,ProviderMetadata. - Token usage placeholders are present on
ChatResponse.metadata.tokens(prompt, completion, total may benull). - System/user message extraction trims whitespace-only lines before joining user segments.
- Always use provider-agnostic DTOs:
-
Streaming lifecycle (BaseStreamingAdapter)
- Exactly one terminal event is emitted per stream (success or error).
- Metrics captured internally and surfaced via finalize event:
emitted_count: number of deltas (finish event excluded)time_to_first_token_ms: set only if at least one delta was emittedtotal_duration_ms: set on every terminal event
- Invariants:
- If
emitted_count== 0 ⇒time_to_first_token_msisnull - When present,
time_to_first_token_ms≤total_duration_ms
- If
- Start phase only is covered by timeout + retry; mid-stream flow is cooperative via cancellation token.
- Cancellation maps to a normalized
cancellederror code and still emits a terminal event with metrics.
-
Capability gating
streaming_supported()centralizes checks; adapters short-circuit when unsupported.
-
Error handling & logging
- Unexpected exceptions are classified into a normalized error code and emitted on the terminal event; no silent suppression.
- Structured logging fields are normalized (phase, attempt, error_code, emitted, tokens) in finalize logs.
-
Tracing (optional)
- Streaming start and run phases are instrumented with lightweight spans when OpenTelemetry is present; otherwise a no-op.
- Span attributes include
provider,model, and terminal metrics (emitted_count,time_to_first_token_ms,total_duration_ms).
-
Security & subprocess policy
- No
shell=True; executables resolved viashutil.whichand validated when subprocess is used.
- No
-
Architectural standards
- All new streaming implementations must use
BaseStreamingAdapter(no bespoke loops). - Public functions/classes include full docstrings (purpose, params, returns, failure modes).
- Adhere to file-size and complexity targets documented at the repo root; prefer small, focused modules.
- All new streaming implementations must use
These guarantees are validated by the streaming contract tests under crux_providers/tests/streaming/ and conventional unit tests under crux_providers/tests/.
Core contracts and types
-
Interfaces:
- LLMProvider (core provider contract)
- SupportsJSONOutput, SupportsResponsesAPI (capability flags)
- ModelListingProvider (expose list_models)
- HasDefaultModel (optional default model) See interfaces.py.
-
DTOs (Provider-agnostic models):
- Message, ChatRequest, ChatResponse, ProviderMetadata
- ModelInfo, ModelRegistrySnapshot
- References:
-
Repositories:
- Model registry I/O and refresh orchestration:
- ModelRegistryRepository
- Key method: list_models(provider, refresh=False)
- ModelRegistryRepository
- DB-first: reads snapshots from SQLite (JSON cache files removed)
- Optional refresh invokes provider “get models” script
- The model registry no longer reads or writes provider JSON files.
- Keys (API key resolution):
- KeysRepository.get_api_key()
- Resolution order: env (OPENAI_API_KEY, etc.) → config.yaml → None
- Model registry I/O and refresh orchestration:
-
Factory:
- ProviderFactory.create()
- Maps canonical provider name (e.g., "openai") to adapter class by lazy import Operational Use:
- ProviderFactory.create()
Provider adapters
Token Accounting (Legacy Placeholder Notes)
- Adapter: OpenAIProvider
-
Exposes:
-
provider_name: "openai" Extraction Logic:
-
supports_json_output(): bool
-
uses_responses_api(model: str): bool (o3/o1 families) Fallback Semantics:
-
chat(request: ChatRequest): ChatResponse
-
-
Internals: Validation:
- model default derived from get_openai_config() with fallback (o3-mini)
- Structured outputs: request.response_format == "json_object" Streaming vs Non-Streaming:
-
Model registry
Consumer Guidance:
- SQLite is the authoritative store for model snapshots (no JSON cache files)
- Repository: ModelRegistryRepository
- refresh=True will attempt a provider get-models runner:
crux_providers.{provider}.get_{provider}_models - Accepts multiple entrypoint names:
run(),get_models(),fetch_models(),update_models(),refresh_models(),main() - Reads exclusively from SQLite; if no snapshot exists for a provider, an empty snapshot is returned unless a refresh is requested.
- refresh=True will attempt a provider get-models runner:
Provider “get models” fetchers (API-backed)
- OpenAI:
- Module: get_openai_models.py
- Entry point: run()
- Behavior:
- If OPENAI_API_KEY is present, fetch via OpenAI SDK: client = OpenAI(api_key=key) items = client.models.list()
- Normalize and persist via save_provider_models()
- Return list[dict] for ModelRegistryRepository convenience
- Offline mode:
- Falls back to load_cached_models(provider) and returns cached entries
- Does not write empty registries (no false positives)
- Key resolution:
- KeysRepository.get_api_key("openai")
- Uses env (OPENAI_API_KEY) or config.yaml api.openai.api_key
- Module: get_openai_models.py
Usage
1. Installing dependencies
- Base:
- pip install -r requirements.txt
- Dev (pytest, etc.):
- pip install -r requirements-dev.txt
2. API keys
- Preferred: .env at project root (project-local .env)
- Example: OPENAI_API_KEY=sk-... # pragma: allowlist secret (example prefix only)
- Alternative: config.yaml
- api: openai: api_key: "sk-..." # pragma: allowlist secret (example prefix only)
- Key resolution order:
- Env → config.yaml → None. See KeysRepository.
3. Populate model registries (OpenAI example)
- Ensure key is loaded into the process environment (source .env).
- Run fetcher (no code changes required):
- python -m crux_providers.openai.get_openai_models
- Expected: "[openai] loaded <N> models" and JSON persisted at: Stored in the SQLite model registry (DB-first)
4. Instantiate providers via factory
- Example:
- from crux_providers.base import ProviderFactory, ChatRequest, Message
- provider = ProviderFactory.create()("openai")
- req = ChatRequest( model=provider.default_model() or "o3-mini", messages=[ Message(role="system", content="You are a concise assistant."), Message(role="user", content="Say 'ok' once.") ], max_tokens=32, response_format="text", )
- resp = provider.chat(req)
- print(resp.text)
5. Listing models
- Example (OpenAI):
- provider = ProviderFactory.create("openai")
- snap = OpenAIProvider.list_models()(refresh=False)
- print(len(snap.models))
- To refresh from API:
- snap = provider.list_models(refresh=True) # requires OPENAI_API_KEY
6. Providers CLI (Benchmark harness)
The providers layer ships with a small CLI to assist local diagnostics and performance checks. After installing this project in editable mode, a console entry providers-cli is available.
- Benchmark latency for a provider/model with warmups and multiple measured runs:
providers-cli benchmark \
--provider openai \
--model o3-mini \
--prompt "Say 'ok' once." \
--runs 5 \
--warmups 2 \
--stream false
Flags
--provider(required): Canonical provider id (e.g.,openai,anthropic,deepseek,xai,gemini,ollama).--model(required): Model name to exercise.--prompt(required when--execute-style actions are used; for benchmark it's required): Prompt text to send.--runs(default 5): Number of measured runs; statistics are computed over these durations.--warmups(default 0): Non-measured warmup iterations executed before timing begins.--stream(default false): When true, uses streaming path; otherwise non-streaming chat.
Output
The command prints a small JSON object with latency statistics:
{
"count": 5,
"total_ms": 1234.5,
"min_ms": 200.1,
"max_ms": 260.3,
"mean_ms": 246.9,
"median_ms": 248.0,
"p50_ms": 248.0,
"p90_ms": 258.4,
"p95_ms": 259.7,
"p99_ms": 260.2
}
Notes
- Timeouts follow the centralized strategy and guard only the start/blocking phase; mid-stream cancellation is cooperative.
- Structured logs include
provider,operation, andmetricsfields to aid local inspection. - External metrics emission (optional): set env
PROVIDERS_METRICS_EXPORT=1to enable best-effort emission. The default exporter is a no-op. Integrators can substitute an exporter by overridingcrux_providers.base.metrics.get_default_exporter()to return a concrete implementation that implementsemit_stream_metrics(StreamMetricsPayload).
External Metrics Emission
Overview
- Purpose: allow best-effort emission of streaming metrics to external systems without coupling core code to a specific backend.
- Toggle: controlled by env var
PROVIDERS_METRICS_EXPORT(1/trueto enable). Default is disabled.
Behavior
- On stream finalize, we emit a single metrics payload through the default exporter if the flag is enabled. Failures are swallowed and logged as a single normalized event with key
metrics.export.errorto avoid breaking application flow.
Default Exporter
- Path:
crux_providers/base/metrics/exporter.py. get_default_exporter()returns a process-wideNoOpMetricsExporterby default.- To integrate with Prometheus, OTEL, or another system, provide a concrete
MetricsExporterand return it fromget_default_exporter()(or monkeypatch in tests).
Payload Shape (StreamMetricsPayload)
- Fields:
provider: strmodel: stremitted_count: inttime_to_first_token_ms: Optional[float]total_duration_ms: Optional[float]tokens: Optional[Mapping[str, Any]]with keysprompt,completion,totalwhen knownerror: Optional[str]extra: Optional[Mapping[str, Any]](reserved for future use)
Failure Semantics
- Exporters must not raise; they should catch and swallow exceptions.
- The finalize helper also wraps emission in a
try/exceptand logs a singlemetrics.export.errorevent containingfailure_classwhen an exception occurs.
Testing
- Unit tests exist under
crux_providers/tests/test_metrics_emission.pyvalidating both the happy-path emission and failure logging behavior. - For reproducible results, ensure a stable network and consider pinning the model version when the provider supports it.
Testing
Location and policy
- All providers tests live under crux_providers/tests/... (no test scripts in other folders by architecture rules).
Unit smoke test (factory + OpenAI adapter)
- File: crux_providers/tests/test_provider_factory_smoke.py
- What it covers:
- ProviderFactory creates OpenAI adapter
- Adapter exposes provider_name and default_model
- list_models(refresh=True) populates registry when OPENAI_API_KEY is set; otherwise the test is skipped (no-network)
- Run:
- python -m pytest -q crux_providers/tests/test_provider_factory_smoke.py
Enforcement (CI + Local Pre-Push)
Server-side (recommended)
- In GitHub → Settings → Branches → Branch protection rules (for
main):- Require a pull request before merging
- Require status checks to pass before merging
- Select the "Providers Fast Tests" check (job: "Run providers tests")
- Optionally: restrict who can push (disallow direct pushes)
Local (optional, developer convenience)
- Pre-push gate using pre-commit to block pushes when providers tests fail:
pip install pre-commit
pre-commit install --hook-type pre-push
The repository includes .pre-commit-config.yaml with a hook that runs:
pytest -q crux_providers/tests
Bypass temporarily with git push --no-verify if necessary.
Design notes and guardrails
- Non-invasive adapter design:
- OpenAIProvider chat() wraps existing call_openai_with_retry()
- No changes to legacy OpenAI client semantics
- Offline/dev behavior:
- The model registry reads and writes exclusively to SQLite. When a live fetch fails, fetchers fall back to the cached snapshot from the database. No JSON cache files are used.
- Extensibility:
- Register new providers by adding entries to ProviderFactory._PROVIDERS and implementing a client.py adapter plus get*{provider}_models.py
- Keep provider-specific SDK imports in provider modules only
- Observability:
- ProviderMetadata attached to ChatResponse supports audits and test logging
request_idandresponse_idfields (when available) now propagated intoProviderMetadataand streamingLogContextfor correlation.
Metadata completeness and limitations
- context_length
- Policy: Keep null when not provided by the API. Do not fabricate limits.
- Enrichment flow: get_openai_models.run() fetches list and best-effort details (client.models.retrieve) and passes through numeric fields (e.g., input_token_limit/context_window) when present. Normalization in normalize_items() maps these into context_length if and only if explicitly available.
- capabilities
- Derived from SDK “modalities” when present and from conservative id heuristics in normalize_items() (e.g., mark reasoning/responses_api for o1/o3 families; vision for gpt-4o/omni; embeddings for text-embedding ids; JSON structured output flagged by default).
- updated_at
- Prefer explicit updated_at from the SDK. If absent, infer ISO date from the model id via _infer_updated_at_from_id(). As a last resort, convert a numeric “created” epoch (UTC) to YYYY-MM-DD.
- provenance
- The snapshot writer save_provider_models() persists fetched_via and metadata.source. The OpenAI fetcher records “api” and “openai_sdk_enriched” to indicate SDK-originated, enriched listings.
Note: If future endpoints expose explicit token limits per model id, the normalization path will automatically populate context_length without policy changes.
Extending to new providers (checklist)
- Create crux_providers/{provider}/client.py implementing LLMProvider (+ capabilities)
- Add get_{provider}_models.py fetcher module with run() entrypoint
- Add factory mapping in ProviderFactory
- Seed via DB helpers or let the fetcher persist snapshots to SQLite
- Add unit tests under crux_providers/tests/
FAQ
What is an "adapter" here?
An adapter is the thin implementation class that maps the generic interfaces
(LLMProvider, optional capability protocols) onto a specific provider SDK or
HTTP API. Each provider has its own client.py containing an adapter class
(OpenAIProvider, AnthropicProvider, etc.). They:
- Accept a normalized
ChatRequest - Translate to provider-specific SDK/HTTP params
- Execute the call (streaming or non-streaming)
- Normalize back into
ChatResponse/ stream events
They intentionally avoid embedding higher-level business logic, formatting, or retry policy (those will be layered via decorators / resilience modules). This keeps the providers layer self-contained and clean.
Q: How do I run a minimal end-to-end provider call without wiring the orchestrator?
- Use ProviderFactory + ChatRequest in a small script or a unit test as shown above. No orchestrator changes are required.
Q: What if my JSON registry file is empty and I get errors?
- The repository now tolerates empty/whitespace files and returns an empty snapshot with a warning, but live API refresh is recommended to populate real models: python -m crux_providers.openai.get_openai_models
Q: Where should tests go?
- Only under crux_providers/tests/. Test-like scripts in other folders are not allowed by architecture rules.
References
- Factory: ProviderFactory.create()
- OpenAI Adapter: OpenAIProvider
- DTOs: ChatRequest, ChatResponse, Message
- Registry: ModelRegistryRepository
- Key resolution: KeysRepository.get_api_key()
- OpenAI fetcher: get_openai_models.run()
- Batch refresh utility:
python -m crux_providers.utils.refresh_all_models(aggregates all provider fetchers; no backward compatibility shim retained)
Observed Capability Caching (Authoritative)
Purpose
Persist runtime-observed provider capabilities and merge them into the model registry at read time. This removes guesswork and ensures behavior reflects real SDK/HTTP responses.
Data-first Policy
- Never infer by model name or regex.
- Persist
trueonly on a confirmed success path; persistfalseonly on explicit, authoritative rejection by the provider. - Otherwise, leave capability unspecified (no override), allowing future observations to set it.
What we record (current)
json_output:truewhen a structured (JSON) non-stream chat succeeds.structured_streaming:falsewhen the provider explicitly does not support structured streaming for the attempted mode.streaming:trueon the first emitted token during a streaming chat.
Persistence and Merge
- SQLite-backed storage (authoritative):
- Table:
observed_capabilities(provider, model_id, feature, value INTEGER, updated_at) - Initialized via
crux_providers.service.db.init_db(); reused across providers layer. - Observations are upserted; writes are best-effort and never block core chat paths.
- Table:
- Registry merge:
ModelRegistryRepository.list_models()loads the base snapshot JSON for the provider and overlays observed flags queried viaload_observed()usingmerge_capabilities().- Observed
trueoverrides unknown/absent. Observedfalseonly set when explicitly unsupported. - No speculative inference is applied during merge.
Edge Cases
- Lack of prior observation leaves capability as-is in the registry snapshot.
- Mid-stream errors after at least one delta still count as
streaming=true(we observed a token). - Start-phase timeouts do not produce observations.
Minimal Example (shape only)
{
"gpt-4o-mini": {
"json_output": true,
"structured_streaming": false,
"streaming": true
}
}
Related Tests
- Helpers and persistence:
crux_providers/base/tests/test_capabilities_helpers.py - Registry merge correctness:
crux_providers/base/tests/test_model_registry_observed_merge.py
Provider Wiring Notes
- OpenAI/Ollama: prior implementations already record observations.
- Anthropic/Gemini: parity added; structured streaming guards set
structured_streaming=falseon explicit rejection. - Deepseek/XAI: covered via
BaseOpenAIStyleProviderimplementation.
Operational Guidance
- Observed data lives in SQLite; to reset, delete the providers DB or use a fresh temp DB in tests. The cache will repopulate as features are exercised.
- Do not hand-edit database rows in normal workflows; use provider calls that record observations.
Logging levels (providers layer)
The providers logging utility (crux_providers.base.logging.get_logger) honors the environment variable PROVIDERS_LOG_LEVEL to control verbosity without code changes.
- Accepted values (case-insensitive):
DEBUG,INFO(default),WARNING,ERROR,CRITICAL. - At
DEBUG, the streaming adapter emits per-delta normalized log events in addition to the final summary event. - At
INFOand above, only the final "finalize" summary record is emitted by the adapter to avoid noise.
Example (temporary verbose logs):
export PROVIDERS_LOG_LEVEL=DEBUG
python -m pytest -q crux_providers/tests
# reset
unset PROVIDERS_LOG_LEVEL
Streaming (experimental)
Added a minimal streaming contract and OpenAI implementation.
Usage:
from crux_providers.base.models import ChatRequest, Message
from crux_providers.base.streaming_adapter import StreamController, BaseStreamingAdapter
# Example pseudo-code integrating streaming with cancellation & IDs
adapter = BaseStreamingAdapter(
ctx=LogContext(provider="openai", model="gpt-4o"),
provider_name="openai",
model="gpt-4o",
starter=lambda: openai_stream_start(...), # may return (stream, {"request_id": rid, "response_id": sid})
translator=translate_openai_chunk,
retry_config_factory=lambda phase: build_retry_config(phase=phase),
logger=get_logger(),
)
controller = StreamController(adapter)
for evt in controller:
if evt.finish:
print("done", evt.error)
elif evt.delta:
print(evt.delta, end="")
# Cooperative cancellation from another thread / signal handler:
controller.cancel("user aborted")
Streaming Adapter Starter Return Shapes (Authoritative)
The BaseStreamingAdapter starter callable (starter) may return one of the following shapes:
streamiterable/generator yielding native SDK chunks.(stream, meta_dict)tuple wheremeta_dictis a mapping that may include:request_id: Upstream correlation id (only set on context if absent)response_id: Provider response id (propagated similarly)
- Mapping with mandatory key
"stream"plus optionalrequest_id/response_idkeys:{"stream": stream, "request_id": "req-123", "response_id": "resp-456"}
Anything else (e.g., mapping missing stream, tuple second element not a mapping) triggers a ProviderError with code internal which is surfaced as a single terminal stream event whose error field begins with "internal:".
INTERNAL Guard Semantics
internal denotes invariant / contract violations that are not user-actionable (adapter misuse or unexpected starter shape). Tests covering this path:
test_stream_internal_error.py(starter mapping missingstream) ensures we do not silently accept malformed returns.
Finalize / Terminal Event Behavior
Exactly one terminal ChatStreamEvent is emitted per stream (success or error). Success terminal events have:
finish=Trueerror=None
Error terminal events have:
finish=Trueerrorstring formatted as"<error_code>:<truncated_message>"
Metrics captured (internal, serialized in finalize path):
emitted_count(# deltas)emitted(bool convenience flag: emitted_count > 0)time_to_first_token_mstotal_duration_ms
Translator Isolation
Exceptions raised inside the translator are suppressed (delta skipped) to avoid destabilizing the stream. Mid-stream error tests inject exceptions via the underlying iterator instead of translator exceptions.
Cancellation
Cooperative cancellation uses a CancellationToken checkpoint before processing each native chunk and once after normal iteration. Cancelled streams emit a terminal error with error beginning "cancelled:" (distinct from timeout).
Troubleshooting Starter Issues
| Symptom | Likely Cause | Resolution |
|---|---|---|
Immediate terminal event internal:starter() mapping missing 'stream' key |
Starter returned mapping without stream |
Return one of the accepted shapes with a stream key |
internal:starter() second element must be mapping... |
Tuple second element not a mapping | Provide a dict for meta or switch to mapping form |
| No deltas, success terminal only | Translator returned None for every chunk or source produced no chunks |
Verify native stream yields chunks and translator logic |
Missing request_id in logs |
Starter did not supply meta or value already set on context | Provide meta dict with request_id if correlation needed |
Logging Normalization
Finalize helper emits structured JSON log containing (subset): event, provider, model, phase=finalize, error_code, emitted, emitted_count, time_to_first_token_ms, total_duration_ms. Internal tests (test_stream_contract_logging*.py) assert these invariants.
Finalize Log Examples
Success (tokens emitted):
{
"event": "stream.adapter.end",
"provider": "openai",
"model": "gpt-4o",
"phase": "finalize",
"error_code": null,
"emitted": true,
"emitted_count": 27,
"time_to_first_token_ms": 142,
"total_duration_ms": 1280,
"request_id": "req_abc123",
"response_id": "resp_def456"
}
Internal error (starter shape violation — no deltas):
{
"event": "stream.adapter.error",
"provider": "openai",
"model": "gpt-4o",
"phase": "finalize",
"error_code": "internal",
"emitted": false,
"emitted_count": 0,
"time_to_first_token_ms": null,
"total_duration_ms": 3,
"error": "internal:starter() mapping missing 'stream' key"
}
Metrics Interpretation
emitted_count: Number of deltas produced (proxy for token / chunk count; exact token count intentionally not inferred).emitted: Convenience boolean (emitted_count > 0).time_to_first_token_ms: Latency from adapter start until first successful delta emission.nullwhen no deltas.total_duration_ms: Wall-clock duration from start until finalize (success or error). Always non-null.error_code: Canonical classification (internal,cancelled,timeout, provider-specific, or null on success).
Invariant checks (enforced in tests / recommended for monitors):
- If
emittedis false thenemitted_count == 0andtime_to_first_token_msis null. - If
emittedis true thentime_to_first_token_msis a positive integer less than or equal tototal_duration_ms. - Exactly one finalize log per stream (idempotent terminal emission).
Operational Use:
- Alert if
internalappears (indicates contract misuse) more than a minimal threshold. - Track P95
time_to_first_token_msto spot upstream latency regressions. - Compare
total_duration_ms - time_to_first_token_msdistribution to understand steady-state streaming throughput.
Token Accounting (Issue #59)
Current State
Providers (OpenAI, Anthropic) now attach a stable placeholder mapping under
ProviderMetadata.extra['token_usage'] for every non-stream chat response:
{"prompt": null, "completion": null, "total": null}
Rationale:
- Downstream logging/metrics pipelines and forthcoming analytics dashboards require a consistent key even before real token counts are available.
- Avoids conditional logic sprinkled across callers; presence is guaranteed,
values may be
nulluntil SDK surfaces counts.
Future Plan:
- Upgrade OpenAI and Anthropic client integrations to capture real usage fields once the unified response objects are adopted (prompt/completion/total).
- Populate
StreamMetricstoken fields during streaming finalization when providers emit usage post-hoc. - Remove placeholder comment blocks and update this section with examples of real token accounting output.
Contract Invariants:
- Keys
prompt,completion,totalalways present. - Values may be
nullbut never omitted. - Additional provider-specific usage metrics (e.g., cached / reasoning tokens)
will be introduced under a sibling mapping (
token_usage_details) rather than mutating the base keys.
Testing / Diagnostics:
- TODO markers in
openai/client.pyandanthropic/client.pynote where a future test harness should assert presence & shape. - Placeholder presence verified implicitly by integration paths relying on
meta.extra['token_usage']existence.
Security & Privacy:
- No sensitive values inserted; numbers only once implemented.
- Placeholder introduces zero PII risk.
This scaffolding completes Issue #59 for multi-provider parity; future work
will replace null values without changing the external contract.
Datetime Storage & Normalization Policy (SQLite Repos)
Rationale
Python 3.12+ deprecates (and in future versions may remove) implicit reliance on
sqlite3's legacy datetime adapter/converter pair. Implicit conversion created
latent ambiguity (naive vs. timezone-aware) and produced warnings when upgrading
interpreters. To enforce explicit, portable semantics the persistence layer now
stores all repository timestamps that originate from runtime events (metrics
and chat logs) as ISO8601 strings with a timezone designator.
Authoritative Rules
- Creation Timestamp Storage:
metrics.created_at&chat_logs.created_atare persisted as the result ofdatetime.isoformat().- If a supplied
datetimeis naive (tzinfo is None), it is treated as UTC and atzinfo=timezone.utcis injected before serialization (backward compatibility with legacy call sites that constructed naive values).
- Parsing on Read:
- Centralized in
_parse_created_atinsidepersistence/sqlite/repos.pywhich returns an awaredatetimein UTC. - On malformed / unexpected values (e.g., manual DB edits) the parser falls
back to Unix epoch UTC (
1970-01-01T00:00:00Z) to keep higher layers resilient. This fallback is intentional and test-covered.
- Centralized in
- Engine Configuration:
engine.pyno longer enablesdetect_types=sqlite3.PARSE_DECLTYPESto avoid the legacy adapter influencing future reads; rows are consumed as raw Python primitives (strings) and parsed explicitly.
- Scope:
- Only the metrics and chat log repositories participate. Other tables
(e.g.,
prefs.updated_at) that rely onCURRENT_TIMESTAMPmay still return strings; they are parsed locally where needed.
- Only the metrics and chat log repositories participate. Other tables
(e.g.,
- Timezone Uniformity:
- All returned
created_atvalues from repository public methods are guaranteed to be timezone-aware (UTC). Callers should never receive naive datetimes.
- All returned
Failure Modes & Defensive Posture
| Scenario | Example Stored Value | Result from _parse_created_at |
Notes |
|---|---|---|---|
| Proper aware ISO | 2025-09-16T18:22:04.123456+00:00 |
Same object | Round-trip |
| Naive ISO | 2025-09-16T18:22:04.123456 |
UTC-aware version | Backfill tz |
| Malformed | not-a-date |
Epoch UTC | Logged only at higher layers if needed |
Already datetime naive |
datetime(2025,9,16,18,22,4) |
UTC-aware | Defensive |
Testing Guarantees
Dedicated tests cover:
- Naive -> aware coercion on write & read (metrics repo normalization test)
- Parser handling of: aware ISO, naive ISO, malformed string, existing aware
datetime, naivedatetimeobject
Migration / Legacy Considerations
Existing deployments that previously relied on sqlite's implicit adapter may
contain naive text representations. The current policy already coerces those to
UTC on first read (no destructive migration required). A future optional
backfill script may rewrite legacy rows to explicit +00:00 suffixed strings;
until then the parser path ensures consistent behavior.
Backfill Utility (Optional)
An idempotent maintenance helper now exists at
crux_providers/persistence/sqlite/backfill_timestamps.py to rewrite legacy naive
created_at values in-place so external inspection (e.g., manual SQLite shell,
ETL exports) reflects explicit UTC offsets. It supports a dry-run scan and an
--apply mode with confirmation. Example:
python -m crux_providers.persistence.sqlite.backfill_timestamps --db /path/to/providers.db # dry-run
ython -m crux_providers.persistence.sqlite.backfill_timestamps --apply --yes # apply changes
If no legacy rows are found the script exits success without modifying data. Re-running after a successful apply reports zero legacy rows.
Exit Codes:
- 0 – Success (no legacy rows found in dry-run OR successful apply)
- 2 – User aborted at confirmation prompt
- 3 – Legacy rows detected during dry-run (no changes performed). This enables CI workflows to gate merges if historical naive timestamps remain.
JSON Output Mode
For automation or programmatic inspection pass --json. The script will emit
one JSON object per invocation (dry-run only, or dry-run + apply combined into
two separate invocations if you call it twice). Example:
python -m crux_providers.persistence.sqlite.backfill_timestamps --json
Sample output:
{
"phase": "dry_run",
"tables": [
{"table": "metrics", "scanned": 2, "legacy_naive": 1, "updated": 0},
{"table": "chat_logs", "scanned": 2, "legacy_naive": 1, "updated": 0}
],
"totals": {"legacy_naive": 2, "updated": 0}
}
Apply with JSON (non-interactive) runs:
python -m crux_providers.persistence.sqlite.backfill_timestamps --apply --yes --json
Which now produces a single JSON document that embeds the original dry-run results:
{
"phase": "applied",
"tables": [
{"table": "metrics", "scanned": 2, "legacy_naive": 1, "updated": 1},
{"table": "chat_logs", "scanned": 2, "legacy_naive": 1, "updated": 1}
],
"totals": {"legacy_naive": 2, "updated": 2},
"dry_run": {
"phase": "dry_run",
"tables": [
{"table": "metrics", "scanned": 2, "legacy_naive": 1, "updated": 0},
{"table": "chat_logs", "scanned": 2, "legacy_naive": 1, "updated": 0}
],
"totals": {"legacy_naive": 2, "updated": 0}
}
}
When there are zero legacy rows, the applied document still includes per-table entries with zero counts for consistent parsing.
Caller Guidance
- Do not attempt custom parsing of
created_at; rely on repository return objects. - When constructing new repository entries, prefer creating timezone-aware
datetimes via
datetime.now(timezone.utc). - Treat any epoch UTC value in telemetry dashboards as a signal of malformed or manually edited data.
Future Enhancements
Potential future evolution includes surfacing a structured warning counter for malformed timestamps to aid operational visibility and adding an optional lightweight migration utility to rewrite legacy naive rows in-place.
JSON Schema Exposure (--print-schema)
For consumers that wish to lock the JSON contract in CI, the backfill utility exposes its output schema (Draft 2020-12) via:
python -m crux_providers.persistence.sqlite.backfill_timestamps --print-schema > backfill_report.schema.json
Characteristics:
- Stable top-level keys:
phase,tables,totals; applied payloads embed prior dry-run underdry_run. phaseis eitherdry_runorapplied.tablesis an array of per-table objects:{table, scanned, legacy_naive, updated}.totalsaggregateslegacy_naiveandupdatedacross tables.- Applied payload adds
dry_runobject capturing the original scan (idempotent audit trail). - Schema is embedded inline (
_SCHEMAconstant) to avoid drift between code and documentation.
Recommended CI Pattern:
- Cache schema (commit the artifact) or regenerate each run.
- Execute dry-run with
--json; parse and ensure only expected keys are present. - Treat unexpected structural changes as a breaking change requiring review/version bump.
Example inspection of per-table item shape with jq:
python -m crux_providers.persistence.sqlite.backfill_timestamps --print-schema | jq '.properties.tables.items.properties'
Exit Behavior:
--print-schemaalways exits 0 (no DB access required).- All other exit codes (0/2/3) remain unchanged.
Error Code Reference (Streaming Finalize)
| error_code | When Emitted | Example Error String Prefix | Remediation / Notes |
|---|---|---|---|
| (null) | Successful stream completion | (no error field) | Normal termination. |
| internal | Adapter / contract violation (starter shape, meta misuse) | internal:starter() mapping missing 'stream' key |
Inspect adapter implementation; should be rare. |
| cancelled | Cooperative cancellation via CancellationToken |
cancelled:user aborted |
Expected when user aborts; not a provider fault. |
| timeout | Start-phase timeout (no tokens) | timeout:operation timed out |
Investigate upstream latency; adjust timeout config if justified. |
| transient | Transient network/IO error surfaced after retries exhausted | transient:... |
Usually retryable; monitor frequency. |
| rate_limit | Provider rate limiting after retries | rate_limit:... |
Consider backoff tuning or quota increase. |
| <provider_specific> | Classified provider error code (future / SDK mapped) | bad_request:... |
Handle per provider semantics. |
Notes:
cancelledis distinct fromtimeout; cancellation is user/system initiated, timeout is passive elapsed start-phase limit.- Mid-stream failures after at least one delta will still carry the classified error code (
timeoutwill not occur mid-stream under current policy—only start phase is timeout guarded). - Providers may enrich token usage placeholders in future; error code taxonomy remains stable for log aggregation.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file crux_providers-0.1.0.tar.gz.
File metadata
- Download URL: crux_providers-0.1.0.tar.gz
- Upload date:
- Size: 2.2 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
30b4f55295f45d171035999457ea2c2c19c1b93d8d4d7515529076a4be43c0bc
|
|
| MD5 |
2fd26a70a9fe8fb1a4d2e4933f9f797c
|
|
| BLAKE2b-256 |
e5599706d92371354247aa82b53499344406827bbe4eda1818a87efb97fd27be
|
File details
Details for the file crux_providers-0.1.0-py3-none-any.whl.
File metadata
- Download URL: crux_providers-0.1.0-py3-none-any.whl
- Upload date:
- Size: 2.4 MB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
01c91bb1e18aa897abce66ed2dc57cedeb815522fe914429bb687275ee823fc7
|
|
| MD5 |
95ea27107dadc5961efd538eb0531167
|
|
| BLAKE2b-256 |
b7d26ece18442ca6b79189e1e40b947c264b2e094fcf71d5c851d7a3eb7bff18
|