Skip to main content

A modern, composable circuit breaker library for Python

Project description

Fluxgate

A modern, composable circuit breaker library for Python with full support for both synchronous and asynchronous code.

Python Version License

Features

  • Sync & Async: First-class support for both synchronous and asynchronous code
  • Composable: Build complex failure detection logic using simple, reusable components
  • Multiple Window Types: Count-based and time-based sliding windows
  • Flexible Failure Detection: Combine multiple conditions with logical operators
  • Zero Dependencies: Core library has no external dependencies
  • Built-in Monitoring: Optional Prometheus, Slack, and logging integrations
  • Fully Typed: Complete type hints for better IDE support

Installation

pip install fluxgate

# Optional integrations
pip install fluxgate[prometheus]  # Prometheus metrics
pip install fluxgate[slack]       # Slack notifications
pip install fluxgate[all]         # Everything

Quick Start

from fluxgate import CircuitBreaker
from fluxgate.windows import CountWindow
from fluxgate.trackers import TypeOf
from fluxgate.trippers import Closed, MinRequests, FailureRate
from fluxgate.retries import Cooldown
from fluxgate.permits import Random

cb = CircuitBreaker(
    name="payment_api",
    window=CountWindow(size=100),
    tracker=TypeOf(ConnectionError),
    tripper=Closed() & MinRequests(10) & FailureRate(0.5),
    retry=Cooldown(duration=60.0),
    permit=Random(ratio=0.5),
    slow_threshold=float("inf"),
)

@cb
def call_payment_api(amount: float):
    return requests.post("https://api.example.com/pay", json={"amount": amount})

Async Support

from fluxgate import AsyncCircuitBreaker

cb = AsyncCircuitBreaker(
    name="async_api",
    window=CountWindow(size=100),
    tracker=TypeOf(ConnectionError),
    tripper=Closed() & MinRequests(10) & FailureRate(0.5),
    retry=Cooldown(duration=60.0),
    permit=Random(ratio=0.5),
    slow_threshold=float("inf"),
)

@cb
async def call_async_api():
    async with httpx.AsyncClient() as client:
        return await client.get("https://api.example.com/data")

Core Concepts

States

┌─────────┐           ┌──────┐
│ CLOSED  │──────────>│ OPEN │<─────┐
└─────────┘ [tripper] └──────┘      │
     ^                    │         │
     │                    │[retry]  │[tripper]
     │                    v         │
     │               ┌───────────┐  │
     └───────────────│ HALF_OPEN │──┘
        [!tripper]   └───────────┘
  • CLOSED: Normal operation
  • OPEN: Failure threshold exceeded, calls blocked
  • HALF_OPEN: Testing recovery (permit controls which calls are allowed)

Windows

Track call history over a sliding window:

  • CountWindow(size) - Last N calls
  • TimeWindow(size) - Last N seconds

Components

Component Purpose Available Implementations
Trackers Define what exceptions to track All(), TypeOf(*types), Custom(func) — composable: &, |, ~
Trippers When to open the circuit Closed(), HalfOpened(), MinRequests(n), FailureRate(ratio), AvgLatency(sec), SlowRate(ratio) — composable: &, |
Retries When to retry from OPEN → HALF_OPEN Never(), Always(), Cooldown(duration, jitter_ratio=0.0), Backoff(initial, multiplier=2.0, max_duration=300.0, jitter_ratio=0.0)
Permits Which calls allowed in HALF_OPEN Random(ratio), RampUp(initial, final, duration)

Monitoring

from fluxgate.listeners.log import LogListener
from fluxgate.listeners.prometheus import PrometheusListener
from fluxgate.listeners.slack import SlackListener

cb = CircuitBreaker(
    ...,
    listeners=[
        LogListener(),
        PrometheusListener(),
        SlackListener(channel="C1234567890", token="xoxb-..."),
    ]
)

Manual Control

# Get circuit state
info = cb.info()
print(f"State: {info.state}")
print(f"Failures: {info.metrics.failure_count}/{info.metrics.total_count}")

# Manual transitions
cb.disable()      # -> DISABLED (pass-through, no tracking)
cb.metrics_only() # -> METRICS_ONLY (pass-through, with tracking)
cb.force_open()   # -> FORCED_OPEN (block all calls)
cb.reset()        # -> CLOSED (clears metrics)

Error Handling

from fluxgate import CallNotPermittedError

try:
    result = cb.call(risky_function)
except CallNotPermittedError:
    result = get_cached_value()  # Fallback

Production-Ready Example

External payment API with multi-process deployment:

import httpx
from fluxgate import CircuitBreaker
from fluxgate.windows import CountWindow
from fluxgate.trackers import Custom
from fluxgate.trippers import Closed, HalfOpened, MinRequests, FailureRate
from fluxgate.retries import Backoff
from fluxgate.permits import RampUp
from fluxgate.listeners.log import LogListener
from fluxgate.listeners.prometheus import PrometheusListener

# Track only 5xx errors and network failures (not client errors like 4xx)
def is_retriable_error(e: Exception) -> bool:
    if isinstance(e, httpx.HTTPStatusError):
        return e.response.status_code >= 500
    return isinstance(e, (httpx.ConnectError, httpx.TimeoutException))

payment_cb = CircuitBreaker(
    # Circuit breaker identifier for logging and metrics
    name="payment_api",

    # Track last 100 calls (more predictable than TimeWindow across processes)
    window=CountWindow(size=100),

    # Define which exceptions count as failures
    tracker=Custom(is_retriable_error),

    # When to open: CLOSED needs 20+ calls & 60% failure; HALF_OPEN trips at 50%
    tripper=MinRequests(20) & ((Closed() & FailureRate(0.6)) | (HalfOpened() & FailureRate(0.5))),

    # When to retry: exponential backoff 10s, 20s, 40s... up to 300s with ±10% jitter
    retry=Backoff(initial=10.0, multiplier=2.0, max_duration=300.0, jitter_ratio=0.1),

    # Which calls allowed in HALF_OPEN: gradually ramp 10% → 50% over 60s
    permit=RampUp(initial=0.1, final=0.5, duration=60.0),

    # Mark calls slower than 3s as slow
    slow_threshold=3.0,

    # Event listeners for logging and metrics
    listeners=[LogListener(), PrometheusListener()],
)

@payment_cb
def charge_payment(amount: float):
    response = httpx.post("https://payment-api.example.com/charge", json={"amount": amount})
    response.raise_for_status()
    return response.json()

Requirements

  • Python 3.10+
  • Optional: prometheus-client for Prometheus support
  • Optional: httpx for Slack support

License

MIT License

Contributing

Contributions welcome! Please submit a Pull Request.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fluxgate-0.2.0.tar.gz (88.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fluxgate-0.2.0-py3-none-any.whl (22.3 kB view details)

Uploaded Python 3

File details

Details for the file fluxgate-0.2.0.tar.gz.

File metadata

  • Download URL: fluxgate-0.2.0.tar.gz
  • Upload date:
  • Size: 88.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for fluxgate-0.2.0.tar.gz
Algorithm Hash digest
SHA256 e22e0f1db32eaf49ccf02c1e3acd5bac07f093de6fedc789f883501af22db96b
MD5 750d71a57516fe25a6a0c76eb28f61e5
BLAKE2b-256 140056a30adb5b2428f0b52d96439e4ce7b0f6cfc060a0c6f5c7aec7b96f289b

See more details on using hashes here.

Provenance

The following attestation bundles were made for fluxgate-0.2.0.tar.gz:

Publisher: publish.yml on byExist/fluxgate

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file fluxgate-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: fluxgate-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 22.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for fluxgate-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7fc75e9e67116da0d7df42c3fccba31110be2fd222c3d293dd5d4b5307cbe3b0
MD5 dfd2b2cd7352e34aaf3a016896ce2b3e
BLAKE2b-256 17b3ee290cbb1023a1887b20d34705fba4d2244e8e4dcbe3779de6cd97b526a9

See more details on using hashes here.

Provenance

The following attestation bundles were made for fluxgate-0.2.0-py3-none-any.whl:

Publisher: publish.yml on byExist/fluxgate

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page