Simple wrapper for python multiprocessing.
Project description
ezq
Simple wrappers for python multiprocessing.
Changelog - Issues - Documentation
Why?
Even though multiprocessing
has Pool
and Queue
, it's surprisingly difficult
to get started to do slightly more complex workflows. ezq
makes it easy to connect subprocesses (workers) using queues.
Install
pip install ezq
Example: Sum Messages
Here's a simple example of a worker that reads from an input queue, sums up the messages, and puts the result on an output queue.
import ezq
def worker(in_q, out_q):
"""Add up all the messges."""
count = 0
for msg in ezq.iter_msg(in_q):
# you could check `msg.kind` if there's different kinds of work
count += msg.data
# when `in_q` is done, put the result on `out_q`
ezq.put_msg(out_q, data=count)
def main():
"""Run several workers."""
in_q = ezq.Queue() # to send work
out_q = ezq.Queue() # to get results
workers = [ezq.run(worker, in_q, out_q) for _ in range(ezq.NUM_CPUS)]
# workers started
for i in range(1000):
ezq.put_msg(in_q, data=i) # send work
ezq.endq_and_wait(in_q, workers)
# all workers are done
result = sum(msg.data for msg in ezq.iter_q(out_q))
assert result == sum(x for x in range(1000))
print(result)
if __name__ == "__main__":
main()
Typical worker lifecycle
-
The main process creates workers with
ezq.run
. -
The main process sends data with
ezq.put_msg
. -
The worker iterates over the queue with
ezq.iter_msg
. -
The main process ends the queue with
ezq.endq_and_wait
. -
The worker returns when it reaches the end of the queue.
A worker is just a function
In general, there's nothing special about a worker function, but note:
-
All arguments are passed through
pickle
first (see below). -
We don't currently do anything with the return value of this function. You'll need an output queue to return data back to the main process.
Create workers
In the main process, create workers using ezq.run
which takes a function and
any additional parameters. Note that workers cannot create additional workers.
Send data
Once you've started the workers, you send them data by calling with ezq.put_msg
which creates ezq.Msg
objects and puts them in the queue. There are three
attributes that are sent (all optional):
-
kind
- a string that indicates what kind of message it is. You can use this to send multiple kinds of work to the same worker. Note that the specialEND
kind is used to indicate the end of a queue (that's whatezq.endq
sends). -
data
- anything that can be pickled. This is the data you want the worker to work on. -
order
- an integer that indicates the message order. This can help you reorder results or ensure that messages from a queue are read in a particular order (that's whatezq.sortiter
uses).
Beware pickle
All parameters sent to workers in ezq.run
and any values put in queues
using ezq.put_msg
are first passed to pickle
by multiprocessing
so anything that cannot be pickled (e.g., database connection)
cannot be passed to workers.
Iterate over messages
Inside the worker, use ezq.iter_msg
to iterate over the messages in the queue
until the queue ends (see below). If the messages need to be
sorted first, wrap the call with ezq.sortiter
.
If you need to read all the messages currently in the queue, you can use ezq.iter_q
which will immediately end the queue and return results. You can also wrap this call
in ezq.sortiter
if you need the messages to be sorted first.
End the queue
After the main process has sent all the data to the workers, it needs to indicate
that there's no additional work to be done. This is done by putting a special
ezq.END_MSG
in the queue which is processed by ezq.iter_msg
and never seen by
the workers.
There are three ways a queue can be ended:
-
ezq.endq
- Explicitly end a queue. You normally won't need to call this. -
ezq.iter_q
- End a queue and iterate over the current messages. This is useful when processing an output queue back in the main process. -
ezq.endq_and_wait
- End a queue and wait for the workers to finish. The most common way to end a queue. You'll need to call this before the end of your main process in order to get results back from all the workers.
Example: Read and Write Queues
In this example, several workers read from a queue, process data, and then write to a different queue that a single worker uses to print to the screen sorting the results as it goes along. When interfacing with a single I/O device (e.g., screen, file) we typically use a single worker to avoid clashes or overwriting.
import ezq
def printer(write_q):
"""Print results in increasing order."""
for msg in ezq.sortiter(ezq.iter_msg(write_q)):
print(msg.data)
def collatz(read_q, write_q):
"""Read numbers and compute values."""
for msg in ezq.iter_msg(read_q):
num = msg.data
if msg.kind == "EVEN":
ezq.put_msg(write_q, data=(num, num / 2), order=msg.order)
elif msg.kind == "ODD":
ezq.put_msg(write_q, data=(num, 3 * num + 1), order=msg.order)
def main():
"""Run several subprocesses."""
read_q, write_q = ezq.Queue(), ezq.Queue()
readers = [ezq.run(collatz, read_q, write_q) for _ in range(ezq.NUM_CPUS - 1)]
writers = ezq.run(printer, write_q)
for i in range(40):
kind = "EVEN" if i % 2 == 0 else "ODD"
ezq.put_msg(read_q, kind=kind, data=i, order=i)
ezq.endq_and_wait(read_q, readers)
ezq.endq_and_wait(write_q, writers)
if __name__ == "__main__":
main()
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.