Skip to main content

Async transport for httpx to implement various rate limiting (using a centralized redis as backend)

Project description

httpx-rate-limiter-transport

Python Badge UV Badge Mergify Badge Renovate Badge MIT Licensed

What is it?

This project provides an async transport for httpx to implement various rate limiting (using a centralized redis as backend).

[!NOTE] You can read some details about httpx transports on this page.

Features

  • ✅ Limit the total number of concurrent outgoing requests (to any host)
  • ✅ Limit the number of concurrent requests per host
  • ✅ Provide your own logic/limit
    • for example: you can limit the number of concurrent requests by HTTP method or only for some given hosts...
  • ✅ TTL to avoid blocking the semaphore forever (in some special cases like computer crash or network issues at the very wrong moment)
  • ✅ Can wrap another transport (if you already use one)
  • ✅ Multiple limits support
  • ✅ Redis backend for distributed rate limiting

Roadmap

  • Add a "request per minute" rate limiting
  • Multiple limits
  • Logging
  • Sync version

Installation

pip install httpx-rate-limiter-transport

(or the same with your favorite package manager)

Quickstart

Here's a simple example that demonstrates the basic usage:

import asyncio
import httpx
from httpx_rate_limiter_transport.limit import (
    ByHostConcurrencyRateLimit,
    GlobalConcurrencyRateLimit,
)
from httpx_rate_limiter_transport.transport import ConcurrencyRateLimiterTransport
from async_redis_rate_limiters import DistributedSemaphoreManager


def get_httpx_client() -> httpx.AsyncClient:
    transport = ConcurrencyRateLimiterTransport(
        limits=[
            # Global limit: no more than 10 concurrent requests to any host
            GlobalConcurrencyRateLimit(concurrency_limit=10),
            # Per-host limit: no more than 1 concurrent request per host
            ByHostConcurrencyRateLimit(concurrency_limit=1),
        ],
        semaphore_manager=DistributedSemaphoreManager(
            redis_url="redis://localhost:6379", redis_ttl=300
        ),
    )
    return httpx.AsyncClient(transport=transport, timeout=300)


async def request(n: int):
    client = get_httpx_client()
    async with client:
        # This will respect the rate limits - only 1 request per host
        # will execute concurrently, with a global max of 10
        futures = [client.get("https://www.google.com/") for _ in range(n)]
        res = await asyncio.gather(*futures)
        for r in res:
            print(r.status_code)


if __name__ == "__main__":
    # This will make 10 requests, but only 1 will execute at a time
    # due to the per-host limit
    asyncio.run(request(10))

Expected behavior: The requests will be rate-limited - only 1 request to google.com will execute at a time, even though we're trying to make 10 concurrent requests.

How-to

How to get a concurrency limit for only one given host?

To get a concurrency limit only for a given host, you can use a SingleHostConcurrencyRateLimit limit object.

from async_redis_rate_limiters import DistributedSemaphoreManager
import httpx
from httpx_rate_limiter_transport.limit import (
    SingleHostConcurrencyRateLimit,
)
from httpx_rate_limiter_transport.transport import ConcurrencyRateLimiterTransport


def get_httpx_client() -> httpx.AsyncClient:
    transport = ConcurrencyRateLimiterTransport(
        limits=[
            # Limit the number of concurrent requests to 10 for any host matching *.foobar.com
            SingleHostConcurrencyRateLimit(
                concurrency_limit=10, host="*.foobar.com", fnmatch_pattern=True
            ),
        ],
        semaphore_manager=DistributedSemaphoreManager(
            redis_url="redis://localhost:6379", redis_ttl=300
        ),
    )
    return httpx.AsyncClient(transport=transport, timeout=300)
How to implement your own custom logic?

You can use a CustomConcurrencyRateLimit object with a custom hook to implement your own logic.

If the hook returns None, this concurrency limit is deactivated. If the hook returns a key (as a string), we count/limit the number of concurrent requests per distinct key.

from async_redis_rate_limiters import DistributedSemaphoreManager
import httpx
from httpx_rate_limiter_transport.limit import CustomConcurrencyRateLimit
from httpx_rate_limiter_transport.transport import ConcurrencyRateLimiterTransport


def concurrency_key_hook(request: httpx.Request) -> str | None:
    if request.url.host == "www.foobar.com" and request.method == "POST":
        return "post on www.foobar.com"
    return None  # no concurrency limit


def get_httpx_client() -> httpx.AsyncClient:
    transport = ConcurrencyRateLimiterTransport(
        limits=[
            CustomConcurrencyRateLimit(
                concurrency_limit=10, concurrency_key_hook=concurrency_key_hook
            )
        ],
        semaphore_manager=DistributedSemaphoreManager(
            redis_url="redis://localhost:6379", redis_ttl=300
        ),
    )
    return httpx.AsyncClient(transport=transport, timeout=300)
How to wrap another httpx transport?

If you already use a specific httpx transport, you can wrap it inside this one.

import httpx
from httpx_rate_limiter_transport.transport import ConcurrencyRateLimiterTransport
from async_redis_rate_limiters import DistributedSemaphoreManager


def get_httpx_client() -> httpx.AsyncClient:
    original_transport = httpx.AsyncHTTPTransport(retries=3)
    transport = ConcurrencyRateLimiterTransport(
        inner_transport=original_transport,  # let's wrap the original transport
        semaphore_manager=DistributedSemaphoreManager(
            redis_url="redis://localhost:6379", redis_ttl=300
        ),
    )
    return httpx.AsyncClient(transport=transport, timeout=300)

Contributing

Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.

Development Setup

  1. Fork the repository
  2. Create a feature branch
  3. Install development dependencies: make sync
  4. Run lint: make lint
  5. Run tests: make test
  6. Submit a pull request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

httpx_rate_limiter_transport-0.4.1.tar.gz (7.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

httpx_rate_limiter_transport-0.4.1-py3-none-any.whl (7.4 kB view details)

Uploaded Python 3

File details

Details for the file httpx_rate_limiter_transport-0.4.1.tar.gz.

File metadata

File hashes

Hashes for httpx_rate_limiter_transport-0.4.1.tar.gz
Algorithm Hash digest
SHA256 277e9a76b470939ca207d03b6626a9d4024bf50ad0f3726feb2f9913c6231e46
MD5 4826393dc7f17c857b3897018bc90a48
BLAKE2b-256 1b76250148b20a8097f868bb30429c566993450cb73d10aca59cc3f43a332ee9

See more details on using hashes here.

File details

Details for the file httpx_rate_limiter_transport-0.4.1-py3-none-any.whl.

File metadata

File hashes

Hashes for httpx_rate_limiter_transport-0.4.1-py3-none-any.whl
Algorithm Hash digest
SHA256 38479a2f8ddf6303a6eab613734eb7394302b1fe2bca2ec324aa671905c84168
MD5 19c523df0e56744385bf0160c94bed92
BLAKE2b-256 6cc7116ad5f3af0e0ccfab28ba5b911c94840329eb16c5c17b5ee961be077423

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page