Skip to main content

Improved Queue and Buffering Utilities.

Project description

bufferq

Better Queue Interface for Python

Python's queue interface is quite clunky and really not that good.

bufferq is a separate queue implementation with a more intuitive interface.

Sample Usage

Queues from bufferq are simple to use with a simple interface:

import threading
import bufferq

q = bufferq.Queue()

def consumer(q):
    for item in q.consume_one_generator():
        print(f"Working on item: {item}")

thd = threading.Thread(target=consumer, args=(q,))
thd.daemon = True
thd.start()

q.put('a')
q.put('b')
time.sleep(1)
q.put_multi(list(range(5)))

q.stop()
thd.join()

What's Wrong with queue?

Here are a few issues:

Basic Operations

Python's queue interface is surprisingly overwhelming for simple tasks.

For example, adding/pushing items to the queue. queue.Queue.put() has three different arguments:

  1. The item
  2. blocking (why?)
  3. timeout (Options 2 and 3 are set so the operation blocks indefinitely until the item can be added.)

This is annoying; why have blocking and timeout as separate arguments, instead of simply letting timeout=0 (or maybe even some placeholder-style object if you are really, REALLY concerned about blocking)? A timeout=0 should imply a single lookup that fails with queue.Empty if nothing is in the queue without any additional arguments. Yes, there is an added "convenience" call of queue.Queue.put_nowait(), but this can just as easily be a proxy call to: put(item, timeout=0) or similar without muddying the primary call for the interface.

This same problem exists (and is more relevant) for the get() calls for the queue.

Design Issues

Python's queue also does not provide much help for common tasks that queues are used for, such as a list of work for Producer/Consumer design patterns. Python's own queue documentation shows the following example:

import threading, queue

q = queue.Queue()

def worker():
    while True:
        item = q.get()
        print(f'Working on {item}')
        print(f'Finished {item}')
        q.task_done()

# turn-on the worker thread
threading.Thread(target=worker, daemon=True).start()

# send thirty task requests to the worker
for item in range(30):
    q.put(item)
print('All task requests sent\n', end='')

# block until all tasks are done
q.join()
print('All work completed')

Even here in the official docs, there are problems. They've omitted the necessary exception handling; if the worker were to raise an exception between q.get() and q.task_done(), the call to q.join() might block indefinitely. (Yes, print() is not likely to raise an exception, but real work done by such a queue is...) This can be fixed by adding try/finally, but the semantics are subtle and as this example shows, error-prone.

The example also does not actually terminate the consumer thread correctly and instead just lets it die as a daemon thread. This might be okay for an example, but this is not good for realistic uses of the queue where resources need stricter management. This is doubly ironic, because the point of q.join() is to (presumably) support draining the queue and block until the queue is empty. However, any logic to handle basic draining requires more tooling that is outside of the queue (i.e. checking some threading.Event instead of while True), thus (in my opinion) defeating the point. This situation is further complicated by the situation below:

import queue

q = queue.Queue()
q.put('a')

def consumer():
    q.get()
    # Uh-oh. No timeout argument passed, so this blocks indefinitely.
    # Ctrl+C to get out of this, or worse (!) since some versions of python
    # did not even support Ctrl+C in this setting...
    q.get()

consumer()

The consumer might be blocked waiting for an element before it has a chance to check whether the stop event was set.

Better Design

The necessary variables to handle the draining should already implicitly be available in the queue object, with improved calls. The queue should have some stop() call that stops the queue and wakes up anyone waiting indefinitely to insert/remove an item with a QueueStopped() exception or similar to avoid deadlock.

Adding pythonic generators to remove items from the queue can also help with these common cases; the consumer can simply iterate to obtain the next item instead of handling the complicated pop/get logic that might otherwise be required.

This is all provided by bufferq.Queue like below:

import threading
import bufferq

q = bufferq.Queue()
def worker():
	for item in q.consume_one_generator():
        print(f'Working on {item}')
        print(f'Finished {item}')

# turn-on the worker thread
thd = threading.Thread(target=worker, daemon=True).start()

# send thirty task requests to the worker
for item in range(30):
    q.put(item)
print('All task requests sent, signal to stop and drain.')
# Request that the queue stop, since everything has been added.
q.stop()

thd.join()
print('All work completed and workers joined!')

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

bufferq-0.0.1.tar.gz (13.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

bufferq-0.0.1-py3-none-any.whl (13.4 kB view details)

Uploaded Python 3

File details

Details for the file bufferq-0.0.1.tar.gz.

File metadata

  • Download URL: bufferq-0.0.1.tar.gz
  • Upload date:
  • Size: 13.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.10.0 pkginfo/1.8.2 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.8.6

File hashes

Hashes for bufferq-0.0.1.tar.gz
Algorithm Hash digest
SHA256 ec345d930065935d8e4f0639894d39ae3dd70f7a82c151c47b2edc3a3a3be2d3
MD5 9135e111070d8fe5a175338259d7eeee
BLAKE2b-256 531a59428fa0f7b0a60d74438d534d0d4441fde73b2744c00f8c6930c8b23211

See more details on using hashes here.

File details

Details for the file bufferq-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: bufferq-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 13.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.10.0 pkginfo/1.8.2 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.8.6

File hashes

Hashes for bufferq-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 054e65524be0a762ad76abda1f7d05684e53b8f59cdddeb6339ddda247733e30
MD5 1a873b3a504a3b485cbc865ac8b2945d
BLAKE2b-256 ce1e2970a9e67ff6544fbc2f86bb33b44135bdee095ba58400c31aeb93223232

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page