Python Rate-Limiter using Leaky-Bucket Algorimth Family
Project description
PyrateLimiter
The request rate limiter using Leaky-bucket algorithm
Introduction
This module can be used to apply rate-limit for API request. User defines window duration and the limit of function calls within such interval.
Available modules
from pyrate_limiter import (
BucketFullException,
Duration,
RequestRate,
Limiter,
MemoryListBucket,
MemoryQueueBucket,
SQLiteBucket,
RedisBucket,
RedisClusterBucket,
)
Bucket backends
A few different bucket backends are available, which can be selected using the bucket_class
argument for Limiter
. Any additional backend-specific arguments can be passed
via bucket_kwargs
.
Memory
The default bucket is stored in memory, backed by a queue.Queue
. A list implementation is also available:
from pyrate_limiter import Limiter, MemoryListBucket
limiter = Limiter(bucket_class=MemoryListBucket)
SQLite
If you need to persist the bucket state, a SQLite backend is available.
By default it will store the state in the system temp directory, and you can use
the path
argument to use a different location:
from pyrate_limiter import Limiter, SQLiteBucket
limiter = Limiter(
bucket_class=SQLiteBucket,
bucket_kwargs={'path': '/tmp/pyrate_limiter.sqlite'},
)
Redis
If you have a larger, distributed application, Redis is an ideal backend. This option requires redis-py.
You can use the redis_pool
argument to pass any connection settings:
from pyrate_limiter import Limiter, RedisBucket
from redis import ConnectionPool
redis_pool = ConnectionPool(host='localhost', port=6379, db=0)
limiter = Limiter(
bucket_class=RedisBucket,
bucket_kwargs={'redis_pool': redis_pool},
)
Redis clusters are also supported, which requires redis-py-cluster:
from pyrate_limiter import Limiter, RedisClusterBucket
limiter = Limiter(bucket_class=RedisClusterBucket)
Custom backends
If these don't suit your needs, you can also create your own bucket backend by extending pyrate_limiter.bucket.AbstractBucket
.
Strategies
Subscription strategies
Considering API throttling logic for usual business models of Subscription, we usually see strategies somewhat similar to these.
Some commercial/free API (Linkedin, Github etc)
- 500 requests/hour, and
- 1000 requests/day, and
- maximum 10,000 requests/month
-
RequestRate
class is designed to describe this strategies - eg for the above strategies we have a Rate-Limiter defined as following
hourly_rate = RequestRate(500, Duration.HOUR) # maximum 500 requests/hour
daily_rate = RequestRate(1000, Duration.DAY) # maximum 1000 requests/day
monthly_rate = RequestRate(10000, Duration.MONTH) # and so on
limiter = Limiter(hourly_rate, daily_rate, monthly_rate, *other_rates, bucket_class=MemoryListBucket) # default is MemoryQueueBucket
# usage
identity = user_id # or ip-address, or maybe both
limiter.try_acquire(identity)
As the logic is pretty self-explainatory, note that the superior rate-limit must come after the inferiors, ie 1000 req/day must be declared after an hourly-rate-limit, and the daily-limit must be larger than hourly-limit.
-
bucket_class
is the type of bucket that holds request. It could be an in-memory data structure like Python List (MemoryListBucket
), or QueueMemoryQueueBucket
. -
For microservices or decentralized platform, multiple rate-Limiter may share a single store for storing request-rate history, ie
Redis
. This lib provides ready-to-useRedisBucket
(redis-py
is required), andRedisClusterBucket
(withredis-py-cluster
being required). The usage difference is when using Redis, a namingprefix
must be provide so the keys can be distinct for each item's identity.
from pyrate_limiter.bucket import RedisBucket, RedisClusterBucket
from redis import ConnectionPool
redis_pool = ConnectionPool.from_url('redis://localhost:6379')
rate = RequestRate(3, 5 * Duration.SECOND)
bucket_kwargs = {
"redis_pool": redis_pool,
"bucket_name": "my-ultimate-bucket-prefix"
}
# so each item buckets will have a key name as
# my-ultimate-bucket-prefix__item-identity
limiter = Limiter(rate, bucket_class=RedisBucket, bucket_kwargs=bucket_kwargs)
# or RedisClusterBucket when used with a redis cluster
# limiter = Limiter(rate, bucket_class=RedisClusterBucket, bucket_kwargs=bucket_kwargs)
item = 'vutran_item'
limiter.try_acquire(item)
BucketFullException
If the Bucket is full, an exception BucketFullException
will be raised, with meta-info about the identity it received, the rate that has raised, and the remaining time until the next request can be processed.
rate = RequestRate(3, 5 * Duration.SECOND)
limiter = Limiter(rate)
item = 'vutran'
has_raised = False
try:
for _ in range(4):
limiter.try_acquire(item)
sleep(1)
except BucketFullException as err:
has_raised = True
assert str(err)
# Bucket for vutran with Rate 3/5 is already full
assert isinstance(err.meta_info, dict)
# {'error': 'Bucket for vutran with Rate 3/5 is already full', 'identity': 'tranvu', 'rate': '5/5', 'remaining_time': 2}
- *RequestRate may be required to
reset
on a fixed schedule, eg: every first-day of a month
Decorator
Rate-limiting is also available in decorator form, using Limiter.ratelimit
. Example:
@limiter.ratelimit(item)
def my_function():
do_stuff()
As with Limiter.try_acquire
, if calls to the wrapped function exceed the rate limits you
defined, a BucketFullException
will be raised.
Rate-limiting delays
In some cases, you may want to simply slow down your calls to stay within the rate limits instead of
canceling them. In that case you can use the delay
flag, optionally with a max_delay
(in seconds) that you are willing to wait in between calls.
Example:
@limiter.ratelimit(item, delay=True, max_delay=10)
def my_function():
do_stuff()
In this case, calls may be delayed by at most 10 seconds to stay within the rate limits; any longer
than that, and a BucketFullException
will be raised instead. Without specifying max_delay
, calls
will be delayed as long as necessary.
Contextmanager
Limiter.ratelimit
also works as a contextmanager:
def my_function():
with limiter.ratelimit(item, delay=True):
do_stuff()
Async decorator/contextmanager
All the above features of Limiter.ratelimit
also work on async functions:
@limiter.ratelimit(item, delay=True)
async def my_function():
await do_stuff()
async def my_function():
async with limiter.ratelimit(item):
await do_stuff()
When delays are enabled, asyncio.sleep
will be used instead of time.sleep
.
Examples
To prove that pyrate-limiter is working as expected, here is a complete example to demonstrate rate-limiting with delays:
from time import perf_counter as time
from pyrate_limiter import Duration, Limiter, RequestRate
limiter = Limiter(RequestRate(5, Duration.SECOND))
n_requests = 27
@limiter.ratelimit("test", delay=True)
def limited_function(start_time):
print(f"t + {(time() - start_time):.5f}")
start_time = time()
for _ in range(n_requests):
limited_function(start_time)
print(f"Ran {n_requests} requests in {time() - start_time:.5f} seconds")
And an equivalent example for async usage:
import asyncio
from time import perf_counter as time
from pyrate_limiter import Duration, Limiter, RequestRate
limiter = Limiter(RequestRate(5, Duration.SECOND))
n_requests = 27
@limiter.ratelimit("test", delay=True)
async def limited_function(start_time):
print(f"t + {(time() - start_time):.5f}")
async def test_ratelimit():
start_time = time()
tasks = [limited_function(start_time) for _ in range(n_requests)]
await asyncio.gather(*tasks)
print(f"Ran {n_requests} requests in {time() - start_time:.5f} seconds")
asyncio.run(test_ratelimit())
Limiter can be used with any custom time source. For example user may want to use
Redis time to use same time in distributed installation. By default time.monotonic
is
used. To adjust time source use time_function
parameter with any no arguments function.
from datetime import datetime
from pyrate_limiter import Duration, Limiter, RequestRate
from time import time
limiter_datetime = Limiter(RequestRate(5, Duration.SECOND), time_function=lambda: datetime.utcnow().timestamp())
limiter_time = Limiter(RequestRate(5, Duration.SECOND), time_function=time)
Spam-protection strategies
- Sometimes, we need a rate-limiter to protect our API from spamming/ddos attack. Some usual strategies for this could be as following
1. No more than 100 requests/minute, or
2. 100 request per minute, and no more than 300 request per hour
Throttling handling
When the number of incoming requets go beyond the limit, we can either do..
1. Raise a 429 Http Error, or
2. Keep the incoming requests, wait then slowly process them one by one.
More complex scenario
https://www.keycdn.com/support/rate-limiting#types-of-rate-limits
- *Sometimes, we may need to apply specific rate-limiting strategies based on schedules/region or some other metrics. It
requires the capability to
switch
the strategies instantly without re-deploying the whole service.
Development
Setup & Commands
- To setup local development, Poetry and Python 3.6 is required. Python can be installed using Pyenv or normal installation from binary source. To install poetry, follow the official guideline (https://python-poetry.org/docs/#installation).
Then, in the repository directory...
$ poetry install
-
Other than built-in Poetry commands, there are some custom commands defined in scripts.py. What you should care about are:
- Run test with:
poetry run test
- To run test with coverage:
poetry run cover
- Format & check for lint error:
poetry run lint
- Run test with:
-
Every commit will be checked locally with pre-commit.
Guideline & Notes
We have GitHub Action CICD to do the checking, testing and publishing work. So, there are few small notes when making Pull Request:
- All existing tests must pass (Of course!)
- Reduction in Coverage shall result in failure. (below 98% is not accepted)
- When you are making bug fixes, or adding more features, remember to bump the version number in pyproject.toml. The number should follow semantic-versioning rules
Notes
Todo-items marked with (*) are planned for v3 release.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.