Queue-like items: iterable queues, channels, etc.
Project description
Queue-like items: iterable queues, channels, etc.
Latest release 20250426: Fix typing annotation, failing on Python 3.8.
Short summary:
Channel: A zero-storage data passage. Unlike aQueue,put(item)blocks waiting for the matchingget().get_batch: Get up tomax_batchclosely spaced items from the queueq. Return the batch.IterablePriorityQueue: Factory to create an iterablePriorityQueue.IterableQueue: Factory to create an iterable queue. Note that the returned queue is already open and needs a close.ListQueue: A simple iterable queue based on alist.NullQ: A queue-like object that discards its inputs. Calls to.get()raisequeue.Empty.NullQueue: A queue-like object that discards its inputs. Calls to.get()raisequeue.Empty.PushQueue: A puttable object which looks like an iterableQueue.QueueIterator: AQueueIteratoris a wrapper for aQueuelike object which presents an iterator interface to collect items. It does not offer the.getor.get_nowaitmethods.TimerQueue: Class to run a lot of "in the future" jobs without using a bazillion Timer threads.
Module contents:
ClassChannel: A zero-storage data passage. Unlike aQueue,put(item)blocks waiting for the matchingget()`.
Channel.__init__(self):
pylint: disable=consider-using-with
Channel.__call__(self, *a):
Call the Channel.
With no arguments, do a .get().
With an argument, do a .put().
Channel.__iter__(self):
A Channel is iterable.
Channel.__next__(self):
next(Channel) calls Channel.get().
Channel.__str__(self):
pylint: disable=consider-using-with
Channel.close(self):
Close the Channel, preventing further put()s.
Channel.get(self):
Read a value from the Channel.
Blocks until someone put()s to the Channel.
Channel.put(self, value):
Write a value to the Channel.
Blocks until a corresponding get() occurs.
-
get_batch(q, max_batch=128, *, poll_delay=0.01): Get up tomax_batchclosely spaced items from the queueq. Return the batch. Raisequeue.Emptyif the firstq.get()raises.Block until the first item arrives. While the batch's size is less that
max_batchand there is another item available withinpoll_delayseconds, append that item to the batch.This requires
get_batch()to be the sole consumer ofqfor correct operation as it makes decisions based onq.empty(). -
IterablePriorityQueue(capacity=0, name=None): Factory to create an iterablePriorityQueue. -
IterableQueue(capacity=0, name=None): Factory to create an iterable queue. Note that the returned queue is already open and needs a close.
ListQueue.__init__(self, queued=None, *, unique=None):
Initialise the queue.
Parameters:
queuedis an optional iterable of initial items for the queueunique: optional signature function, defaultNone
The unique parameter provides iteration via the
cs.seq.unrepeated iterator filter which yields only items
not seen earlier in the iteration.
If unique is None or False iteration iterates
over the queue items directly.
If unique is True, iteration uses the default mode
where items are compared for equality.
Otherwise unique may be a callable which produces a
value to use to detect repetitions, used as the cs.seq.unrepeated
signature parameter.
Example:
>>> items = [1, 2, 3, 1, 2, 5]
>>> list(ListQueue(items))
[1, 2, 3, 1, 2, 5]
>>> list(ListQueue(items, unique=True))
[1, 2, 3, 5]
ListQueue.__bool__(self):
A ListQueue looks a bit like a container,
and is false when empty.
ListQueue.__iter__(self):
A ListQueue is iterable.
ListQueue.__next__(self):
Iteration gets from the queue.
ListQueue.append(self, item):
Append an item to the queue, aka put.
ListQueue.extend(self, items):
Convenient/performant queue-lots-of-items.
ListQueue.get(self):
Get pops from the start of the list.
ListQueue.insert(self, index, item):
Insert item at index in the queue.
ListQueue.prepend(self, items, offset=0):
Insert items at offset (default 0, the front of the queue).
ListQueue.put(self, item):
Put appends to the queue.
NullQ = <NullQueue:NullQ blocking=False>: A queue-like object that discards its inputs. Calls to.get()raisequeue.Empty.ClassNullQueue(cs.resources.MultiOpenMixin): A queue-like object that discards its inputs. Calls to.get()raisequeue.Empty`.
NullQueue.__init__(self, blocking=False, name=None):
Initialise the NullQueue.
Parameters:
blocking: optional; if true, calls to.get()block until.shutdown(); default:False.name: optional name for thisNullQueue.
NullQueue.get(self):
Get the next value. Always raises queue.Empty.
If .blocking, delay until .shutdown().
NullQueue.put(self, item):
Put a value onto the queue; it is discarded.
NullQueue.shutdown(self):
Shut down the queue.
NullQueue.startup(self):
Start the queue.
-
ClassPushQueue(cs.resources.MultiOpenMixin, cs.resources.RunStateMixin): A puttable object which looks like an iterableQueue`.In this base class, calling
.put(item)callsfunctorsupplied at initialisation to trigger a function on data arrival whose iterable of results are put onto the output queue.As an example, the
cs.pipeline.Pipelineclass uses subclasses ofPushQueuefor each pipeline stage, overriding the.put(item)method to mediate the call offunctorthroughcs.later.Lateras resource controlled concurrency.
PushQueue.__init__(self, name: str, functor: Callable[[Any], Iterable], outQ, runstate: Optional[cs.resources.RunState] = <function uses_runstate.<locals>.<lambda> at 0x10e368400>):
Initialise the PushQueue with the callable functor
and the output queue outQ.
Parameters:
functoris a one-to-many function which accepts a single item of input and returns an iterable of outputs; it may be a generator. These outputs are passed tooutQ.putindividually as received.outQis aMultiOpenMixinwhich accepts via its.put()method.
PushQueue.put(self, item):
Receive a new item, put the results of functor(item) onto self.outQ.
Subclasses might override this method, for example to process
the result of functor differently, or to queue the call
to functor(item) via some taks system.
PushQueue.startup_shutdown(self):
Open/close the output queue.
ClassQueueIterator(cs.resources.MultiOpenMixin, collections.abc.Iterable, typing.Generic): AQueueIteratoris a wrapper for aQueuelike object which presents an iterator interface to collect items. It does not offer the.getor.get_nowait` methods.
QueueIterator.__iter__(self):
Iterable interface for the queue.
QueueIterator.__next__(self):
Return the next item from the queue.
If the queue is closed, raise StopIteration.
QueueIterator.empty(self):
Test if the queue is empty.
QueueIterator.iter_batch(self, batch_size=1024):
A generator which yields batches of items from the queue.
The default batch_size is 1024.
QueueIterator.join(self):
Wait for the queue items to complete.
QueueIterator.next(self):
Return the next item from the queue.
If the queue is closed, raise StopIteration.
QueueIterator.next_batch(self, batch_size=1024, block_once=False):
Obtain a batch of immediately available items from the queue.
Up to batch_size items will be obtained, default 1024.
Return a list of the items.
If the queue is empty an empty list is returned.
If the queue is not empty, continue collecting items until
the queue is empty or the batch size is reached.
If block_once is true, wait for the first item;
this mode never returns an empty list except at the end of the iterator.
QueueIterator.put(self, item, *args, **kw):
Put item onto the queue.
Warn if the queue is closed.
Raises ValueError if item is the sentinel.
QueueIterator.startup_shutdown(self):
MultiOpenMixin support; puts the sentinel onto the underlying queue
on the final close.
QueueIterator.task_done(self):
Report that an item has been processed.
ClassTimerQueue`: Class to run a lot of "in the future" jobs without using a bazillion Timer threads.
TimerQueue.add(self, when, func):
Queue a new job to be called at 'when'.
'func' is the job function, typically made with functools.partial.
TimerQueue.close(self, cancel=False):
Close the TimerQueue. This forbids further job submissions.
If cancel is supplied and true, cancel all pending jobs.
Note: it is still necessary to call TimerQueue.join() to
wait for all pending jobs.
TimerQueue.join(self):
Wait for the main loop thread to finish.
Release Log
Release 20250426: Fix typing annotation, failing on Python 3.8.
Release 20250306:
- QueueIterator: annotate as Iterable[Any].
- Drop obsolete pretence of python 2 support, embrace the f-string.
- Some other minor updates.
Release 20250103: PushQueue: mix in RunStateMixin, automatic RunState parameter, honour runstate.cancelled.
Release 20240412:
- QueueIterator: new next_batch and iter_batch methods for collecting items in batches.
- QueueIterator: bugfix put(), do not hold the lock around the internal Queue.put, can deadlock the corresponding next/get.
Release 20240318: QueueIterator.put: be more rigorous with the put + item count increment.
Release 20240316: Fixed release upload artifacts.
Release 20240305: New get_batch(q) to fetch closely spaced batches from a Queue.
Release 20240211: ListQueue: new unique parameter which uses cs.seq.unrepeated in iteration.
Release 20231129:
- ListQueue: add len, str, repr.
- QueueIterator: use a mutex around the item counting.
Release 20230331:
- QueueIterator.startup_shutdown: try/finally for the terminating _put(self.sentinel).
- QueueIterator.str: show the underlying queue.
- QueueIterator: drop call to self.finalise(), no longer available or needed.
Release 20221228: Minor doc update.
Release 20221207: PushQueue: modernise the MutiOpenMixin startup_shutdown, do both open and close of self.outQ.
Release 20220918: Expose formerly private _QueueIterator as QueueIterator, use a per-queue sentinel from cs.obj.Sentinel.
Release 20220805: ListQueue: add "append" synonym for "put" in keeping with the list-ish flavour.
Release 20220605: ListQueue: extend/prepend: reject str explicitly - although iterable, it is almost never what is intended.
Release 20220317: Add missed import.
Release 20220313: New ListQueue.prepend(items[,offset=0]) method.
Release 20211116: ListQueue: new insert() method.
Release 20210924: Channel: make a Channel iterable.
Release 20210913: New ListQueue simple iterable queue based on a list with list-like .append and .extend.
Release 20201025: Drop obsolete call to MultiOpenMixin.init.
Release 20200718: _QueueIterator: set finalise_later via new MultiOpenMixin property, required by recent MultiOpenMixin change.
Release 20200521: IterableQueue,IterablePriorityQueue: simplify wrappers, bypasses weird bug from overengineering these.
Release 20191007:
- PushQueue: improve str.
- Clean lint, drop cs.obj dependency.
Release 20190812: _QueueIterator: do MultiOpenMixin.init so that str is functional.
Release 20181022: Bugfix Channel, drasticly simplify PushQueue, other minor changes.
Release 20160828:
- Use "install_requires" instead of "requires" in DISTINFO.
- TimerQueue.add: support optional *a and **kw arguments for func.
- Many bugfixes and internal changes.
Release 20150115: More PyPI metadata fixups.
Release 20150111: Initial PyPI release.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cs_queues-20250426.tar.gz.
File metadata
- Download URL: cs_queues-20250426.tar.gz
- Upload date:
- Size: 12.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1fcdbc5707a66338e2bb890fdf0e1144eb3ece95d4b1984e71b7759253ffcd98
|
|
| MD5 |
3b7da9dd89d7ec7686d78c7d55a95205
|
|
| BLAKE2b-256 |
74e2ec224e2f23c2df73b1c6424bd275e2b9bbb32adfd019a3a7148151d5fe33
|
File details
Details for the file cs_queues-20250426-py2.py3-none-any.whl.
File metadata
- Download URL: cs_queues-20250426-py2.py3-none-any.whl
- Upload date:
- Size: 12.0 kB
- Tags: Python 2, Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d14ee36b35f8a2463b0e3cc6967284b122833831f57595cfc04819dd39c9db01
|
|
| MD5 |
b44b5d18c6cc6aca2426208718bc7971
|
|
| BLAKE2b-256 |
b55edbc3a708fa631daa7bb87bce230b51dcf8e22bcbe0370d02d3e1c8193cd6
|