Collection of BucketRateLimiters. Allows to limit number of ops to the certain number.
Project description
BucketRateLimiter
bucketratelimiter is the collection of rate limiters, which are based on Bucket conception.
How to install:
pip install bucketratelimiter
Examples:
You can find complete examples on how to use AsyncioBucketTimeRateLimiter
and MThreadedBucketTimeRateLimiter
in
folder examples of this repository. Also you can check tests folder and get some tricks from unittests.
How to use:
Create rate limiter:
from bucketratelimiter import AsyncioBucketTimeRateLimiter
# max_size = 4 and recovery_time = 1.0
# means that we would like to limit something to 4 attempts per second
# max_size - is a size of internal bucket
# recovery_time - time to make bucket full again
# rest_time - is the time to sleep for workers which can not proceed further due to rate limiter
# callback - is a function which should be called after task will be completed
ASYNC_LIMITER = AsyncioBucketTimeRateLimiter(
max_size=4,
recovery_time=1.0,
rest_time=0.2,
callback=None,
)
from bucketratelimiter import MThreadedBucketTimeRateLimiter
ASYNC_LIMITER = MThreadedBucketTimeRateLimiter(
max_size=4,
recovery_time=1.0,
rest_time=0.2,
callback=None,
)
"Wrap" some function in ratelimiter:
import asyncio
from bucketratelimiter import AsyncioBucketTimeRateLimiter
limiter = AsyncioBucketTimeRateLimiter()
@limiter # we use decorator to limit the function to a certain number of attempts per second
async def some_func_to_limit(sleep_time: float = 1.0) -> None:
await asyncio.sleep(sleep_time)
import time
from bucketratelimiter import MThreadedBucketTimeRateLimiter
limiter = MThreadedBucketTimeRateLimiter()
@limiter # we use decorator to limit the function to a certain number of attempts per second
def some_func_to_limit(sleep_time: float = 1.0) -> None:
time.sleep(sleep_time)
Activate ratelimiter logic:
limiter = AsyncioBucketTimeRateLimiter()
async def main_entry_point() -> None:
"""Main entry point of our asyncio app."""
q = asyncio.Queue()
for task in TASKS_TO_COMPLETE:
await q.put(task)
# use LIMITER as context manager to ensure its correct activation and end of work
async with limiter:
for w in [worker(q) for _ in range(1, WORKER_NUM + 1)]:
asyncio.create_task(w)
await q.join()
limiter = MThreadedBucketTimeRateLimiter()
def main_entry_point() -> None:
"""Main entry point of our multithreading app."""
q = Queue()
for task in TASKS_TO_COMPLETE:
q.put(task)
# use LIMITER as context manager to ensure its correct activation and end of work
with LIMITER:
for _ in range(1, WORKER_NUM + 1):
Thread(target=worker, args=(q, ), daemon=True).start()
q.join()
HOW TO USE LOW LEVEL API:
Use without context manager:
# Use RateLimiter's method to activate and deactivate it's inner logic
# instead of using context managers
try:
limiter.activate()
finally:
limiter.deactivate()
Use without functions decoration:
import asyncio
from bucketratelimiter import AsyncioBucketTimeRateLimiter
limiter = AsyncioBucketTimeRateLimiter()
async def some_func_to_limit(sleep_time: float = 1.0) -> None:
await asyncio.sleep(sleep_time)
...
async with limiter:
await limiter.wrap_operation(some_func_to_limit, sleep_time=1.0)
# ATTENTION !
# Do not use wrap_operation to some function more than once
# Do not apply decorator to some function if you use wrap_operation
# It can lead to unexpected results
import time
from bucketratelimiter import MThreadedBucketTimeRateLimiter
limiter = MThreadedBucketTimeRateLimiter()
def some_func_to_limit(sleep_time: float = 1.0) -> None:
time.sleep(sleep_time)
...
with limiter:
limiter.wrap_operation(some_func_to_limit, sleep_time=1.0)
FOR CONTRIBUTORS:
Clone the project:
https://github.com/ArtyomKozyrev8/BucketRateLimiter.git
cd BucketRateLimiter
Create a new virtualenv:
python3 -m venv env
source env/bin/activate
Install all requirements:
pip install -e '.[develop]'
Run Tests:
mypy --strict bucketratelimiter
pytest --cov=bucketratelimiter tests/
coverage report
coverage html
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
bucketratelimiter-1.0.0.tar.gz
(19.4 kB
view details)
Built Distribution
File details
Details for the file bucketratelimiter-1.0.0.tar.gz
.
File metadata
- Download URL: bucketratelimiter-1.0.0.tar.gz
- Upload date:
- Size: 19.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.9.13
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | b8fa596ad9549affc3f666280a33c9c8afaa5d103ae17c072d0dc7ec0e4aa79d |
|
MD5 | e3d85c0a34701984c07b4afa478f31df |
|
BLAKE2b-256 | 2a83a0620e53e12b449f523c08b890ed57aed26ac15bbe11dbb500fd2b7a361b |
File details
Details for the file bucketratelimiter-1.0.0-py3-none-any.whl
.
File metadata
- Download URL: bucketratelimiter-1.0.0-py3-none-any.whl
- Upload date:
- Size: 20.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.9.13
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | fc3b003318efaf5db6fd30552c21bf57efb0bf675d1579b2af81e527d9e4c301 |
|
MD5 | c267871231265294402ca2503429fdce |
|
BLAKE2b-256 | b69f4acc9a6427f71b6c2425ef2c1824f678354575605799e71d7282142baab2 |