an easy parallel queueing system

Project description

ezpq: an easy parallel queueing system.


I often rely on concurrent processing to perform many independent, long-running jobs quickly. Each time, I find myself wiring up functions to iterate over input, handle output, errors, time jobs, and all that's involved in practical parallel processing. I explored other options, but have yet to encounter something simple, versatile, and feature rich. With this project, I sought to create a Python package to encapsulate the logic required in efficiently managing a parallel queue. The result is a simple, customizable wrapper around both threading.Thread and multriprocessing.Process that provides all of the essential functions of an efficient parallel queueing system, and then some.


ezpq implements a parallel queueing system consisting of:

  1. a priority "waiting" queue in.
  2. a lookup table of "working" jobs.
  3. a priority "completed" queue out.

Note that this is not a traditonal first-in/first-out (FIFO) queue. Given the parallel nature, this queueing system is first-in/first-completed-out. In addition, the priority of an individual job can be adjusted to ensure it gets processed and retrieved before other lesser-priority jobs.


  • Simple interface; pure Python.
  • No required dependencies outside of standard library.
  • Compatible with Python 2 & 3.
  • Cross platform with MacOS, Linux, and Windows.
  • Data remains in-memory.
  • Priority Queueing, both in and out.
  • Easily switch from processes to threads.
  • Automatic handling of output.
  • Rich job details, easily viewed as pandas dataframe.
  • Built-in logging to CSV.
  • Insightful visualizations of queue operations.

How to get it

Install from the PyPI with:

pip install ezpq

To use the optional plotting features, install pandas and plotnine, or run...

pip install ezpq[plot]


Suppose you wanted to speed up the following code, which has 10 inputs that each take exactly 1 second to run, and stores the output...

import random

def random_sleep(x, min=0, max=1):
    n = random.randint(min*1000, max*1000) / 1000
    return 'Job {} slept {} seconds.'.format(item, round(n, 2))

output = [random_sleep(x, 0, 2) for x in range(10)]


The above function should take ~10 seconds to execute; here is the output:

> Runtime: 11.526

['Job 0 slept 1.29 seconds.',
 'Job 1 slept 1.04 seconds.',
 'Job 2 slept 1.31 seconds.',
 'Job 3 slept 1.79 seconds.',

Here is the same function, decorated with @ezpq.Queue(), which will run the function 10 times in parallel with 5 workers:

def random_sleep(x, min=0, max=1):
    n = random.randint(min*1000, max*1000) / 1000
    return 'Job {} slept {} seconds.'.format(x, round(n, 2))

output = random_sleep(items=range(10), min=0, max=2)


The concurrency of 5 workers reduced the runtime by more than 2/3rds. In addition, the output provides rich details about each job.

> Runtime: 2.78

[{'id': 1,
  'name': 1,
  'priority': 100,
  'timeout': 0,
  'function': '<function random_sleep at 0x11bfd7d90>',
  'args': [0],
  'kwargs': {'min': 0, 'max': 2},
  'submitted': datetime.datetime(2018, 12, 11, 1, 39, 36, 681870),
  'started': datetime.datetime(2018, 12, 11, 1, 39, 36, 684734),
  'ended': datetime.datetime(2018, 12, 11, 1, 39, 36, 930090),
  'processed': datetime.datetime(2018, 12, 11, 1, 39, 36, 995742),
  'exitcode': 0,
  'cancelled': False,
  'runtime': 0.24535608291625977,
  'output': 'Job 0 slept 0.24 seconds.',
  'callback': None},
 {'id': 2,
  'name': 2,
  'priority': 100,
  'timeout': 0,
  'function': '<function random_sleep at 0x11bfd7d90>',
  'args': [1],
  'kwargs': {'min': 0, 'max': 2},
  'submitted': datetime.datetime(2018, 12, 11, 1, 39, 36, 689686),
  'started': datetime.datetime(2018, 12, 11, 1, 39, 36, 692901),
  'ended': datetime.datetime(2018, 12, 11, 1, 39, 37, 210175),
  'processed': datetime.datetime(2018, 12, 11, 1, 39, 37, 308434),
  'exitcode': 0,
  'cancelled': False,
  'runtime': 0.5172739028930664,
  'output': 'Job 1 slept 0.51 seconds.',
  'callback': None},

The resulting list above can easily be viewed as a pandas dataframe:

df = pd.DataFrame(output)

df[['name', 'output', 'runtime']].head()
name output runtime
1 Job 0 slept 0.24 seconds. 0.24535608291625977
2 Job 1 slept 0.51 seconds. 0.5172739028930664
3 Job 2 slept 0.72 seconds. 0.7306220531463623
4 Job 3 slept 0.79 seconds. 0.8023269176483154
5 Job 4 slept 0.29 seconds. 0.3013160228729248

The class ezpq.Plot uses data returned from an ezpq.Queue to produce a Gannt chart of the job runtimes. This feature introduces the only non-standard dependencies; you will need pandas for the data reshaping and plotnine for the visualizations.

plt = ezpq.Plot(output).build()'docs/imgs/quickplot.png')

There is a context manager interface available as well, which can allow for more versatility over the decorator approach:

with ezpq.Queue(n_workers=5) as Q:
    for i in range(10):
        Q.put(random_sleep, args=[0, 2])
    output = Q.collect()


The Queue class implements the queueing system, which is itself a 3-part system composed of the:

  1. waiting queue
  2. working table
  3. completed queue

The queueing system can be instantiated with the following parameters:

Q = ezpq.Queue(job_runner=Process, # either a Process (default) or Thread function.
               n_workers=3,        # the max number of concurrent workers (processes/threads).
               max_size=0,         # the max combined size all items in the queue (0=unlimited).
               auto_remove=False,  # Should completed jobs (and their output) be kept, or discarded?
               auto_start=True,    # Should the queue start processing jobs upon instantiation?
               auto_stop=False,    # Should the queue stop after all submitted jobs are processed?
               callback=None,      # optional (synchronous) function to run after job completion.
               log_file=None,      # optional path of a CSV file used to output logs.
               poll=0.1)           # repetition period, in seconds, for the Queue operations.


The put() method puts a job into the Queue. put() accepts a function and optional arguments:

Q.put(function,     # the function to call
      args=None,    # positional arguments, as a list or tuple.
      kwargs=None,  # kwargs, as a dict.
      name=None,    # optional name. If blank, name is id.
      priority=100, # optional priority (default=100). Lesser values are higher priority.
      timeout=0)    # optional timeout, in seconds (0=unlimited).

The only required parameter is function. The parameters args and kwargs


size() will return a count of all items across all three queue components. It accepts three boolean parameters, waiting, working, and completed. If all of these are False (default), all jobs are counted. If any combination of these is True, only the corresponding queue(s) will be counted. For example, I define this function to print the sizes of the entire Queue system, and each queue component:

def print_sizes(Q):
    msg = 'Total: {0}; Waiting: {1}; Working: {2}; Completed: {3}'.format(

Instantiate ezpq.Queue and call the above function to watch the sizes change:

with ezpq.Queue() as Q:
    # enqueue jobs
    for x in range(10):
        Q.put(random_sleep, args=[x, 0, 2])

    # repeatedly print sizes until complete.
    while Q.size(waiting=True, working=True) > 0:

Total: 10; Waiting: 7; Working: 3; Completed: 0
Total: 10; Waiting: 6; Working: 3; Completed: 1
Total: 10; Waiting: 5; Working: 3; Completed: 2
Total: 10; Waiting: 4; Working: 3; Completed: 3
Total: 10; Waiting: 3; Working: 3; Completed: 4
Total: 10; Waiting: 2; Working: 3; Completed: 5
Total: 10; Waiting: 1; Working: 3; Completed: 6
Total: 10; Waiting: 0; Working: 3; Completed: 7
Total: 10; Waiting: 0; Working: 2; Completed: 8
Total: 10; Waiting: 0; Working: 1; Completed: 9
Total: 10; Waiting: 0; Working: 0; Completed: 10


The wait() method will block execution until some jobs complete. It accepts a parameter n, which specifies how many jobs to wait for. When this value is 0 (default) or less, wait() will wait for all submitted jobs to complete. It also accepts a timeout parameter which will break out of the call to wait() when exceeded. The return value is the number of jobs that did not complete within the timeout. This value should be 0; anything greater indicates the timeout as reached. The parameter poll can be used to adjust how frequently (in seconds) the operation checks for completed jobs (default=0.1).

with ezpq.Queue() as Q:
    # enqueue jobs
    for x in range(10):
        Q.put(random_sleep, args=[x, 0, 2])

    # repeatedly wait 1 second for jobs to finish until n_remaining == 0.
    n_remaining = Q.wait(timeout=1)
    while n_remaining > 0:
        print('Timeout reached; {0} jobs remain.'.format(n_remaining))
        n_remaining = Q.wait(timeout=1)

    print('{0} jobs remain.'.format(n_remaining))
Timeout reached; 7 jobs remain.
Timeout reached; 5 jobs remain.
Timeout reached; 3 jobs remain.
Timeout reached; 1 jobs remain.
0 jobs remain.


get() will retrieve and delete ("pop") the highest priority job from the completed queue, if one is available. If the completed queue is empty, get() will return None. However, like wait(), get() has parameters for poll and timeout. If the timeout is exceeded, None is returned.

with ezpq.Queue() as Q:
    # enqueue jobs
    for x in range(10):
        Q.put(random_sleep, args=[x, 0, 2])

    # repeatedly `get()` jobs until Q.size() == 0.
    output = list()

    while Q.size() > 0:
        job = Q.get(timeout=0.5)
        if job is None:
            print('Timeout reached; no output available; {0} jobs remain.'.format(Q.size()))

    print('{0} jobs remain in queue.'.format(Q.size()))
    print('Output list size: {0}'.format(len(output)))
Timeout reached; no output available; 10 jobs remain.
Timeout reached; no output available; 7 jobs remain.
Timeout reached; no output available; 5 jobs remain.
Timeout reached; no output available; 2 jobs remain.
0 jobs remain in queue.
Output list size: 10


collect() is similar to get(), but it will return a list of all completed jobs and clear the completed queue. It does not support the poll or timeout parameters, but you can call wait() before collect() if necessary.

with ezpq.Queue(n_workers=5) as Q:
  # enqueue jobs
    for x in range(10):
        Q.put(random_sleep, args=[x, 0, 2])

    # wait and collect all jobs
    print('Queue size before: {0}'.format(Q.size()))

    output = Q.collect()

    print('Queue size after: {0}'.format(Q.size()))
    print('Output size: {0}'.format(len(output)))
Queue size before: 10
Queue size after: 0
Output size: 10


The queueing operations performed by ezpq.Queue are performed on a periodic basis. By default, the poll parameter for a Queue is 0.1, or 100 ms., or ten times a second. This "pulse" will continue until the Queue is disposed of.

In the previous examples, we have not concerned ourselves with disposing of the ezpq.Queue, as it is done automatically with the context manager (with ezpq.Queue() as Q...). However, if you choose not to use the context manager or decorator interfaces, you must clean up after yourself.


Q = ezpq.Queue() # by default, n_workers = # of cpu cores (mp.cpu_count()).

# enqueue jobs
for x in range(10):
    Q.put(random_sleep, args=[x, 0, 2])

# wait and dispose

The example above also enables debugging logs, which produces:

DEBUG: Initialized queue with 8 workers.
DEBUG: Initialized pulse.
DEBUG: Started job '1'
DEBUG: Started job '2'
DEBUG: Started job '3'
DEBUG: Queued job: '4'
DEBUG: Queued job: '5'
DEBUG: Queued job: '6'
DEBUG: Queued job: '7'
DEBUG: Queued job: '8'
DEBUG: Queued job: '9'
DEBUG: Queued job: '10'
DEBUG: Completed job: '1'
DEBUG: Started job '4'
DEBUG: Completed job: '8'
DEBUG: Completed job: '9'
DEBUG: Completed job: '10'
DEBUG: Removed jobs.
DEBUG: Removed output.
DEBUG: Removed completed.
DEBUG: Reset counters.
DEBUG: Stopped pulse.


Now that you have a understanding of the basic operations, recall the first example which makes use of the ezpq.Queue() as a decorator.

def random_sleep(x, min=0, max=1):
    n = random.randint(min*1000, max*1000) / 1000
    return 'Job {} slept {} seconds.'.format(x, round(n, 2))

output = random_sleep(items=range(10), min=0, max=2)

By using the decorator, each call to random_sleep(items=[...], ...) is actually running this:

for x in items:
    Q.put(function=fun, args=[x]+list(args), kwargs=kwargs)
job_data = Q.collect()
return job_data

Some notes on using the decorator interface:

  1. The decorated function (in this case, random_sleep), must accept at least one parameter that will be used to store the next item from an iterable (in this case, x).
  2. When calling the function, you must specify the list of items to iterate over as the first argument (or as a named arg with items=[...]).


The Plot class is used to visualize the wait, start, and end times for each job that entered the queueing system. The class accepts a list of the job_data; exactly what is returned from a call to collect(). It also allows for a title and color_by parameter. color_by accepts any of 'priority', 'cancelled', 'exitcode', 'name', or 'output', and will color code the bar accordingly.

Below, the left plot is a serial, one-by-one operation; the right plot uses an ezpq.Queue queueing system with 10 workers.

plt = ezpq.Plot(output).build()'...')


Each horizontal bar represents an independent job id. The start of the gray bar indicates when the job entered the queuing system. The start of the colored bar indicates when the job started running, and when it ended. The gray bar that follows (if any) reflects how long it took for the queue operations to recognize the finished job, join the job data with its output, remove it from the working table, and place it in the completed queue.

Let's implement a queueing system that takes advantage of the priority feature, then visualize the results. Here, numbers that are divisible by 5 have the greatest priority.

with ezpq.Queue() as Q:
    # enqueue with priority
    for x in range(100):
        Q.put(random_sleep, args=[x, 0, 2],
            priority=int((x+1) % 5 != 0)) # 0 is higher priority than 1.

    # wait and collect
    output = Q.collect()

plt = ezpq.Plot(output, color_by='priority',
                title='Lower priority value takes precedence.').build()


Notebook of Examples

Many more examples can be found in docs/examples.ipynb.

