A Python-based, distributed MapReduce solution.
Project description
This project is still under active development, though largely finished. It is currently being tested in a production environment. The documentation is being incrementally completed.
Overview
JobX is a Python-based MapReduce solution. The JobX project is entirely written in Python, as are the queue and KV clients. However, the actual distributed queue (NSQ) and distributed KV (etcd) are written in Go.
Many of the configuration options have reasonable defaults so as to be as simple as possible to experiment with. All you need is a local instance of NSQ and etcd, which are, themselves, almost trivial to setup.
Installing JobX
Dependencies
nginx
Python 2.7 (gevent is not Python 3 compatible)
go:
$ sudo apt-get install golang
etcd:
$ git clone git@github.com:coreos/etcd.git $ cd etcd $ ./build $ sudo mkdir /var/lib/etcd $ sudo bin/etcd -addr=127.0.0.1:4001 -data-dir=/var/lib/etcd -name=etcd1
nsq:
$ sudo apt-get install gpm $ mkdir ~/.go $ GOPATH=~/.go go get github.com/bitly/nsq/... $ sudo mkdir /var/lib/nsq $ cd /var/lib/nsq $ sudo ~/.go/bin/nsqd
Install Nginx.
Configuration
Configure Nginx:
upstream mapreduce { server unix:/tmp/mr.gunicorn.sock fail_timeout=0; } server { listen 80; server_name job1.domain; keepalive_timeout 5; access_log /tmp/nginx-mr-access.log; error_log /tmp/nginx-mr-error.log; location /s { root /usr/local/lib/python2.7/dist-packages/mr/resources/static; try_files $uri $uri/ =404; } location / { proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header Host $http_host; proxy_redirect off; proxy_pass http://mapreduce; } }
Set the following two environment variables (either in your profile, or /etc/environment):
export MR_ETCD_HOST=127.0.0.1 export MR_NSQD_HOSTS=127.0.0.1 export MR_WORKFLOW_NAMES=test_workflow
Create workflow:
mr_kv_workflow_create test_workflow "Jobs that assist build and deployment." Even though *test_workflow* is actually the default workflow name in the system, we explicitly specify it in the variables above so that it's clear how to for when you'll need to with your own workflow.
Load handlers:
..write and load handlers
Load steps:
..create step(s)
Load jobs:
..create job
Start:
mr_start_gunicorn_dev
Handler Management
In order to both alleviate the annoyance of having to maintain current copies of the sourcecode for handlers on every job worker, we store the source-code to the KV. It is syntax-checked when loaded, the metadata header is parsed, the code is compiled, and the compiled object is committed to the “library”. There is a sync script that can be sure to push updated handler code, ignored unchanged handlers, and remove handlers for which no file is found and no steps refer.
The job workers will check the KV for updates approximately every ten-seconds, and merge them.
Handler Examples
Sessions
When it comes to flow, mappers receive the data (key-value pairs), first. If this data represents actual arguments, then your logic might determine what comes next dynamically. Your mapper may branch to downstream mappers in order to collect data that you require to perform your primary task, and your reducer may then act on it. However, the reducer may need access to some of the same data that your mapper had. Unfortunately, where the mapper receives data that it is free to slice and reorganize, the reducer only receives a collection of results from mappers that yielded data. Unless the mappers forwarded data down to the eventual result (potentially being of no actual use intermediary mapper), the reducer may need some of that original information to complete its task.
This is what sessions are for. Every mapper invocation is given a private, durable namespace in which to stash data that only the corresponding reducer will have access to. This data will be destroyed at the completion of the request like all of the other request-specific entities.
There are tools available to debug sessions, if needed.
Scope Factories
Scope factories are a mechanism that allow you to inject variables into the global scope of each handler. A different scope factory can be defined for each workflow. Though you can inject the same variables into the scope of every handler [in the same workflow], the scope-factory will also receive the name of the handler. This allows you to provide sensitive information to some, but not all, handlers.
You must define environment variables with the required parameters to enable this functionality.
Capabilities
Capabilities are classifications that you may define to control how jobs are assigned to workers. Every worker declares a list of offered capabality classifications, and every handler declares a “required capability” classification. You may use this functionality to only route operations with handlers that invoke licensed functionality to only those workers that have been adequately equipped.
Language Processors
Handlers can be defined in any language, as long as there’s a processor defined for it that can dispatch the code to be executed, and can yield the data that is returned (all handlers are generators).
Result Writers
A result-writer manages how results are transmitted, and will influence what you receive in the HTTP response.
Currently, there are two:
inline: Return the data within the response. This is the default.
file: Store to a local directory.
Distributed Queue Semantics
The circulatory system of JobX is bit.ly’s NSQ platform, a very high-volume, and easy-to-deploy, distributed queue.
Distributed KV Semantics
All persistence is done into the etcd distributed, immediately-consistent KV. etcd is a component of CoreOS, and is also very easy to deploy. etcd stores key-value pairs, but the three-things that makes it unique are:
It’s immediately consistent across all instances.
Key-value pairs can be stored heirarchically.
You can long-poll on nodes while waiting for them to change, rather than polling them.
All data is manipulated as entities, which are modeled heirarchically on to the KV in functionality that was written specifically for this project. The models of this project resemble traditional RDBMS models found in Django and SQLAlchemy (to within reason, while being pragmatic and maintaining efficiency).
Entity Types
workflow: This is the container of all of the other entities. You may have concurrent workflows operating on the same cluster that have their own jobs, steps, and handlers defined. They are completely isolated at the queueing and storage levels.
job: This defines the noun that you post requests to, and the initial step.
step: This binds a mapper to a combiner (optional), and a combiner to a reducer.
handler: This defines a single body of code for a mapper, combiner, or reducer.
request: This identifies one received request, and the invocation of the first step.
invocation: This is the basic unit of operation. Every time a mapper or reducer is queued, it is given its own invocation record.
Directly Reading KV Entities
Where we want to read the “request” entity with the given ID under the “build” workflow:
$ etcdctl get entities/request/build/c1ef1a0d645e9a01fae9de1b7eca412fb14372c3 | python -m json.tool { "context": { "requester_ip": "127.0.0.1" }, "done": true, "failed_invocation_id": null, "invocation_id": "3c7494eb9f521d39e8609733a6d3988100540abb", "job_name": "obfuscate_for_clients", "workflow_name": "build" }
Tracing
When one of your handlers eventually starts chronically raising an exception, it’ll be critical to be able to investigate it. The following tools are available, and take advantage of a common tracing functionality.
Note that in order to be able to do anything, you need to disable request-cleanup. Otherwise, every request will be immediately queued and destroyed after a result is achieved.
Generating a Physical Graph Using Graphviz
Generating an Invocation Tree from the Command-Line
Dumping the Invocation Tree and Data from the Command-Line
Advanced
KV Queue Collections
KV Tree Collections
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for mapreduce-0.2.7-py2-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | afe997097c260bcd7b7780688a5ce61594919eafbda7d73c6515efc64d8a79f2 |
|
MD5 | 86d94f84624fa883ec3cc11f2fc42950 |
|
BLAKE2b-256 | d720e858202b468b5fc03c796fea0cfe027273fa6123798d2d21ed99ef59adae |