Skip to main content

A modern, composable circuit breaker library for Python

Project description

Fluxgate

A modern, composable circuit breaker library for Python with full support for both synchronous and asynchronous code.

Python Version License

Features

  • Sync & Async: First-class support for both synchronous and asynchronous code
  • Composable: Build complex failure detection logic using simple, reusable components
  • Multiple Window Types: Count-based and time-based sliding windows
  • Flexible Failure Detection: Combine multiple conditions with logical operators
  • Zero Dependencies: Core library has no external dependencies
  • Built-in Monitoring: Optional Prometheus, Slack, and logging integrations
  • Fully Typed: Complete type hints for better IDE support

Installation

pip install fluxgate

# Optional integrations
pip install fluxgate[prometheus]  # Prometheus metrics
pip install fluxgate[slack]       # Slack notifications
pip install fluxgate[all]         # Everything

Quick Start

from fluxgate import CircuitBreaker
from fluxgate.windows import CountWindow
from fluxgate.trackers import TypeOf
from fluxgate.trippers import Closed, MinRequests, FailureRate
from fluxgate.retries import Cooldown
from fluxgate.permits import Random

cb = CircuitBreaker(
    name="payment_api",
    window=CountWindow(size=100),
    tracker=TypeOf(ConnectionError),
    tripper=Closed() & MinRequests(10) & FailureRate(0.5),
    retry=Cooldown(duration=60.0),
    permit=Random(ratio=0.5),
)

@cb
def call_payment_api(amount: float):
    return requests.post("https://api.example.com/pay", json={"amount": amount})

Async Support

from fluxgate import AsyncCircuitBreaker

cb = AsyncCircuitBreaker(
    name="async_api",
    window=CountWindow(size=100),
    tracker=TypeOf(ConnectionError),
    tripper=Closed() & MinRequests(10) & FailureRate(0.5),
    retry=Cooldown(duration=60.0),
    permit=Random(ratio=0.5),
)

@cb
async def call_async_api():
    async with httpx.AsyncClient() as client:
        return await client.get("https://api.example.com/data")

Core Concepts

States

┌─────────┐           ┌──────┐
│ CLOSED  │──────────>│ OPEN │<─────┐
└─────────┘ [tripper] └──────┘      │
     ^                    │         │
     │                    │[retry]  │[tripper]
     │                    v         │
     │               ┌───────────┐  │
     └───────────────│ HALF_OPEN │──┘
        [!tripper]   └───────────┘
  • CLOSED: Normal operation
  • OPEN: Failure threshold exceeded, calls blocked
  • HALF_OPEN: Testing recovery (permit controls which calls are allowed)

Windows

Track call history over a sliding window:

  • CountWindow(size) - Last N calls
  • TimeWindow(size) - Last N seconds

Components

Component Purpose Available Implementations
Trackers Define what exceptions to track All(), TypeOf(*types), Custom(func) (composable: &, |, ~)
Trippers When to open the circuit Closed(), HalfOpened(), MinRequests(count), FailureRate(ratio), AvgLatency(threshold), SlowCallRate(ratio) (composable: &, |)
Retries When to retry from OPEN → HALF_OPEN Never(), Always(), Cooldown(duration, jitter_ratio=0.0), Backoff(initial, multiplier=2.0, max_duration=300.0, jitter_ratio=0.0)
Permits Which calls allowed in HALF_OPEN Random(ratio), RampUp(initial, final, duration)

Monitoring

from fluxgate.listeners.log import LogListener
from fluxgate.listeners.prometheus import PrometheusListener
from fluxgate.listeners.slack import SlackListener

cb = CircuitBreaker(
    ...,
    listeners=[
        LogListener(),
        PrometheusListener(),
        SlackListener(channel="C1234567890", token="xoxb-..."),
    ]
)

Manual Control

# Get circuit state
info = cb.info()
print(f"State: {info.state}")
print(f"Failures: {info.metrics.failure_count}/{info.metrics.total_count}")

# Manual transitions
cb.disable()      # -> DISABLED
cb.enable()       # -> CLOSED
cb.force_open()   # -> FORCED_OPEN
cb.reset()        # -> CLOSED (clears metrics)

Error Handling

from fluxgate import CallNotPermittedError

try:
    result = cb.call(risky_function)
except CallNotPermittedError:
    result = get_cached_value()  # Fallback

Production-Ready Example

External payment API with multi-process deployment:

import httpx
from fluxgate import CircuitBreaker
from fluxgate.windows import CountWindow
from fluxgate.trackers import Custom
from fluxgate.trippers import Closed, HalfOpened, MinRequests, FailureRate
from fluxgate.retries import Backoff
from fluxgate.permits import RampUp
from fluxgate.listeners.log import LogListener
from fluxgate.listeners.prometheus import PrometheusListener

# Track only 5xx errors and network failures (not client errors like 4xx)
def is_retriable_error(e: Exception) -> bool:
    if isinstance(e, httpx.HTTPStatusError):
        return e.response.status_code >= 500
    return isinstance(e, (httpx.ConnectError, httpx.TimeoutException))

payment_cb = CircuitBreaker(
    # Circuit breaker identifier for logging and metrics
    name="payment_api",

    # Track last 100 calls (more predictable than TimeWindow across processes)
    window=CountWindow(size=100),

    # Define which exceptions count as failures
    tracker=Custom(is_retriable_error),

    # When to open: CLOSED needs 20+ calls & 60% failure; HALF_OPEN trips at 50%
    tripper=MinRequests(20) & ((Closed() & FailureRate(0.6)) | (HalfOpened() & FailureRate(0.5))),

    # When to retry: exponential backoff 10s, 20s, 40s... up to 300s with ±10% jitter
    retry=Backoff(initial=10.0, multiplier=2.0, max_duration=300.0, jitter_ratio=0.1),

    # Which calls allowed in HALF_OPEN: gradually ramp 10% → 50% over 60s
    permit=RampUp(initial=0.1, final=0.5, duration=60.0),

    # Event listeners for logging and metrics
    listeners=[LogListener(), PrometheusListener()],

    # Mark calls slower than 3s as slow
    slow_threshold=3.0,
)

@payment_cb
def charge_payment(amount: float):
    response = httpx.post("https://payment-api.example.com/charge", json={"amount": amount})
    response.raise_for_status()
    return response.json()

Requirements

  • Python 3.10+
  • Optional: prometheus-client for Prometheus support
  • Optional: httpx for Slack support

License

MIT License

Contributing

Contributions welcome! Please submit a Pull Request.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fluxgate-0.1.1.tar.gz (68.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fluxgate-0.1.1-py3-none-any.whl (21.9 kB view details)

Uploaded Python 3

File details

Details for the file fluxgate-0.1.1.tar.gz.

File metadata

  • Download URL: fluxgate-0.1.1.tar.gz
  • Upload date:
  • Size: 68.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for fluxgate-0.1.1.tar.gz
Algorithm Hash digest
SHA256 11fb62de43069e2a11a98f05d6eed39e46bafb96f3943d55c1e2af0fbd272d68
MD5 bf5f7556d4901d23f9989b25c54c8b51
BLAKE2b-256 a7d0611a6c96d6e0000f39dc2f194351af3fc0d1e600bbe394f238f330f50c35

See more details on using hashes here.

Provenance

The following attestation bundles were made for fluxgate-0.1.1.tar.gz:

Publisher: publish.yml on byExist/fluxgate

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file fluxgate-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: fluxgate-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 21.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for fluxgate-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 94ddf1b35284665c822a9620539d3b70d7bd421421fe2f629526f527e84fc91f
MD5 14e368094f628c7dd9c5a10a577cae70
BLAKE2b-256 8df30a7f5992c0f57d69555cc8e799cbfa4fb4be037ac39f7120e77f66dc8156

See more details on using hashes here.

Provenance

The following attestation bundles were made for fluxgate-0.1.1-py3-none-any.whl:

Publisher: publish.yml on byExist/fluxgate

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page