Reliability layer for AI/LLM streaming with retry, guardrails, and observability
Project description
L0 - Deterministic Streaming Execution Substrate (DSES) for AI
The missing reliability and observability layer for all AI streams.
LLMs produce high-value reasoning over a low-integrity transport layer. Streams stall, drop tokens, reorder events, violate timing guarantees, and expose no deterministic contract.
This breaks retries. It breaks supervision. It breaks reproducibility. It makes reliable AI systems impossible to build on top of raw provider streams.
L0 is the deterministic execution substrate that fixes the transport - with guardrails designed specifically for the streaming layer: stream-neutral, pattern-based, loop-safe, and timing-aware.
The result: production-grade, integrity-preserving, deterministic AI streams you can finally build real systems on.
It works with OpenAI and LiteLLM (100+ providers including Anthropic, Cohere, Bedrock, Vertex, Gemini). Supports tool calls and provides full observability.
pip install ai2070-l0
Also available in TypeScript: @ai-2070/l0 npm install @ai2070/l0 - native implementation with full lifecycle and event signature parity.
Production-grade reliability. Just pass your stream. L0'll take it from here.
L0 includes 1,800+ tests covering all major reliability features.
Any AI Stream L0 Layer Your App
───────────────── ┌──────────────────────────────────────┐ ─────────────
│ │
OpenAI / LiteLLM │ Retry · Fallback · Resume │ Reliable
Custom Streams ──▶│ Guardrails · Timeouts · Consensus │─────▶ Output
│ Full Observability │
│ │
└──────────────────────────────────────┘
───────────────── ─────────────
L0 = Token-Level Reliability
Features
| Feature | Description |
|---|---|
| 🔁 Smart Retries | Model-aware retries with fixed-jitter backoff. Automatic retries for zero-token output, network stalls, and provider overloads. |
| 🌐 Network Protection | Automatic recovery from dropped streams, slow responses, 429/503 load shedding, DNS errors, and partial chunks. |
| 🔀 Model Fallbacks | Automatically fallback to secondary models (e.g., GPT-4o → GPT-4o-mini → Claude) with full retry logic. |
| 💥 Zero-Token/Stall Protection | Detects when model produces nothing or stalls mid-stream. Automatically retries or switches to fallbacks. |
| 📍 Last-Known-Good Token Resumption | continue_from_last_good_token resumes from the last checkpoint on timeout or failure. No lost tokens. |
| 🧠 Drift Detection | Detects repetition, stalls, and format drift before corruption propagates. |
| 🧱 Structured Output | Guaranteed-valid JSON with Pydantic. Auto-corrects missing braces, commas, and markdown fences. |
| 📋 Pydantic Validation Models | All L0 types available as Pydantic models for runtime validation, JSON serialization, and schema generation via l0.pydantic. |
| 🩹 JSON Auto-Healing | Automatic correction of truncated or malformed JSON (missing braces, brackets, quotes), and repair of broken Markdown code fences. |
| 🛡️ Guardrails | JSON, Markdown, and pattern validation with fast streaming checks. Delta-only checks run sync; full-content scans defer to async. |
| ⚡ Race: Fastest-Model Wins | Run multiple models or providers in parallel and return the fastest valid stream. Ideal for ultra-low-latency chat. |
| 🌿 Parallel: Fan-Out / Fan-In | Start multiple streams simultaneously and collect structured or summarized results. Perfect for agent-style multi-model workflows. |
| 🧩 Consensus: Agreement Across Models | Combine multiple model outputs using unanimous, majority, or best-match consensus. Guarantees high-confidence generation. |
| 🔔 Lifecycle Callbacks | on_start, on_complete, on_error, on_event, on_violation, on_retry, on_fallback, on_tool_call - full observability into every stream phase. |
| 📡 Streaming-First Runtime | Thin, deterministic wrapper with unified event types (token, error, complete) for easy UIs. |
| 📼 Central Event Bus | Full observability into every stream phase via on_event callback with 25+ structured event types. |
| 🔌 Custom Adapters (BYOA) | Bring your own adapter for any LLM provider. Built-in adapters for OpenAI and LiteLLM. |
| 📦 Raw Chunk Access | Access original provider chunks (e.g., OpenAI's ChatCompletionChunk) via stream.raw() for provider-specific processing. |
| ⚡ Pure asyncio | No compatibility layers (no anyio/trio). Native Python async for full determinism and performance. |
| 🔧 Own Retry Logic | No external dependencies (no tenacity). L0 controls all retry behavior for predictable execution. |
| 📝 Type-Safe | Full type hints with py.typed marker. Passes mypy strict mode. |
| 📦 Minimal Dependencies | Only httpx, pydantic, orjson, typing-extensions, uuid6. No heavy abstractions. |
| 🚀 Nvidia Blackwell-Ready | Optimized for 1000+ tokens/s streaming. Ready for next-gen GPU inference speeds. |
| 🧪 Battle-Tested | 1,800+ unit tests and 100+ integration tests validating real streaming, retries, and advanced behavior. |
Know what you're doing? Skip the tutorial
Quick Start
Wrap Your Client (Recommended)
import asyncio
from openai import AsyncOpenAI
import l0
async def main():
# Wrap the client once - L0 reliability is automatic
client = l0.wrap(AsyncOpenAI())
# Use normally - no lambdas needed!
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}],
stream=True,
)
# Stream with L0 events
async for event in response:
if event.is_token:
print(event.text, end="", flush=True)
# Or read all at once
text = await response.read()
asyncio.run(main())
With Configuration
import l0
from openai import AsyncOpenAI
# Configure once, use everywhere
client = l0.wrap(
AsyncOpenAI(),
guardrails=l0.Guardrails.recommended(),
retry=l0.Retry(attempts=5),
timeout=l0.Timeout(initial_token=10000, inter_token=30000),
continue_from_last_good_token=True, # Resume from checkpoint on failure
)
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}],
stream=True,
)
See Also: API.md for all options, ADVANCED.md for full examples
With LiteLLM (100+ Providers)
import asyncio
import litellm
import l0
async def main():
# For LiteLLM, use l0.run() with a factory function
result = await l0.run(
stream=lambda: litellm.acompletion(
model="anthropic/claude-3-haiku-20240307",
messages=[{"role": "user", "content": "Hello!"}],
stream=True,
),
guardrails=l0.Guardrails.recommended(),
)
# Read full text
text = await result.read()
print(text)
asyncio.run(main())
Structured Output with Pydantic
from pydantic import BaseModel
import l0
class UserProfile(BaseModel):
name: str
age: int
occupation: str
result = await l0.structured(
schema=UserProfile,
stream=lambda: client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Generate a fictional person as JSON"}],
stream=True,
),
auto_correct=True, # Fix trailing commas, missing braces, markdown fences
)
print(result.name) # "Alice"
print(result.age) # 32
Lifecycle Events
import l0
result = await l0.run(
stream=lambda: client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True,
),
on_stream_event=lambda event: (
print(event.text, end="") if event.is_token else
print(f"\nError: {event.error}") if event.is_error else
print("\nDone!") if event.is_complete else None
),
)
Fallback Models & Providers
import l0
result = await l0.run(
# Primary model
stream=lambda: client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True,
),
# Fallbacks: tried in order if primary fails
fallbacks=[
lambda: client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
stream=True,
),
lambda: litellm.acompletion(
model="anthropic/claude-3-haiku-20240307",
messages=[{"role": "user", "content": prompt}],
stream=True,
),
],
on_fallback=lambda index, reason: print(f"Switched to fallback {index}"),
)
Parallel Execution
import l0
prompts = ["Name a fruit", "Name a color", "Name an animal"]
result = await l0.parallel(
tasks=[
lambda p=p: l0.run(
stream=lambda: client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": p}],
stream=True,
),
)
for p in prompts
],
concurrency=3,
)
for prompt, stream in zip(prompts, result.results):
text = await stream.read()
print(f"{prompt}: {text.strip()}")
Pydantic Validation Models
L0 provides Pydantic models for all its types, enabling runtime validation, JSON serialization, and schema generation:
from l0.pydantic import StateModel, RetryModel, DriftResultModel
# Validate external data
state = StateModel(content="Hello", token_count=5, completed=True)
# Serialize to JSON
json_data = state.model_dump_json()
# Generate JSON schema for documentation or APIs
schema = StateModel.model_json_schema()
All L0 types have corresponding Pydantic models: StateModel, RetryModel, TimeoutModel, ConsensusResultModel, DriftResultModel, MetricsSnapshotModel, and more.
Philosophy
- No magic - Everything is explicit and predictable
- Streaming-first - Built for real-time token delivery
- Signals, not rewrites - Guardrails detect issues, don't modify output
- Model-agnostic - Works with any provider via adapters
- Pure asyncio - No compatibility layers, native Python async
- Own retry logic - No tenacity, full control over behavior
Performance
Benchmarks on Apple M1 Max, Python 3.13, zero-delay mock streams (2000 tokens):
| Scenario | Tokens/s | Avg Duration | TTFT |
|---|---|---|---|
| Baseline (raw streaming) | 1,406,390 | 1.42 ms | 0.02 ms |
| L0 Core (no features) | 596,086 | 3.36 ms | 0.10 ms |
| L0 + JSON Guardrail | 557,550 | 3.59 ms | 0.09 ms |
| L0 + All Guardrails | 547,991 | 3.65 ms | 0.09 ms |
| L0 + Drift Detection | 114,935 | 17.41 ms | 0.10 ms |
| L0 Full Stack | 114,895 | 17.43 ms | 0.10 ms |
Full stack = JSON + Markdown + zero-output guardrails + drift detection + checkpointing. See BENCHMARKS.md for details.
Documentation
| Guide | Description |
|---|---|
| QUICKSTART.md | Get started in 5 minutes |
| ADVANCED.md | Advanced usage and full examples |
| API.md | Complete API reference |
| GUARDRAILS.md | Guardrails and validation |
| STRUCTURED_OUTPUT.md | Structured output guide |
| CONSENSUS.md | Multi-generation consensus |
| DETERMINISTIC_LIFECYCLE.md | Lifecycle specification and events |
| NETWORK_ERRORS.md | Network error handling |
| ERROR_HANDLING.md | Error handling guide |
| CUSTOM_ADAPTERS.md | Build your own adapters |
| DOCUMENT_WINDOWS.md | Chunking and processing long documents |
| EVENT_SOURCING.md | Record/replay, audit trails |
| MONITORING.md | OpenTelemetry and Sentry integrations |
| FORMATTING.md | Context, memory, output, and tool formatting |
| PARALLEL_OPERATIONS.md | Parallel, race, batch, and pool operations |
| MULTIMODAL.md | Image, audio, video, and multimodal adapters |
| PERFORMANCE.md | Performance tuning guide |
Installation
# Basic installation
pip install ai2070-l0
# With OpenAI support
pip install ai2070-l0[openai]
# With LiteLLM (100+ providers)
pip install ai2070-l0[litellm]
# With OpenTelemetry
pip install ai2070-l0[otel]
# With Sentry
pip install ai2070-l0[sentry]
# Development (dev is a dependency-group, not a pip extra)
uv sync --group dev
Or with uv:
uv add ai2070-l0
uv add ai2070-l0 --extra openai
uv add ai2070-l0 --extra litellm
Dependencies
| Package | Purpose |
|---|---|
httpx |
HTTP client |
pydantic |
Schema validation |
orjson |
Fast JSON |
uuid6 |
UUIDv7 for stream IDs |
typing-extensions |
Type hints |
Optional Dependencies
| Extra | Packages |
|---|---|
openai |
openai>=2.0,<3.0 |
litellm |
litellm>=1.40 |
otel |
opentelemetry-api, opentelemetry-sdk, opentelemetry-instrumentation-httpx |
sentry |
sentry-sdk |
observability |
otel + sentry combined |
speed |
uvloop (Unix only) |
dev |
pytest, pytest-asyncio, pytest-cov, mypy, ruff |
License
Apache-2.0
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ai2070_l0-0.21.0.tar.gz.
File metadata
- Download URL: ai2070_l0-0.21.0.tar.gz
- Upload date:
- Size: 1.3 MB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8295df8ddca7656380f3392a75bcf3df41240d186955bff5657319e00828e5b6
|
|
| MD5 |
f83273e825055afbcf772aa1b94acee6
|
|
| BLAKE2b-256 |
5fa378a1d133e5d9456e6aa33134a4f692caa1f0282b7c1cee4483f0fae66671
|
Provenance
The following attestation bundles were made for ai2070_l0-0.21.0.tar.gz:
Publisher:
release.yml on ai-2070/l0-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
ai2070_l0-0.21.0.tar.gz -
Subject digest:
8295df8ddca7656380f3392a75bcf3df41240d186955bff5657319e00828e5b6 - Sigstore transparency entry: 1274847344
- Sigstore integration time:
-
Permalink:
ai-2070/l0-python@41822115ba879018548fe47c073e5afd901f72e3 -
Branch / Tag:
refs/tags/0.21.0 - Owner: https://github.com/ai-2070
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@41822115ba879018548fe47c073e5afd901f72e3 -
Trigger Event:
release
-
Statement type:
File details
Details for the file ai2070_l0-0.21.0-py3-none-any.whl.
File metadata
- Download URL: ai2070_l0-0.21.0-py3-none-any.whl
- Upload date:
- Size: 250.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cd735503c7549a8e8bac3703c81a1e087536e7a29f8f27ddff671f1d1a8fb67b
|
|
| MD5 |
6f49ac218c632b8fd93e9f68e202462c
|
|
| BLAKE2b-256 |
19872990996965a22eb74ca8a845777be2d3a140a0e9f4f00c10cf3c4eec04e0
|
Provenance
The following attestation bundles were made for ai2070_l0-0.21.0-py3-none-any.whl:
Publisher:
release.yml on ai-2070/l0-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
ai2070_l0-0.21.0-py3-none-any.whl -
Subject digest:
cd735503c7549a8e8bac3703c81a1e087536e7a29f8f27ddff671f1d1a8fb67b - Sigstore transparency entry: 1274847520
- Sigstore integration time:
-
Permalink:
ai-2070/l0-python@41822115ba879018548fe47c073e5afd901f72e3 -
Branch / Tag:
refs/tags/0.21.0 - Owner: https://github.com/ai-2070
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@41822115ba879018548fe47c073e5afd901f72e3 -
Trigger Event:
release
-
Statement type: