Skip to main content

Simplify and master control (run and stop) the python threads (workers)

Project description

python-worker

Downloads


Description

A package to simplify the thread declaration directly either by using decorator or pass it through function. It also allows you to stop the running thread (worker) from any layer


Installation

pip install python-worker

Changelogs

  • v1.8:
    • Refactoring codes
    • flexible worker declaration
  • v1.9:
    • Added Asynchronous Worker for coroutine function using @async_worker decorator
  • v1.10:
    • Added overload typehints for worker and async_worker
    • Added restart feature for worker
  • v2.0:
    • Added process worker to enable run your function in different GIL (Global Interpreter Lock) which could give you a performance boost

Basic Guide

@worker will define a function as a thread object once it run

import time
from worker import worker

@worker
def go(n, sleepDur):
    for i in range(n):
      time.sleep(sleepDur)
    print('done')

go(100, 0.1)

The function go will be running as a thread


Asynchronous Guide

Well, if you have a coroutine function you can use async_worker instead

import asyncio
from worker import async_worker

@async_worker
async def go():
    print("this is inside coroutine!")
    for i in range(10):
        time.sleep(0.5)
        print(i)
    print("done!")
    return "result!"

go_worker = asyncio.run(go())

Process Guide

A new feature called process is simply putting your worker on different GIL (Global Interpreter Lock) which could give you a performance boost. It's implementing multiprocessing instead of multithreading which at this stage is achieving the true form of parallelism which is run in different environment with your function call environment

import time
import os
from worker import process

@process
def go(n, sleepDur):
    for i in range(n):
        time.sleep(sleepDur)
    print('done')

go(100, 0.1)

your function go will run in different process.

To check it, you can just print out the os.getpid()

import os
from worker import worker, process

@worker(multiproc=True)
def run_in_new_process_from_worker(parent_pid):
    print(f"from {parent_pid} running in a new process {os.getpid()} - from worker.mutliproc==True")
    return "return from process"

@process
def run_in_new_process(parent_pid):
    print(f"from {parent_pid} running in a new process {os.getpid()} - from process")
    return "return from process"

@worker
def run_in_new_thread(parent_pid):
    print(f"from {parent_pid} running in a new thread {os.getpid()} - from worker.multiproc==False")
    return "return from thread"


print(f"this is on main thread {os.getpid()}")

run_in_new_process_from_worker(os.getpid())
run_in_new_process(os.getpid())
run_in_new_thread(os.getpid())

then run the script

this is on main thread 29535
from 29535 running in a new process 29537 - from worker.mutliproc==True
from 29535 running in a new thread 29535 - from worker.multiproc==False
from 29535 running in a new process 29538 - from process

you can see the different of process id between running in a new process and thread


Additional Guides

Kill / Stop / Abort the running worker

You can abort some workers, all workers or even all threads..

Abort specific workers

import time
from worker import worker, abort_worker

@worker
def go4(n=10):
    for i in range(n):
        time.sleep(1)

go4_worker = go4(10)
time.sleep(3)
abort_worker(go4_worker)

or just abort it from the instance

go4_worker.abort()

Abort all workers (this only abort worker threads only)

from worker import abort_all_worker

abort_all_worker()

Abort all threads (it will abort both all worker and non-worker threads)

from worker import abort_all_thread

abort_all_thread()

Run undefined @worker function

import time
from worker import run_as_Worker

def go(n):
    ...

go_worker = run_as_Worker(target=go, args=(10,))

Get Return Value

How to get the return of threaded function ?

@worker
def go(n):
    time.sleep(n)
    return "done"

go_worker = go(10)

# this will await the worker to finished and return the value

go_result = go_worker.await

# You can also use this if it's finished, dont have to await

go_result = go_worker.ret

Check/Monitor All Workers

from worker import ThreadWorkerManager

## all created workers
ThreadWorkerManager.list()

## All active/running workers only
ThreadWorkerManager.list(active_only=True)

it will return the information

>>> ThreadWorkerManager.list()
==============================================================
ID   |Name                |Active|Address        | WorkTime (s)
==============================================================
0    |worker              |True  |0x7fdf1a977af0 | 4.97
1    |worker1             |True  |0x7fdf1a73d640 | 4.07
2    |worker2             |True  |0x7fdf1a73d9d0 | 3.83
3    |worker3             |True  |0x7fdf1a73dd00 | 3.62
4    |worker4             |True  |0x7fdf1a74b070 | 3.38
==============================================================

Python Interactive Shell - Keyboard Interrupt (CTRL+C)

When you run your scripts on interactive mode

python -i myScript.py

you could add an abort handler with keyboard interrupt to abort your thread.

Inside myScript.py

ThreadWorkerManager.enableKeyboardInterrupt() allows you to abort your running workers.

from worker import worker, ThreadWorkerManager


# enabling abort handler for worker into keyboard interrupt (CTRL+C)

ThreadWorkerManager.enableKeyboardInterrupt()

You could also activate exit thread which triggered by pressing the CTRL+Z. This also added an abort handler for worker into keyboard interrupt (CTRL+C).

ThreadWorkerManager.disableKeyboardInterrupt(enable_exit_thread=True)

Disabling abort handler for worker into keyboard interrupt (CTRL+C).

ThreadWorkerManager.disableKeyboardInterrupt()

Check handler status.

ThreadWorkerManager.keyboard_interrupt_handler_status

You also can choose which workers are allowed to be aborted on keyboard interrupt

Inside myScript.py

from worker import worker, ThreadWorkerManager

@worker("Uninterrupted", on_abort=lambda: print("ITS GREAT"), keyboard_interrupt=False)
def go_not_interrupted():
  i = 0
  while i < 1e3/2:
    i += 10
    print(i,"go_not_interrupted")
    time.sleep(0.001)
  return i

@worker("Interrupted", on_abort=lambda: print("ITS GREAT"), keyboard_interrupt=True)
def go_interrupted():
  i = 0
  while i < 1e3/2:
    i += 10
    print(i,"go_interrupted")
    time.sleep(0.001)
  return i

ThreadWorkerManagerManager.enableKeyboardInterrupt()
go_not_interrupted()
go_interrupted()

run in your terminal

python -i myScript.py

press CTRL+C while the process is running and see the results.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

python-worker-2.1.0.tar.gz (9.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

python_worker-2.1.0-py3-none-any.whl (9.3 kB view details)

Uploaded Python 3

File details

Details for the file python-worker-2.1.0.tar.gz.

File metadata

  • Download URL: python-worker-2.1.0.tar.gz
  • Upload date:
  • Size: 9.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.11.4 pkginfo/1.8.2 requests/2.22.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.8.10

File hashes

Hashes for python-worker-2.1.0.tar.gz
Algorithm Hash digest
SHA256 c7b67d17d4b69a1804714c7a3d9dc7ece5798fb12e43475460fa1a10b25657ad
MD5 587a6500df86de710424f3d160f312e2
BLAKE2b-256 9d6bccbf91a8be317f608b5663568f42add6b06fa048023ad78b05b9ab5637dc

See more details on using hashes here.

File details

Details for the file python_worker-2.1.0-py3-none-any.whl.

File metadata

  • Download URL: python_worker-2.1.0-py3-none-any.whl
  • Upload date:
  • Size: 9.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.11.4 pkginfo/1.8.2 requests/2.22.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.8.10

File hashes

Hashes for python_worker-2.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7cd9a221ad738f312c1eaad802bd2e72e3db25fea7152c043238677f0aab5707
MD5 1c944326ca93cec282bfacb530e55f8f
BLAKE2b-256 945f7e3c12cbebd32062c43799878a5e66ce6fd27d0ce4650506332b1ed6d95a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page