Simple token throttle package
Project description
Token throttler
Token throttler is an extendable rate-limiting library somewhat based on a token bucket algorithm.
Table of contents
1. Installation
Token throttler is available on PyPI:
$ python -m pip install token-throttler
Token throttler officially supports Python >= 3.7.
2. Features
- Global throttler(s) configuration
- Configurable token throttler cost and identifier
- Multiple buckets per throttler per identifier
- Buckets can be added/removed manually or by a
dict
configuration - Manual usage or usage via decorator
- Decorator usage supports async code too
- Custom decorator can be written
- Extendable storage engine (eg. Redis)
3. Usage
Token throttler supports both manual usage and via decorator.
Decorator usage supports both async and sync.
1) Manual usage example:
from token_throttler import TokenBucket, TokenThrottler
from token_throttler.storage import RuntimeStorage
throttler: TokenThrottler = TokenThrottler(cost=1, storage=RuntimeStorage())
throttler.add_bucket(identifier="hello_world", bucket=TokenBucket(replenish_time=10, max_tokens=10))
throttler.add_bucket(identifier="hello_world", bucket=TokenBucket(replenish_time=30, max_tokens=20))
def hello_world() -> None:
print("Hello World")
for i in range(10):
throttler.consume(identifier="hello_world")
hello_world()
if throttler.consume(identifier="hello_world"):
hello_world()
else:
print("bucket_one ran out of tokens")
2) Decorator usage example:
from token_throttler import TokenBucket, TokenThrottler, TokenThrottlerException
from token_throttler.storage import RuntimeStorage
throttler: TokenThrottler = TokenThrottler(1, RuntimeStorage())
throttler.add_bucket("hello_world", TokenBucket(10, 10))
throttler.add_bucket("hello_world", TokenBucket(30, 20))
@throttler.enable("hello_world")
def hello_world() -> None:
print("Hello World")
for i in range(10):
hello_world()
try:
hello_world()
except TokenThrottlerException:
print("bucket_one ran out of tokens")
For other examples see examples directory.
4. Storage
Currently, token throttler supports RuntimeStorage
but is very easy to extend.
If you want your own storage engine, feel free to extend the token_throttler.storage.BucketStorage
class.
1) Redis storage example:
import pickle
from datetime import timedelta
from typing import Dict, List, Union
from redis import StrictRedis
from token_throttler import TokenBucket, TokenThrottler
from token_throttler.storage import BucketStorage
class RedisStorage(BucketStorage):
def __init__(self, connection_string: str, delimiter: str) -> None:
super().__init__()
self.redis: StrictRedis = StrictRedis.from_url(url=connection_string)
self.delimiter: str = delimiter
def _create_bucket(self, cache_key: str) -> TokenBucket:
bucket_info: List[str] = cache_key.split(self.delimiter)
token_bucket: TokenBucket = TokenBucket(
int(bucket_info[1]), int(bucket_info[-1])
)
token_bucket.cost = int(bucket_info[2])
token_bucket.identifier = bucket_info[0]
return token_bucket
def _delete_bucket(self, cache_key: str) -> None:
self.redis.delete(cache_key)
def _save_bucket(self, cache_key: str, bucket: TokenBucket) -> None:
self.redis.setex(
cache_key,
timedelta(seconds=bucket.replenish_time),
pickle.dumps(bucket),
)
def get_bucket(self, identifier: str, bucket_key: str) -> Union[TokenBucket, None]:
cache_key: Union[str, None] = self.get(identifier, {}).get(bucket_key, None)
if not cache_key:
return None
bucket: Union[bytes, None] = self.redis.get(cache_key)
if not bucket:
return None
return pickle.loads(bucket)
def get_all_buckets(self, identifier: str) -> Union[Dict[str, TokenBucket], None]:
buckets: Dict[str, TokenBucket] = {}
stored_buckets: Dict[str, str] = self.get(identifier, None)
if not stored_buckets:
return None
for bucket_key in stored_buckets:
bucket: Union[TokenBucket, None] = self.get_bucket(identifier, bucket_key)
if not bucket:
continue
buckets[bucket_key] = bucket
return None if not buckets else buckets
def add_bucket(self, bucket: TokenBucket) -> None:
cache_key: str = f"{self.delimiter}".join(
map(
str,
[
bucket.identifier,
bucket.replenish_time,
bucket.cost,
bucket.max_tokens,
],
)
)
self[str(bucket.identifier)][str(bucket.replenish_time)] = cache_key
self._save_bucket(cache_key, bucket)
def remove_bucket(self, identifier: str, bucket_key: str) -> None:
if identifier not in self:
return None
bucket: Union[str, None] = self.get(identifier, {}).get(bucket_key, None)
if bucket:
self._delete_bucket(self[identifier][bucket_key])
del self[identifier][bucket_key]
if not self[identifier]:
del self[identifier]
def remove_all_buckets(self, identifier: str) -> None:
if identifier not in self:
return None
for bucket_key in self[identifier]:
self._delete_bucket(self[identifier][bucket_key])
del self[identifier]
def replenish(self, bucket: TokenBucket) -> None:
pass
def consume(self, identifier: str, bucket_key: str) -> bool:
cache_key: str = self[identifier][bucket_key]
bucket: Union[TokenBucket, None] = self.get_bucket(identifier, bucket_key)
if not bucket:
bucket = self._create_bucket(cache_key)
self.add_bucket(bucket)
bucket_state: bool = bucket.consume()
self._save_bucket(cache_key, bucket)
return bucket_state
throttler: TokenThrottler = TokenThrottler(1, RedisStorage(connection_string="connection-string-to-redis", delimiter="||"))
...
5. Configuration
Token throttler supports global configuration by making use of ThrottlerConfig
class.
Configuration params:
IDENTIFIER_FAIL_SAFE
- if invalid identifier is given as a param for theconsume
method andIDENTIFIER_FAIL_SAFE
is set toTrue
, noKeyError
exception will be raised andconsume
will act like a limitless bucket is being consumed.ENABLE_THREAD_LOCK
- if set toTrue
, throttler will acquire a thread lock upon callingconsume
method and release the lock once theconsume
is finished. This avoids various race conditions at a slight performance cost.
Configuration usage
from token_throttler import ThrottlerConfig, TokenBucket, TokenThrottler
from token_throttler.storage import RuntimeStorage
ThrottlerConfig.set({
"ENABLE_THREAD_LOCK": False,
"IDENTIFIER_FAIL_SAFE": True,
})
throttler: TokenThrottler = TokenThrottler(1, RuntimeStorage())
throttler.add_bucket("hello_world", TokenBucket(10, 10))
throttler.add_bucket("hello_world", TokenBucket(30, 20))
...
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for token_throttler-1.1.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0362d3bc0660f0e70216be0af1762df39ef00c8eafe50f5a423a892d2d772c1d |
|
MD5 | d10f64ca123a133c03de394f76a0b90d |
|
BLAKE2b-256 | bb2bbad24337e87e9fbc52de51268b8e866502de82b8c16130fc163774029af4 |