Skip to main content

A flexible API rate limiting library for Python

Project description

Ratehawk

Ratehawk is a flexible API rate limiting library for Python. It allows you to easily implement rate limiting in your applications to prevent abuse and ensure fair usage of your APIs.

Installation

You can install Ratehawk using pip:

pip install ratehawk

Usage

Here are some examples of how to use Ratehawk in your Python applications:

Basic Usage

from ratehawk import RateLimiter

# Create a rate limiter with a limit of 10 requests per minute
rate_limiter = RateLimiter(limits=[(10, 60)])

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using Redis Storage

from ratehawk import RateLimiter
from ratehawk.storage.redis import RedisStorage
from redis import Redis

# Create a Redis client
redis_client = Redis(host='localhost', port=6379, db=0)

# Create a rate limiter with Redis storage
rate_limiter = RateLimiter(limits=[(10, 60)], storage=RedisStorage(redis_client))

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using Postgres Storage

from ratehawk import RateLimiter
from ratehawk.storage.postgres import PostgresStorage

# Create a rate limiter with Postgres storage
rate_limiter = RateLimiter(limits=[(10, 60)], storage=PostgresStorage(dsn="postgresql://localhost/ratehawk"))

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using SQLite Storage

from ratehawk import RateLimiter
from ratehawk.storage.sqlite import SQLiteStorage

# Create a rate limiter with SQLite storage
rate_limiter = RateLimiter(limits=[(10, 60)], storage=SQLiteStorage(db_path="ratehawk.db"))

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using RateLimitMetrics

from ratehawk import RateLimiter
from ratehawk.monitoring.metrics import RateLimitMetrics

# Create a rate limiter with metrics monitoring
rate_limiter = RateLimiter(limits=[(10, 60)])
rate_limiter.metrics = RateLimitMetrics()

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using RateLimitLogger

from ratehawk import RateLimiter
from ratehawk.monitoring.logging import RateLimitLogger

# Create a rate limiter with logging
rate_limiter = RateLimiter(limits=[(10, 60)])
rate_limiter.logger = RateLimitLogger()

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using RateLimitEvent

from ratehawk import RateLimiter
from ratehawk.monitoring.events import RateLimitEvent, EventEmitter

# Create a rate limiter with event monitoring
rate_limiter = RateLimiter(limits=[(10, 60)])
rate_limiter.events = EventEmitter()

# Register event handlers
def on_limit_exceeded(key, retry_after):
    print(f"Rate limit exceeded for key: {key}, retry after: {retry_after}s")

def on_near_limit(key, current, limit):
    print(f"Near rate limit for key: {key}, current: {current}, limit: {limit}")

def on_reset(key):
    print(f"Rate limit reset for key: {key}")

rate_limiter.events.on(RateLimitEvent.LIMIT_EXCEEDED, on_limit_exceeded)
rate_limiter.events.on(RateLimitEvent.NEAR_LIMIT, on_near_limit)
rate_limiter.events.on(RateLimitEvent.RESET, on_reset)

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using Multiple Rate Limits per Key

from ratehawk import RateLimiter

# Create a rate limiter with multiple rate limits per key
rate_limiter = RateLimiter(limits=[(10, 60), (100, 3600)])

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using Rate Limit Groups and Hierarchies

from ratehawk import RateLimiter

# Create a rate limiter with rate limit groups and hierarchies
rate_limiter = RateLimiter(
    limits=[(10, 60)],
    burst_limits=[(20, 15)],
    quota_limits=[(1000, 86400)]
)

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using Dynamic Rate Limits

from ratehawk import RateLimiter

# Function to dynamically determine rate limits based on user attributes or time of day
def dynamic_limits_func():
    # Example: Different rate limits for different user roles
    user_role = get_user_role()
    if user_role == "premium":
        return {"limits": [(100, 60)], "burst_limits": [(200, 15)], "quota_limits": [(5000, 86400)]}
    else:
        return {"limits": [(10, 60)], "burst_limits": [(20, 15)], "quota_limits": [(1000, 86400)]}

# Create a rate limiter with dynamic rate limits
rate_limiter = RateLimiter(
    limits=[(10, 60)],
    dynamic_limits_func=dynamic_limits_func
)

# Apply dynamic rate limits
await rate_limiter.apply_dynamic_limits()

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using Rate Limit Quotas

from ratehawk import RateLimiter

# Create a rate limiter with rate limit quotas over longer periods
rate_limiter = RateLimiter(
    limits=[(10, 60)],
    quota_limits=[(1000, 86400)]
)

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

# Check if the quota limit is exceeded
if await rate_limiter.check_quota():
    print("Quota limit not exceeded")
else:
    print("Quota limit exceeded")

# Increment the quota limit counter
try:
    await rate_limiter.increment_quota()
    print("Request allowed")
except RateLimitExceeded:
    print("Quota limit exceeded, try again later")

Examples

For more detailed examples, please refer to the following files in the docs folder:

License

This project is licensed under the MIT License. See the LICENSE file for more details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ratehawk-0.2.0.tar.gz (16.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ratehawk-0.2.0-py3-none-any.whl (16.8 kB view details)

Uploaded Python 3

File details

Details for the file ratehawk-0.2.0.tar.gz.

File metadata

  • Download URL: ratehawk-0.2.0.tar.gz
  • Upload date:
  • Size: 16.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.3

File hashes

Hashes for ratehawk-0.2.0.tar.gz
Algorithm Hash digest
SHA256 e4e9ba0300835cb50cd23a772b03ee286c44983375c1881186832592e9985628
MD5 cb11ab61949ca7e2430770b8b3083a57
BLAKE2b-256 f413ce66fac1a7560845f0934888fe5c539abcd48de561162f7bbc33d8db6f83

See more details on using hashes here.

File details

Details for the file ratehawk-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: ratehawk-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 16.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.3

File hashes

Hashes for ratehawk-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c2c4b0e2763dc67c68fb668c935d968744e7327245705cea6fe9bab9fe9454e3
MD5 db3aba97ee7e35bf93a6ea8d0098b4b1
BLAKE2b-256 ae7c09bcf560756c03a5a62664f4d2696c3ea30dae9d3b6df2df7a9483484592

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page