Skip to main content

Async transport for httpx to implement various rate limiting (using a centralized redis as backend)

Project description

httpx-rate-limiter-transport

Python Badge UV Badge Mergify Badge Renovate Badge MIT Licensed

What is it?

This project provides an async transport for httpx to implement various rate limiting (using a centralized redis as backend).

[!NOTE] You can read some details about httpx transports on this page.

Features

  • ✅ Limit the total number of concurrent outgoing requests (to any host)
  • ✅ Limit the number of concurrent requests per host
  • ✅ Provide your own logic/limit
    • for example: you can limit the number of concurrent requests by HTTP method or only for some given hosts...
  • ✅ TTL to avoid blocking the semaphore forever (in some special cases like computer crash or network issues at the very wrong moment)
  • ✅ Can wrap another transport (if you already use one)
  • ✅ Multiple limits support
  • ✅ Redis backend for distributed rate limiting

Roadmap

  • Add a "request per minute" rate limiting
  • Multiple limits
  • Logging
  • Sync version

Installation

pip install httpx-rate-limiter-transport

(or the same with your favorite package manager)

Quickstart

Here's a simple example that demonstrates the basic usage:

import asyncio
import httpx
from httpx_rate_limiter_transport.limit import (
    ByHostConcurrencyRateLimit,
    GlobalConcurrencyRateLimit,
)
from httpx_rate_limiter_transport.transport import ConcurrencyRateLimiterTransport
from async_redis_rate_limiters import DistributedSemaphoreManager


def get_httpx_client() -> httpx.AsyncClient:
    transport = ConcurrencyRateLimiterTransport(
        limits=[
            # Global limit: no more than 10 concurrent requests to any host
            GlobalConcurrencyRateLimit(concurrency_limit=10),
            # Per-host limit: no more than 1 concurrent request per host
            ByHostConcurrencyRateLimit(concurrency_limit=1),
        ],
        semaphore_manager=DistributedSemaphoreManager(
            redis_url="redis://localhost:6379", redis_ttl=300
        ),
    )
    return httpx.AsyncClient(transport=transport, timeout=300)


async def request(n: int):
    client = get_httpx_client()
    async with client:
        # This will respect the rate limits - only 1 request per host
        # will execute concurrently, with a global max of 10
        futures = [client.get("https://www.google.com/") for _ in range(n)]
        res = await asyncio.gather(*futures)
        for r in res:
            print(r.status_code)


if __name__ == "__main__":
    # This will make 10 requests, but only 1 will execute at a time
    # due to the per-host limit
    asyncio.run(request(10))

Expected behavior: The requests will be rate-limited - only 1 request to google.com will execute at a time, even though we're trying to make 10 concurrent requests.

How-to

How to get a concurrency limit for only one given host?

To get a concurrency limit only for a given host, you can use a SingleHostConcurrencyRateLimit limit object.

from async_redis_rate_limiters import DistributedSemaphoreManager
import httpx
from httpx_rate_limiter_transport.limit import (
    SingleHostConcurrencyRateLimit,
)
from httpx_rate_limiter_transport.transport import ConcurrencyRateLimiterTransport


def get_httpx_client() -> httpx.AsyncClient:
    transport = ConcurrencyRateLimiterTransport(
        limits=[
            # Limit the number of concurrent requests to 10 for any host matching *.foobar.com
            SingleHostConcurrencyRateLimit(
                concurrency_limit=10, host="*.foobar.com", fnmatch_pattern=True
            ),
        ],
        semaphore_manager=DistributedSemaphoreManager(
            redis_url="redis://localhost:6379", redis_ttl=300
        ),
    )
    return httpx.AsyncClient(transport=transport, timeout=300)
How to implement your own custom logic?

You can use a CustomConcurrencyRateLimit object with a custom hook to implement your own logic.

If the hook returns None, this concurrency limit is deactivated. If the hook returns a key (as a string), we count/limit the number of concurrent requests per distinct key.

from async_redis_rate_limiters import DistributedSemaphoreManager
import httpx
from httpx_rate_limiter_transport.limit import CustomConcurrencyRateLimit
from httpx_rate_limiter_transport.transport import ConcurrencyRateLimiterTransport


def concurrency_key_hook(request: httpx.Request) -> str | None:
    if request.url.host == "www.foobar.com" and request.method == "POST":
        return "post on www.foobar.com"
    return None  # no concurrency limit


def get_httpx_client() -> httpx.AsyncClient:
    transport = ConcurrencyRateLimiterTransport(
        limits=[
            CustomConcurrencyRateLimit(
                concurrency_limit=10, concurrency_key_hook=concurrency_key_hook
            )
        ],
        semaphore_manager=DistributedSemaphoreManager(
            redis_url="redis://localhost:6379", redis_ttl=300
        ),
    )
    return httpx.AsyncClient(transport=transport, timeout=300)
How to wrap another httpx transport?

If you already use a specific httpx transport, you can wrap it inside this one.

import httpx
from httpx_rate_limiter_transport.transport import ConcurrencyRateLimiterTransport
from async_redis_rate_limiters import DistributedSemaphoreManager


def get_httpx_client() -> httpx.AsyncClient:
    original_transport = httpx.AsyncHTTPTransport(retries=3)
    transport = ConcurrencyRateLimiterTransport(
        inner_transport=original_transport,  # let's wrap the original transport
        semaphore_manager=DistributedSemaphoreManager(
            redis_url="redis://localhost:6379", redis_ttl=300
        ),
    )
    return httpx.AsyncClient(transport=transport, timeout=300)

Contributing

Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.

Development Setup

  1. Fork the repository
  2. Create a feature branch
  3. Install development dependencies: make sync
  4. Run lint: make lint
  5. Run tests: make test
  6. Submit a pull request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

httpx_rate_limiter_transport-0.3.2.tar.gz (6.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

httpx_rate_limiter_transport-0.3.2-py3-none-any.whl (7.4 kB view details)

Uploaded Python 3

File details

Details for the file httpx_rate_limiter_transport-0.3.2.tar.gz.

File metadata

File hashes

Hashes for httpx_rate_limiter_transport-0.3.2.tar.gz
Algorithm Hash digest
SHA256 7dbf94119ede1ec4f55c5446a720f0ee715d3c6ab187c87bb4143cd5c40565d4
MD5 001fcc37327a304bf3177de75fa43b1a
BLAKE2b-256 7a0c0541e8868a28a5269e4883e1d0ae9ff4ec2912d44d6abc0509946b0a2259

See more details on using hashes here.

File details

Details for the file httpx_rate_limiter_transport-0.3.2-py3-none-any.whl.

File metadata

File hashes

Hashes for httpx_rate_limiter_transport-0.3.2-py3-none-any.whl
Algorithm Hash digest
SHA256 8eee0d92ae0193e2c7db6378aa46eb5b7157b9ab21f554707667666a6bd30f04
MD5 1ba7841ad14a9dc1fed40c58e0cee1f5
BLAKE2b-256 7c98914c62cba1cde8a5cedb57e9015f01a8ae85d1f72e32c62e8428e4a7fde8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page