Skip to main content

A modern, composable circuit breaker library for Python

Project description

Fluxgate

A modern, composable circuit breaker library for Python with full support for both synchronous and asynchronous code.

Python Version License

Features

  • Sync & Async: First-class support for both synchronous and asynchronous code
  • Composable: Build complex failure detection logic using simple, reusable components
  • Multiple Window Types: Count-based and time-based sliding windows
  • Flexible Failure Detection: Combine multiple conditions with logical operators
  • Zero Dependencies: Core library has no external dependencies
  • Built-in Monitoring: Optional Prometheus, Slack, and logging integrations
  • Fully Typed: Complete type hints for better IDE support

Installation

pip install fluxgate

# Optional integrations
pip install fluxgate[prometheus]  # Prometheus metrics
pip install fluxgate[slack]       # Slack notifications
pip install fluxgate[all]         # Everything

Quick Start

from fluxgate import CircuitBreaker
from fluxgate.windows import CountWindow
from fluxgate.trackers import TypeOf
from fluxgate.trippers import Closed, MinRequests, FailureRate
from fluxgate.retries import Cooldown
from fluxgate.permits import Random

cb = CircuitBreaker(
    name="payment_api",
    window=CountWindow(size=100),
    tracker=TypeOf(ConnectionError),
    tripper=Closed() & MinRequests(10) & FailureRate(0.5),
    retry=Cooldown(duration=60.0),
    permit=Random(ratio=0.5),
)

@cb
def call_payment_api(amount: float):
    return requests.post("https://api.example.com/pay", json={"amount": amount})

Async Support

from fluxgate import AsyncCircuitBreaker

cb = AsyncCircuitBreaker(
    name="async_api",
    window=CountWindow(size=100),
    tracker=TypeOf(ConnectionError),
    tripper=Closed() & MinRequests(10) & FailureRate(0.5),
    retry=Cooldown(duration=60.0),
    permit=Random(ratio=0.5),
)

@cb
async def call_async_api():
    async with httpx.AsyncClient() as client:
        return await client.get("https://api.example.com/data")

Core Concepts

States

┌─────────┐           ┌──────┐
│ CLOSED  │──────────>│ OPEN │<─────┐
└─────────┘ [tripper] └──────┘      │
     ^                    │         │
     │                    │[retry]  │[tripper]
     │                    v         │
     │               ┌───────────┐  │
     └───────────────│ HALF_OPEN │──┘
        [!tripper]   └───────────┘
  • CLOSED: Normal operation
  • OPEN: Failure threshold exceeded, calls blocked
  • HALF_OPEN: Testing recovery (permit controls which calls are allowed)

Windows

Track call history over a sliding window:

  • CountWindow(size) - Last N calls
  • TimeWindow(size) - Last N seconds

Components

Component Purpose Available Implementations
Trackers Define what exceptions to track All(), TypeOf(*types), Custom(func) (composable: &, |, ~)
Trippers When to open the circuit Closed(), HalfOpened(), MinRequests(count), FailureRate(ratio), AvgLatency(threshold), SlowCallRate(ratio) (composable: &, |)
Retries When to retry from OPEN → HALF_OPEN Never(), Always(), Cooldown(duration, jitter_ratio=0.0), Backoff(initial, multiplier=2.0, max_duration=300.0, jitter_ratio=0.0)
Permits Which calls allowed in HALF_OPEN Random(ratio), RampUp(initial, final, duration)

Monitoring

from fluxgate.listeners.log import LogListener
from fluxgate.listeners.prometheus import PrometheusListener
from fluxgate.listeners.slack import SlackListener

cb = CircuitBreaker(
    ...,
    listeners=[
        LogListener(),
        PrometheusListener(),
        SlackListener(channel="C1234567890", token="xoxb-..."),
    ]
)

Manual Control

# Get circuit state
info = cb.info()
print(f"State: {info.state}")
print(f"Failures: {info.metrics.failure_count}/{info.metrics.total_count}")

# Manual transitions
cb.disable()      # -> DISABLED
cb.enable()       # -> CLOSED
cb.force_open()   # -> FORCED_OPEN
cb.reset()        # -> CLOSED (clears metrics)

Error Handling

from fluxgate import CallNotPermittedError

try:
    result = cb.call(risky_function)
except CallNotPermittedError:
    result = get_cached_value()  # Fallback

Production-Ready Example

External payment API with multi-process deployment:

import httpx
from fluxgate import CircuitBreaker
from fluxgate.windows import CountWindow
from fluxgate.trackers import Custom
from fluxgate.trippers import Closed, HalfOpened, MinRequests, FailureRate
from fluxgate.retries import Backoff
from fluxgate.permits import RampUp
from fluxgate.listeners.log import LogListener
from fluxgate.listeners.prometheus import PrometheusListener

# Track only 5xx errors and network failures (not client errors like 4xx)
def is_retriable_error(e: Exception) -> bool:
    if isinstance(e, httpx.HTTPStatusError):
        return e.response.status_code >= 500
    return isinstance(e, (httpx.ConnectError, httpx.TimeoutException))

payment_cb = CircuitBreaker(
    # Circuit breaker identifier for logging and metrics
    name="payment_api",

    # Track last 100 calls (more predictable than TimeWindow across processes)
    window=CountWindow(size=100),

    # Define which exceptions count as failures
    tracker=Custom(is_retriable_error),

    # When to open: CLOSED needs 20+ calls & 60% failure; HALF_OPEN trips at 50%
    tripper=MinRequests(20) & ((Closed() & FailureRate(0.6)) | (HalfOpened() & FailureRate(0.5))),

    # When to retry: exponential backoff 10s, 20s, 40s... up to 300s with ±10% jitter
    retry=Backoff(initial=10.0, multiplier=2.0, max_duration=300.0, jitter_ratio=0.1),

    # Which calls allowed in HALF_OPEN: gradually ramp 10% → 50% over 60s
    permit=RampUp(initial=0.1, final=0.5, duration=60.0),

    # Event listeners for logging and metrics
    listeners=[LogListener(), PrometheusListener()],

    # Mark calls slower than 3s as slow
    slow_threshold=3.0,
)

@payment_cb
def charge_payment(amount: float):
    response = httpx.post("https://payment-api.example.com/charge", json={"amount": amount})
    response.raise_for_status()
    return response.json()

Requirements

  • Python 3.10+
  • Optional: prometheus-client for Prometheus support
  • Optional: httpx for Slack support

License

MIT License

Contributing

Contributions welcome! Please submit a Pull Request.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fluxgate-0.1.2.tar.gz (88.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fluxgate-0.1.2-py3-none-any.whl (22.3 kB view details)

Uploaded Python 3

File details

Details for the file fluxgate-0.1.2.tar.gz.

File metadata

  • Download URL: fluxgate-0.1.2.tar.gz
  • Upload date:
  • Size: 88.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for fluxgate-0.1.2.tar.gz
Algorithm Hash digest
SHA256 529d1db61c297ab7b12e774c4e9217a3beb6c7d34930607d3c7f5d10aa2dd6b6
MD5 f60bd82518120cf21f9a774494c74abc
BLAKE2b-256 317eba0d7436d2b61d1098b59f3f8bce54ac74733afd2999694c4f3144ded56d

See more details on using hashes here.

Provenance

The following attestation bundles were made for fluxgate-0.1.2.tar.gz:

Publisher: publish.yml on byExist/fluxgate

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file fluxgate-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: fluxgate-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 22.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for fluxgate-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 800dd72ffbcd0facdd64eebf1989a678377bc715b574f03542d7799bddd8bdbd
MD5 232ad93fe3fbb5cb0b801a451b90b870
BLAKE2b-256 b2722cfbcdcd6ea1d0c2ef0d361dae91762a98d1babfec12ae21762d50cf607c

See more details on using hashes here.

Provenance

The following attestation bundles were made for fluxgate-0.1.2-py3-none-any.whl:

Publisher: publish.yml on byExist/fluxgate

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page