Skip to main content

Simple wrapper for python multiprocessing.

Project description

ezq

Simple wrappers for python multiprocessing.

Build Status ezq on PyPI

Changelog - Issues - Documentation

Why?

Even though multiprocessing has Pool and Queue, it's surprisingly difficult to get started to do slightly more complex workflows. ezq makes it easy to connect subprocesses (workers) using queues.

Install

pip install ezq

Example: Sum Messages

Here's a simple example of a worker that reads from an input queue, sums up the messages, and puts the result on an output queue.

import ezq


def worker(in_q, out_q):
    """Add up all the messges."""
    count = 0
    for msg in ezq.iter_msg(in_q):
        # you could check `msg.kind` if there's different kinds of work
        count += msg.data

    # when `in_q` is done, put the result on `out_q`
    ezq.put_msg(out_q, data=count)


def main():
    """Run several workers."""
    in_q = ezq.Queue()  # to send work
    out_q = ezq.Queue()  # to get results

    workers = [ezq.run(worker, in_q, out_q) for _ in range(ezq.NUM_CPUS)]
    # workers started

    for i in range(1000):
        ezq.put_msg(in_q, data=i)  # send work

    ezq.endq_and_wait(in_q, workers)
    # all workers are done

    result = sum(msg.data for msg in ezq.iter_q(out_q))
    assert result == sum(x for x in range(1000))
    print(result)


if __name__ == "__main__":
    main()

Typical worker lifecycle

A worker is just a function

In general, there's nothing special about a worker function, but note:

  • All arguments are passed through pickle first (see below).

  • We don't currently do anything with the return value of this function. You'll need an output queue to return data back to the main process.

Create workers

In the main process, create workers using ezq.run which takes a function and any additional parameters. Note that workers cannot create additional workers.

Send data

Once you've started the workers, you send them data by calling with ezq.put_msg which creates ezq.Msg objects and puts them in the queue. There are three attributes that are sent (all optional):

  • kind - a string that indicates what kind of message it is. You can use this to send multiple kinds of work to the same worker. Note that the special END kind is used to indicate the end of a queue (that's what ezq.endq sends).

  • data - anything that can be pickled. This is the data you want the worker to work on.

  • order - an integer that indicates the message order. This can help you reorder results or ensure that messages from a queue are read in a particular order (that's what ezq.sortiter uses).

Beware pickle

All parameters sent to workers in ezq.run and any values put in queues using ezq.put_msg are first passed to pickle by multiprocessing so anything that cannot be pickled (e.g., database connection) cannot be passed to workers.

Iterate over messages

Inside the worker, use ezq.iter_msg to iterate over the messages in the queue until the queue ends (see below). If the messages need to be sorted first, wrap the call with ezq.sortiter.

If you need to read all the messages currently in the queue, you can use ezq.iter_q which will immediately end the queue and return results. You can also wrap this call in ezq.sortiter if you need the messages to be sorted first.

End the queue

After the main process has sent all the data to the workers, it needs to indicate that there's no additional work to be done. This is done by putting a special ezq.END_MSG in the queue which is processed by ezq.iter_msg and never seen by the workers.

There are three ways a queue can be ended:

  • ezq.endq - Explicitly end a queue. You normally won't need to call this.

  • ezq.iter_q - End a queue and iterate over the current messages. This is useful when processing an output queue back in the main process.

  • ezq.endq_and_wait - End a queue and wait for the workers to finish. The most common way to end a queue. You'll need to call this before the end of your main process in order to get results back from all the workers.

Example: Read and Write Queues

In this example, several workers read from a queue, process data, and then write to a different queue that a single worker uses to print to the screen sorting the results as it goes along. When interfacing with a single I/O device (e.g., screen, file) we typically use a single worker to avoid clashes or overwriting.

import ezq


def printer(write_q):
    """Print results in increasing order."""
    for msg in ezq.sortiter(ezq.iter_msg(write_q)):
        print(msg.data)


def collatz(read_q, write_q):
    """Read numbers and compute values."""
    for msg in ezq.iter_msg(read_q):
        num = msg.data
        if msg.kind == "EVEN":
            ezq.put_msg(write_q, data=(num, num / 2), order=msg.order)
        elif msg.kind == "ODD":
            ezq.put_msg(write_q, data=(num, 3 * num + 1), order=msg.order)


def main():
    """Run several subprocesses."""
    read_q, write_q = ezq.Queue(), ezq.Queue()
    readers = [ezq.run(collatz, read_q, write_q) for _ in range(ezq.NUM_CPUS - 1)]
    writers = ezq.run(printer, write_q)

    for i in range(40):
        kind = "EVEN" if i % 2 == 0 else "ODD"
        ezq.put_msg(read_q, kind=kind, data=i, order=i)

    ezq.endq_and_wait(read_q, readers)
    ezq.endq_and_wait(write_q, writers)


if __name__ == "__main__":
    main()

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ezq-2.0.2.tar.gz (8.1 kB view hashes)

Uploaded Source

Built Distribution

ezq-2.0.2-py3-none-any.whl (7.1 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page