Leaky bucket implementation in python with different persistence options
Project description
Py Leaky Bucket
An implementation of the leaky bucket algorithm in python, with different persistence options for use in high throughput, multi process/worker applications. This is a useful package when managing integrations with various API's that have different rate limits.
What's in it?
The package includes:
- Leaky Bucket Algorithm: A flexible rate-limiting algorithm that supports various persistence backends.
- Persistence Backends:
- In-Memory: Fast, lightweight, and great for single-process applications.
- Redis: Suitable for distributed, multi-worker environments.
- SQLite: A file-based backend that supports high concurrency in single-node setups (honestly not the best option for high-throughput due to the nature of sqlite and the need to deadlock the database - better to use the redis option if possible)
- Hourly Limit Support: Control total operations over a rolling hourly window in addition to per-second rate limits.
- Thread-Safe and Process-Safe: Implements proper locking mechanisms to ensure safe concurrent usage.
- Asynchronous and Synchronous: Works in both
asyncio-based and synchronous applications. - Decorators and Context Managers: Simplify integration with your existing functions and methods.
Installation
Easy to install:
pip install leaky-bucket-py
Usage:
Redis backend with async bucket:
import asyncio
import redis
from leakybucket.bucket import AsyncLeakyBucket
from leakybucket.persistence.redis import RedisLeakyBucketStorage
# Connect to Redis
redis_conn = redis.Redis(host='localhost', port=6379, db=0)
# Create a new Redis storage backend
storage = RedisLeakyBucketStorage(
redis_conn,
redis_key="api_bucket",
max_rate=5,
time_period=1
)
# Create a new LeakyBucket instance
bucket = AsyncLeakyBucket(storage)
# Make requests using the bucket as a context manager
async def make_requests():
async def make_request():
async with bucket: # block if the rate limit is exceeded
print("Making request")
await asyncio.sleep(1)
await asyncio.gather(*[make_request() for i in range(10)])
# or use a decorator to rate limit a coroutine
@bucket.throttle()
async def make_request(index):
print(f"Making request {index}")
await asyncio.sleep(1)
async def main():
await make_requests()
await asyncio.gather(*[make_request(i) for i in range(10)])
asyncio.run(main())
Memory backend:
Synchronous:
import httpx
from leakybucket.bucket import LeakyBucket
from leakybucket.persistence.memory import InMemoryLeakyBucketStorage
# Create a new Memory storage backend (3 requests per second)
storage = InMemoryLeakyBucketStorage(max_rate=3, time_period=1)
# Create a new LeakyBucket instance
throttler = LeakyBucket(storage)
@throttler.throttle()
def fetch_data(api_url: str):
response = httpx.get(api_url)
data = response.json()
print(data)
return data
def main():
# make multiple requests
api_url = "https://jsonplaceholder.typicode.com/posts/1"
results = []
for _ in range(10):
results.append(fetch_data(api_url))
print(results)
main()
Asynchronous:
import asyncio
import httpx
from leakybucket.bucket import AsyncLeakyBucket
from leakybucket.persistence.memory import InMemoryLeakyBucketStorage
# Create a new Memory storage backend (3 requests per second)
storage = InMemoryLeakyBucketStorage(max_rate=3, time_period=1)
# Create a new LeakyBucket instance
async_throttler = AsyncLeakyBucket(storage)
@async_throttler.throttle()
async def async_fetch_data(api_url):
async with httpx.AsyncClient() as client:
response = await client.get(api_url)
data = response.json()
print(data)
return data
async def main():
# make multiple requests
api_url = "https://jsonplaceholder.typicode.com/posts/1"
tasks = [async_fetch_data(api_url) for _ in range(10)]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
Sqlite backend:
import time
from leakybucket.bucket import LeakyBucket
from leakybucket.persistence.sqlite import SqliteLeakyBucketStorage
# Create a shared SQLite bucket
bucket = LeakyBucket(
SqliteLeakyBucketStorage(
db_path="leakybucket.db",
max_rate=10,
time_period=10
)
)
# Decorate the function
@bucket.throttle()
def make_request(index):
print(f"Making request {index}")
def main():
for i in range(35):
make_request(i)
main()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file leaky_bucket_py-0.1.3.tar.gz.
File metadata
- Download URL: leaky_bucket_py-0.1.3.tar.gz
- Upload date:
- Size: 11.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.11.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ca88b95e506a8f70437d0694fb0a7f91d5f0f8c7354bce9b5ae2e309f89b508f
|
|
| MD5 |
9084374647e4c72342df8835f691fe1e
|
|
| BLAKE2b-256 |
854f9111cf73bb3be09314f7c3df889bfc1e51bb204a1f67a53c6c2ae061b5f3
|
File details
Details for the file leaky_bucket_py-0.1.3-py3-none-any.whl.
File metadata
- Download URL: leaky_bucket_py-0.1.3-py3-none-any.whl
- Upload date:
- Size: 13.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.11.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1c591a0efb3566785ffe9fa7113edf9f917d37dbf843517fb48adeb1508b3a3d
|
|
| MD5 |
6ff3bae63f84210b523435b5a2fb81e8
|
|
| BLAKE2b-256 |
2003494f9430bf30064d39e356100c9595953e8b449394531bfe1e15f5a87ea6
|