It is required that there is a PlasmaFS filesystem up and running on the cluster. There is no way to use Map/Reduce without PlasmaFS.
There need to be three directories for input data, work files, and
output data. We assume here that they have names /input
, /work
,
and /output
, but of course they can have any name, and stored in
any subdirectory.
You can use the plasma
utility to create these directories
(also see Cmd_plasma
for documentation - we assume here the
plasma
utility is configured to automatically find the cluster):
plasma mkdir /input
plasma mkdir /work
plasma mkdir /output
The /input
directory must be filled with the input files (text files
where every line is not longer than the blocksize of the filesystem).
The input files can have any names - the Map/Reduce framework simply
processes all files it finds in this directory.
You can use plasma
to upload the input files, e.g.
plasma put input0 /input/input0
when there is a local file input0
. For the sample programs the file
must have a key and a value on each line, separated by a single TAB
character.
We also assume here that you have built and installed the Plasma distribution, so that a command like
ocamlfind ocamlopt -package mr_framework ...
can be used to compile a Plasma Map/Reduce program.
The distribution builds a sample Map/Reduce program mr_test
. This
test program does not do anything reasonable, but it is a good "hello
world" program. The functions for map
and reduce
are the identities,
so that the overall effect of the program is to group the input data
by key.
The sample program is:
let job : Mapred_def.mapred_job =
let partitions = 16 in
object
method name = "test1"
method input_dir = "/input"
method output_dir = "/output"
method work_dir = "/work"
method map me id r w = ...
method reduce me p r w = ...
...
end
let () =
Mapred_main.main job
What you can see is that a M/R job is specified by an object
of type Mapred_def.mapred_job
. This object returns a number
of constants (e.g. the directory names), but also the functions
for map
and reduce
. Before looking in detail at this, let me
describe how to compile and start this job.
The program calls Mapred_main.main
with the job object. This is
the main program which parses command-line arguments, and runs the
job as the user wants it. You compile this with
ocamlfind ocamlopt -package mr_framework -linkpkg -o mr_test mr_test.ml
Before you can start mr_test
you need a configuration file
mr_test.conf
(generally, the framework appends ".conf" to the
executable when looking for the config file).
This file can look like (you need to adapt this to your setup):
netplex {
namenodes {
clustername = "test";
node { addr = "office3:2730" };
node { addr = "office4:2730" }
};
mapred {
node { addr = "office3" };
node { addr = "office4" };
port = 8989;
tmpdir = "/tmp/mapred";
load_limit = 8.1;
shm_low = 1000000000;
shm_high = 2000000000;
}
}
clustername
is the name of the PlasmaFS clusternode
/addr
parameters in namenodes
specify the namenodes
of the PlasmaFS cluster. You can enumerate them as shown, or put
the host names into a separate file, and point to this with
a single node_list
parameter:
namenodes {
clustername = "test";
node_list = "namenode.hosts";
port = 2730
}
node
/addr
parameters in mapred
specify on which nodes
the tasks are going to be executed. This can be any machines, but
it is advantageous to use the datanode machines. Also, you can
use node_list
:
mapred {
node_list = "tasknode.hosts";
port = 8989;
...
}
port
in mapred
can be freely chosentmpdir
directory the program will put executables, log files,
and other runtime files. Not much space is needed there.shm_low
and shm_high
control the consumption of
shared memory. If the (estimate) amount of shared memory exceeds
shm_high
, it is assumed that memory is tight, and measures are
taken to reduce RAM consumption. If the use of shared memory drops
again below shm_low
, it is assumed that the memory pressure is
gone, and consumption is turned back to normal. The parameters are
given in bytes.mr_test
is a multi-purpose program: It acts both as task
server on the machines, and as a central job control instance. Before
the job can be started, the task servers need to be deployed and
started:
./mr_test start_task_servers
This uses ssh
to install the mr_test
executable on all task
nodes. The directory for this is tmpdir
.
There is also stop_task_servers
to stop the servers.
The job is started with
./mr_test exec_job
Progress messages are emitted on stdout:
Checking...
Planning...
Starting...
[Thu Jun 10 19:43:07 2010] [info] Starting job
[Thu Jun 10 19:43:07 2010] [info] Stats: runnable=16 running=0 finished=0 total=16 complete=false
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 0 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 1 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 2 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 3 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 4 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 5 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 6 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 7 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 9 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 10 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 11 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 12 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 13 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 14 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 15 to 192.168.5.40
Job can be interrupted with CTRL-C!
[Thu Jun 10 19:47:35 2010] [info] Finished task Map 14
[Thu Jun 10 19:47:35 2010] [info] Stats: runnable=10 running=14 finished=1 total=26 complete=false
[Thu Jun 10 19:47:35 2010] [info] Submitted task Sort 8 to 192.168.5.40
...
The job can be killed with CTRL-C
or kill -2 <pid>
. This also
kills the running tasks, and performs file cleanup.
The statistics line is to be interpreted as follows: The runnable
tasks are the tasks whose input files exist, and that could be
immediately started if there were more load capacity on the nodes. The
running
tasks are the tasks currently running on the node. The
finished
number is the sum of all finished tasks. total
is the
total number of tasks, including non-runnable tasks. You will notice
that total
grows in the course of the execution. This has to do with
the incremental planning algorithm: At job start it is not yet known
how many tasks have to be run in total. This number depends on how
many output files are written by the map tasks. If you see
complete=true
all tasks are defined, and total
will no longer
grow.
The full job object looks like
let job : Mapred_def.mapred_job =
let partitions = 16 in
object
method name = "test1"
method input_dir = "/input"
method output_dir = "/output"
method work_dir = "/work"
method check_config _ = ()
method map me id r w =
try
while true do
let r = r#input_record() in
w # output_record r
done
with End_of_file ->
w # flush()
method map_tasks = 16
method sort_limit = 134217728L (* 128 M *)
method merge_limit = 8
method split_limit = 4
method extract_key me line = Mapred_split.tab_split_key line
method partitions = partitions
method partition_of_key me key = (Hashtbl.hash key) mod partitions
method reduce me p r w =
try
while true do
let r = r#input_record() in
w # output_record r
done
with End_of_file ->
w # flush()
end
Let's first have a look at map
and reduce
. map
is a function
Mapred_def.mapred_env
->
int
->
Mapred_io.record_reader
->
Mapred_io.record_writer
->
unit
The record_reader
argument is an object allowing access to the
input file. The record_writer
argument is the object for writing
to the output file. map
now typically reads the lines of the
input (using input_record
) and writes lines to the output (via
output_record
).
map
is completely on its own for interpreting the inputs. It is not
required that there is a key and a value - the input can be arbitrary.
The output, however, should provide a key and a value. This should be
done in a way so that the extract_key
function can extract the key
from the written lines. We use here Mapred_split.tab_split_key
to
get everything until the first TAB as key.
The Mapred_def.mapred_env
object allows access to other PlasmaFS
files (using the open_cluster
method), to the config file, and to
(optional) command-line arguments provided at exec_job
time.
reduce
has exactly the same signature as map
. The
Mapred_io.record_reader
object, however, is now connected with the
intermediate file containing all records for the partition passed as
int
. These records comply to the key/value format.
There are a few interesting parameters:
map_task
determines how many map tasks are created. For maximum
performance, this should be a small multiple of the total number
of cores on all task machines. There is no need to make this
number larger. (N.B. It is likely that this parameter is removed
from the job definition object.)sort_limit
is how large the chunks are that are sorted in RAM.
This does not directly correspond to the consumed RAM, but gives
roughly the order of magnitude. If the average record length is
low, the most extra RAM is consumed, and can reach about 6 times
sort_limit
. For larger records (several K) it is much smaller.merge_limit
says how many files are merged at most in a shuffle task.
The higher the less often the data is copied during
shuffling; however, a lot more RAM is consumed. One should calculate
at least 64 M of shared memory for every open file.split_limit
says how many files are produced at most in a shuffle task.
The same comments apply as for merge_limit
.partitions
determines how many partitions are created in the
output.partition_of_key
is the function that determines into which
partition (0 to partitions-1
) a key is put. The given definition
(Hashtbl.hash key) mod partitions
works well if there are no
special requirements.