Skip to main content

Library to help manage threads that run continuously for a long time.

Project description

This library provides several classes to help manage threads that run continuously.

There are some problems with threads runing continuously in a loop. Calculation threads are greedy and keep running which starves other threads. Another problem is if you don’t exit an infinite loop in a thread it may keep running after python has tried to exit. Daemon threads will close, but resources/variables may not be cleaned up properly. Mostly, I needed to finish writing data to a file before the thread closed. This library aims to solve those problems.

This library provides a couple of main thread utilities:
  • Thread - threading with context manager support

  • ContinuousThread - Run a function continuously in a loop (It is suggested sleep is called periodically if no I/O)

  • PausableThread - Continuous thread that can be stopped and started again.

  • OperationThread - Thread that will run a calculation in a separate thread with different data.

  • PeriodicThread - Thread that runs a function periodically at a given interval.

  • shutdown - Call join(timeout) on every non-daemon thread that is active.

Shutdown Update!

Noticed issue with Python 3.8 on Windows. Python’s threading._shutdown method can hang forever preventing the process from exiting. This library is dependent on that function. I override threading._shutdown to automatically “join” all non-daemon threads.

Note: Python’s threading.Thread has a _stop method. Try not to override this method.

Problem

import time
import threading

c = 0

def count_loop():
    global c

    while True:
        c += 1
        time.sleep(1)

th = threading.Thread(target=count_loop)
th.start()

time.sleep(5)
print('Count:', c)

# Process will not exit, because thread is running and cannot end

Solution

Added an allow_shutdown() method and a shutdown() function. The “ContinuousThread”, “PausableThread”, “OperationThread”, and “PeriodicThread” do not have this problem. The continuous_threading library overrides threading._shutdown and has fixes to allow ContinuousThread’s to close automatically.

import time
import continuous_threading

c = 0

def count():
    global c
    c += 1
    time.sleep(1)

th = continuous_threading.ContinuousThread(target=count)
th.start()

time.sleep(5)
print('Count:', c)

# Process will automatically exit with threading._shutdown() override
import time
import continuous_threading

c = 0

def count_loop():
    global c

    while True:
        c += 1
        time.sleep(1)

th = continuous_threading.Thread(target=count_loop)
th.start()

time.sleep(5)
print('Count:', c)

continuous_threading.shutdown(0)  # Call join on every non-daemon Thread.

# Alternative. This does not call join normally. continuous_threading threading._shutdown override does call "join".
# th.allow_shutdown()  # Release "_tstate_lock" allowing threading._shutdown to continue

# Process will exit, because allow_shutdown or shutdown

Close Infinite Loop Threads Automatically

Control how the override threading._shutdown works.

import time
import continuous_threading

# Change threading._shutdown() to automatically call Thread.allow_shutdown()
continuous_threading.set_allow_shutdown(True)
continuous_threading.set_shutdown_timeout(0)  # Default 1

c = 0

def count_loop():
    global c

    while True:
        c += 1
        time.sleep(1)

th = continuous_threading.Thread(target=count_loop)
th.start()

time.sleep(5)
print('Count:', c)

# Process will exit due to "set_allow_shutdown"

Can also just call continuous_threading.shutdown()

Remove threading._shutdown() override

You can change the threading._shutdown function with

import continuous_threading

# Change back to original threading._shutdown function
continuous_threading.reset_shutdown()

# Set custom function or leave empty to use continuous_threading custom_shutdown()
continuous_threading.set_shutdown()

Thread context manager

This library turns threads into a context manager which automatically starts and stops threads.

import continuous_threading

thread_success = [False]

def do_something():
    print('here')
    thread_success[0] = True


with continuous_threading.Thread(target=do_something):
    print('in context')

assert thread_success[0] is True

ContinuousThread

The ContinuousThread is a simple thread in an infinite while loop. The while loop keeps looping while the thread alive Event is set. Call thread.stop(), thread.close(), or thread.join() to stop the thread. The thread should also stop automatically when the python program is exiting/closing.

import continuous_threading

class CountingThread(continuous_threading.ContinuousThread):
    def __init__(self):
        super().__init__()
        self.counter = 0

    def _run(self):
        self.counter += 1


with CountingThread() as th:
    print('in context')

assert th.counter > 0
print("Simple context manager print caused %d thread iterations" % th.counter)

Example of start and stop methods. .. code-block:: python

import time import continuous_threading

class CountingThread(continuous_threading.ContinuousThread):
def __init__(self):

super().__init__() self.counter = 0

def _run(self):

self.counter += 1

th = CountingThread() th.start() time.sleep(0.1) th.stop() # or th.close() or th.join()

assert th.counter > 0 print(“Simple context manager print caused %d thread iterations” % th.counter)

New init function .. code-block:: python

import time import continuous_threading

COUNTER = [0]

def init_counter():

return {‘counter’: COUNTER} # dict gets pass as kwargs to the target function.

def inc_coutner(counter):

counter[0] += 1

th = continuous_threading.ContinuousThread(target=inc_counter, init=init_counter) th.start() time.sleep(0.1) th.stop() # or th.close() or th.join()

assert th.counter > 0 print(“Simple context manager print caused %d thread iterations” % th.counter)

PausableThread

A continuous thread that can be stopped and started again.

import time
import continuous_threading


counter = [0]

def inc_counter():
    counter[0] += 1

th = continuous_threading.PausableThread(target=inc_counter)

th.start()
time.sleep(0.1)

th.stop()
time.sleep(0.1)

value = counter[0]
assert value > 0

time.sleep(0.1)
assert value == counter[0]

th.start()
time.sleep(0.1)
assert counter[0] > value

Again this can be used as a context manager. .. code-block:: python

import time import continuous_threading

class CountingThread(continuous_threading.PausableThread):
def __init__(self):

super().__init__() self.counter = 0

def _run(self):

self.counter += 1

with CountingThread() as th:

time.sleep(0.1) th.stop() value = th.counter assert value > 0

time.sleep(0.1) assert value == th.counter

th.start() time.sleep(0.1) assert th.counter > value

PeriodicThread

Run a function periodically.

import time
import continuous_threading


time_list = []

def save_time():
    time_list.append(time.time())

th = continuous_threading.PeriodicThread(0.5, save_time)
th.start()

time.sleep(4)
th.join()

print(time_list)

OperationThread

Add data to a queue which will be operated on in a separate thread.

import time
import continuous_threading


values = []

def run_calculation(data1, data2):
    values.append(data1 + data2)

th = continuous_threading.OperationThread(target=run_calculation)
th.start()
th.add_data(1, 1)
time.sleep(0.1)

assert len(values) > 0
assert values[0] == 2

th.add_data(2, 2)
th.add_data(3, 3)
th.add_data(4, 4)
th.add_data(5, 5)

time.sleep(0.1)
assert values == [2, 4, 6, 8, 10]

Process

All of the above Thread classes can also be used as a separate Process:
  • Process

  • ContinuousProcess

  • PausableProcess

  • PeriodicProcess

  • OperationProcess

  • CommandProcess

CommandProcess

Run functions and commands on an object that lives in a different process.

from continuous_threading import CommandProcess


class MyObj(object):
    def __init__(self, x=0, y=0):
        self._x = x
        self._y = y

    def set_x(self, x):
        self._x = x

    def set_y(self, y):
        self._y = y

    def print_obj(self, msg=''):
        print(self._x, self._y, msg)

    def expect(self, x, y, msg=''):
        assert self._x == x, 'X value {} does not match expected {}'.format(self._x, x)
        assert self._y == y, 'Y value {} does not match expected {}'.format(self._y, y)
        self.print_obj(msg=msg)


obj1 = MyObj()
obj2 = MyObj()

proc = CommandProcess(target=obj1)
proc.start()

# Send a command obj1
print('Main Obj1')  # Note: this prints way earlier
proc.send_cmd('print_obj', msg="Obj1")
proc.send_cmd('set_x', 1)
proc.send_cmd('print_obj')
proc.send_cmd('set_y', 2)
proc.send_cmd('print_obj')
proc.send_cmd('expect', 1, 2, msg='Obj1 expected (1,2)')

# Send a command obj2
print('Main Obj2')  # Note: this prints way earlier
proc.obj = obj2
proc.send_cmd('print_obj', msg="Obj2")
proc.send_cmd('set_x', 2)
proc.send_cmd('print_obj')
proc.send_cmd('set_y', 4)
proc.send_cmd('print_obj')
proc.send_cmd('expect', 2, 4, msg='Obj2 expected (2,4)')

# *** IGNORE COMMENTS: I implemented a caching system to save object state. ***
# Change back to obj1 (Remember this obj has attr 0,0 and when sent to other process will be a new obj 0,0).
# Cannot remember objects unless cached (saved in a dict) on the other process. id in process will be different.
#  ... NVM I'll just cache the obj value.
print('Main Obj1 again (Cached)')  # Note: this prints way earlier
proc.obj = obj1
proc.send_cmd('expect', 1, 2, msg="Obj1 Again (Cached)")
proc.send_cmd('set_x', 3)
proc.send_cmd('print_obj')
proc.send_cmd('set_y', 5)
proc.send_cmd('print_obj')
proc.send_cmd('expect', 3, 5, msg='Obj1 Again expected (3,5)')

proc.join()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

continuous_threading-2.0.6.tar.gz (20.3 kB view details)

Uploaded Source

Built Distribution

continuous_threading-2.0.6-py3-none-any.whl (20.0 kB view details)

Uploaded Python 3

File details

Details for the file continuous_threading-2.0.6.tar.gz.

File metadata

  • Download URL: continuous_threading-2.0.6.tar.gz
  • Upload date:
  • Size: 20.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.10.6

File hashes

Hashes for continuous_threading-2.0.6.tar.gz
Algorithm Hash digest
SHA256 3d38a6d4ec277f7578942e4753996831e9ff6d21d414bba8e745e0cc3995a8b1
MD5 6f8fe71efc3e9f8cc33621e7e8954543
BLAKE2b-256 d178e758fd277559deade7a5c7215b5b63f512fb1212fcb4f300f9f829953d9f

See more details on using hashes here.

File details

Details for the file continuous_threading-2.0.6-py3-none-any.whl.

File metadata

File hashes

Hashes for continuous_threading-2.0.6-py3-none-any.whl
Algorithm Hash digest
SHA256 bdccf2887cfc8bb80a64ccf2bda06b0fe3e5418676afeca39b35a5039431ba20
MD5 47569f2b7abce5332d7fb9e83bc3c359
BLAKE2b-256 2784d86ac7e32dfcd86953b2b1b6568a67a46659e469a8c18518681c9ae1ba73

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page