Skip to main content

Provdes distributed async client rate limiters for using Redis

Project description



Distributed async rate limiters for clients

PyPI test status coverage


A self-limiting source produces traffic which never exceeds some upper bound. This is helpful when interacting with rate limited resources, or to prevent burstiness.

self-limiters provides a way to police your outgoing traffic with respect to:

To use this package, you'll need to be running an async stack, and have redis available on Python 3.8 or above.

Installation

pip install self-limiters

Performance considerations

Some parts of the package logic are implemented using Lua scripts, to run on the redis instance. This makes it possible to do the same work in one request (from the client), that would otherwise take 4. One benefit of this is that it eliminates the latency for each request saved. However, the biggest benefit is while the lua script is running, our python app event-loop is freed up to do other things.

The flow of the semaphore implementation is:

  • Run initial script to create semaphore if needed
  • Run BLPOP to wait for the semaphore to return
  • Run script to "release" the semaphore by adding back capacity

All of these are non-blocking.

The flow for the token bucket implementation is:

  • Run initial script to retrieve a wake-up time
  • Sleep asynchronously until the wake-up time

Both of these are also non-blocking.

In other words, the limiters' impact on the application event-loop should be negligible.

The semaphore implementation

The semaphore implementation is useful when you need to limit a process to n concurrent actions. For example if you have 10 web servers, and you're interacting with an API that will only tolerate 5 concurrent requests before locking you out.

In terms of fairness, the semaphore implementation skews towards FIFO, but is opportunistic. A worker will not be allowed to run until there is capacity assigned to them, specifically; but the order of execution is not guaranteed to be exactly FIFO.

The flow goes roughly like this:

Flow breakdown
  1. The Lua script will call SETNX on the name of the queue plus a postfix. If the returned value is 1 it means the queue we will use for our semaphore does not exist yet and needs to be created.

    (It might strike you as weird to have a separate entry for indicating whether the list should be created or not. It would be great if we could use EXISTS on the list directly instead, but a list is deleted when all elements are popped, so I don't see another way of achieving this. Contributions are welcome if you do.)

  2. If the queue needs to be created we call RPUSH with the number of arguments equal to the capacity value used when initializing the semaphore instance.

  3. Once the queue has been created, we call BLPOP to block until it's our turn. BLPOP is FIFO by default. We also make sure to specify the max_sleep based on the initialized semaphore instance setting. If nothing was passed we allow sleeping forever.

  4. On __aexit__ we call another script to RPUSH a 1 value back into the queue and set an expiry on the queue and the value we called SETNX on.

    The expires are a half measure for dealing with dropped capacity. If a node holding the semaphore dies, the capacity might never be returned. If, however, there is no one using the semaphore for the duration of the expiry value, all values will be cleared, and the semaphore will be recreated at full capacity next time it's used. The expiry is 30 seconds at the time of writing, but could be made configurable.

Usage

The utility is implemented as a context manager in Python. Here is an example of a semaphore which will allow 10 concurrent requests:

from self_limiters import semaphore


# Instantiate a semaphore that will allow 10 concurrent requests
concurrency_limited_queue = semaphore(
    name="unique-resource-name",
    capacity=10,
    redis_url="redis://localhost:6379"
)

while True:
    async with concurrency_limited_queue:
        client.get(...)

The token bucket implementation

The token bucket implementation is useful when you need to limit a process to a certain number of actions per unit of time. For example, 1 request per minute.

The implementation is forward-looking. It works out the time there would have been capacity in the bucket for a given client and returns that time. From there we can asynchronously sleep until it's time to perform our rate limited action.

The code flow goes:

Flow breakdown
  1. The Lua script first GETs the state of the bucket. That means, the last slot that was scheduled and the number of tokens left for that slot. With a capacity of 1, having a tokens_left_for_slot variable makes no sense, but if there's capacity of 2 or more, it is possible that we will need to schedule multiple clients on the same slot.

    The script then works out whether to decrement the tokens_left_for_slot value, or to increment the slot value wrt. the frequency variable.

    Finally, we store the bucket state again using SETEX. This allows us to store the state and set expiry at the same time. The default expiry is 30 at the time of writing, but could be made configurable.

    One thing to note, is that this would not work if it wasn't for the fact that redis is single threaded, so Lua scripts on Redis are FIFO. Without this we would need locks and a lot more logic.

  2. Then we just sleep!

Usage

This is also implemented as a context manager in Python and can be used roughly as follows:

from self_limiters import TokenBucket

# Instantiate a bucket that will allow 10 requests per minute
time_limited_queue = TokenBucket(
    name="unique-resource-name",
    capacity=10,
    refill_frequency=60,
    refill_amount=10,
    redis_url="redis://localhost:6379"
)

while True:
    async with time_limited_queue:
        # Perform the rate-limited work immediately
        client.get(...)

Benchmarks

When testing locally:

  • processing 100 nodes with the semaphore implementation takes ~13ms
  • processing 100 nodes with the token bucket implementation takes ~7ms

Contributing

Debugging Lua scripts

Assuming you have a redis server running at :6389 you can debug a lua script by calling redis-cli -u redis://127.0.0.1:6389 --ldb --eval src/semaphore/rpushnx.lua x 1.

Just type help in the debugger for options.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

self_limiters-0.0.7-cp38-abi3-win_amd64.whl (641.3 kB view hashes)

Uploaded CPython 3.8+ Windows x86-64

self_limiters-0.0.7-cp38-abi3-win32.whl (597.4 kB view hashes)

Uploaded CPython 3.8+ Windows x86

self_limiters-0.0.7-cp38-abi3-musllinux_1_1_x86_64.whl (904.7 kB view hashes)

Uploaded CPython 3.8+ musllinux: musl 1.1+ x86-64

self_limiters-0.0.7-cp38-abi3-musllinux_1_1_aarch64.whl (851.1 kB view hashes)

Uploaded CPython 3.8+ musllinux: musl 1.1+ ARM64

self_limiters-0.0.7-cp38-abi3-manylinux_2_24_armv7l.whl (659.9 kB view hashes)

Uploaded CPython 3.8+ manylinux: glibc 2.24+ ARMv7l

self_limiters-0.0.7-cp38-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (677.9 kB view hashes)

Uploaded CPython 3.8+ manylinux: glibc 2.17+ ARM64

self_limiters-0.0.7-cp38-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (716.4 kB view hashes)

Uploaded CPython 3.8+ manylinux: glibc 2.12+ x86-64

self_limiters-0.0.7-cp38-abi3-manylinux_2_12_i686.manylinux2010_i686.whl (773.3 kB view hashes)

Uploaded CPython 3.8+ manylinux: glibc 2.12+ i686

self_limiters-0.0.7-cp38-abi3-macosx_11_0_arm64.whl (622.2 kB view hashes)

Uploaded CPython 3.8+ macOS 11.0+ ARM64

self_limiters-0.0.7-cp38-abi3-macosx_10_7_x86_64.whl (658.6 kB view hashes)

Uploaded CPython 3.8+ macOS 10.7+ x86-64

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page