Simultaneous Task Registrar and Message Broker
Project description
STRMBRKR
Simultaneous Task Registrar and Message Broker
The hammer for all of your parallel processing nails.
This package abstracts zeroMQ's Load Balancing Message Broker design pattern to facilitate the brokering of processor-intensive workloads across parallel workers.
This package also provides a KeyValueStore implementation inspired by Redis and built on zeroMQ's REQ/REP socket pattern that facilitates inter-process communication.
Installation
To install the strmbrkr package, you'll need Python and pip installed.
There are a variety of ways to do this, such as using Miniconda, so it's left to the user to utilize their preferred method.
Installing in Development Mode
This method is recommended if you plan on making updates to the source code and don't want to re-install that package with each change.
Clone this repository and install using pip:
$ git clone git@github.com:kdavid13/strmbrkr.git
$ cd ./strmbrkr
$ pip install -e .
Usage
Here is an example script (available as example.py) that utilizes the strmbrkr package.
It is standalone and should run on any system with the strmbrkr package properly installed.
For more examples of strmbrkr edge cases like error handling and watchdog timers, peruse the tests/ directory.
"""
Initialize the `strmbrkr` interchange service (a.k.a. key-value store).
"""
from strmbrkr import Job, KeyValueStore, QueueManager, WorkerManager
worker_manager = WorkerManager()
queue_manager = QueueManager()
"""
Define some work that needs to be done.
"""
from random import randrange
from time import sleep
def sleepAndReturn(how_long: float, throw: Exception = None):
"""Example work method."""
sleep(how_long)
if throw is not None:
raise throw
KeyValueStore.appendValue("sleep_list", how_long)
sleep_list = KeyValueStore.getValue("sleep_list")
KeyValueStore.setValue("sleep_sum", sum(sleep_list))
return how_long
jobs = [Job(sleepAndReturn, [randrange(3, 10), ]) for _ in range(10)]
"""
> Note:
>
> The worker processes must be started _after_ the work that's being done has been defined.
> Otherwise, the workers will not be able to resolve the reference to the function pointer.
"""
worker_manager.startWorkers()
"""
Record the time for performance reporting.
"""
from time import time
_start = time()
"""
Enqueue the jobs, wait for the jobs to process, and then report results.
"""
queue_manager.queueJobs(*jobs)
queue_manager.blockUntilProcessed(timeout = 60)
print(f"Completed processing in {time() - _start} seconds")
for result in queue_manager.getResults():
print(f"Job {result.id} returned {result.status}: {result.retval}")
print(f"Total time processed: {KeyValueStore.getValue('sleep_sum')}")
"""In addition to logging messages from the `strmbrkr` package, the resultant report should look something like this:
Completed processing in 18.03610348701477 seconds
Job 2 returned Status.PROCESSED: 3
Job 1 returned Status.PROCESSED: 4
Job 3 returned Status.PROCESSED: 5
Job 4 returned Status.PROCESSED: 4
Job 0 returned Status.PROCESSED: 8
Job 6 returned Status.PROCESSED: 3
Job 7 returned Status.PROCESSED: 5
Job 5 returned Status.PROCESSED: 9
Job 9 returned Status.PROCESSED: 5
Job 8 returned Status.PROCESSED: 9
Total time processed: 55
"""
"""
Tear down `strmbrkr` infrastructure.
"""
worker_manager.stopWorkers()
queue_manager.stopHandling()
KeyValueStore.stopServerProcess()
Configuration
When utilizing strmbrkr library components, you may find it useful to change their default behaviors.
strmbrkr will look two places for configuration options: environment variables and a strmbrkr.env file in the current working directory.
The configuration options available are as follows:
STRMBRKR_LOGGING_LEVEL(str): Level at which the STRMBRKR module logger will emit messages. Default:INFO.STRMBRKR_CONFIG_LOG_LEVEL(str): Level at which to log the contents of the configuration. Default:DEBUG.STRMBRKR_WORKER_PROC_COUNT(int): Default number of worker processes spun up by an instance ofWorkerManager. Default:4.STRMBRKR_WORKER_WATCHDOG_TERMINATE_AFTER(int): The default number of seconds a worker can spend processing a single job before being terminated. Default:15.STRMBRKR_KVS_CLIENT_SOCKET_TIMEOUT(int): Connection timeout (in milliseconds) of theKeyValueStoreclient. Default:500.STRMBRKR_KVS_CLIENT_SOCKET_ATTEMPTS(int): How many times theKeyValueStoreclient will attempt to connect to the server before failing. Default:1.STRMBRKR_KVS_DUMP_PIPELINE_STATUS_REPORTS(bool): Flag that indicates whether to save pipeline status reports for debugging. Default:False.STRMBRKR_SOCKET_PROTOCOL(str): The type of socket protocol that strmbrkr will use. Default:tcp.STRMBRKR_INSTANCE_ID(str): A unique identifier for thestrmbrkr'instance' using this configuration. Default: the process ID of Python interpreter instance running what is colloquially known as the 'control thread'.
See documentation in the strmbrkr.config submodule for more information.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file strmbrkr-1.4.1.tar.gz.
File metadata
- Download URL: strmbrkr-1.4.1.tar.gz
- Upload date:
- Size: 29.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.9.19
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a69731dcb6c08c7a488266638f938a4499fc1f7430459c294e4bd3dfdfbfecee
|
|
| MD5 |
8b8bc92ee65ae49532ccf2d32e3481da
|
|
| BLAKE2b-256 |
e857cf68484d8aa2d6501fc0a0097aee9d09668d9351daf862c85e1b5ad10dd4
|
File details
Details for the file strmbrkr-1.4.1-py3-none-any.whl.
File metadata
- Download URL: strmbrkr-1.4.1-py3-none-any.whl
- Upload date:
- Size: 32.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.9.19
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
061836224c8b3e7d2b902f745fca8bc3a16187695748660ba65f78258e5a69dd
|
|
| MD5 |
71fc3c34cc98a2d111b45dcd91279fc9
|
|
| BLAKE2b-256 |
b71bd91955ac1bf8657c976febe6b9d3ff2b1781ad63fbf6ce44603017b05657
|