Skip to main content

Async transport for httpx to implement various rate limiting (using a centralized redis as backend)

Project description

httpx-rate-limiter-transport

Python Badge UV Badge Mergify Badge Renovate Badge MIT Licensed

What is it?

This project provides an async transport for httpx to implement various rate limiting (using a centralized redis as backend).

[!NOTE] You can read some details about httpx transports on this page.

Features

  • ✅ Limit the total number of concurrent outgoing requests (to any host)
  • ✅ Limit the number of concurrent requests per host
  • ✅ Provide your own logic/limit
    • for example: you can limit the number of concurrent requests by HTTP method or only for some given hosts...
  • ✅ TTL to avoid blocking the semaphore forever (in some special cases like computer crash or network issues at the very wrong moment)
  • ✅ Can wrap another transport (if you already use one)
  • ✅ Multiple limits support
  • ✅ Redis backend for distributed rate limiting

Roadmap

  • Add a "request per minute" rate limiting
  • Multiple limits
  • Logging
  • Sync version

Installation

pip install httpx-rate-limiter-transport

(or the same with your favorite package manager)

Quickstart

Here's a simple example that demonstrates the basic usage:

import asyncio
import httpx
from httpx_rate_limiter_transport.limit import (
    ByHostConcurrencyRateLimit,
    GlobalConcurrencyRateLimit,
)
from httpx_rate_limiter_transport.transport import ConcurrencyRateLimiterTransport
from async_redis_rate_limiters import DistributedSemaphoreManager


def get_httpx_client() -> httpx.AsyncClient:
    transport = ConcurrencyRateLimiterTransport(
        limits=[
            # Global limit: no more than 10 concurrent requests to any host
            GlobalConcurrencyRateLimit(concurrency_limit=10),
            # Per-host limit: no more than 1 concurrent request per host
            ByHostConcurrencyRateLimit(concurrency_limit=1),
        ],
        semaphore_manager=DistributedSemaphoreManager(
            redis_url="redis://localhost:6379", redis_ttl=300
        ),
    )
    return httpx.AsyncClient(transport=transport, timeout=300)


async def request(n: int):
    client = get_httpx_client()
    async with client:
        # This will respect the rate limits - only 1 request per host
        # will execute concurrently, with a global max of 10
        futures = [client.get("https://www.google.com/") for _ in range(n)]
        res = await asyncio.gather(*futures)
        for r in res:
            print(r.status_code)


if __name__ == "__main__":
    # This will make 10 requests, but only 1 will execute at a time
    # due to the per-host limit
    asyncio.run(request(10))

Expected behavior: The requests will be rate-limited - only 1 request to google.com will execute at a time, even though we're trying to make 10 concurrent requests.

How-to

How to get a concurrency limit for only one given host?

To get a concurrency limit only for a given host, you can use a SingleHostConcurrencyRateLimit limit object.

from async_redis_rate_limiters import DistributedSemaphoreManager
import httpx
from httpx_rate_limiter_transport.limit import (
    SingleHostConcurrencyRateLimit,
)
from httpx_rate_limiter_transport.transport import ConcurrencyRateLimiterTransport


def get_httpx_client() -> httpx.AsyncClient:
    transport = ConcurrencyRateLimiterTransport(
        limits=[
            # Limit the number of concurrent requests to 10 for any host matching *.foobar.com
            SingleHostConcurrencyRateLimit(
                concurrency_limit=10, host="*.foobar.com", fnmatch_pattern=True
            ),
        ],
        semaphore_manager=DistributedSemaphoreManager(
            redis_url="redis://localhost:6379", redis_ttl=300
        ),
    )
    return httpx.AsyncClient(transport=transport, timeout=300)
How to implement your own custom logic?

You can use a CustomConcurrencyRateLimit object with a custom hook to implement your own logic.

If the hook returns None, this concurrency limit is deactivated. If the hook returns a key (as a string), we count/limit the number of concurrent requests per distinct key.

from async_redis_rate_limiters import DistributedSemaphoreManager
import httpx
from httpx_rate_limiter_transport.limit import CustomConcurrencyRateLimit
from httpx_rate_limiter_transport.transport import ConcurrencyRateLimiterTransport


def concurrency_key_hook(request: httpx.Request) -> str | None:
    if request.url.host == "www.foobar.com" and request.method == "POST":
        return "post on www.foobar.com"
    return None  # no concurrency limit


def get_httpx_client() -> httpx.AsyncClient:
    transport = ConcurrencyRateLimiterTransport(
        limits=[
            CustomConcurrencyRateLimit(
                concurrency_limit=10, concurrency_key_hook=concurrency_key_hook
            )
        ],
        semaphore_manager=DistributedSemaphoreManager(
            redis_url="redis://localhost:6379", redis_ttl=300
        ),
    )
    return httpx.AsyncClient(transport=transport, timeout=300)
How to wrap another httpx transport?

If you already use a specific httpx transport, you can wrap it inside this one.

import httpx
from httpx_rate_limiter_transport.transport import ConcurrencyRateLimiterTransport
from async_redis_rate_limiters import DistributedSemaphoreManager


def get_httpx_client() -> httpx.AsyncClient:
    original_transport = httpx.AsyncHTTPTransport(retries=3)
    transport = ConcurrencyRateLimiterTransport(
        inner_transport=original_transport,  # let's wrap the original transport
        semaphore_manager=DistributedSemaphoreManager(
            redis_url="redis://localhost:6379", redis_ttl=300
        ),
    )
    return httpx.AsyncClient(transport=transport, timeout=300)

Contributing

Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.

Development Setup

  1. Fork the repository
  2. Create a feature branch
  3. Install development dependencies: make sync
  4. Run lint: make lint
  5. Run tests: make test
  6. Submit a pull request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

httpx_rate_limiter_transport-0.3.1.tar.gz (6.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

httpx_rate_limiter_transport-0.3.1-py3-none-any.whl (7.3 kB view details)

Uploaded Python 3

File details

Details for the file httpx_rate_limiter_transport-0.3.1.tar.gz.

File metadata

File hashes

Hashes for httpx_rate_limiter_transport-0.3.1.tar.gz
Algorithm Hash digest
SHA256 b5cfd4b3ad7a99850759e84d91ace5d18a8c7a9e598aacf858789ff953a26574
MD5 dc2b29266f09b45d2ac0262e4b6d7af2
BLAKE2b-256 05b3a61cf6791389ef5dd922bc4fddcf8fdbd2b29cada65d45f51764ae855246

See more details on using hashes here.

File details

Details for the file httpx_rate_limiter_transport-0.3.1-py3-none-any.whl.

File metadata

File hashes

Hashes for httpx_rate_limiter_transport-0.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 60c56f3aa4099cec84dea9282cb43d0197e53e54aa0794538fc5bae5d5590714
MD5 29c2ec2fe3d95171594dd7f70c2da93f
BLAKE2b-256 3d3135464c4b36f64d02ef5ac0c07f7bf161e6229f8450f4b81ceec667ba3557

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page