Skip to main content

pool-workers is a light weight package for dealing & managing pools, workers and queues.

Project description

linting (flake8 & mypy) Tests & Coverage

Pool_Workers

Pool_Workers is a small package for dealing with pools, workers and queues.

Installation:

pip install pool-workers

More info:

Usefull functions for Pool as well as Worker.

"""
Default params:
Pool(max_workers=os.cpu_count() + 4, name=None, queue=None, wait_queue=True,
	result_queue=None, workers_sleep=0.1, callback=None, execption_handler=execption_handler)
"""
from pool_workers import Pool

pool = Pool(...)

pool.start()		# Start all workers to process queue tasks.
pool.is_alive()		# Return true if is there any alive worker, else false.
pool.is_idle()		# Return true if is there any worker in idle mode, else false.
pool.is_done()		# Return true if the queue is empty (no tasks left to process).
pool.is_paused()	# Return true if the all workers have been paused, else false.
pool.shutdown()		# Abort all workers
pool.join()			# Wait for all workers to finish the all queue tasks.
pool.result()		# return a list result
pool.pause()		# pause the all workers
pool.resume()		# resume the all workers
pool.count()		# count workers
pool.update()		# adjust the number of workers

"""
default params:
Worker(name, queue, result=None, wait_queue=False, sleep=0.5, callback=None,
	execption_handler=execption_handler)
"""
from pool_workers import Worker

worker = Worker(...)

worker.start()
worker.abort()		# stop worker in a safe way
worker.aborted()
worker.pause()
worker.paused()
worker.resume()
# And like a normal thread, worker has also:
worker.is_alive()
worker.join()

Usage

Example 1:

import time
import threading
import random

from queue import Queue
from pool_workers import Pool, Task

# Our logic to be performed Asynchronously.
def our_process(a):
	t = threading.current_thread()
	# just to semulate how mush time this logic is going to take to be done.
	time.sleep(random.uniform(0, 3))
	print(f'{t.getName()} is finished the task {a} ...')


# Our function to handle thrown exceptions from 'our_process' logic.
def execption_handler(thread_name, exception):
    print(f'{thread_name}: {exception}')


# create a queue & pool.
q = Queue()
pool = Pool(name='Pool_1', queue=q, wait_queue=False, execption_handler=execption_handler)

# adding some tasks the the queue.
for i in range(10):
	# Creating task with args and kwargs and push it into the queue.
	task = Task(our_process, args=(i,), kwargs={})
	q.put(task)

try:
	# start the Pool
	pool.start()
	# go back to the main thread from time to another to check the KeyboardInterrupt
	while pool.is_alive():
		pool.join(0.5)

except (KeyboardInterrupt, SystemExit):
	# shutdown the pool by aborting its Workers/threads.
	pool.shutdown()


"""output result
Worker_1_Pool_1 is finished the task 1 ...
Worker_1_Pool_1 is finished the task 2 ...
Worker_0_Pool_1 is finished the task 0 ...
Worker_0_Pool_1 is finished the task 4 ...
Worker_0_Pool_1 is finished the task 5 ...
Worker_1_Pool_1 is finished the task 3 ...
Worker_0_Pool_1 is finished the task 6 ...
Worker_1_Pool_1 is finished the task 7 ...
Worker_0_Pool_1 is finished the task 8 ...
Worker_0_Pool_1: The Queue is empty.
Worker_1_Pool_1 is finished the task 9 ...
Worker_1_Pool_1: The Queue is empty.
Worker_0_Pool_1 is stopped
Worker_1_Pool_1 is stopped
Pool_1 is shutted down
"""

Example 2:

import time
import threading
import random

from queue import Queue
from pool_workers import Worker, Task

# Our logic to be performed Asynchronously.
def our_process(a):
	t = threading.current_thread()
	# just to semulate how mush time this logic is going to take to be done.
	time.sleep(random.uniform(0, 3))
	print(f'{t.getName()} is finished the task {a} ...')


# Our function to handle thrown exceptions from 'our_process' logic.
def execption_handler(thread_name, exception):
    print(f'{thread_name}: {exception}')


# create a queue & pool.
q = Queue()
t = Worker(name='worker', queue=q, wait_queue=False, sleep=0.1, execption_handler=execption_handler)

# adding some tasks the the queue.
for i in range(10):
	# Creating task with args and kwargs and push it into the queue.
	task = Task(our_process, args=(i,), kwargs={})
	q.put(task)

try:
	# start the Pool
	t.start()
	# block the code execution here to check the KeyboardInterrupt (to stop the worker safely)
	while t.is_alive():
		t.join(0.5)

	# Can't go here until the worker finishes his work.

except (KeyboardInterrupt, SystemExit):
	# stop the Worker/thread.
	t.abort()

"""output result
worker is finished the task 0 ...
worker is finished the task 1 ...
worker is finished the task 2 ...
worker is finished the task 3 ...
worker is finished the task 4 ...
worker is finished the task 5 ...
worker is finished the task 6 ...
worker is finished the task 7 ...
worker is finished the task 8 ...
worker is finished the task 9 ...
"""

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pool_workers-0.0.6.tar.gz (5.0 kB view details)

Uploaded Source

Built Distribution

pool_workers-0.0.6-py3-none-any.whl (5.4 kB view details)

Uploaded Python 3

File details

Details for the file pool_workers-0.0.6.tar.gz.

File metadata

  • Download URL: pool_workers-0.0.6.tar.gz
  • Upload date:
  • Size: 5.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.13

File hashes

Hashes for pool_workers-0.0.6.tar.gz
Algorithm Hash digest
SHA256 1dc95385b999b7296094a02d075c30528180a807af1415a10e4c802762618bd1
MD5 d22deaf6042532d16c7c04a21bb5a2e8
BLAKE2b-256 6c65a9e5fac6fc0c7456f198f83525d2f34f755aa688dafeba4ef55bd8d71981

See more details on using hashes here.

File details

Details for the file pool_workers-0.0.6-py3-none-any.whl.

File metadata

File hashes

Hashes for pool_workers-0.0.6-py3-none-any.whl
Algorithm Hash digest
SHA256 26a0577031b37837ac2a770e32a9ebb9b2f04b3640eec9c3f3895ec1411af831
MD5 8ea4ee087ef6a296486f240176c5e60b
BLAKE2b-256 c5fdf34c60ccbefd6a5b25499470573178ab1481520bbfd7743d0b0ff7d8195c

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page