Skip to main content

Helper for running functions in a concurrent loop.

Project description

README

Package overview

concurrent_loop provides helpers for running functions in a continuous loop in a separate thread or process.

Installation

pip install concurrent_loop

Example usage

The following code creates a class which increments a counter in a looped process using the ProcessLoop class (replace with ThreadLoop instead to run in a separate thread rather than process).

from concurrent_loop.loops import ProcessLoop


class CounterIterator(object):
  """
  Iterates a counter in a loop that runs in an independent process.
  """
  counter = 0  # Value to be incremented

  _concurrent_loop_runner = ProcessLoop(100)  # Set up the controller that
  # will run any requested function every 100 ms in a separate process.

  def _increment(self, increment_val):
    """
    Increment the internal counter once and print value. 

    This will be run repeatedly in a process.

    Args:
      increment_val (int): The value to increment the internal counter by.
    """
    self.counter += increment_val
    print(self.counter)

  def concurrent_start(self):
    """
    Run the _increment() function in the process loop.
    """
    # Increments the internal counter in steps of 2. Arg must be supplied
    # as a tuple.
    self._concurrent_loop_runner.start(self._increment, (2,))

  def concurrent_stop(self):
    """
    Stop the process loop.
    """
    self._concurrent_loop_runner.stop()

Finally, in the main code:

iter = CounterIterator()

iter.concurrent_start()
sleep(1)
iter.concurrent_stop()

Asynchronous communication with Queue()

Both multiprocessing.Queue and queue.Queue allow asynchronous communications with the concurrent loop. However, to ensure correct functioning of the queue, the following rules must be adhered to:

  • The class that calls the ThreadLoop or ProcessLoop (which is the CounterIterator class in above example) must create the Queue instance as an instance attribute, and not as a class attribute.
  • The Queue instance must be passed into looped function (the _increment function in above example) as a function parameter, and not called from the looped function as an attribute.

To extend the above example so that _increment function sends the counter value to a results queue on each loop, we do the following.

Import the queue module (for this example, we'll use the simpler multiprocessing.Queue):

from multiprocessing import Queue

Instantiate a results queue in CounterIterator.__init__:

class CounterIterator(object):
  _results_q = None

  def __init__(self):
    self._results_q = Queue()

Modify the _increment function to put the counter value into the results queue:

  def _increment(self, res_q, increment_val)
    self.counter += increment_val
    res_q.put_nowait(self._counter)

Pass the results queue from concurrent_start method into the _increment function.

  def concurrent_start(self):
    self._concurrent_loop_runner.start(self._increment, (self._results_q, 2))

Define a counter getter that gets the counter value from the FIFO results queue:

  @property
  def counter(self):
    return self._results_q.get()

In the main code, to print out the counter value from the first 10 loops:

iter = CounterIterator()

iter.concurrent_start()

for _ in range(10):
  print(iter.counter)

iter.concurrent_stop()

Who do I talk to?

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

concurrent_loop-1.1.2.tar.gz (16.2 kB view hashes)

Uploaded Source

Built Distribution

concurrent_loop-1.1.2-py3-none-any.whl (17.4 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page