Run functions under any limited rate
Project description
QPS Limit
Run functions under any limited rate using multiprocessing + asyncio
Available on Unix (i.e. Linux, MacOS) only, as the default multiprocessing start method is fork
Installation
pip install qps-limit
or install manually via git
git clone git://github.com/antct/qps-limit.git qps-limit
cd qps-limit
python setup.py install
Usage
from qps_limit import Limiter
Limiter(
func="an asynchronous function",
params="a generator function yields args and kwargs",
callback="a callback function that handles the return values of func",
num_workers="number of processes, recommended <= number of CPUs",
num_coroutines="number of coroutines per worker, adjust according to the situation",
max_qps="maximum qps, None means unlimited",
ordered="return ordered results or not, the default option is False"
)
BTW: The wrapped function returns tuples (idx, res) consisting of the data index and the function return value. If ordered=False is set, the order of the returned values may be randomized for better performance.
Quick Start
10 workers, each with a maximum qps of 10, to say "hello world"
from qps_limit import Limiter
async def func(n: int):
return "hello world {}".format(n)
def params():
for n in range(1000):
yield (), {"n": n}
f = Limiter(
func=func,
params=params,
num_workers=10,
num_coroutines=128,
max_qps=100,
ordered=False
)
for idx, res in f():
print(idx, res)
receiver: 1000it [00:00, 1057032.26it/s]
producer: 100%|██████████████████████████████| 1000/1000 [00:11<00:00, 101.96it/s]
consumer: 100%|██████████████████████████████| 1000/1000 [00:11<00:00, 101.95it/s]
Best Practice
Initialize resources that can not be pickled
resource = None
async def func(n):
global resource
# with process lock
if resource is None:
resource = {}
Debugging code using only partial data
Limiter(
...,
max_steps=100
)
Early termination if specific conditions are met
i, max_i = 0, 100
for _, res in f():
if # condition:
if i > max_i:
f.stop()
break
i += 1
Safely write files with multiple processes
import fcntl
writer = open('...', 'w+')
def callback(line):
global writer
fcntl.flock(writer, fcntl.LOCK_EX)
writer.write('{}\n'.format(line))
writer.flush()
fcntl.flock(writer, fcntl.LOCK_UN)
f = Limiter(
...
callback=callback
)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file qps_limit-1.2.7.tar.gz.
File metadata
- Download URL: qps_limit-1.2.7.tar.gz
- Upload date:
- Size: 5.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a31c4f4b9227d5481b6b700c56e977cc39e7e838e430dd00ee7639a805b720c7
|
|
| MD5 |
2cae37ffdba836b1cc26c9e63ea35771
|
|
| BLAKE2b-256 |
4ba124a947c14be4f66a16d8e1d0464381ce02c666af727c81e4d29e72f0701a
|
File details
Details for the file qps_limit-1.2.7-py3-none-any.whl.
File metadata
- Download URL: qps_limit-1.2.7-py3-none-any.whl
- Upload date:
- Size: 6.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
af3548e99e58a917bc31423f49a20b320f050049bb310c66f8e394522d214338
|
|
| MD5 |
6e3148b0bb42c06c86e269e8a3f9fdf1
|
|
| BLAKE2b-256 |
1c53a02eb797c895a76f86b06412e1e604e8c21c92f7dfec8efe599103046336
|