Skip to main content

Quality-aware circuit breaker for production ML pipelines

Project description

ml-breaker

Quality-aware circuit breaker for production ML pipelines.

PyPI Python ≥3.9 MIT License CI


The Problem

Standard circuit breakers trip on HTTP errors and latency. ML pipelines fail softly — the service returns 200, latency is fine, but model quality has degraded (low-confidence outputs, bad embeddings, hallucinated content). No existing library catches this. ml-breaker lets engineers define quality-based trip conditions using the model's own outputs.


Quick Start

pip install ml-breaker
from ml_breaker import circuit_breaker, CircuitBreakerOpen
from ml_breaker._conditions import consecutive_failures

@circuit_breaker(
    trip_on=consecutive_failures(5),
    fallback=lightweight_model,
    half_open_after=30,
)
def call_reranker(query, candidates):
    return reranker_model(query, candidates)

Quality Signals

Two ways to emit quality:

# Option A: report_quality() — when validation logic is complex
@circuit_breaker(trip_on=quality_below(threshold=0.7, window=20), ...)
def call_reranker(query, candidates):
    result = reranker_model(query, candidates)
    report_quality(result.confidence)   # emits to context side-channel
    return result                       # return type unchanged

# Option B: score_fn — when quality is directly on the return value
@circuit_breaker(
    trip_on=quality_below(threshold=0.7, window=20, score_fn=lambda r: r.confidence),
    ...
)
def call_reranker(query, candidates):
    return reranker_model(query, candidates)

report_quality wins if both are present.

Set require_quality_signal=True on the decorator to enforce that every call emits a quality score — useful when strict instrumentation is required and silent omissions should raise immediately.


State Machine

CLOSED ──(condition trips)──► OPEN ──(half_open_after s)──► HALF_OPEN
  ▲                                                               │
  └──(recovery_threshold probes pass)────────────────────────────┘
                                          │
  OPEN ◄──(any probe fails)───────────────┘
State Behavior
CLOSED Normal operation. Conditions are evaluated on every call.
OPEN All calls are rejected immediately (or routed to fallback). No traffic reaches the model.
HALF_OPEN A limited number of probe calls are allowed through to test recovery.

recovery_threshold defaults to 2 — one successful probe is too fragile for ML workloads where a single lucky call is not a reliable signal.


Built-in Conditions

Condition Signature Trips when
quality_below (threshold, window=20, score_fn=None) rolling mean of quality scores over full window drops below threshold
latency_above (threshold_ms, window=20) all latencies in full window exceed threshold
error_rate_above (rate, window=20) error rate over full window exceeds rate
consecutive_failures (n) n consecutive errors (no window)

window is a call count, not a time window.


Composing Conditions

from ml_breaker import any_of, all_of

trip_on=any_of(quality_below(0.7, window=20), latency_above(2000, window=10))
trip_on=all_of(error_rate_above(0.3, window=50), consecutive_failures(3))

Each condition in a composition maintains independent state and window — any_of and all_of do not share a merged buffer.


Fallback Strategies

Three forms are supported:

# 1. Callable — called with the same args as the guarded function
@circuit_breaker(trip_on=..., fallback=lightweight_model)
def call_reranker(query, candidates):
    return reranker_model(query, candidates)

# 2. Static value — returned as-is when the breaker is open
@circuit_breaker(trip_on=..., fallback=[])
def call_reranker(query, candidates):
    return reranker_model(query, candidates)

# 3. No fallback — raises CircuitBreakerOpen
@circuit_breaker(trip_on=...)
def call_reranker(query, candidates):
    return reranker_model(query, candidates)

try:
    result = call_reranker(query, candidates)
except CircuitBreakerOpen as e:
    print(e.name)        # breaker name
    print(e.state)       # State.OPEN
    print(e.trip_count)  # number of times this breaker has tripped

Registry & Inspection

cb = my_fn.breaker                # attached to decorated function
cb = CircuitBreaker.get("name")   # global registry lookup

cb.state        # State.CLOSED | State.OPEN | State.HALF_OPEN
cb.trip_count   # int
cb.reset()      # force back to CLOSED — useful in tests
CircuitBreaker.all()  # dict[str, CircuitBreaker]

Use name= to share a single breaker across multiple functions or services:

@circuit_breaker(name="reranker", trip_on=consecutive_failures(5), ...)
def call_reranker_v1(query, candidates): ...

@circuit_breaker(name="reranker", trip_on=consecutive_failures(5), ...)
def call_reranker_v2(query, candidates): ...

# Both functions reference the same CircuitBreaker instance
assert call_reranker_v1.breaker is call_reranker_v2.breaker

Prometheus Integration

from ml_breaker.metrics import PrometheusMetrics
PrometheusMetrics.register()   # call once at startup
pip install ml-breaker[metrics]
Metric Type Description
ml_breaker_state Gauge Current state per breaker (0=CLOSED, 1=OPEN, 2=HALF_OPEN)
ml_breaker_trip_total Counter Total number of trips per breaker
ml_breaker_call_total Counter Total calls, labeled by outcome (success/failure/rejected)
ml_breaker_quality_score Histogram Quality scores emitted via report_quality or score_fn
ml_breaker_latency_ms Histogram Call latency in milliseconds per breaker

For setups without a Prometheus dependency, use the on_state_change callback instead:

@circuit_breaker(
    trip_on=...,
    on_state_change=lambda breaker, old_state, new_state: logger.warning(
        "breaker %s: %s -> %s", breaker.name, old_state, new_state
    ),
)
def call_reranker(query, candidates):
    return reranker_model(query, candidates)

Roadmap

  • v0.2: async support, Redis-backed distributed state, built-in quality metrics for embeddings and softmax outputs

Contributing

PRs welcome. Open an issue first for anything non-trivial: github.com/ritabanb/ml-breaker/issues.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ml_breaker-0.1.0.tar.gz (20.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ml_breaker-0.1.0-py3-none-any.whl (11.7 kB view details)

Uploaded Python 3

File details

Details for the file ml_breaker-0.1.0.tar.gz.

File metadata

  • Download URL: ml_breaker-0.1.0.tar.gz
  • Upload date:
  • Size: 20.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.6

File hashes

Hashes for ml_breaker-0.1.0.tar.gz
Algorithm Hash digest
SHA256 e263413b4454d5454f0ddac6aa299f1ca6551e86b4ca286a085949f84203814f
MD5 194eaa000363cac38fd43ba0dea373a8
BLAKE2b-256 cefdffa529e9fe34d64d7b3703b814d43d2811fe365eb21ecc86c01712a454dd

See more details on using hashes here.

File details

Details for the file ml_breaker-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: ml_breaker-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 11.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.6

File hashes

Hashes for ml_breaker-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a01a6dbe53ec867b31e41b3c227dcfa32d90c094d8f35004e40a37c34c095d00
MD5 aa4ba124f55c0549ace8e519eb001f3d
BLAKE2b-256 aa35283da86eeaa18f4a9bd940239b4bffbe72e281e05a17fc6808c52ea59e1b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page