an easy parallel queueing system
Project description
ezpq
: an easy parallel queueing system.
Background
I often rely on concurrent processing to perform many independent, long-running jobs quickly. Each time, I find myself wiring up functions to iterate over input, handle output, errors, time jobs, and all that's involved in practical parallel processing. I explored other options, but have yet to encounter something simple, versatile, and feature rich. With this project, I sought to create a Python package to encapsulate the logic required in efficiently managing a parallel queue. The result is a simple, customizable wrapper around both threading.Thread
and multriprocessing.Process
that provides all of the essential functions of an efficient parallel queueing system, and then some.
Overview
ezpq
implements a parallel queueing system consisting of:
- a priority "waiting" queue in.
- a lookup table of "working" jobs.
- a priority "completed" queue out.
Note that this is not a traditonal first-in/first-out (FIFO) queue. Given the parallel nature, this queueing system is first-in/first-completed-out. In addition, the priority of an individual job can be adjusted to ensure it gets processed and retrieved before other lesser-priority jobs.
Features
- Simple interface; pure Python.
- No required dependencies outside of standard library.
- Compatible with Python 2 & 3.
- Cross platform with MacOS, Linux, and Windows.
- Data remains in-memory.
- Priority Queueing, both in and out.
- Easily switch from processes to threads.
- Automatic handling of output.
- Rich job details, easily viewed as pandas dataframe.
- Built-in logging to CSV.
- Insightful visualizations of queue operations.
How to get it
Install from the PyPI with:
pip install ezpq
To use the optional plotting features, install pandas
and plotnine
, or run...
pip install ezpq[plot]
Quickstart
Suppose you wanted to speed up the following code, which has 10 inputs that each take exactly 1 second to run, and stores the output...
import random
def random_sleep(x, min=0, max=1):
n = random.randint(min*1000, max*1000) / 1000
time.sleep(n)
return 'Job {} slept {} seconds.'.format(item, round(n, 2))
output = [random_sleep(x, 0, 2) for x in range(10)]
output
The above function should take ~10 seconds to execute; here is the output:
> Runtime: 11.526
['Job 0 slept 1.29 seconds.',
'Job 1 slept 1.04 seconds.',
'Job 2 slept 1.31 seconds.',
'Job 3 slept 1.79 seconds.',
...
Here is the same function, decorated with @ezpq.Queue()
, which will run the function 10 times in parallel with 5 workers:
@ezpq.Queue(n_workers=5)
def random_sleep(x, min=0, max=1):
n = random.randint(min*1000, max*1000) / 1000
time.sleep(n)
return 'Job {} slept {} seconds.'.format(x, round(n, 2))
output = random_sleep(items=range(10), min=0, max=2)
output
The concurrency of 5 workers reduced the runtime by more than 2/3rds. In addition, the output provides rich details about each job.
> Runtime: 2.78
[{'id': 1,
'name': 1,
'priority': 100,
'timeout': 0,
'function': '<function random_sleep at 0x11bfd7d90>',
'args': [0],
'kwargs': {'min': 0, 'max': 2},
'submitted': datetime.datetime(2018, 12, 11, 1, 39, 36, 681870),
'started': datetime.datetime(2018, 12, 11, 1, 39, 36, 684734),
'ended': datetime.datetime(2018, 12, 11, 1, 39, 36, 930090),
'processed': datetime.datetime(2018, 12, 11, 1, 39, 36, 995742),
'exitcode': 0,
'cancelled': False,
'runtime': 0.24535608291625977,
'output': 'Job 0 slept 0.24 seconds.',
'callback': None},
{'id': 2,
'name': 2,
'priority': 100,
'timeout': 0,
'function': '<function random_sleep at 0x11bfd7d90>',
'args': [1],
'kwargs': {'min': 0, 'max': 2},
'submitted': datetime.datetime(2018, 12, 11, 1, 39, 36, 689686),
'started': datetime.datetime(2018, 12, 11, 1, 39, 36, 692901),
'ended': datetime.datetime(2018, 12, 11, 1, 39, 37, 210175),
'processed': datetime.datetime(2018, 12, 11, 1, 39, 37, 308434),
'exitcode': 0,
'cancelled': False,
'runtime': 0.5172739028930664,
'output': 'Job 1 slept 0.51 seconds.',
'callback': None},
...
The resulting list above can easily be viewed as a pandas dataframe:
df = pd.DataFrame(output)
df[['name', 'output', 'runtime']].head()
name | output | runtime |
---|---|---|
1 | Job 0 slept 0.24 seconds. | 0.24535608291625977 |
2 | Job 1 slept 0.51 seconds. | 0.5172739028930664 |
3 | Job 2 slept 0.72 seconds. | 0.7306220531463623 |
4 | Job 3 slept 0.79 seconds. | 0.8023269176483154 |
5 | Job 4 slept 0.29 seconds. | 0.3013160228729248 |
The class ezpq.Plot
uses data returned from an ezpq.Queue
to produce a Gannt chart of the job runtimes. This feature introduces the only non-standard dependencies; you will need pandas
for the data reshaping and plotnine
for the visualizations.
plt = ezpq.Plot(output).build()
plt.save('docs/imgs/quickplot.png')
plt
There is a context manager interface available as well, which can allow for more versatility over the decorator approach:
with ezpq.Queue(n_workers=5) as Q:
for i in range(10):
Q.put(random_sleep, args=[0, 2])
Q.wait()
output = Q.collect()
ezpq.Queue
The Queue
class implements the queueing system, which is itself a 3-part system composed of the:
- waiting queue
- working table
- completed queue
The queueing system can be instantiated with the following parameters:
Q = ezpq.Queue(job_runner=Process, # either a Process (default) or Thread function.
n_workers=3, # the max number of concurrent workers (processes/threads).
max_size=0, # the max combined size all items in the queue (0=unlimited).
auto_remove=False, # Should completed jobs (and their output) be kept, or discarded?
auto_start=True, # Should the queue start processing jobs upon instantiation?
auto_stop=False, # Should the queue stop after all submitted jobs are processed?
callback=None, # optional (synchronous) function to run after job completion.
log_file=None, # optional path of a CSV file used to output logs.
poll=0.1) # repetition period, in seconds, for the Queue operations.
put
The put()
method puts a job into the Queue. put()
accepts a function and optional arguments:
Q.put(function, # the function to call
args=None, # positional arguments, as a list or tuple.
kwargs=None, # kwargs, as a dict.
name=None, # optional name. If blank, name is id.
priority=100, # optional priority (default=100). Lesser values are higher priority.
timeout=0) # optional timeout, in seconds (0=unlimited).
The only required parameter is function
. The parameters args
and kwargs
size
size()
will return a count of all items across all three queue components. It accepts three boolean parameters, waiting
, working
, and completed
. If all of these are False
(default), all jobs are counted. If any combination of these is True
, only the corresponding queue(s) will be counted. For example, I define this function to print the sizes of the entire Queue system, and each queue component:
def print_sizes(Q):
msg = 'Total: {0}; Waiting: {1}; Working: {2}; Completed: {3}'.format(
Q.size(),
Q.size(waiting=True),
Q.size(working=True),
Q.size(completed=True)
)
print(msg)
Instantiate ezpq.Queue
and call the above function to watch the sizes change:
with ezpq.Queue() as Q:
# enqueue jobs
for x in range(10):
Q.put(random_sleep, args=[x, 0, 2])
# repeatedly print sizes until complete.
while Q.size(waiting=True, working=True) > 0:
print_sizes(Q)
Q.wait(n=1)
print_sizes(Q)
Total: 10; Waiting: 7; Working: 3; Completed: 0
Total: 10; Waiting: 6; Working: 3; Completed: 1
Total: 10; Waiting: 5; Working: 3; Completed: 2
Total: 10; Waiting: 4; Working: 3; Completed: 3
Total: 10; Waiting: 3; Working: 3; Completed: 4
Total: 10; Waiting: 2; Working: 3; Completed: 5
Total: 10; Waiting: 1; Working: 3; Completed: 6
Total: 10; Waiting: 0; Working: 3; Completed: 7
Total: 10; Waiting: 0; Working: 2; Completed: 8
Total: 10; Waiting: 0; Working: 1; Completed: 9
Total: 10; Waiting: 0; Working: 0; Completed: 10
wait
The wait()
method will block execution until some jobs complete. It accepts a parameter n
, which specifies how many jobs to wait for. When this value is 0 (default) or less, wait()
will wait for all submitted jobs to complete. It also accepts a timeout
parameter which will break out of the call to wait()
when exceeded. The return value is the number of jobs that did not complete within the timeout. This value should be 0; anything greater indicates the timeout as reached. The parameter poll
can be used to adjust how frequently (in seconds) the operation checks for completed jobs (default=0.1).
with ezpq.Queue() as Q:
# enqueue jobs
for x in range(10):
Q.put(random_sleep, args=[x, 0, 2])
# repeatedly wait 1 second for jobs to finish until n_remaining == 0.
n_remaining = Q.wait(timeout=1)
while n_remaining > 0:
print('Timeout reached; {0} jobs remain.'.format(n_remaining))
n_remaining = Q.wait(timeout=1)
print('{0} jobs remain.'.format(n_remaining))
Timeout reached; 7 jobs remain.
Timeout reached; 5 jobs remain.
Timeout reached; 3 jobs remain.
Timeout reached; 1 jobs remain.
0 jobs remain.
get
get()
will retrieve and delete ("pop") the highest priority job from the completed queue, if one is available. If the completed queue is empty, get()
will return None
. However, like wait()
, get()
has parameters for poll
and timeout
. If the timeout is exceeded, None
is returned.
with ezpq.Queue() as Q:
# enqueue jobs
for x in range(10):
Q.put(random_sleep, args=[x, 0, 2])
# repeatedly `get()` jobs until Q.size() == 0.
output = list()
while Q.size() > 0:
job = Q.get(timeout=0.5)
if job is None:
print('Timeout reached; no output available; {0} jobs remain.'.format(Q.size()))
else:
output.append(job)
print('{0} jobs remain in queue.'.format(Q.size()))
print('Output list size: {0}'.format(len(output)))
Timeout reached; no output available; 10 jobs remain.
Timeout reached; no output available; 7 jobs remain.
Timeout reached; no output available; 5 jobs remain.
Timeout reached; no output available; 2 jobs remain.
0 jobs remain in queue.
Output list size: 10
collect
collect()
is similar to get()
, but it will return a list of all completed jobs and clear the completed queue. It does not support the poll
or timeout
parameters, but you can call wait()
before collect()
if necessary.
with ezpq.Queue(n_workers=5) as Q:
# enqueue jobs
for x in range(10):
Q.put(random_sleep, args=[x, 0, 2])
# wait and collect all jobs
print('Queue size before: {0}'.format(Q.size()))
Q.wait()
output = Q.collect()
print('Queue size after: {0}'.format(Q.size()))
print('Output size: {0}'.format(len(output)))
Queue size before: 10
Queue size after: 0
Output size: 10
dispose
The queueing operations performed by ezpq.Queue
are performed on a periodic basis. By default, the poll
parameter for a Queue is 0.1
, or 100 ms., or ten times a second. This "pulse" will continue until the Queue is disposed of.
In the previous examples, we have not concerned ourselves with disposing of the ezpq.Queue
, as it is done automatically with the context manager (with ezpq.Queue() as Q...
). However, if you choose not to use the context manager or decorator interfaces, you must clean up after yourself.
logger.setLevel(log.DEBUG)
Q = ezpq.Queue() # by default, n_workers = # of cpu cores (mp.cpu_count()).
# enqueue jobs
for x in range(10):
Q.put(random_sleep, args=[x, 0, 2])
# wait and dispose
Q.wait()
Q.dispose()
The example above also enables debugging logs, which produces:
DEBUG: Initialized queue with 8 workers.
DEBUG: Initialized pulse.
DEBUG: Started job '1'
DEBUG: Started job '2'
DEBUG: Started job '3'
DEBUG: Queued job: '4'
DEBUG: Queued job: '5'
DEBUG: Queued job: '6'
DEBUG: Queued job: '7'
DEBUG: Queued job: '8'
DEBUG: Queued job: '9'
DEBUG: Queued job: '10'
DEBUG: Completed job: '1'
DEBUG: Started job '4'
...
DEBUG: Completed job: '8'
DEBUG: Completed job: '9'
DEBUG: Completed job: '10'
DEBUG: Removed jobs.
DEBUG: Removed output.
DEBUG: Removed completed.
DEBUG: Reset counters.
DEBUG: Stopped pulse.
Decorator
Now that you have a understanding of the basic operations, recall the first example which makes use of the ezpq.Queue()
as a decorator.
@ezpq.Queue(n_workers=5)
def random_sleep(x, min=0, max=1):
n = random.randint(min*1000, max*1000) / 1000
time.sleep(n)
return 'Job {} slept {} seconds.'.format(x, round(n, 2))
output = random_sleep(items=range(10), min=0, max=2)
By using the decorator, each call to random_sleep(items=[...], ...)
is actually running this:
for x in items:
Q.put(function=fun, args=[x]+list(args), kwargs=kwargs)
Q.wait()
job_data = Q.collect()
Q.dispose()
return job_data
Some notes on using the decorator interface:
- The decorated function (in this case,
random_sleep
), must accept at least one parameter that will be used to store the next item from an iterable (in this case,x
). - When calling the function, you must specify the list of items to iterate over as the first argument (or as a named arg with
items=[...]
).
ezpq.Plot
The Plot
class is used to visualize the wait, start, and end times for each job that entered the queueing system. The class accepts a list of the job_data; exactly what is returned from a call to collect()
. It also allows for a title
and color_by
parameter. color_by
accepts any of 'priority', 'cancelled', 'exitcode', 'name', or 'output', and will color code the bar accordingly.
Below, the left plot is a serial, one-by-one operation; the right plot uses an ezpq.Queue
queueing system with 10 workers.
plt = ezpq.Plot(output).build()
#plt.save('...')
plt
Each horizontal bar represents an independent job id. The start of the gray bar indicates when the job entered the queuing system. The start of the colored bar indicates when the job started running, and when it ended. The gray bar that follows (if any) reflects how long it took for the queue operations to recognize the finished job, join the job data with its output, remove it from the working table, and place it in the completed queue.
Let's implement a queueing system that takes advantage of the priority
feature, then visualize the results. Here, numbers that are divisible by 5 have the greatest priority.
with ezpq.Queue() as Q:
# enqueue with priority
for x in range(100):
Q.put(random_sleep, args=[x, 0, 2],
priority=int((x+1) % 5 != 0)) # 0 is higher priority than 1.
# wait and collect
Q.wait()
output = Q.collect()
plt = ezpq.Plot(output, color_by='priority',
title='Lower priority value takes precedence.').build()
plt
Notebook of Examples
Many more examples can be found in docs/examples.ipynb.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file ezpq-0.1.0.tar.gz
.
File metadata
- Download URL: ezpq-0.1.0.tar.gz
- Upload date:
- Size: 20.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.12.1 pkginfo/1.4.2 requests/2.19.1 setuptools/39.0.1 requests-toolbelt/0.8.0 tqdm/4.28.1 CPython/3.6.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 08030106d7b1164539e986ad520e453416c3d485e17f763c94c184037e395b2f |
|
MD5 | 3b119e180dd96678f74c22af73d2a60f |
|
BLAKE2b-256 | 29caa3bd2d18b53f1308a4bd66befb0483d1cb38de08e9c02eaf5134cddf69c2 |
File details
Details for the file ezpq-0.1.0-py3-none-any.whl
.
File metadata
- Download URL: ezpq-0.1.0-py3-none-any.whl
- Upload date:
- Size: 14.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.12.1 pkginfo/1.4.2 requests/2.19.1 setuptools/39.0.1 requests-toolbelt/0.8.0 tqdm/4.28.1 CPython/3.6.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2f8deb951c931f2e63b28c4026e7a0df8c017b8635d750826982b2982a645445 |
|
MD5 | 5ddb14b822e93578639bbcb0a6065bdb |
|
BLAKE2b-256 | e1cf53015301e32c71dd894b12c2f9796d45e93eb189b006904843c02686c0f2 |