Skip to main content

A modern, composable circuit breaker library for Python

Project description

Fluxgate

A modern, composable circuit breaker library for Python with full support for both synchronous and asynchronous code.

Python Version License

Features

  • Sync & Async: First-class support for both synchronous and asynchronous code
  • Composable: Build complex failure detection logic using simple, reusable components
  • Multiple Window Types: Count-based and time-based sliding windows
  • Flexible Failure Detection: Combine multiple conditions with logical operators
  • Zero Dependencies: Core library has no external dependencies
  • Built-in Monitoring: Optional Prometheus, Slack, and logging integrations
  • Fully Typed: Complete type hints for better IDE support

Installation

pip install fluxgate

# Optional integrations
pip install fluxgate[prometheus]  # Prometheus metrics
pip install fluxgate[slack]       # Slack notifications
pip install fluxgate[all]         # Everything

Quick Start

from fluxgate import CircuitBreaker
from fluxgate.windows import CountWindow
from fluxgate.trackers import TypeOf
from fluxgate.trippers import Closed, MinRequests, FailureRate
from fluxgate.retries import Cooldown
from fluxgate.permits import Random

cb = CircuitBreaker(
    name="payment_api",
    window=CountWindow(size=100),
    tracker=TypeOf(ConnectionError),
    tripper=Closed() & MinRequests(10) & FailureRate(0.5),
    retry=Cooldown(duration=60.0),
    permit=Random(ratio=0.5),
    slow_threshold=float("inf"),
)

@cb
def call_payment_api(amount: float):
    return requests.post("https://api.example.com/pay", json={"amount": amount})

Async Support

from fluxgate import AsyncCircuitBreaker

cb = AsyncCircuitBreaker(
    name="async_api",
    window=CountWindow(size=100),
    tracker=TypeOf(ConnectionError),
    tripper=Closed() & MinRequests(10) & FailureRate(0.5),
    retry=Cooldown(duration=60.0),
    permit=Random(ratio=0.5),
    slow_threshold=float("inf"),
)

@cb
async def call_async_api():
    async with httpx.AsyncClient() as client:
        return await client.get("https://api.example.com/data")

Core Concepts

States

┌─────────┐           ┌──────┐
│ CLOSED  │──────────>│ OPEN │<─────┐
└─────────┘ [tripper] └──────┘      │
     ^                    │         │
     │                    │[retry]  │[tripper]
     │                    v         │
     │               ┌───────────┐  │
     └───────────────│ HALF_OPEN │──┘
        [!tripper]   └───────────┘
  • CLOSED: Normal operation
  • OPEN: Failure threshold exceeded, calls blocked
  • HALF_OPEN: Testing recovery (permit controls which calls are allowed)

Windows

Track call history over a sliding window:

  • CountWindow(size) - Last N calls
  • TimeWindow(size) - Last N seconds

Components

Component Purpose Available Implementations
Trackers Define what exceptions to track All(), TypeOf(*types), Custom(func) — composable: &, |, ~
Trippers When to open the circuit Closed(), HalfOpened(), MinRequests(n), FailureRate(ratio), AvgLatency(sec), SlowRate(ratio) — composable: &, |
Retries When to retry from OPEN → HALF_OPEN Never(), Always(), Cooldown(duration, jitter_ratio=0.0), Backoff(initial, multiplier=2.0, max_duration=300.0, jitter_ratio=0.0)
Permits Which calls allowed in HALF_OPEN Random(ratio), RampUp(initial, final, duration)

Monitoring

from fluxgate.listeners.log import LogListener
from fluxgate.listeners.prometheus import PrometheusListener
from fluxgate.listeners.slack import SlackListener

cb = CircuitBreaker(
    ...,
    listeners=[
        LogListener(),
        PrometheusListener(),
        SlackListener(channel="C1234567890", token="xoxb-..."),
    ]
)

Manual Control

# Get circuit state
info = cb.info()
print(f"State: {info.state}")
print(f"Failures: {info.metrics.failure_count}/{info.metrics.total_count}")

# Manual transitions
cb.disable()      # -> DISABLED (pass-through, no tracking)
cb.metrics_only() # -> METRICS_ONLY (pass-through, with tracking)
cb.force_open()   # -> FORCED_OPEN (block all calls)
cb.reset()        # -> CLOSED (clears metrics)

Error Handling

from fluxgate import CallNotPermittedError

try:
    result = cb.call(risky_function)
except CallNotPermittedError:
    result = get_cached_value()  # Fallback

Production-Ready Example

External payment API with multi-process deployment:

import httpx
from fluxgate import CircuitBreaker
from fluxgate.windows import CountWindow
from fluxgate.trackers import Custom
from fluxgate.trippers import Closed, HalfOpened, MinRequests, FailureRate
from fluxgate.retries import Backoff
from fluxgate.permits import RampUp
from fluxgate.listeners.log import LogListener
from fluxgate.listeners.prometheus import PrometheusListener

# Track only 5xx errors and network failures (not client errors like 4xx)
def is_retriable_error(e: Exception) -> bool:
    if isinstance(e, httpx.HTTPStatusError):
        return e.response.status_code >= 500
    return isinstance(e, (httpx.ConnectError, httpx.TimeoutException))

payment_cb = CircuitBreaker(
    # Circuit breaker identifier for logging and metrics
    name="payment_api",

    # Track last 100 calls (more predictable than TimeWindow across processes)
    window=CountWindow(size=100),

    # Define which exceptions count as failures
    tracker=Custom(is_retriable_error),

    # When to open: CLOSED needs 20+ calls & 60% failure; HALF_OPEN trips at 50%
    tripper=MinRequests(20) & ((Closed() & FailureRate(0.6)) | (HalfOpened() & FailureRate(0.5))),

    # When to retry: exponential backoff 10s, 20s, 40s... up to 300s with ±10% jitter
    retry=Backoff(initial=10.0, multiplier=2.0, max_duration=300.0, jitter_ratio=0.1),

    # Which calls allowed in HALF_OPEN: gradually ramp 10% → 50% over 60s
    permit=RampUp(initial=0.1, final=0.5, duration=60.0),

    # Mark calls slower than 3s as slow
    slow_threshold=3.0,

    # Event listeners for logging and metrics
    listeners=[LogListener(), PrometheusListener()],
)

@payment_cb
def charge_payment(amount: float):
    response = httpx.post("https://payment-api.example.com/charge", json={"amount": amount})
    response.raise_for_status()
    return response.json()

Requirements

  • Python 3.10+
  • Optional: prometheus-client for Prometheus support
  • Optional: httpx for Slack support

License

MIT License

Contributing

Contributions welcome! Please submit a Pull Request.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fluxgate-0.3.0.tar.gz (90.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fluxgate-0.3.0-py3-none-any.whl (22.6 kB view details)

Uploaded Python 3

File details

Details for the file fluxgate-0.3.0.tar.gz.

File metadata

  • Download URL: fluxgate-0.3.0.tar.gz
  • Upload date:
  • Size: 90.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for fluxgate-0.3.0.tar.gz
Algorithm Hash digest
SHA256 5ffd977f45092a6d30961706c4d6f6f47691bed6628d7b653e11d5f38d0701c1
MD5 a7249fc9483cf7bbcfe11739f336aa9a
BLAKE2b-256 87da6b2ecbb75cc269bfc40104b4964b3ebf1cf6a4440b987df9c7b454c1606f

See more details on using hashes here.

Provenance

The following attestation bundles were made for fluxgate-0.3.0.tar.gz:

Publisher: publish.yml on byExist/fluxgate

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file fluxgate-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: fluxgate-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 22.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for fluxgate-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 63b10917fe49eb3240f4512fda7467a5f21c6efa38c9041d67760d5095fc4e7a
MD5 a758587007fe51ffdc78e4b81decbeee
BLAKE2b-256 20a4f041acbc7ee7c29404c496f3da156ce924796fbb8b7cf2aad5d3aef58c02

See more details on using hashes here.

Provenance

The following attestation bundles were made for fluxgate-0.3.0-py3-none-any.whl:

Publisher: publish.yml on byExist/fluxgate

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page