Skip to main content

Python Rate-Limiter using Leaky-Bucket Algorimth Family

Project description

PyrateLimiter

The request rate limiter using Leaky-bucket algorithm

PyPI version PyPI - Python Versions codecov Maintenance PyPI license


Introduction

This module can be used to apply rate-limit for API request. User defines window duration and the limit of function calls within such interval.

Available modules

from pyrate_limiter import (
    BucketFullException,
    Duration,
    RequestRate,
    Limiter,
    MemoryListBucket,
    MemoryQueueBucket,
    SQLiteBucket,
    RedisBucket,
    RedisClusterBucket,
)

Bucket backends

A few different bucket backends are available, which can be selected using the bucket_class argument for Limiter. Any additional backend-specific arguments can be passed via bucket_kwargs.

Memory

The default bucket is stored in memory, backed by a queue.Queue. A list implementation is also available:

from pyrate_limiter import Limiter, MemoryListBucket

limiter = Limiter(bucket_class=MemoryListBucket)

SQLite

If you need to persist the bucket state, a SQLite backend is available.

By default it will store the state in the system temp directory, and you can use the path argument to use a different location:

from pyrate_limiter import Limiter, SQLiteBucket

limiter = Limiter(
    bucket_class=SQLiteBucket,
    bucket_kwargs={'path': '/tmp/pyrate_limiter.sqlite'},
)

Redis

If you have a larger, distributed application, Redis is an ideal backend. This option requires redis-py.

You can use the redis_pool argument to pass any connection settings:

from pyrate_limiter import Limiter, RedisBucket
from redis import ConnectionPool

redis_pool = ConnectionPool(host='localhost', port=6379, db=0)
limiter = Limiter(
    bucket_class=RedisBucket,
    bucket_kwargs={'redis_pool': redis_pool},
)

Redis clusters are also supported, which requires redis-py-cluster:

from pyrate_limiter import Limiter, RedisClusterBucket

limiter = Limiter(bucket_class=RedisClusterBucket)

Custom backends

If these don't suit your needs, you can also create your own bucket backend by extending pyrate_limiter.bucket.AbstractBucket.

Strategies

Subscription strategies

Considering API throttling logic for usual business models of Subscription, we usually see strategies somewhat similar to these.

Some commercial/free API (Linkedin, Github etc)
- 500 requests/hour, and
- 1000 requests/day, and
- maximum 10,000 requests/month
  • RequestRate class is designed to describe this strategies - eg for the above strategies we have a Rate-Limiter defined as following
hourly_rate = RequestRate(500, Duration.HOUR) # maximum 500 requests/hour
daily_rate = RequestRate(1000, Duration.DAY) # maximum 1000 requests/day
monthly_rate = RequestRate(10000, Duration.MONTH) # and so on

limiter = Limiter(hourly_rate, daily_rate, monthly_rate, *other_rates, bucket_class=MemoryListBucket) # default is MemoryQueueBucket

# usage
identity = user_id # or ip-address, or maybe both
limiter.try_acquire(identity)

As the logic is pretty self-explainatory, note that the superior rate-limit must come after the inferiors, ie 1000 req/day must be declared after an hourly-rate-limit, and the daily-limit must be larger than hourly-limit.

  • bucket_class is the type of bucket that holds request. It could be an in-memory data structure like Python List (MemoryListBucket), or Queue MemoryQueueBucket.

  • For microservices or decentralized platform, multiple rate-Limiter may share a single store for storing request-rate history, ie Redis. This lib provides ready-to-use RedisBucket (redis-py is required), and RedisClusterBucket (with redis-py-cluster being required). The usage difference is when using Redis, a naming prefix must be provide so the keys can be distinct for each item's identity.

from pyrate_limiter.bucket import RedisBucket, RedisClusterBucket
from redis import ConnectionPool

redis_pool = ConnectionPool.from_url('redis://localhost:6379')

rate = RequestRate(3, 5 * Duration.SECOND)

bucket_kwargs = {
    "redis_pool": redis_pool,
    "bucket_name": "my-ultimate-bucket-prefix"
}

# so each item buckets will have a key name as
# my-ultimate-bucket-prefix__item-identity

limiter = Limiter(rate, bucket_class=RedisBucket, bucket_kwargs=bucket_kwargs)
# or RedisClusterBucket when used with a redis cluster
# limiter = Limiter(rate, bucket_class=RedisClusterBucket, bucket_kwargs=bucket_kwargs)
item = 'vutran_item'
limiter.try_acquire(item)

BucketFullException

If the Bucket is full, an exception BucketFullException will be raised, with meta-info about the identity it received, the rate that has raised, and the remaining time until the next request can be processed.

rate = RequestRate(3, 5 * Duration.SECOND)
limiter = Limiter(rate)
item = 'vutran'

has_raised = False
try:
    for _ in range(4):
        limiter.try_acquire(item)
        sleep(1)
except BucketFullException as err:
    has_raised = True
    assert str(err)
    # Bucket for vutran with Rate 3/5 is already full
    assert isinstance(err.meta_info, dict)
    # {'error': 'Bucket for vutran with Rate 3/5 is already full', 'identity': 'tranvu', 'rate': '5/5', 'remaining_time': 2}
  • *RequestRate may be required to reset on a fixed schedule, eg: every first-day of a month

Decorator

Rate-limiting is also available in decorator form, using Limiter.ratelimit. Example:

@limiter.ratelimit(item)
def my_function():
    do_stuff()

As with Limiter.try_acquire, if calls to the wrapped function exceed the rate limits you defined, a BucketFullException will be raised.

Rate-limiting delays

In some cases, you may want to simply slow down your calls to stay within the rate limits instead of canceling them. In that case you can use the delay flag, optionally with a max_delay (in seconds) that you are willing to wait in between calls.

Example:

@limiter.ratelimit(item, delay=True, max_delay=10)
def my_function():
    do_stuff()

In this case, calls may be delayed by at most 10 seconds to stay within the rate limits; any longer than that, and a BucketFullException will be raised instead. Without specifying max_delay, calls will be delayed as long as necessary.

Contextmanager

Limiter.ratelimit also works as a contextmanager:

def my_function():
    with limiter.ratelimit(item, delay=True):
        do_stuff()

Async decorator/contextmanager

All the above features of Limiter.ratelimit also work on async functions:

@limiter.ratelimit(item, delay=True)
async def my_function():
    await do_stuff()

async def my_function():
    async with limiter.ratelimit(item):
        await do_stuff()

When delays are enabled, asyncio.sleep will be used instead of time.sleep.

Examples

To prove that pyrate-limiter is working as expected, here is a complete example to demonstrate rate-limiting with delays:

from time import perf_counter as time
from pyrate_limiter import Duration, Limiter, RequestRate

limiter = Limiter(RequestRate(5, Duration.SECOND))
n_requests = 27

@limiter.ratelimit("test", delay=True)
def limited_function(start_time):
    print(f"t + {(time() - start_time):.5f}")

start_time = time()
for _ in range(n_requests):
    limited_function(start_time)
print(f"Ran {n_requests} requests in {time() - start_time:.5f} seconds")

And an equivalent example for async usage:

import asyncio
from time import perf_counter as time
from pyrate_limiter import Duration, Limiter, RequestRate

limiter = Limiter(RequestRate(5, Duration.SECOND))
n_requests = 27

@limiter.ratelimit("test", delay=True)
async def limited_function(start_time):
    print(f"t + {(time() - start_time):.5f}")

async def test_ratelimit():
    start_time = time()
    tasks = [limited_function(start_time) for _ in range(n_requests)]
    await asyncio.gather(*tasks)
    print(f"Ran {n_requests} requests in {time() - start_time:.5f} seconds")

asyncio.run(test_ratelimit())

Limiter can be used with any custom time source. For example user may want to use Redis time to use same time in distributed installation. By default time.monotonic is used. To adjust time source use time_function parameter with any no arguments function.

from datetime import datetime
from pyrate_limiter import Duration, Limiter, RequestRate
from time import time

limiter_datetime = Limiter(RequestRate(5, Duration.SECOND), time_function=lambda: datetime.utcnow().timestamp())
limiter_time = Limiter(RequestRate(5, Duration.SECOND), time_function=time)

Spam-protection strategies

  • Sometimes, we need a rate-limiter to protect our API from spamming/ddos attack. Some usual strategies for this could be as following
1. No more than 100 requests/minute, or
2. 100 request per minute, and no more than 300 request per hour

Throttling handling

When the number of incoming requets go beyond the limit, we can either do..

1. Raise a 429 Http Error, or
2. Keep the incoming requests, wait then slowly process them one by one.

More complex scenario

https://www.keycdn.com/support/rate-limiting#types-of-rate-limits

  • *Sometimes, we may need to apply specific rate-limiting strategies based on schedules/region or some other metrics. It requires the capability to switch the strategies instantly without re-deploying the whole service.

Development

Setup & Commands

  • To setup local development, Poetry and Python 3.6 is required. Python can be installed using Pyenv or normal installation from binary source. To install poetry, follow the official guideline (https://python-poetry.org/docs/#installation).

Then, in the repository directory...

$ poetry install
  • Other than built-in Poetry commands, there are some custom commands defined in scripts.py. What you should care about are:
    • Run test with: poetry run test
    • Format code base: poetry run format
    • To run test with coverage: poetry run cover
    • Check for lint error: poetry run lint

Guideline & Notes

We have CICD running on Travis to do the checking, testing and publishing work. So, there are few small notes when making Pull Request:

  • All existing tests must pass (Of course!)
  • Reduction in Coverage shall result in failure. (below 98% is not accepted)
  • When you are making bug fixes, or adding more features, remember to bump the version number in pyproject.toml. The number should follow semantic-versioning rules

Notes

Todo-items marked with (*) are planned for v3 release.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyrate-limiter-2.6.0.tar.gz (17.0 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page