Skip to main content

vimap

Project description

Build Status

Variations on imap, not in C

The vimap package is designed to provide a more flexible alternative for multiprocessing.imap_unordered. (You should read multiprocessing documentation if you haven’t already, or else this README won’t make sense!) It aspires to support HTTP-like clients processing data, though contains nothing client-specific.

What in particular makes it more flexible?

  • It facilitates custom initialization of processes (one could connect to a HTTP client, etc.), and cleanup.

  • It facilitates configuration passing to processes – often, with multiprocessing, one ends up [wastefully] tupling configuration values with every input.

  • It facilitates passing non-serializable objects (before processes are forked)

  • Allows timeouts and retrying of failed tasks or processes

What do we aspire to do better than the regular multiprocessing library?

  • You don’t have to hack to achieve custom process initialization [ c.f. http://stackoverflow.com/a/10118250/81636 ]

  • The API helps prevent dumb mistakes, like initializing a pool before you’ve defined relevant functions. [ http://stackoverflow.com/q/2782961/81636 ]

  • Aims to have better worker exception handling – multiprocessing will leave around dead worker processes; we aim not to.

  • Collection of common use cases (reading from files, etc.)

Other features / design decisions:

  • Attempts to keep all workers busy by pre-emptively enqueuing tasks

  • Also, it probably doesn’t affect you, but this library can also work around a bug in Python 2.6.7 [ http://stackoverflow.com/q/16684900/81636 ].

Defining worker functions

vimap provides its custom initialization and such via decorated functions. If your inputs are HTTP requests, and you want to get responses from any of a set of servers, you could express your program as such (using the requests HTTP library – it’s intuitive so you probably don’t need to read its documentation),

import vimap.worker_process
@vimap.worker_process.worker
def send_reqests_worker(requests, server):
    s = requests.Session()
    for request in requests:
        yield s.post('http://{0}{1}'.format(server, request.uri), data=request.data)
    s.close()

What is happening? When the worker processes start up, a new session is opened. Each request (some pickleable object containing a .uri and a .data), sent by the parent process, is posted to the server. Then, the worker yields a single response, and this response is sent back to the parent process.

imapping data from the parent process

Let’s continue the example,

import vimap.pool
pool = vimap.pool.fork(send_requests_worker.init_args(server=server) for server in my_servers)

This initializes a pool of workers. Each one gets a bound argument server. When the worker processes start up, they start running until they try to pull an element off of the requests iterator; then they must pause for the parent process to send data. The parent process can send data like so,

Request = namedtuple("Request", ["uri", "data"])
pool.imap(Request(**ujson.loads(line)) for line in fileinput.input()).block_ignore_output()

This reads lines from a file containing JSON input, and sends the loaded entries to the workers. In the real world, you’d probably want to make the workers do the JSON loading. The .block_ignore_output() will cause the entire iterable (input file) to be read, and [by default] close the pool after it’s done.

variations on imap

The input binder

The first Variation on Imap tuples inputs with outputs. So, you have some [lazy] iterable of inputs,

x1, x2, x3, x4, x5, ...

and when you vimap this with some function f, you get back tuples,

(x1, f(x1)), (x2, f(x2)), ...

possibly not in order. Since it’s streaming, one shouldn’t need to keep around inputs for long – the input binder will keep around O(# processes) inputs, hence it’s safe to iterate through large inputs. In code, this could look like,

for input, output in pool.imap(iterable).zip_in_out():
    results[input] = output
# do some more processing

Handling exceptions

If you want to gracefully handle exceptions that bubble up to the main function of your worker processes, you can request that vimap yield back to you any exceptions it receives from the workers.

for input, output, typ in pool.imap(iterable).zip_in_out_typ():
    if typ == 'exception':
        print('Worker had an exception:')
        print(output.formatted_traceback)
    elif typ == 'output':
        print('I got some actual output from a worker!')
        print(output)

output will be an ExceptionContext namedtuple if the return type is exception; those contain a value of the exception raised and formatted_traceback string of the traceback that would have been printed to stderr.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

vimap-0.1.9-alpha-deadlock-quickfix.tar.gz (20.2 kB view details)

Uploaded Source

File details

Details for the file vimap-0.1.9-alpha-deadlock-quickfix.tar.gz.

File metadata

File hashes

Hashes for vimap-0.1.9-alpha-deadlock-quickfix.tar.gz
Algorithm Hash digest
SHA256 4e6176a69b2d382b65e42e4d00987aab60d7ebef9ce374b6e3dda9183f03859d
MD5 be0481d493d45ab959f2f4dd47cf3e69
BLAKE2b-256 e79832083ed01a8bb25563859008ae2a70d4b672a4a2a9aa08e0b2e968301fda

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page