Skip to main content

A flexible API rate limiting library for Python

Project description

Ratehawk

Ratehawk is a flexible API rate limiting library for Python. It allows you to easily implement rate limiting in your applications to prevent abuse and ensure fair usage of your APIs.

Installation

You can install Ratehawk using pip:

pip install ratehawk

Usage

Here are some examples of how to use Ratehawk in your Python applications:

Basic Usage

from ratehawk import RateLimiter

# Create a rate limiter with a limit of 10 requests per minute
rate_limiter = RateLimiter(limits=[(10, 60)])

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using Redis Storage

from ratehawk import RateLimiter
from ratehawk.storage.redis import RedisStorage
from redis import Redis

# Create a Redis client
redis_client = Redis(host='localhost', port=6379, db=0)

# Create a rate limiter with Redis storage
rate_limiter = RateLimiter(limits=[(10, 60)], storage=RedisStorage(redis_client))

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using Postgres Storage

from ratehawk import RateLimiter
from ratehawk.storage.postgres import PostgresStorage

# Create a rate limiter with Postgres storage
rate_limiter = RateLimiter(limits=[(10, 60)], storage=PostgresStorage(dsn="postgresql://localhost/ratehawk"))

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using SQLite Storage

from ratehawk import RateLimiter
from ratehawk.storage.sqlite import SQLiteStorage

# Create a rate limiter with SQLite storage
rate_limiter = RateLimiter(limits=[(10, 60)], storage=SQLiteStorage(db_path="ratehawk.db"))

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using RateLimitMetrics

from ratehawk import RateLimiter
from ratehawk.monitoring.metrics import RateLimitMetrics

# Create a rate limiter with metrics monitoring
rate_limiter = RateLimiter(limits=[(10, 60)])
rate_limiter.metrics = RateLimitMetrics()

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using RateLimitLogger

from ratehawk import RateLimiter
from ratehawk.monitoring.logging import RateLimitLogger

# Create a rate limiter with logging
rate_limiter = RateLimiter(limits=[(10, 60)])
rate_limiter.logger = RateLimitLogger()

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using RateLimitEvent

from ratehawk import RateLimiter
from ratehawk.monitoring.events import RateLimitEvent, EventEmitter

# Create a rate limiter with event monitoring
rate_limiter = RateLimiter(limits=[(10, 60)])
rate_limiter.events = EventEmitter()

# Register event handlers
def on_limit_exceeded(key, retry_after):
    print(f"Rate limit exceeded for key: {key}, retry after: {retry_after}s")

def on_near_limit(key, current, limit):
    print(f"Near rate limit for key: {key}, current: {current}, limit: {limit}")

def on_reset(key):
    print(f"Rate limit reset for key: {key}")

rate_limiter.events.on(RateLimitEvent.LIMIT_EXCEEDED, on_limit_exceeded)
rate_limiter.events.on(RateLimitEvent.NEAR_LIMIT, on_near_limit)
rate_limiter.events.on(RateLimitEvent.RESET, on_reset)

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using Multiple Rate Limits per Key

from ratehawk import RateLimiter

# Create a rate limiter with multiple rate limits per key
rate_limiter = RateLimiter(limits=[(10, 60), (100, 3600)])

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using Rate Limit Groups and Hierarchies

from ratehawk import RateLimiter

# Create a rate limiter with rate limit groups and hierarchies
rate_limiter = RateLimiter(
    limits=[(10, 60)],
    burst_limits=[(20, 15)],
    quota_limits=[(1000, 86400)]
)

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using Dynamic Rate Limits

from ratehawk import RateLimiter

# Function to dynamically determine rate limits based on user attributes or time of day
def dynamic_limits_func():
    # Example: Different rate limits for different user roles
    user_role = get_user_role()
    if user_role == "premium":
        return {"limits": [(100, 60)], "burst_limits": [(200, 15)], "quota_limits": [(5000, 86400)]}
    else:
        return {"limits": [(10, 60)], "burst_limits": [(20, 15)], "quota_limits": [(1000, 86400)]}

# Create a rate limiter with dynamic rate limits
rate_limiter = RateLimiter(
    limits=[(10, 60)],
    dynamic_limits_func=dynamic_limits_func
)

# Apply dynamic rate limits
await rate_limiter.apply_dynamic_limits()

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

Using Rate Limit Quotas

from ratehawk import RateLimiter

# Create a rate limiter with rate limit quotas over longer periods
rate_limiter = RateLimiter(
    limits=[(10, 60)],
    quota_limits=[(1000, 86400)]
)

# Check if the rate limit is exceeded
if await rate_limiter.check():
    print("Rate limit not exceeded")
else:
    print("Rate limit exceeded")

# Increment the rate limit counter
try:
    await rate_limiter.increment()
    print("Request allowed")
except RateLimitExceeded:
    print("Rate limit exceeded, try again later")

# Check if the quota limit is exceeded
if await rate_limiter.check_quota():
    print("Quota limit not exceeded")
else:
    print("Quota limit exceeded")

# Increment the quota limit counter
try:
    await rate_limiter.increment_quota()
    print("Request allowed")
except RateLimitExceeded:
    print("Quota limit exceeded, try again later")

License

This project is licensed under the MIT License. See the LICENSE file for more details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ratehawk-0.1.0.tar.gz (6.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ratehawk-0.1.0-py3-none-any.whl (4.9 kB view details)

Uploaded Python 3

File details

Details for the file ratehawk-0.1.0.tar.gz.

File metadata

  • Download URL: ratehawk-0.1.0.tar.gz
  • Upload date:
  • Size: 6.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.3

File hashes

Hashes for ratehawk-0.1.0.tar.gz
Algorithm Hash digest
SHA256 338a6bcdf8609fbb8dbecea3f798a6123ec8b0053b609d6515796aad9c8ea28d
MD5 6f5a62066d0be961d90b43045f0dc9b2
BLAKE2b-256 44b3380f99d423800186cf65c9ecf9793f4437d3dbf0c2beedc5ee9b547d2a4a

See more details on using hashes here.

File details

Details for the file ratehawk-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: ratehawk-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 4.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.3

File hashes

Hashes for ratehawk-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c081752be51f38c092ec73ea58fe56d0aeb0f6bccf9aaefa980917771d39a72a
MD5 3a0d07e2d98a566a9f538a6312cbbf34
BLAKE2b-256 76d459de3e96a0f182c26e477900556b5d4c4598936a0edaaee2be09ad6349e8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page