Skip to main content

Rate-limited Queue

Project description

A slower queue implementation, guaranteeing that tasks are processsed at a maximum rate. SlowQueue aims to be directly comparable with the Queue API so that it can be dropped in as an alternative implementation, as with LIFOQueue and PriorityQueue, but currently (as this needn’t be the case) will raise additional ValueErrors if you try to pass block=False or timeout > 0 to get methods. Some slamming is possible and is controllable using the max_slam keyword argument on the TokenBucket or SlowQueue instance.

See following example (taken from demo_sloq.py) or the unit tests for more:

from argparse import ArgumentParser
from threading import Thread, current_thread
import logging
import time

from sloq import SlowQueue


def main(args=None):
    prog = ArgumentParser()
    prog.add_argument("-n", type=int, default=10, metavar="TASK_COUNT",
                      help="The number of tasks")
    prog.add_argument("-t", type=float, default=1, metavar="TASK_INTERVAL",
                      help="The tick: seconds between tasks being released")
    prog.add_argument("-w", type=int, default=3, metavar="WORKER_COUNT",
                      help="Number of workers")
    prog.add_argument("-d", type=float, default=0, metavar="TASK_DURATION",
                      help="Duration of a single task")
    prog.add_argument("-s", type=float, default=0, metavar="MAX_SLAM",
                      help="The maximum amount of slam to allow")
    args = prog.parse_args(args)

    logger = logging.getLogger(__name__)
    logger.setLevel(logging.INFO)
    logger.addHandler(logging.StreamHandler())

    test_queue(logger, args.t, args.n, args.d, args.w, args.s)


def test_queue(logger, tick=1, tasks=10, task_duration=0, worker_count=3,
               slam=0):
    start_time = time.time()
    sloq = SlowQueue(release_tick=tick, max_slam=slam)

    # Begin the workers
    for w in xrange(0, worker_count):
        Thread(target=test_worker, args=(logger, start_time, sloq)).start()

    # Populate the queue
    for task in xrange(0, tasks):
        sloq.put((task, task_duration))
    for w in xrange(0, worker_count):
        sloq.put((None, None))

    sloq.join()


def test_worker(logger, start_time, queue):
    while True:
        task, sleep = queue.get()
        if task is None:
            logger.info("%s, Done" % current_thread().name)
            queue.task_done()
            return
        else:
            logger.info("%s, Elapsed time: %0.2f, Task: %r",
                        current_thread().name, time.time() - start_time, task)
            if sleep:
                time.sleep(sleep)
            queue.task_done()


if __name__ == "__main__":
    main()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sloq-0.2.tar.gz (4.4 kB view details)

Uploaded Source

File details

Details for the file sloq-0.2.tar.gz.

File metadata

  • Download URL: sloq-0.2.tar.gz
  • Upload date:
  • Size: 4.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No

File hashes

Hashes for sloq-0.2.tar.gz
Algorithm Hash digest
SHA256 b85c72e97a80cf13d75107cc9d56a62e7f2d3817c0af262f623b24e7529dba21
MD5 8533807b56b55fd5622aa2d9c3dd8ede
BLAKE2b-256 ce91e8be5e739c6c495812a635f0deea9b184c33be02b95c56f0ceada4d3b123

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page