Usable Circuit Breaker pattern implementation
Project description
Usable Circuit Breaker pattern implementation.
Install
$ pip install breakers
Usage
import functools
from breakers import Breaker
def circuit_breaker(time_span=20000, unit=1000, calls_limit=10,
error_limit=0.5, retry_time=10000):
def deco(func):
# Create breaker
if not hasattr(func, '__breaker__'):
func.__breaker__ = Breaker(time_span, unit, calls_limit,
error_limit, retry_time)
breaker = func.__breaker__
@functools.wraps(func)
def wraps(*args, **kwargs):
if not breaker.is_allow():
raise RuntimeError('Circuit breaker')
exc = None
try:
return func(*args, **kwargs)
except Exception as e:
exc = e
raise
finally:
if exc:
breaker.add_failure(1)
else:
breaker.add_success(1)
return wraps
return deco
@circuit_breaker()
def f():
import random
if random.randint(1, 4) in (1, 2):
raise ValueError
return 'succeed'
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
breakers-0.1.0.tar.gz
(3.3 kB
view hashes)
Built Distribution
Close
Hashes for breakers-0.1.0-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9a61cf5c6477c850efc2916ce3e6ae13a4a8f544b70c8bfdd4be0649409c426b |
|
MD5 | a86df33126cdd90343ad70c5594bfbc1 |
|
BLAKE2b-256 | be5effa1e68eec8fd610b3d661ad48075fc39ba95bf1492a9c898167f32b7e85 |