Skip to main content

Async transport for httpx to implement various rate limiting (using a centralized redis as backend)

Project description

httpx-rate-limiter-transport

Python Badge UV Badge Mergify Badge Renovate Badge MIT Licensed

What is it?

This project provides an async transport for httpx to implement various rate limiting (using a centralized redis as backend).

[!NOTE] You can read some details about httpx transports on this page.

Features

  • ✅ Limit the total number of concurrent outgoing requests (to any host)
  • ✅ Limit the number of concurrent requests per host
  • ✅ Provide your own logic/limit
    • for example: you can limit the number of concurrent requests by HTTP method or only for some given hosts...
  • ✅ TTL to avoid blocking the semaphore forever (in some special cases like computer crash or network issues at the very wrong moment)
  • ✅ Can wrap another transport (if you already use one)
  • ✅ Multiple limits support
  • ✅ Redis backend for distributed rate limiting

Roadmap

  • Add a "request per minute" rate limiting
  • Multiple limits
  • Logging
  • Sync version

Installation

pip install httpx-rate-limiter-transport

(or the same with your favorite package manager)

Quickstart

Here's a simple example that demonstrates the basic usage:

import asyncio
import httpx
from httpx_rate_limiter_transport.limit import (
    ByHostConcurrencyRateLimit,
    GlobalConcurrencyRateLimit,
)
from httpx_rate_limiter_transport.transport import ConcurrencyRateLimiterTransport
from async_redis_rate_limiters import DistributedSemaphoreManager


def get_httpx_client() -> httpx.AsyncClient:
    transport = ConcurrencyRateLimiterTransport(
        limits=[
            # Global limit: no more than 10 concurrent requests to any host
            GlobalConcurrencyRateLimit(concurrency_limit=10),
            # Per-host limit: no more than 1 concurrent request per host
            ByHostConcurrencyRateLimit(concurrency_limit=1),
        ],
        semaphore_manager=DistributedSemaphoreManager(
            redis_url="redis://localhost:6379", redis_ttl=300
        ),
    )
    return httpx.AsyncClient(transport=transport, timeout=300)


async def request(n: int):
    client = get_httpx_client()
    async with client:
        # This will respect the rate limits - only 1 request per host
        # will execute concurrently, with a global max of 10
        futures = [client.get("https://www.google.com/") for _ in range(n)]
        res = await asyncio.gather(*futures)
        for r in res:
            print(r.status_code)


if __name__ == "__main__":
    # This will make 10 requests, but only 1 will execute at a time
    # due to the per-host limit
    asyncio.run(request(10))

Expected behavior: The requests will be rate-limited - only 1 request to google.com will execute at a time, even though we're trying to make 10 concurrent requests.

How-to

How to get a concurrency limit for only one given host?

To get a concurrency limit only for a given host, you can use a SingleHostConcurrencyRateLimit limit object.

from async_redis_rate_limiters import DistributedSemaphoreManager
import httpx
from httpx_rate_limiter_transport.limit import (
    SingleHostConcurrencyRateLimit,
)
from httpx_rate_limiter_transport.transport import ConcurrencyRateLimiterTransport


def get_httpx_client() -> httpx.AsyncClient:
    transport = ConcurrencyRateLimiterTransport(
        limits=[
            # Limit the number of concurrent requests to 10 for any host matching *.foobar.com
            SingleHostConcurrencyRateLimit(
                concurrency_limit=10, host="*.foobar.com", fnmatch_pattern=True
            ),
        ],
        semaphore_manager=DistributedSemaphoreManager(
            redis_url="redis://localhost:6379", redis_ttl=300
        ),
    )
    return httpx.AsyncClient(transport=transport, timeout=300)
How to implement your own custom logic?

You can use a CustomConcurrencyRateLimit object with a custom hook to implement your own logic.

If the hook returns None, this concurrency limit is deactivated. If the hook returns a key (as a string), we count/limit the number of concurrent requests per distinct key.

from async_redis_rate_limiters import DistributedSemaphoreManager
import httpx
from httpx_rate_limiter_transport.limit import CustomConcurrencyRateLimit
from httpx_rate_limiter_transport.transport import ConcurrencyRateLimiterTransport


def concurrency_key_hook(request: httpx.Request) -> str | None:
    if request.url.host == "www.foobar.com" and request.method == "POST":
        return "post on www.foobar.com"
    return None  # no concurrency limit


def get_httpx_client() -> httpx.AsyncClient:
    transport = ConcurrencyRateLimiterTransport(
        limits=[
            CustomConcurrencyRateLimit(
                concurrency_limit=10, concurrency_key_hook=concurrency_key_hook
            )
        ],
        semaphore_manager=DistributedSemaphoreManager(
            redis_url="redis://localhost:6379", redis_ttl=300
        ),
    )
    return httpx.AsyncClient(transport=transport, timeout=300)
How to wrap another httpx transport?

If you already use a specific httpx transport, you can wrap it inside this one.

import httpx
from httpx_rate_limiter_transport.transport import ConcurrencyRateLimiterTransport
from async_redis_rate_limiters import DistributedSemaphoreManager


def get_httpx_client() -> httpx.AsyncClient:
    original_transport = httpx.AsyncHTTPTransport(retries=3)
    transport = ConcurrencyRateLimiterTransport(
        inner_transport=original_transport,  # let's wrap the original transport
        semaphore_manager=DistributedSemaphoreManager(
            redis_url="redis://localhost:6379", redis_ttl=300
        ),
    )
    return httpx.AsyncClient(transport=transport, timeout=300)

Contributing

Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.

Development Setup

  1. Fork the repository
  2. Create a feature branch
  3. Install development dependencies: make sync
  4. Run lint: make lint
  5. Run tests: make test
  6. Submit a pull request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

httpx_rate_limiter_transport-0.4.2.tar.gz (7.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

httpx_rate_limiter_transport-0.4.2-py3-none-any.whl (8.0 kB view details)

Uploaded Python 3

File details

Details for the file httpx_rate_limiter_transport-0.4.2.tar.gz.

File metadata

File hashes

Hashes for httpx_rate_limiter_transport-0.4.2.tar.gz
Algorithm Hash digest
SHA256 abc073a0a1c285e3cd8f7fd9ea4fd53b1934fb7baf95bb19150bad1441308ae9
MD5 bc22a0b866d56994814199a599848395
BLAKE2b-256 62733c2e8a5a0133c379f4215209f4d432a9cec94fbcb327a8df372612e83cbf

See more details on using hashes here.

File details

Details for the file httpx_rate_limiter_transport-0.4.2-py3-none-any.whl.

File metadata

File hashes

Hashes for httpx_rate_limiter_transport-0.4.2-py3-none-any.whl
Algorithm Hash digest
SHA256 5940851d500d5052a2de3621c37f845e86ad8890f1c936376941a78887e4a17a
MD5 981973cd9994fd8b217c6d69f6b4a396
BLAKE2b-256 39dd78400dbc09484ef6853b0e5693585e370ab3490897d7da47eba692f54be7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page