Skip to main content

Run functions under any limited rate

Project description

QPS Limit

Run functions under any limited rate using multiprocessing + asyncio

Available on Unix (i.e. Linux, MacOS) only, as the default multiprocessing start method is fork

Installation

pip install qps-limit

or install manually via git

git clone git://github.com/antct/qps-limit.git qps-limit
cd qps-limit
python setup.py install

Usage

from qps_limit import Limiter

Limiter(
    func="an asynchronous function",
    params="a generator function yields args and kwargs",
    callback="a callback function that handles the return values of func",
    num_workers="number of processes, recommended <= number of CPUs",
    num_coroutines="number of coroutines per worker, adjust according to the situation",
    max_qps="maximum qps, None means unlimited",
    ordered="return ordered results or not, the default option is False"
)

BTW: The wrapped function returns tuples (idx, res) consisting of the data index and the function return value. If ordered=False is set, the order of the returned values may be randomized for better performance.

Quick Start

10 workers, each with a maximum qps of 10, to say "hello world"

from qps_limit import Limiter


async def func(n: int):
    return "hello world {}".format(n)


def params():
    for n in range(1000):
        yield (), {"n": n}


f = Limiter(
    func=func,
    params=params,
    num_workers=10,
    num_coroutines=128,
    max_qps=100,
    ordered=False
)

for idx, res in f():
    print(idx, res)
receiver: 1000it [00:00, 1057032.26it/s]
producer: 100%|██████████████████████████████| 1000/1000 [00:11<00:00, 101.96it/s]
consumer: 100%|██████████████████████████████| 1000/1000 [00:11<00:00, 101.95it/s]

Best Practice

Initialize resources that can not be pickled

resource = None

async def func(n):
    global resource
    # with process lock
    if resource is None:
        resource = {}

Debugging code using only partial data

Limiter(
    ...,
    max_steps=100
)

Early termination if specific conditions are met

i, max_i = 0, 100
for _, res in f():
    if # condition:
        if i > max_i:
            f.stop()
            break
        i += 1

Safely write files with multiple processes

import fcntl

writer = open('...', 'w+')

def callback(line):
    global writer
    fcntl.flock(writer, fcntl.LOCK_EX)
    writer.write('{}\n'.format(line))
    writer.flush()
    fcntl.flock(writer, fcntl.LOCK_UN)

f = Limiter(
    ...
    callback=callback
)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

qps_limit-1.2.7.tar.gz (5.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

qps_limit-1.2.7-py3-none-any.whl (6.1 kB view details)

Uploaded Python 3

File details

Details for the file qps_limit-1.2.7.tar.gz.

File metadata

  • Download URL: qps_limit-1.2.7.tar.gz
  • Upload date:
  • Size: 5.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.5

File hashes

Hashes for qps_limit-1.2.7.tar.gz
Algorithm Hash digest
SHA256 a31c4f4b9227d5481b6b700c56e977cc39e7e838e430dd00ee7639a805b720c7
MD5 2cae37ffdba836b1cc26c9e63ea35771
BLAKE2b-256 4ba124a947c14be4f66a16d8e1d0464381ce02c666af727c81e4d29e72f0701a

See more details on using hashes here.

File details

Details for the file qps_limit-1.2.7-py3-none-any.whl.

File metadata

  • Download URL: qps_limit-1.2.7-py3-none-any.whl
  • Upload date:
  • Size: 6.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.5

File hashes

Hashes for qps_limit-1.2.7-py3-none-any.whl
Algorithm Hash digest
SHA256 af3548e99e58a917bc31423f49a20b320f050049bb310c66f8e394522d214338
MD5 6e3148b0bb42c06c86e269e8a3f9fdf1
BLAKE2b-256 1c53a02eb797c895a76f86b06412e1e604e8c21c92f7dfec8efe599103046336

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page