Skip to main content

some Queue subclasses and ducktypes

Project description

Queue-like items: iterable queues, channels, etc.

Latest release 20240412:

  • QueueIterator: new next_batch and iter_batch methods for collecting items in batches.
  • QueueIterator: bugfix put(), do not hold the lock around the internal Queue.put, can deadlock the corresponding next/get.

Class Channel

A zero-storage data passage. Unlike a Queue, put(item) blocks waiting for the matching get().

Method Channel.__init__(self): pylint: disable=consider-using-with

Method Channel.__call__(self, *a): Call the Channel. With no arguments, do a .get(). With an argument, do a .put().

Method Channel.__iter__(self): A Channel is iterable.

Method Channel.__next__(self): next(Channel) calls Channel.get().

Method Channel.__str__(self): pylint: disable=consider-using-with

Method Channel.close(self): Close the Channel, preventing further put()s.

Method Channel.get(self, *a, **kw): Wrapper function to check that this instance is not closed.

Method Channel.put(self, *a, **kw): Wrapper function to check that this instance is not closed.

Function get_batch(q, max_batch=128, *, poll_delay=0.01)

Get up to max_batch closely spaced items from the queue q. Return the batch. Raise Queue_Empty if the first q.get() raises.

Block until the first item arrives. While the batch's size is less that max_batch and there is another item available within poll_delay seconds, append that item to the batch.

This requires get_batch() to be the sole consumer of q for correct operation as it polls q.empty().

Function IterablePriorityQueue(capacity=0, name=None)

Factory to create an iterable PriorityQueue.

Function IterableQueue(capacity=0, name=None)

Factory to create an iterable queue. Note that the returned queue is already open and needs a close.

Class ListQueue

A simple iterable queue based on a list.

Method ListQueue.__init__(self, queued=None, *, unique=None): Initialise the queue.

Parameters:

  • queued is an optional iterable of initial items for the queue
  • unique: optional signature function, default None

The unique parameter provides iteration via the cs.seq.unrepeated iterator filter which yields only items not seen earlier in the iteration. If unique is None or False iteration iterates over the queue items directly. If unique is True, iteration uses the default mode where items are compared for equality. Otherwise unique may be a callable which produces a value to use to detect repetitions, used as the cs.seq.unrepeated signature parameter.

Example:

>>> items = [1, 2, 3, 1, 2, 5]
>>> list(ListQueue(items))
[1, 2, 3, 1, 2, 5]
>>> list(ListQueue(items, unique=True))
[1, 2, 3, 5]

Method ListQueue.__bool__(self): A ListQueue looks a bit like a container, and is false when empty.

Method ListQueue.__iter__(self): A ListQueue is iterable.

Method ListQueue.__next__(self): Iteration gets from the queue.

Method ListQueue.append(self, item): Append an item to the queue, aka put.

Method ListQueue.extend(self, items): Convenient/performant queue-lots-of-items.

Method ListQueue.get(self): Get pops from the start of the list.

Method ListQueue.insert(self, index, item): Insert item at index in the queue.

Method ListQueue.prepend(self, items, offset=0): Insert items at offset (default 0, the front of the queue).

Method ListQueue.put(self, item): Put appends to the queue.

NullQ = <NullQueue:NullQ blocking=False>

A queue-like object that discards its inputs. Calls to .get() raise Queue_Empty.

Class NullQueue(cs.resources.MultiOpenMixin)

A queue-like object that discards its inputs. Calls to .get() raise Queue_Empty.

Method NullQueue.__init__(self, blocking=False, name=None): Initialise the NullQueue.

Parameters:

  • blocking: optional; if true, calls to .get() block until .shutdown(); default: False.
  • name: optional name for this NullQueue.

Method NullQueue.get(self): Get the next value. Always raises Queue_Empty. If .blocking, delay until .shutdown().

Method NullQueue.put(self, item): Put a value onto the queue; it is discarded.

Method NullQueue.shutdown(self): Shut down the queue.

Method NullQueue.startup(self): Start the queue.

Class PushQueue(cs.resources.MultiOpenMixin)

A puttable object which looks like an iterable Queue.

In this base class, calling .put(item) calls functor supplied at initialisation to trigger a function on data arrival whose iterable of results are put onto the output queue.

As an example, the cs.pipeline.Pipeline class uses subclasses of PushQueue for each pipeline stage, overriding the .put(item) method to mediate the call of functor through cs.later.Later as resource controlled concurrency.

Method PushQueue.__init__(self, name, functor, outQ): Initialise the PushQueue with the callable functor and the output queue outQ.

Parameters:

  • functor is a one-to-many function which accepts a single item of input and returns an iterable of outputs; it may be a generator. These outputs are passed to outQ.put individually as received.
  • outQ is a MultiOpenMixin which accepts via its .put() method.

Method PushQueue.put(self, *a, **kw): Wrapper function to check that this instance is not closed.

Method PushQueue.startup_shutdown(self): Open/close the output queue.

Class QueueIterator(cs.resources.MultiOpenMixin)

A QueueIterator is a wrapper for a Queue (or ducktype) which presents an iterator interface to collect items. It does not offer the .get or .get_nowait methods.

Method QueueIterator.__iter__(self): Iterable interface for the queue.

Method QueueIterator.__next__(self): Return the next item from the queue. If the queue is closed, raise StopIteration.

Method QueueIterator.empty(self): Test if the queue is empty.

Method QueueIterator.iter_batch(self, batch_size=1024): A generator which yields batches of items from the queue.

Method QueueIterator.join(self): Wait for the queue items to complete.

Method QueueIterator.next(self): Return the next item from the queue. If the queue is closed, raise StopIteration.

Method QueueIterator.next_batch(self, batch_size=1024, block_once=False): Obtain a batch of immediately available items from the queue. Up to batch_size items will be obtained, default 1024. Return a list of the items. If the queue is empty an empty list is returned. If the queue is not empty, continue collecting items until the queue is empty or the batch size is reached. If block_once is true, wait for the first item; this mode never returns an empty list except at the end of the iterator.

Method QueueIterator.put(self, *a, **kw): Wrapper function to check that this instance is not closed.

Method QueueIterator.startup_shutdown(self): MultiOpenMixin support; puts the sentinel onto the underlying queue on the final close.

Method QueueIterator.task_done(self): Report that an item has been processed.

Class TimerQueue

Class to run a lot of "in the future" jobs without using a bazillion Timer threads.

Method TimerQueue.add(self, when, func): Queue a new job to be called at 'when'. 'func' is the job function, typically made with functools.partial.

Method TimerQueue.close(self, cancel=False): Close the TimerQueue. This forbids further job submissions. If cancel is supplied and true, cancel all pending jobs. Note: it is still necessary to call TimerQueue.join() to wait for all pending jobs.

Method TimerQueue.join(self): Wait for the main loop thread to finish.

Release Log

Release 20240412:

  • QueueIterator: new next_batch and iter_batch methods for collecting items in batches.
  • QueueIterator: bugfix put(), do not hold the lock around the internal Queue.put, can deadlock the corresponding next/get.

Release 20240318: QueueIterator.put: be more rigorous with the put + item count increment.

Release 20240316: Fixed release upload artifacts.

Release 20240305: New get_batch(q) to fetch closely spaced batches from a Queue.

Release 20240211: ListQueue: new unique parameter which uses cs.seq.unrepeated in iteration.

Release 20231129:

  • ListQueue: add len, str, repr.
  • QueueIterator: use a mutex around the item counting.

Release 20230331:

  • QueueIterator.startup_shutdown: try/finally for the terminating _put(self.sentinel).
  • QueueIterator.str: show the underlying queue.
  • QueueIterator: drop call to self.finalise(), no longer available or needed.

Release 20221228: Minor doc update.

Release 20221207: PushQueue: modernise the MutiOpenMixin startup_shutdown, do both open and close of self.outQ.

Release 20220918: Expose formerly private _QueueIterator as QueueIterator, use a per-queue sentinel from cs.obj.Sentinel.

Release 20220805: ListQueue: add "append" synonym for "put" in keeping with the list-ish flavour.

Release 20220605: ListQueue: extend/prepend: reject str explicitly - although iterable, it is almost never what is intended.

Release 20220317: Add missed import.

Release 20220313: New ListQueue.prepend(items[,offset=0]) method.

Release 20211116: ListQueue: new insert() method.

Release 20210924: Channel: make a Channel iterable.

Release 20210913: New ListQueue simple iterable queue based on a list with list-like .append and .extend.

Release 20201025: Drop obsolete call to MultiOpenMixin.init.

Release 20200718: _QueueIterator: set finalise_later via new MultiOpenMixin property, required by recent MultiOpenMixin change.

Release 20200521: IterableQueue,IterablePriorityQueue: simplify wrappers, bypasses weird bug from overengineering these.

Release 20191007:

  • PushQueue: improve str.
  • Clean lint, drop cs.obj dependency.

Release 20190812: _QueueIterator: do MultiOpenMixin.init so that str is functional.

Release 20181022: Bugfix Channel, drasticly simplify PushQueue, other minor changes.

Release 20160828:

  • Use "install_requires" instead of "requires" in DISTINFO.
  • TimerQueue.add: support optional *a and **kw arguments for func.
  • Many bugfixes and internal changes.

Release 20150115: More PyPI metadata fixups.

Release 20150111: Initial PyPI release.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cs.queues-20240412.tar.gz (12.4 kB view hashes)

Uploaded Source

Built Distribution

cs.queues-20240412-py3-none-any.whl (11.4 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page