Helper for running functions in a concurrent loop.
Project description
README
Package overview
concurrent_loop
provides helpers for running functions in a continuous
loop in a separate thread or process.
Installation
pip install concurrent_loop
Example usage
The following code creates a class which increments a counter in a looped
process using the ProcessLoop
class (replace with ThreadLoop
instead to
run in a separate thread rather than process).
from concurrent_loop.loops import ProcessLoop class CounterIterator(object): """ Iterates a counter in a loop that runs in an independent process. """ counter = 0 # Value to be incremented _concurrent_loop_runner = ProcessLoop(100) # Set up the controller that # will run any requested function every 100 ms in a separate process. def _increment(self, increment_val): """ Increment the internal counter once and print value. This will be run repeatedly in a process. Args: increment_val (int): The value to increment the internal counter by. """ self.counter += increment_val print(self.counter) def concurrent_start(self): """ Run the _increment() function in the process loop. """ # Increments the internal counter in steps of 2. Arg must be supplied # as a tuple. self._concurrent_loop_runner.start(self._increment, (2,)) def concurrent_stop(self): """ Stop the process loop. """ self._concurrent_loop_runner.stop()
Finally, in the main code:
iter = CounterIterator() iter.concurrent_start() sleep(1) iter.concurrent_stop()
Asynchronous communication with Queue()
Both multiprocessing.Queue
and queue.Queue
allow asynchronous
communications with the concurrent loop. However, to ensure correct
functioning of the queue, the following rules must be adhered to:
- The class that calls the
ThreadLoop
orProcessLoop
(which is theCounterIterator
class in above example) must create theQueue
instance as an instance attribute, and not as a class attribute. - The
Queue
instance must be passed into looped function (the_increment
function in above example) as a function parameter, and not called from the looped function as an attribute.
To extend the above example so that _increment
function sends the
counter value to a results queue on each loop, we do the following.
Import the queue module (for this example, we'll use the simpler
multiprocessing.Queue
):
from multiprocessing import Queue
Instantiate a results queue in CounterIterator.__init__
:
class CounterIterator(object): _results_q = None def __init__(self): self._results_q = Queue()
Modify the _increment
function to put the counter value into the results
queue:
def _increment(self, res_q, increment_val) self.counter += increment_val res_q.put_nowait(self._counter)
Pass the results queue from concurrent_start
method into the _increment
function.
def concurrent_start(self): self._concurrent_loop_runner.start(self._increment, (self._results_q, 2))
Define a counter getter that gets the counter value from the FIFO results queue:
@property def counter(self): return self._results_q.get()
In the main code, to print out the counter value from the first 10 loops:
iter = CounterIterator() iter.concurrent_start() for _ in range(10): print(iter.counter) iter.concurrent_stop()
Who do I talk to?
- The author: KCLee
- Email: lathe-rebuke.0c@icloud.com
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for concurrent_loop-1.1.2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | c851f27445a9d78a1412518d0531f3469aaa5732cdd0dfad3678ff48495ffaf8 |
|
MD5 | 19d8eda6e62ea37a06536e020a194ede |
|
BLAKE2b-256 | 05076acdd4baf490b742b13676e3ccc33f7dd10b98e6f3ef014b42f8cd46f8ab |