Skip to main content

A flexible API rate limiting library for Python

Project description

Ratehawk

Ratehawk is a flexible API rate limiting library for Python. It allows you to easily implement rate limiting in your applications to prevent abuse and ensure fair usage of your APIs.

Installation

You can install Ratehawk using pip:

pip install ratehawk

Usage

Here are some examples of how to use Ratehawk in your Python applications:

Basic Usage

from ratehawk import RateLimiter

# Create a rate limiter with a limit of 10 requests per minute
rate_limiter = RateLimiter(limits=[(10, 60)])

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using Redis Storage

from ratehawk import RateLimiter
from ratehawk.storage.redis import RedisStorage
from redis import Redis

# Create a Redis client
redis_client = Redis(host='localhost', port=6379, db=0)

# Create a rate limiter with Redis storage
rate_limiter = RateLimiter(limits=[(10, 60)], storage=RedisStorage(redis_client))

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using Postgres Storage

from ratehawk import RateLimiter
from ratehawk.storage.postgres import PostgresStorage

# Create a rate limiter with Postgres storage
rate_limiter = RateLimiter(limits=[(10, 60)], storage=PostgresStorage(dsn="postgresql://localhost/ratehawk"))

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using SQLite Storage

from ratehawk import RateLimiter
from ratehawk.storage.sqlite import SQLiteStorage

# Create a rate limiter with SQLite storage
rate_limiter = RateLimiter(limits=[(10, 60)], storage=SQLiteStorage(db_path="ratehawk.db"))

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using RateLimitMetrics

from ratehawk import RateLimiter
from ratehawk.monitoring.metrics import RateLimitMetrics

# Create a rate limiter with metrics monitoring
rate_limiter = RateLimiter(limits=[(10, 60)])
rate_limiter.metrics = RateLimitMetrics()

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using RateLimitLogger

from ratehawk import RateLimiter
from ratehawk.monitoring.logging import RateLimitLogger

# Create a rate limiter with logging
rate_limiter = RateLimiter(limits=[(10, 60)])
rate_limiter.logger = RateLimitLogger()

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using RateLimitEvent

from ratehawk import RateLimiter
from ratehawk.monitoring.events import RateLimitEvent, EventEmitter

# Create a rate limiter with event monitoring
rate_limiter = RateLimiter(limits=[(10, 60)])
rate_limiter.events = EventEmitter()

# Register event handlers
def on_limit_exceeded(key, retry_after):
    print(f"Rate limit exceeded for key: {key}, retry after: {retry_after}s")

def on_near_limit(key, current, limit):
    print(f"Near rate limit for key: {key}, current: {current}, limit: {limit}")

def on_reset(key):
    print(f"Rate limit reset for key: {key}")

rate_limiter.events.on(RateLimitEvent.LIMIT_EXCEEDED, on_limit_exceeded)
rate_limiter.events.on(RateLimitEvent.NEAR_LIMIT, on_near_limit)
rate_limiter.events.on(RateLimitEvent.RESET, on_reset)

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using Multiple Rate Limits per Key

from ratehawk import RateLimiter

# Create a rate limiter with multiple rate limits per key
rate_limiter = RateLimiter(limits=[(10, 60), (100, 3600)])

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using Rate Limit Groups and Hierarchies

from ratehawk import RateLimiter

# Create a rate limiter with rate limit groups and hierarchies
rate_limiter = RateLimiter(
    limits=[(10, 60)],
    burst_limits=[(20, 15)],
    quota_limits=[(1000, 86400)]
)

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using Dynamic Rate Limits

from ratehawk import RateLimiter

# Function to dynamically determine rate limits based on user attributes or time of day
def dynamic_limits_func():
    # Example: Different rate limits for different user roles
    user_role = get_user_role()
    if user_role == "premium":
        return {"limits": [(100, 60)], "burst_limits": [(200, 15)], "quota_limits": [(5000, 86400)]}
    else:
        return {"limits": [(10, 60)], "burst_limits": [(20, 15)], "quota_limits": [(1000, 86400)]}

# Create a rate limiter with dynamic rate limits
rate_limiter = RateLimiter(
    limits=[(10, 60)],
    dynamic_limits_func=dynamic_limits_func
)

# Apply dynamic rate limits
await rate_limiter.apply_dynamic_limits()

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using Rate Limit Quotas

from ratehawk import RateLimiter

# Create a rate limiter with rate limit quotas over longer periods
rate_limiter = RateLimiter(
    limits=[(10, 60)],
    quota_limits=[(1000, 86400)]
)

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

# Check if the quota limit is exceeded
if await rate_limiter.check_quota():
    print("Quota limit not exceeded")
else:
    print("Quota limit exceeded")

# Increment the quota limit counter
try:
    await rate_limiter.increment_quota()
    print("Request allowed")
except RateLimitExceeded:
    print("Quota limit exceeded, try again later")

Examples

For more detailed examples, please refer to the following files in the docs folder:

License

This project is licensed under the MIT License. See the LICENSE file for more details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ratehawk-0.1.5.tar.gz (14.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ratehawk-0.1.5-py3-none-any.whl (14.3 kB view details)

Uploaded Python 3

File details

Details for the file ratehawk-0.1.5.tar.gz.

File metadata

  • Download URL: ratehawk-0.1.5.tar.gz
  • Upload date:
  • Size: 14.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.3

File hashes

Hashes for ratehawk-0.1.5.tar.gz
Algorithm Hash digest
SHA256 33f99264281b3b9e6455dfb385fc718af4a1affaa85d3a3d2a91dac4095ba454
MD5 01fae2472ff3936920df1569e9a92150
BLAKE2b-256 c9e4948d1b3d305ad599813c64036eb58d5ee2d26e932b55d3fc525592bd4b77

See more details on using hashes here.

File details

Details for the file ratehawk-0.1.5-py3-none-any.whl.

File metadata

  • Download URL: ratehawk-0.1.5-py3-none-any.whl
  • Upload date:
  • Size: 14.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.3

File hashes

Hashes for ratehawk-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 56cf5e6629abcd6f0bf5f73fbd2e284618a8beb8ba1df213e073e0257d194fe4
MD5 a55fc0ed57e10a1084686133221a9c86
BLAKE2b-256 baa876dc99a1dfafbc076e334034e1735308f55b30daac423cc628332c39346f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page