Skip to main content

Quality-aware circuit breaker for production ML pipelines

Project description

ml-breaker

Quality-aware circuit breaker for production ML pipelines.

PyPI Python ≥3.9 MIT License CI


The Problem

Standard circuit breakers trip on HTTP errors and latency. ML pipelines fail softly — the service returns 200, latency is fine, but model quality has degraded (low-confidence outputs, bad embeddings, hallucinated content). No existing library catches this. ml-breaker lets engineers define quality-based trip conditions using the model's own outputs.


Quick Start

pip install ml-breaker
from ml_breaker import circuit_breaker, CircuitBreakerOpen
from ml_breaker._conditions import consecutive_failures

@circuit_breaker(
    trip_on=consecutive_failures(5),
    fallback=lightweight_model,
    half_open_after=30,
)
def call_reranker(query, candidates):
    return reranker_model(query, candidates)

Quality Signals

Two ways to emit quality:

# Option A: report_quality() — when validation logic is complex
@circuit_breaker(trip_on=quality_below(threshold=0.7, window=20), ...)
def call_reranker(query, candidates):
    result = reranker_model(query, candidates)
    report_quality(result.confidence)   # emits to context side-channel
    return result                       # return type unchanged

# Option B: score_fn — when quality is directly on the return value
@circuit_breaker(
    trip_on=quality_below(threshold=0.7, window=20, score_fn=lambda r: r.confidence),
    ...
)
def call_reranker(query, candidates):
    return reranker_model(query, candidates)

report_quality wins if both are present.

Set require_quality_signal=True on the decorator to enforce that every call emits a quality score — useful when strict instrumentation is required and silent omissions should raise immediately.


State Machine

CLOSED ──(condition trips)──► OPEN ──(half_open_after s)──► HALF_OPEN
  ▲                                                               │
  └──(recovery_threshold probes pass)────────────────────────────┘
                                          │
  OPEN ◄──(any probe fails)───────────────┘
State Behavior
CLOSED Normal operation. Conditions are evaluated on every call.
OPEN All calls are rejected immediately (or routed to fallback). No traffic reaches the model.
HALF_OPEN A limited number of probe calls are allowed through to test recovery.

recovery_threshold defaults to 2 — one successful probe is too fragile for ML workloads where a single lucky call is not a reliable signal.


Built-in Conditions

Condition Signature Trips when
quality_below (threshold, window=20, score_fn=None) rolling mean of quality scores over full window drops below threshold
latency_above (threshold_ms, window=20) all latencies in full window exceed threshold
error_rate_above (rate, window=20) error rate over full window exceeds rate
consecutive_failures (n) n consecutive errors (no window)

window is a call count, not a time window.


Composing Conditions

from ml_breaker import any_of, all_of

trip_on=any_of(quality_below(0.7, window=20), latency_above(2000, window=10))
trip_on=all_of(error_rate_above(0.3, window=50), consecutive_failures(3))

Each condition in a composition maintains independent state and window — any_of and all_of do not share a merged buffer.


Fallback Strategies

Three forms are supported:

# 1. Callable — called with the same args as the guarded function
@circuit_breaker(trip_on=..., fallback=lightweight_model)
def call_reranker(query, candidates):
    return reranker_model(query, candidates)

# 2. Static value — returned as-is when the breaker is open
@circuit_breaker(trip_on=..., fallback=[])
def call_reranker(query, candidates):
    return reranker_model(query, candidates)

# 3. No fallback — raises CircuitBreakerOpen
@circuit_breaker(trip_on=...)
def call_reranker(query, candidates):
    return reranker_model(query, candidates)

try:
    result = call_reranker(query, candidates)
except CircuitBreakerOpen as e:
    print(e.name)        # breaker name
    print(e.state)       # State.OPEN
    print(e.trip_count)  # number of times this breaker has tripped

Registry & Inspection

cb = my_fn.breaker                # attached to decorated function
cb = CircuitBreaker.get("name")   # global registry lookup

cb.state        # State.CLOSED | State.OPEN | State.HALF_OPEN
cb.trip_count   # int
cb.reset()      # force back to CLOSED — useful in tests
CircuitBreaker.all()  # dict[str, CircuitBreaker]

Use name= to share a single breaker across multiple functions or services:

@circuit_breaker(name="reranker", trip_on=consecutive_failures(5), ...)
def call_reranker_v1(query, candidates): ...

@circuit_breaker(name="reranker", trip_on=consecutive_failures(5), ...)
def call_reranker_v2(query, candidates): ...

# Both functions reference the same CircuitBreaker instance
assert call_reranker_v1.breaker is call_reranker_v2.breaker

Prometheus Integration

from ml_breaker.metrics import PrometheusMetrics
PrometheusMetrics.register()   # call once at startup
pip install ml-breaker[metrics]
Metric Type Description
ml_breaker_state Gauge Current state per breaker (0=CLOSED, 1=OPEN, 2=HALF_OPEN)
ml_breaker_trip_total Counter Total number of trips per breaker
ml_breaker_call_total Counter Total calls, labeled by outcome (success/failure/rejected)
ml_breaker_quality_score Histogram Quality scores emitted via report_quality or score_fn
ml_breaker_latency_ms Histogram Call latency in milliseconds per breaker

For setups without a Prometheus dependency, use the on_state_change callback instead:

@circuit_breaker(
    trip_on=...,
    on_state_change=lambda breaker, old_state, new_state: logger.warning(
        "breaker %s: %s -> %s", breaker.name, old_state, new_state
    ),
)
def call_reranker(query, candidates):
    return reranker_model(query, candidates)

Roadmap

  • v0.2: async support, Redis-backed distributed state, built-in quality metrics for embeddings and softmax outputs

Contributing

PRs welcome. Open an issue first for anything non-trivial: github.com/ritabanb/ml-breaker/issues.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ml_breaker-0.1.1.tar.gz (22.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ml_breaker-0.1.1-py3-none-any.whl (12.3 kB view details)

Uploaded Python 3

File details

Details for the file ml_breaker-0.1.1.tar.gz.

File metadata

  • Download URL: ml_breaker-0.1.1.tar.gz
  • Upload date:
  • Size: 22.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.6

File hashes

Hashes for ml_breaker-0.1.1.tar.gz
Algorithm Hash digest
SHA256 976a8de25d259981d49170a99fd4809c4725cd28599b2b71bbf891700cc40d94
MD5 97596fca3bf64a05c64a2443e6d468ee
BLAKE2b-256 6534cceb6917cc5337890220258f3991b24791bc5a93f158a00c480499b73646

See more details on using hashes here.

File details

Details for the file ml_breaker-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: ml_breaker-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 12.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.6

File hashes

Hashes for ml_breaker-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 dd8992ef0c66a4d86c3153e6205aca61bab77d7cfc8271234bc3df2bd269c947
MD5 0fe7ac1fdf505d79a0b903715567a477
BLAKE2b-256 2fdd70504aa170c8273fd14b14de7d423b412bc41ae0c7b05cbd134a63199d09

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page