Skip to main content

an easy parallel queueing system

Project description

Read this on GitHub or my site.

How to get it

Install from PyPI with:

pip install ezpq

Optional packages:

pip install pandas    # required for plots
pip install plotnine  # required for plots
pip install tqdm      # required for progress bars

Overview

ezpq implements a parallel queueing system consisting of:

  1. a priority “waiting” queue in.

  2. a lookup table of “working” jobs.

  3. a priority “completed” queue out.

The queueing system uses multiprocessing.Process by default and can also run jobs with threading.Thread.

image0

Features

  • Simple interface; pure Python.

  • No required dependencies outside of standard library.

  • Optional integration with `tqdm <https://github.com/tqdm/tqdm>`__ progress bars.

  • Compatible with Python 2 & 3.

  • Cross platform with MacOS, Linux, and Windows.

  • Data remains in-memory.

  • Priority Queueing, both in and out and within lanes.

  • Synchronous lanes allow dependent jobs to execute in the desired order.

  • Easily switch from processes to threads.

  • Automatic handling of output.

  • Rich job details, easily viewed as pandas dataframe.

  • Built-in logging to CSV.

  • Customizable visualizations of queue operations.

Quickstart

Suppose you wanted to speed up the following code, which runs 60 operations that take anywhere from 0s to 2s. With an average job time of ~1s, this operation should take ~60s.

import time
import random
def random_sleep(x):
  random.seed(x)
  n = random.uniform(0.5, 1.5)
  time.sleep(n)
  return n
start = time.time()
output = [random_sleep(x) for x in range(60)]
end = time.time()
print('> Runtime: ' + str(end - start))
## '> Runtime: 58.932034969329834'

Here is the function ran in parallel with an ezpq Queue of 6 workers. Thus, the runtime of the above operation will be reduced from ~60s to ~10s.

import time
import random
import ezpq
start = time.time()
with ezpq.Queue(6) as Q:
  output = Q.map(random_sleep, range(60))
end = time.time()
print('> Runtime: ' + str(end - start))

Here is the same scenario, using the @ezpq.Queue decorator.

@ezpq.Queue(6)
def random_sleep(x):
    random.seed(x)
    n = random.uniform(0.5, 1.5)
    time.sleep(n)
    return n
output = random_sleep(iterable=range(60))

While map() and the decorator are useful for quick-n-simple parallization, the essential functions of an ezpq Queue include put(), wait(), and get() (or collect()).

with ezpq.Queue(6) as Q:
    for x in range(60):
        Q.put(random_sleep, args=x)
    Q.wait()
    output = Q.collect()

The output is a list of dicts containing verbose information about each job, along with its output, and exit code.

print( output[0] )
## {'args': [0],
##  'callback': None,
##  'cancelled': False,
##  'ended': datetime.datetime(2019, 3, 13, 0, 48, 52, 811248),
##  'exception': None,
##  'exitcode': 0,
##  'function': 'random_sleep',
##  'id': 1,
##  'kwargs': None,
##  'lane': None,
##  'name': 1,
##  'output': 1.3444218515250481,
##  'priority': 100,
##  'processed': datetime.datetime(2019, 3, 13, 0, 48, 52, 867387),
##  'qid': '13318d36',
##  'runtime': 1.3500409126281738,
##  'started': datetime.datetime(2019, 3, 13, 0, 48, 51, 461207),
##  'submitted': datetime.datetime(2019, 3, 13, 0, 48, 51, 357405),
##  'timeout': 0}

Easily convert output to a pandas dataframe:

import pandas as pd
df = pd.DataFrame(output)
print( df.head()[['id', 'output', 'runtime', 'exitcode']] )
##    id    output   runtime  exitcode
## 0   1  1.344422  1.350041         0
## 1   2  0.634364  0.638938         0
## 2   3  1.456034  1.459830         0
## 3   4  0.737965  0.741742         0
## 4   5  0.736048  0.739848         0

Use ezpq.Plot to generate a Gannt chart of the job timings.

plt = ezpq.Plot(output).build(show_legend=False)
plt.save('docs/imgs/quickstart.png')

image1

ezpq.Queue

The Queue class implements the queueing system, which is itself a 3-part system composed of the:

  1. waiting queue

  2. working table

  3. completed queue

    <!– –>

## Help on function __init__ in module ezpq.Queue:
##
## __init__(self, n_workers=8, max_size=0, job_runner=<class 'multiprocessing.context.Process'>, auto_remove=False, auto_start=True, auto_stop=False, callback=None, log_file=None, poll=0.1, show_progress=False, qid=None)
##     Implements a parallel queueing system.
##
##     Args:
##         n_workers: the max number of concurrent jobs.
##             - Accepts: int
##             - Default: cpu_count()
##         max_size: when > 0, will throw an exception the number of enqueued jobs exceeds this value. Otherwise, no limit.
##             - Accepts: int
##             - Default: 0 (unlimited)
##         job_runner: the class to use to invoke new jobs.
##             - Accepts: multiprocessing.Process, threading.Thread
##             - Default: multiprocessing.Process
##         auto_remove: controls whether jobs are discarded of after completion.
##             - Accepts: bool
##             - Default: False
##         auto_start: controls whether the queue system "pulse" is started upon instantiation (default), or manually.
##             - Accepts: bool
##             - Default: True
##         auto_stop: controls whether the queue system "pulse" stops itself after all jobs are complete.
##             - Accepts: bool
##             - Default: False
##         callback: optional function to execute synchronously immediately after a job completes.
##             - Accepts: function object
##             - Default: None
##         log_file: if file path is specified, job data is written to this path in CSV format.
##             - Accepts: str
##             - Default: None
##         poll: controls the pulse frequency; the amount of time slept between operations.
##             - Accepts: float
##             - Default: 0.1
##
##     Returns:
##         ezpq.Queue object.
##
## None

ezpq.Job

A ezpq job defines the function to run. It is passed to an ezpq queue with a call to submit().

## Help on function __init__ in module ezpq.Job:
##
## __init__(self, function, args=None, kwargs=None, name=None, priority=100, lane=None, timeout=0, suppress_errors=False, stop_on_lane_error=False)
##     Defines what to run within a `ezpq.Queue`, and how to run it.
##
##     Args:
##         function: the function to run.
##             - Accepts: function object
##         args: optional positional arguments to pass to the function.
##             - Accepts: list, tuple
##             - Default: None
##         kwargs: optional keyword arguments to pass to the function.
##             - Accepts: dict
##             - Default: None
##         name: optional name to give to the job. Does not have to be unique.
##             - Accepts: str
##             - Default: None; assumes same name as job id.
##         priority: priority value to assign. Lower values get processed sooner.
##             - Accepts: int
##             - Default: 100
##         lane: a sequential lane to place the job in. if it does not already exist, it will be created.
##             - Accepts: int, str; any hashable object
##             - Default: None; no lanes.
##         timeout: When > 0, if this value (in seconds) is exceeded, the job is terminated. Otherwise, no limit enforced.
##             - Accepts: float
##             - Default: 0 (unlimited)
##
##     Returns:
##         ezpq.Job object
##
## None
with ezpq.Queue(6) as Q:
  for x in range(60):
    priority = x % 2 # give even numbers higher priority.
    job = ezpq.Job(random_sleep, args=x, priority=priority)
    Q.submit(job)
  Q.wait()
  output = Q.collect()

image2

put

The put method creates a job and submits it to an ezpq queue. All of its arguments are passed to ezpq.Job().

with ezpq.Queue(6) as Q:
    for x in range(60):
        Q.put(random_sleep, args=x)
    Q.wait()
    output = Q.collect()

size

size() returns a count of all items across all three queue components. It accepts three boolean parameters, waiting, working, and completed. If all of these are False (default), all jobs are counted. If any combination of these is True, only the corresponding queue(s) will be counted. For example:

def print_sizes(Q):
    msg = 'Total: {0}; Waiting: {1}; Working: {2}; Completed: {3}'.format(
        Q.size(),
        Q.size(waiting=True),
        Q.size(working=True),
        Q.size(completed=True)
    )
    print(msg)
with ezpq.Queue(6) as Q:
    # enqueue jobs
    for x in range(60):
        Q.put(random_sleep, x)
    # repeatedly print sizes until complete.
    while Q.size(waiting=True, working=True):
        print_sizes(Q)
        time.sleep(1)
    print_sizes(Q)
## 'Total: 60; Waiting: 60; Working: 0; Completed: 0'
## 'Total: 60; Waiting: 51; Working: 6; Completed: 3'
## 'Total: 60; Waiting: 46; Working: 6; Completed: 8'
## 'Total: 60; Waiting: 39; Working: 6; Completed: 15'
## 'Total: 60; Waiting: 34; Working: 6; Completed: 20'
## 'Total: 60; Waiting: 31; Working: 6; Completed: 23'
## 'Total: 60; Waiting: 24; Working: 6; Completed: 30'
## 'Total: 60; Waiting: 17; Working: 6; Completed: 37'
## 'Total: 60; Waiting: 11; Working: 6; Completed: 43'
## 'Total: 60; Waiting: 6; Working: 6; Completed: 48'
## 'Total: 60; Waiting: 0; Working: 5; Completed: 55'
## 'Total: 60; Waiting: 0; Working: 1; Completed: 59'
## 'Total: 60; Waiting: 0; Working: 0; Completed: 60'

wait

The wait() method will block execution until all jobs complete. It also accepts a timeout parameter, given in seconds. The return value is the count of jobs that did not complete. Thus, a return value greater than 0 indicates the timeout was exceeded. The parameter poll can be used to adjust how frequently (in seconds) the operation checks for completed jobs.

New in v0.2.0, include show_progress=True to show a progress bar while waiting. This is equivalent to a call to waitpb().

image3

get

get() retrieves and deletes (“pop”) the highest priority job from the completed queue, if one is available. If the completed queue is empty, get() returns None. However, get() will wait for a completed job if wait, poll, or timeout are specified. If the timeout is exceeded, None is returned.

with ezpq.Queue(6) as Q:
    n_inputs = 60
    output = [None] * n_inputs
    # enqueue jobs
    for x in range(n_inputs):
        Q.put(random_sleep, args=x)

    # repeatedly `get()` until queue is empty.
    for i in range(n_inputs):
        output[i] = Q.get(wait=True)

collect

collect() is similar to get(), but it will return a list of all completed jobs and clear the completed queue. It does not support the poll or timeout parameters, but you can call wait() before collect() if desired.

with ezpq.Queue(6) as Q:
    # enqueue jobs
    for x in range(60):
        Q.put(random_sleep, x)
    # wait and collect all jobs
    print('Queue size before: {0}'.format(Q.size()))
    Q.wait()
    output = Q.collect()
    print('Queue size after: {0}'.format(Q.size()))
    print('Output size: {0}'.format(len(output)))
## 'Queue size before: 60'
## 'Queue size after: 0'
## 'Output size: 60'

map

map encapsulates the logic of put, wait, and collect in one call. Include show_progress=True to get output tqdm progress bar.

image4

starmap

starmap is similar to map, but operates on a list of lists, with each nested list being unpacked as arguments to the function.

def my_pow(x, k):
    return '{}^{} = {}'.format(x, k, x**k)
# list of lists to iterate over.
args_list = [[x, x%4] # (x, k)
            for x in range(100)]
# starmap
with ezpq.Queue(10) as Q:
    output = Q.starmap(my_pow, iterable=args_list)

[x['output'] for x in output[:10]]

startmapkw

Same as starmap, but operations on a list of dicts to be expanded as kwargs to the function.

def my_pow(x, k):
    return '{}^{} = {}'.format(x, k, x**k)
# list of dicts to iterate over.
kwargs_list = [{ 'x':x, 'k':x%4 } # (x, k)
                for x in range(100)]
# starmapkw
with ezpq.Queue(10) as Q:
    output = Q.starmapkw(my_pow, iterable=kwargs_list)

[x['output'] for x in output[:10]]

dispose

The queueing operations performed by ezpq.Queue are performed on a periodic basis. By default, the poll parameter for a Queue is 0.1 seconds. This “pulse” thread will continue firing until the Queue is disposed of.

In the previous examples, use of the context manager (with ezpq.Queue() as Q:) results in automatic disposal. If not using the context manager (or decorator), clean up after yourself with dispose().

Synchronous Lanes

When you have jobs that are dependent upon another, you can use “lanes” to execute them in sequence. All that is required is an arbitrary lane name/id passed to the lane parameter of put. Empty lanes are automatically removed.

image5

In the above graphic, notice how same-colored bars never overlap. These bars represent jobs that are in the same lane, which executed synchronously.

Lane Error Handling

You may want to short-circuit a synchronous lane if a job in the lane fails. You can do this by specifying stop_on_lane_error=True when putting a job in the queue. If specified and the preceding job has a non-zero exit code, this job will not be run.

def reciprocal(x):
    time.sleep(0.1) # slow things down
    return 1/x      # will throw DivideByZero exception
import random
with ezpq.Queue(6) as Q:
    for i in range(100):
        Q.put(reciprocal, random.randint(0, 10), lane=i%5, suppress_errors=True, stop_on_lane_error=True)
    Q.wait()
    output = Q.collect()
plt = ezpq.Plot(output).build(facet_by='lane', color_by='exitcode', color_pal=['red', 'blue'])
plt.save('docs/imgs/lane_error.png')

image6

ezpq.Plot

The Plot class is used to visualize the wait, start, and end times for each job that entered the queueing system. The class is initialized with a list of dicts; exactly what is returned from a call to collect() or map().

Arguments given to build() control various aspects of the plot, from coloring, to faceting,

## Help on function build in module ezpq.Plot:
##
## build(self, color_by='qid', facet_by='qid', facet_scale='fixed', show_legend=True, bar_width=1, title=None, color_pal=None, theme='bw')
##     Produces a plot based on the data and options provided to a `ezpq.Plot()` object.
##
##     Args:
##         color_by: controls the column to use for coloring the bars.
##             - Accepts: one of 'qid', 'priority', 'lane', 'cancelled', 'exitcode', 'name', 'output'
##             - Default: 'qid'
##         facet_by: controls the column to use for facetting the plot.
##             - Accepts: one of 'qid', 'priority', 'lane', 'cancelled', 'exitcode', 'name', 'output'
##             - Default: 'qid'
##         facet_scale: controls the scale of the x/y axis across facets.
##             - Accepts: one of 'fixed', 'free', 'free_x', 'free_y'
##             - Default: 'fixed'
##         show_legend: controls whether the legend is drawn.
##             - Accepts: bool
##             - Default: True
##         bar_width: controls the bar width
##             - Accepts: float
##             - Default: 1
##         title: optional title to be drawn above the plot.
##             - Accepts: str, None
##             - Default: None
##         theme:
##             - Accepts: 'bw', 'classic', 'gray', 'grey', 'seaborn', '538', 'dark', 'matplotlib', 'minimal', 'xkcd', 'light'
##             - Default: 'bw'
##     Returns:
##         The plot produced from plotnine.ggplot().
##
## None
with ezpq.Queue(6) as Q:
  for x in range(60):
    lane = x % 5
    Q.put(random_sleep, x, timeout=1, lane=lane)
  Q.wait()
  output = Q.collect()
plt = ezpq.Plot(output).build(facet_by='lane', show_legend=False)
plt.save('docs/imgs/lanes2.png')

image7

Each horizontal bar represents an independent job id. The start of the gray bar indicates when the job entered the queuing system. The start of the colored bar indicates when the job started running, and when it ended. The gray bar that follows (if any) reflects how long it took for the queue operations to recognize the finished job, join the job data with its output, remove it from the working table, and place it in the completed queue.

More Examples

Many more examples can be found in docs/examples.ipynb.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ezpq-0.2.2.tar.gz (24.1 kB view hashes)

Uploaded Source

Built Distribution

ezpq-0.2.2-py3-none-any.whl (18.4 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page