Function decoration for backoff and retry
Project description
backon
Function decoration for backoff and retry — modern, fast, zero dependencies.
backon is a modern evolution of backoff — a zero-dependency Python library for retry with exponential backoff. It provides decorator, functional, and context manager APIs for both sync and async code.
Table of Contents
- Features
- Installation
- Quick Start
- API Reference
- Wait Generators
- Stop Conditions
- Retry Conditions
- Jitter
- Handlers
- Global Toggle
- Async Support
- Custom Sleep
- Advanced Features
- Migrating from backoff
- Contributing
- License
Features
- Zero dependencies — pure Python, stdlib only
- Four APIs — decorator (
@on_exception,@on_predicate), functional (retry()), context manager (Retrying), callable (RetryingCaller/AsyncRetryingCaller) - Async native — same API works for
async deffunctions - Full type hints — validated with mypy, strict mode compatible
- Global toggle —
backon.disable()/backon.enable()for testing - Custom sleep — inject your own sleep function (useful for testing with
asyncio.Event) - Multiple wait strategies — exponential, constant, Fibonacci, decay, runtime, randomized, incremental, and composable chains
- Jitter — full jitter, random jitter, or none
- Rich callbacks —
on_attempt,on_backoff,on_success,on_giveup,before_sleep,before,after - Circuit breaker — CLOSED/OPEN/HALF_OPEN states with automatic recovery
- Hedging — concurrent retry requests, first-success-wins
- Prometheus / OpenTelemetry metrics — optional, zero hard dependencies
- Testing module —
disable_retries(),limit_retries(),remove_backoff(),assert_retried() - Trio support — retry with the trio async framework
- Operator overloading — compose stops with
|/&, wait generators with+ - Iterator API —
for attempt in Retrying(...): - Modern packaging — PEP 621, PDM, py.typed
Installation
pip install backon
Requires Python 3.10+.
Quick Start
Retry on exception
import backon
@backon.on_exception(backon.expo, ValueError, max_tries=3)
def fetch_data():
return api.call()
Retry on predicate
@backon.on_predicate(backon.constant, max_tries=5, interval=0.5)
def poll_status():
return check_ready()
Functional API
result = backon.retry(
fetch_data,
backon.expo,
exception=ValueError,
max_tries=3,
)
Context manager
with backon.Retrying(backon.expo, exception=ValueError, max_tries=3) as r:
result = r.call(fetch_data)
Async variant:
async with backon.Retrying(backon.constant, exception=ValueError, max_tries=3, interval=0.5) as r:
result = await r.async_call(fetch_data)
API Reference
Decorators
@backon.on_exception(wait_gen, exception, ...)
Retry when the decorated function raises one of the specified exceptions.
@backon.on_exception(backon.expo, (ValueError, TimeoutError), max_tries=5)
def fetch():
...
| Argument | Type | Default | Description |
|---|---|---|---|
wait_gen |
WaitGenerator |
— | Wait strategy (expo, constant, fibo, etc.) |
exception |
type or tuple[type] |
— | Exception class(es) to retry on |
max_tries |
int or Callable[[], int] |
None |
Maximum number of attempts |
max_time |
float, timedelta, or Callable |
None |
Maximum total elapsed time |
jitter |
Jitterer or None |
full_jitter |
Jitter function |
giveup |
Callable[[Exception], bool or float] |
lambda e: False |
Stop retrying for matching exceptions; return float to override wait |
on_success |
Handler or list |
None |
Called after successful attempt |
on_backoff |
Handler or list |
None |
Called before each retry |
on_giveup |
Handler or list |
None |
Called when retries exhausted |
on_attempt |
Handler or list |
None |
Called before each attempt |
before_sleep |
Handler or list |
None |
Called before sleeping |
before |
Handler or list |
None |
Called before each attempt (lower-level than on_attempt) |
after |
Handler or list |
None |
Called after each attempt (lower-level than on_success/on_giveup) |
retry_error_callback |
Callable[[dict], Any] |
None |
Called when retry gives up instead of raising |
raise_on_giveup |
bool |
True |
Raise final exception when giving up |
logger |
str or Logger |
"backon" |
Logger name or instance |
backoff_log_level |
int |
logging.INFO |
Log level for backoff messages |
giveup_log_level |
int |
logging.ERROR |
Log level for giveup messages |
sleep |
Callable[[float], Any] |
None |
Custom sleep function |
**wait_gen_kwargs |
varies | — | Extra kwargs passed to the wait generator (e.g. base=3, interval=0.5) |
@backon.on_predicate(wait_gen, predicate, ...)
Retry while the predicate matches the return value.
@backon.on_predicate(backon.constant, predicate=lambda x: x is None, max_tries=5)
def poll():
...
Accepts all parameters from on_exception except exception, giveup, and raise_on_giveup. Adds:
| Argument | Type | Default | Description |
|---|---|---|---|
predicate |
Callable[[Any], bool] |
operator.not_ |
Retry when this returns True for the return value |
Functional API
backon.retry(target, wait_gen, ...)
result = backon.retry(
target=my_function,
wait_gen=backon.expo,
exception=ValueError,
max_tries=3,
)
Accepts all parameters from on_exception plus on_predicate extras, plus:
| Argument | Type | Default | Description |
|---|---|---|---|
condition |
RetryCondition |
None |
Advanced retry condition object |
stop |
Stop |
None |
Advanced stop condition object |
name |
str |
"" |
Identifier for the retry call |
**wait_gen_kwargs |
varies | — | Extra kwargs passed to the wait generator |
If target is a coroutine function, retry() returns a coroutine. Otherwise it returns the result synchronously.
Context Manager
backon.Retrying(wait_gen, ...)
with backon.Retrying(backon.expo, exception=ValueError, max_tries=3) as r:
r.call(my_function)
async with backon.Retrying(backon.constant, exception=ValueError, max_tries=3, interval=0.5) as r:
await r.async_call(my_async_function)
| Method | Description |
|---|---|
call(target, *args, **kwargs) |
Execute synchronously |
async_call(target, *args, **kwargs) |
Execute asynchronously |
copy() |
Return a modified copy of the Retrying instance |
statistics |
Property returning dict with attempt_number, elapsed, idle_for, start_time |
call_state |
Property returning the current RetryCallState |
enabled |
Property to enable/disable retry per-instance |
Arguments: Same as retry(), plus enabled (default True).
Callers
backon.RetryingCaller(wait_gen, ...)
A callable object with pre-bound exception type via .on().
caller = backon.RetryingCaller(backon.expo, max_tries=3)
caller = caller.on(ValueError)
result = caller(my_function, arg1, arg2)
backon.AsyncRetryingCaller(wait_gen, ...)
Async variant of RetryingCaller.
caller = backon.AsyncRetryingCaller(backon.expo, max_tries=3).on(ValueError)
result = await caller(my_async_function, arg1, arg2)
| Method | Description |
|---|---|
.on(exception) |
Return a copy bound to the given exception type |
.copy() |
Return a modified copy |
.__call__(target, *args, **kwargs) |
Execute with retry |
Wait Generators
All wait generators are callables that produce a sequence of wait times. Pass extra kwargs (e.g. interval=0.5, base=3) as **wait_gen_kwargs to decorators and functions.
| Generator | Signature | Description |
|---|---|---|
expo |
(base=2, factor=1, max_value=None) |
Exponential backoff: factor * base^n |
constant |
(interval=1) |
Fixed interval; accepts float or Sequence[float] for varied intervals |
fibo |
(max_value=None) |
Fibonacci sequence: 1, 1, 2, 3, 5, 8, ... |
runtime |
(value=Callable) |
Dynamic wait from return value or exception — useful for Retry-After headers |
decay |
(initial_value=1, decay_factor=1, min_value=None) |
Exponential decay: initial * e^(-t * decay_factor) |
wait_random_exponential |
(multiplier=1, max_value=None, exp_base=2, min_value=0) |
Randomized exponential (uniform random between 0 and the exponential value) |
wait_incrementing |
(start=1, increment=1, max_value=None) |
Linear increment: start + n * increment |
wait_chain |
(*generators) |
Sequentially play through multiple generators |
wait_exception |
(value=Callable) |
Dynamic wait based on the caught exception |
wait_random |
(min=0, max=1) |
Uniform random wait between min and max |
wait_exponential_jitter |
(initial=1, max=60, exp_base=2, jitter=1) |
Exponential backoff with added random jitter |
wait_none |
() |
Always returns 0 (no wait) |
Composition: Combine wait generators with +:
wait_strategy = backon.expo(base=3) + backon.constant(interval=0.5)
Stop Conditions
Stop conditions determine when retry should cease. They can be composed with | (any) and & (all).
| Condition | Description |
|---|---|
stop_after_attempt(max_attempts) |
Stop after N attempts |
stop_after_delay(max_delay) |
Stop after total elapsed time exceeds max_delay seconds |
stop_before_delay(max_delay) |
Stop if the next wait would exceed max_delay |
stop_all(*stops) |
Stop when all sub-conditions are met |
stop_any(*stops) |
Stop when any sub-condition is met |
stop_never() |
Never stop (retry indefinitely) |
stop_when_event_set(event) |
Stop when a threading.Event is set |
from backon import stop_after_attempt, stop_after_delay, stop_any
stop = stop_after_attempt(5) | stop_after_delay(30.0)
Retry Conditions
Retry conditions determine whether a retry should happen. They can be composed with | and &.
| Condition | Description |
|---|---|
retry_if_exception_type(*types) |
Retry if exception is an instance of given type(s) |
retry_if_exception(predicate) |
Retry if the exception matches a custom predicate |
retry_if_exception_message(message, match=None) |
Retry if exception message contains a string (or matches regex with match="re") |
retry_if_result(predicate) |
Retry if the return value matches a predicate |
retry_if_not_result(predicate) |
Retry if the return value does NOT match a predicate |
retry_all(*conditions) |
Retry only when all conditions pass |
retry_any(*conditions) |
Retry when any condition passes |
retry_always() |
Always retry |
retry_never() |
Never retry |
from backon import retry_if_exception_type, retry_if_exception_message, retry_all
condition = retry_all(
retry_if_exception_type(HTTPError),
retry_if_exception_message("429"),
)
Jitter
@backon.on_exception(backon.expo, ValueError, jitter=backon.full_jitter)
def f():
...
| Jitter | Effect |
|---|---|
backon.full_jitter |
Random value between 0 and the calculated wait time |
backon.random_jitter |
Adds random() to the calculated wait time (~+0.5s on average) |
None |
No jitter (deterministic waits) |
Handlers
Handlers receive a details dict with contextual information:
def handler(details):
print(f"Attempt {details['tries']}, elapsed {details['elapsed']:.2f}s")
@backon.on_exception(
backon.expo, ValueError, max_tries=3,
on_attempt=handler,
on_backoff=handler,
on_success=handler,
on_giveup=handler,
)
def f():
...
Available keys in details:
| Key | Available in |
|---|---|
target |
All |
args, kwargs |
All |
tries |
All |
elapsed |
All |
value |
on_success, on_backoff, on_giveup |
exception |
on_backoff, on_giveup |
wait |
on_backoff, before_sleep |
Global Toggle
Useful in tests to disable retry logic globally:
backon.disable() # skip retry, call function directly
backon.enable() # re-enable retry
Per-instance toggle via Retrying.enabled:
r = backon.Retrying(backon.expo, exception=ValueError, max_tries=3)
r.enabled = False
result = r.call(fn) # no retry
Async Support
All three APIs work with async functions transparently:
@backon.on_exception(backon.expo, ValueError, max_tries=3)
async def fetch():
return await api.call()
result = await backon.retry(fetch, backon.expo, exception=ValueError, max_tries=3)
async with backon.Retrying(backon.expo, exception=ValueError, max_tries=3) as r:
result = await r.async_call(fetch)
Custom Sleep
Replace the default sleep for testing or special environments:
@backon.on_exception(
backon.expo, ValueError, max_tries=3,
sleep=lambda s: print(f"waiting {s}s"),
)
def f():
...
# With asyncio.Event for testing
import asyncio
event = asyncio.Event()
@backon.on_exception(
backon.expo, ValueError, max_tries=3,
sleep=backon.sleep_using_event(event),
)
async def f():
...
Advanced Features
Circuit Breaker
Circuit breaker with three states: CLOSED (normal), OPEN (failing), HALF_OPEN (testing recovery).
from backon._circuit_breaker import CircuitBreaker, BreakerRetrying, CircuitOpenError
breaker = BreakerRetrying(
backon.expo, max_tries=3,
breaker=CircuitBreaker(
failure_threshold=5,
recovery_timeout=60.0,
half_open_max_calls=1,
),
)
try:
result = breaker.call(fetch)
except CircuitOpenError:
print("Circuit is open, skipping request")
CircuitBreaker parameter |
Default | Description |
|---|---|---|
failure_threshold |
5 |
Consecutive failures before opening the circuit |
recovery_timeout |
60.0 |
Seconds before transitioning from OPEN to HALF_OPEN |
half_open_max_calls |
1 |
Allowed calls in HALF_OPEN state before fully closing |
name |
"" |
Identifier for the breaker |
Hedging
Run multiple retry attempts concurrently and return the first success.
from backon._hedging import hedge, HedgingRetrying
# Functional
result = hedge(fetch, backon.expo, max_hedge=3)
# Decorator
@backon.on_hedge(backon.expo, max_hedge=3)
def fetch():
...
# Context manager
with HedgingRetrying(backon.expo, max_hedge=3) as h:
result = h.call(fetch)
| Parameter | Default | Description |
|---|---|---|
max_hedge |
3 |
Number of concurrent hedged requests |
timeout |
None |
Maximum time to wait for any hedge |
on_hedge |
None |
Callback when a hedge request is sent |
Metrics
Optional Prometheus and OpenTelemetry metrics. Requires prometheus_client or opentelemetry-api to be installed.
from backon._instrumentation import PrometheusMetrics, OTelMetrics, set_metrics_collector
# Prometheus
set_metrics_collector(PrometheusMetrics())
# OpenTelemetry
set_metrics_collector(OTelMetrics(meter_name="myapp.backon"))
Metrics collected:
backon_retry_attempts_total(attempts, labeled by target and exception type)backon_retry_success_total(successes)backon_retry_failure_total(failures)backon_circuit_breaker_open_total/backon_circuit_breaker_close_totalbackon_hedge_requests_totalbackon.retry.attempt_duration(histogram, OTel only)
Testing Utilities
from backon._testing import (
disable_retries, enable_retries,
test_config, limit_retries, remove_backoff,
assert_retried, assert_not_retried,
)
# Context manager that skips retry for a block
with disable_retries():
result = fetch()
# Limit max retries in tests
with limit_retries(2):
fetch()
# Remove backoff delay entirely
with remove_backoff():
fetch()
# Assert the function was retried N times
assert_retried(fetch, expected_tries=3)
Trio Support
Retry with the trio async framework:
from backon._trio import retry_exception, retry_predicate
@retry_exception(backon.expo, ValueError, max_tries=3)
async def fetch():
...
Requires trio to be installed.
Retry Context Inspection
Check if code is running inside a retry and get the current attempt number anywhere in the call stack:
from backon import is_retrying, get_attempt_number
def log_attempt():
if is_retrying():
print(f"This is attempt #{get_attempt_number()}")
@backon.on_exception(backon.expo, ValueError, max_tries=3)
def fetch():
log_attempt()
return api.call()
Uses contextvars — thread-safe and async-safe.
Dynamic Backoff
Override the wait time per attempt by returning a float from the giveup callback. Useful for respecting Retry-After headers.
def respect_retry_after(exc: HTTPError) -> float:
return exc.response.headers.get("Retry-After", 1.0)
@backon.on_exception(backon.expo, HTTPError, giveup=respect_retry_after)
def fetch():
...
Hot Loop Detection
When 5 or more retries occur with less than 100ms between them, backon logs a warning. This helps detect misconfigured retry policies before they cause issues.
Retry Statistics
r = backon.Retrying(backon.expo, exception=ValueError, max_tries=3)
result = r.call(fetch)
print(r.statistics)
# {'start_time': ..., 'attempt_number': 2, 'idle_for': 1.5, 'elapsed': 2.3}
print(r.call_state)
# RetryCallState(fn=..., attempt_number=2, ...)
Operator Composition
Compose stops, conditions, and wait generators using Python operators:
# Stop when either condition is met
stop = stop_after_attempt(5) | stop_after_delay(30.0)
# Retry when both conditions pass
cond = retry_if_exception_type(TimeoutError) & retry_if_result(lambda x: x is None)
# Wait with combined strategy
wait = backon.expo(base=3) + backon.constant(interval=0.5)
Iterator API
for attempt in backon.Retrying(backon.expo, exception=ValueError, max_tries=3):
with attempt:
result = fetch()
if not attempt.failed:
break
Migrating from backoff
backon is a near-drop-in replacement. Change your imports:
- import backoff
+ import backon
- @backoff.on_exception(backoff.expo, ValueError, max_tries=3)
+ @backon.on_exception(backon.expo, ValueError, max_tries=3)
Key differences:
| Area | backoff | backon |
|---|---|---|
| Python support | 3.7+ | 3.10+ |
| Type hints | Partial | Full |
on_attempt callback |
Not supported | Supported |
| Context manager | Not supported | Retrying class |
| Functional API | Not supported | retry() function, RetryingCaller |
| Global toggle | Not supported | disable() / enable() |
| Custom sleep | Not supported | sleep= parameter |
| Circuit breaker | Not supported | CircuitBreaker + BreakerRetrying |
| Hedging | Not supported | hedge() / on_hedge() |
| Metrics | Not supported | Prometheus / OTel |
| Wait generator composition | Not supported | + operator |
| Stop / RetryCondition composition | Not supported | | / & operators |
| Trio | Not supported | import from backon._trio |
| Iterator API | Not supported | for attempt in Retrying(): |
| Build system | Poetry | PDM (PEP 621) |
Contributing
git clone https://github.com/Llucs/backon.git
cd backon
pip install pdm
pdm install
pdm run ruff check backon/ tests/
pdm run mypy backon/
pdm run pytest tests/ -q
License
Made by Llucs with ❤️
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file backon-3.7.0.tar.gz.
File metadata
- Download URL: backon-3.7.0.tar.gz
- Upload date:
- Size: 50.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e9f5be03d528d5aebfd085c2f1af1922d47ae08372ef3b364bb9affaaa8fde35
|
|
| MD5 |
26953a0cf7a69206209045416b1d6f84
|
|
| BLAKE2b-256 |
60d64c4fa7003b4645febd6a0d00e0eb461e0e9d9d3a823bc090162fe265d4f7
|
Provenance
The following attestation bundles were made for backon-3.7.0.tar.gz:
Publisher:
release.yml on Llucs/backon
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
backon-3.7.0.tar.gz -
Subject digest:
e9f5be03d528d5aebfd085c2f1af1922d47ae08372ef3b364bb9affaaa8fde35 - Sigstore transparency entry: 2036745256
- Sigstore integration time:
-
Permalink:
Llucs/backon@b680d1d83ff80f13224d4f25acfd59aa1a784f24 -
Branch / Tag:
refs/tags/v3.7.0 - Owner: https://github.com/Llucs
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@b680d1d83ff80f13224d4f25acfd59aa1a784f24 -
Trigger Event:
release
-
Statement type:
File details
Details for the file backon-3.7.0-py3-none-any.whl.
File metadata
- Download URL: backon-3.7.0-py3-none-any.whl
- Upload date:
- Size: 34.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6b64ecedaa87995429b38f556c66a13fd99c4fb69b7e6d22525c2ee69f38a95a
|
|
| MD5 |
1c18a7340560dd8aa974d3d3fcc252fc
|
|
| BLAKE2b-256 |
73b1ccb967edf96ef4bdd8931730c6b466b7933c51624879db7bd3e06df62d23
|
Provenance
The following attestation bundles were made for backon-3.7.0-py3-none-any.whl:
Publisher:
release.yml on Llucs/backon
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
backon-3.7.0-py3-none-any.whl -
Subject digest:
6b64ecedaa87995429b38f556c66a13fd99c4fb69b7e6d22525c2ee69f38a95a - Sigstore transparency entry: 2036746018
- Sigstore integration time:
-
Permalink:
Llucs/backon@b680d1d83ff80f13224d4f25acfd59aa1a784f24 -
Branch / Tag:
refs/tags/v3.7.0 - Owner: https://github.com/Llucs
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@b680d1d83ff80f13224d4f25acfd59aa1a784f24 -
Trigger Event:
release
-
Statement type: