Skip to main content

Simple token throttle package

Project description

Token throttler

Coverage Version Downloads License

Token throttler is an extendable rate-limiting library somewhat based on a token bucket algorithm.

Table of contents

  1. Installation
  2. Features
  3. Usage
    1. Manual usage example
    2. Decorator usage example
  4. Storage
    1. Redis storage example
  5. Configuration
    1. Configuration usage

1. Installation

Token throttler is available on PyPI:

$ python -m pip install token-throttler

Token throttler officially supports Python >= 3.7.

2. Features

  • Global throttler(s) configuration
  • Configurable token throttler cost and identifier
  • Multiple buckets per throttler per identifier
  • Buckets can be added/removed manually or by a dict configuration
  • Manual usage or usage via decorator
  • Decorator usage supports async code too
  • Custom decorator can be written
  • Extendable storage engine (eg. Redis)

3. Usage

Token throttler supports both manual usage and via decorator.

Decorator usage supports both async and sync.

1) Manual usage example:

from token_throttler import TokenBucket, TokenThrottler
from token_throttler.storage import RuntimeStorage

throttler: TokenThrottler = TokenThrottler(cost=1, storage=RuntimeStorage())
throttler.add_bucket(identifier="hello_world", bucket=TokenBucket(replenish_time=10, max_tokens=10))
throttler.add_bucket(identifier="hello_world", bucket=TokenBucket(replenish_time=30, max_tokens=20))


def hello_world() -> None:
    print("Hello World")


for i in range(10):
    throttler.consume(identifier="hello_world")
    hello_world()

if throttler.consume(identifier="hello_world"):
    hello_world()
else:
    print("bucket_one ran out of tokens")

2) Decorator usage example:

from token_throttler import TokenBucket, TokenThrottler, TokenThrottlerException
from token_throttler.storage import RuntimeStorage

throttler: TokenThrottler = TokenThrottler(1, RuntimeStorage())
throttler.add_bucket("hello_world", TokenBucket(10, 10))
throttler.add_bucket("hello_world", TokenBucket(30, 20))


@throttler.enable("hello_world")
def hello_world() -> None:
    print("Hello World")


for i in range(10):
    hello_world()

try:
    hello_world()
except TokenThrottlerException:
    print("bucket_one ran out of tokens")

For other examples see examples directory.

4. Storage

Currently, token throttler supports RuntimeStorage but is very easy to extend. If you want your own storage engine, feel free to extend the token_throttler.storage.BucketStorage class.

1) Redis storage example:

import pickle
from datetime import timedelta
from typing import Union

from redis import StrictRedis

from token_throttler import TokenBucket, TokenThrottler
from token_throttler.storage import BucketStorage


class RedisStorage(BucketStorage):
    def __init__(self, connection_string: str, delimiter: str) -> None:
        super().__init__()
        self.redis: StrictRedis = StrictRedis.from_url(url=connection_string)
        self.delimiter: str = delimiter

    def _create_bucket(self, cache_key: str) -> TokenBucket:
        bucket_info: list[str] = cache_key.split(self.delimiter)
        token_bucket: TokenBucket = TokenBucket(
            int(bucket_info[1]), int(bucket_info[-1])
        )
        token_bucket.cost = int(bucket_info[2])
        token_bucket.identifier = bucket_info[0]
        return token_bucket

    def _delete_bucket(self, cache_key: str) -> None:
        self.redis.delete(cache_key)

    def _save_bucket(self, cache_key: str, bucket: TokenBucket) -> None:
        self.redis.setex(
            cache_key,
            timedelta(seconds=bucket.replenish_time),
            pickle.dumps(bucket),
        )

    def get_bucket(self, identifier: str, bucket_key: str) -> Union[TokenBucket, None]:
        cache_key: Union[str, None] = self.get(identifier, {}).get(bucket_key, None)
        if not cache_key:
            return None
        bucket: Union[bytes, None] = self.redis.get(cache_key)
        if not bucket:
            return None
        return pickle.loads(bucket)

    def get_all_buckets(self, identifier: str) -> Union[dict[str, TokenBucket], None]:
        buckets: dict[str, TokenBucket] = {}
        stored_buckets: dict[str, str] = self.get(identifier, None)
        if not stored_buckets:
            return None
        for bucket_key in stored_buckets:
            bucket: Union[TokenBucket, None] = self.get_bucket(identifier, bucket_key)
            if not bucket:
                continue
            buckets[bucket_key] = bucket
        return None if not buckets else buckets

    def add_bucket(self, bucket: TokenBucket) -> None:
        cache_key: str = f"{self.delimiter}".join(
            map(
                str,
                [
                    bucket.identifier,
                    bucket.replenish_time,
                    bucket.cost,
                    bucket.max_tokens,
                ],
            )
        )
        self[str(bucket.identifier)][str(bucket.replenish_time)] = cache_key
        self._save_bucket(cache_key, bucket)

    def remove_bucket(self, identifier: str, bucket_key: str) -> None:
        if identifier not in self:
            return None
        bucket: Union[str, None] = self.get(identifier, {}).get(bucket_key, None)
        if bucket:
            self._delete_bucket(self[identifier][bucket_key])
            del self[identifier][bucket_key]
        if not self[identifier]:
            del self[identifier]

    def remove_all_buckets(self, identifier: str) -> None:
        if identifier not in self:
            return None
        for bucket_key in self[identifier]:
            self._delete_bucket(self[identifier][bucket_key])
        del self[identifier]

    def replenish(self, bucket: TokenBucket) -> None:
        pass

    def consume(self, identifier: str, bucket_key: str) -> bool:
        cache_key: str = self[identifier][bucket_key]
        bucket: Union[TokenBucket, None] = self.get_bucket(identifier, bucket_key)
        if not bucket:
            bucket = self._create_bucket(cache_key)
            self.add_bucket(bucket)
        bucket_state: bool = bucket.consume()
        self._save_bucket(cache_key, bucket)
        return bucket_state


throttler: TokenThrottler = TokenThrottler(1, RedisStorage(connection_string="connection-string-to-redis", delimiter="||"))
...

5. Configuration

Token throttler supports global configuration by making use of ThrottlerConfig class.

Configuration params:

  • IDENTIFIER_FAIL_SAFE - if invalid identifier is given as a param for the consume method and IDENTIFIER_FAIL_SAFE is set to True, no KeyError exception will be raised and consume will act like a limitless bucket is being consumed.
  • ENABLE_THREAD_LOCK - if set to True, throttler will acquire a thread lock upon calling consume method and release the lock once the consume is finished. This avoids various race conditions at a slight performance cost.

Configuration usage

from token_throttler import ThrottlerConfig, TokenBucket, TokenThrottler
from token_throttler.storage import RuntimeStorage

ThrottlerConfig.set({
   "ENABLE_THREAD_LOCK": False,
   "IDENTIFIER_FAIL_SAFE": True,
})
throttler: TokenThrottler = TokenThrottler(1, RuntimeStorage())
throttler.add_bucket("hello_world", TokenBucket(10, 10))
throttler.add_bucket("hello_world", TokenBucket(30, 20))
...

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

token-throttler-1.1.0.tar.gz (9.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

token_throttler-1.1.0-py3-none-any.whl (12.4 kB view details)

Uploaded Python 3

File details

Details for the file token-throttler-1.1.0.tar.gz.

File metadata

  • Download URL: token-throttler-1.1.0.tar.gz
  • Upload date:
  • Size: 9.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.13 CPython/3.10.2 Linux/5.16.14-1-MANJARO

File hashes

Hashes for token-throttler-1.1.0.tar.gz
Algorithm Hash digest
SHA256 35525fa6176fc634f8f768732300954a83e32f795a498f77bbef8eae376bd10e
MD5 4a94f8b7a18abf5aeed836e4b9d61d5f
BLAKE2b-256 8fa237fb44d0a6384b1b4aaf7f98e68411a3abacd551464517bfc67115e7a9d6

See more details on using hashes here.

File details

Details for the file token_throttler-1.1.0-py3-none-any.whl.

File metadata

  • Download URL: token_throttler-1.1.0-py3-none-any.whl
  • Upload date:
  • Size: 12.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.13 CPython/3.10.2 Linux/5.16.14-1-MANJARO

File hashes

Hashes for token_throttler-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e9bdb25e84af1110dd6c3da8290213932dad73cd08b9465fcde1a53071344cae
MD5 0f71c6d0a9763a3e07fe8582b0fa9654
BLAKE2b-256 fd07b8d52f5f86776a8bd1abe8ff2ccc1a9693772c8c877c0933de00a33946b2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page