Plasma GitLab Archive
Projects Blog Knowledge

Plasmamr_howto



Plasma MapReduce: How to run a job?

Preliminaries

It is required that there is a PlasmaFS filesystem up and running on the cluster. There is no way to use Map/Reduce without PlasmaFS.

There need to be three directories for input data, work files, and output data. We assume here that they have names /input, /work, and /output, but of course they can have any name, and stored in any subdirectory.

You can use the plasma utility to create these directories (also see Cmd_plasma for documentation - we assume here the plasma utility is configured to automatically find the cluster):

plasma mkdir /input
plasma mkdir /work
plasma mkdir /output

The /input directory must be filled with the input files (text files where every line is not longer than the blocksize of the filesystem). The input files can have any names - the Map/Reduce framework simply processes all files it finds in this directory.

You can use plasma to upload the input files, e.g.

plasma put input0 /input/input0

when there is a local file input0. For the sample programs the file must have a key and a value on each line, separated by a single TAB character.

We also assume here that you have built and installed the Plasma distribution, so that a command like

ocamlfind ocamlopt -package mr_framework ...

can be used to compile a Plasma Map/Reduce program.

The sample M/R program

The distribution builds a sample Map/Reduce program mr_test. This test program does not do anything reasonable, but it is a good "hello world" program. The functions for map and reduce are the identities, so that the overall effect of the program is to group the input data by key.

The sample program is:

let job : Mapred_def.mapred_job =
  let partitions = 16 in
object
  method name = "test1"
  method input_dir = "/input"
  method output_dir = "/output"
  method work_dir = "/work"
  method map me id r w = ...
  method reduce me p r w = ...
  ...
end

let () =
  Mapred_main.main job

What you can see is that a M/R job is specified by an object of type Mapred_def.mapred_job. This object returns a number of constants (e.g. the directory names), but also the functions for map and reduce. Before looking in detail at this, let me describe how to compile and start this job.

The program calls Mapred_main.main with the job object. This is the main program which parses command-line arguments, and runs the job as the user wants it. You compile this with

ocamlfind ocamlopt -package mr_framework -linkpkg -o mr_test mr_test.ml

Before you can start mr_test you need a configuration file mr_test.conf (generally, the framework appends ".conf" to the executable when looking for the config file).

This file can look like (you need to adapt this to your setup):

netplex {
  namenodes {
    clustername = "test";
    node { addr = "office3:2730" };
    node { addr = "office4:2730" }
  };
  mapred {
    node { addr = "office3" };
    node { addr = "office4" };
    port = 8989;
    tmpdir = "/tmp/mapred";
    load_limit = 8.1;
    shm_low = 1000000000;
    shm_high = 2000000000;
  }
}

  • The parameter clustername is the name of the PlasmaFS cluster
  • The node/addr parameters in namenodes specify the namenodes of the PlasmaFS cluster. You can enumerate them as shown, or put the host names into a separate file, and point to this with a single node_list parameter:
        namenodes {
          clustername = "test";
          node_list = "namenode.hosts";
          port = 2730
        }
      
  • The node/addr parameters in mapred specify on which nodes the tasks are going to be executed. This can be any machines, but it is advantageous to use the datanode machines. Also, you can use node_list:
        mapred {
          node_list = "tasknode.hosts";
          port = 8989;
          ...
        }
      
  • The port in mapred can be freely chosen
  • In the tmpdir directory the program will put executables, log files, and other runtime files. Not much space is needed there.
  • The load limit determines how many tasks are executed on each node. A "load unit" of 1 does not directly correspond to a process, though. The framework takes into account that different tasks use the resources differently. As a rule of thumb, put here two times the number of cores on a single machine, plus 0.1.
  • The parameters shm_low and shm_high control the consumption of shared memory. If the (estimate) amount of shared memory exceeds shm_high, it is assumed that memory is tight, and measures are taken to reduce RAM consumption. If the use of shared memory drops again below shm_low, it is assumed that the memory pressure is gone, and consumption is turned back to normal. The parameters are given in bytes.
The program mr_test is a multi-purpose program: It acts both as task server on the machines, and as a central job control instance. Before the job can be started, the task servers need to be deployed and started:

./mr_test start_task_servers

This uses ssh to install the mr_test executable on all task nodes. The directory for this is tmpdir.

There is also stop_task_servers to stop the servers.

The job is started with

./mr_test exec_job

Progress messages are emitted on stdout:

Checking...
Planning...
Starting...
[Thu Jun 10 19:43:07 2010] [info] Starting job
[Thu Jun 10 19:43:07 2010] [info] Stats: runnable=16 running=0 finished=0 total=16 complete=false
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 0 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 1 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 2 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 3 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 4 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 5 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 6 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 7 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 9 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 10 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 11 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 12 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 13 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 14 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 15 to 192.168.5.40
Job can be interrupted with CTRL-C!
[Thu Jun 10 19:47:35 2010] [info] Finished task Map 14
[Thu Jun 10 19:47:35 2010] [info] Stats: runnable=10 running=14 finished=1 total=26 complete=false
[Thu Jun 10 19:47:35 2010] [info] Submitted task Sort 8 to 192.168.5.40
...

The job can be killed with CTRL-C or kill -2 <pid>. This also kills the running tasks, and performs file cleanup.

The statistics line is to be interpreted as follows: The runnable tasks are the tasks whose input files exist, and that could be immediately started if there were more load capacity on the nodes. The running tasks are the tasks currently running on the node. The finished number is the sum of all finished tasks. total is the total number of tasks, including non-runnable tasks. You will notice that total grows in the course of the execution. This has to do with the incremental planning algorithm: At job start it is not yet known how many tasks have to be run in total. This number depends on how many output files are written by the map tasks. If you see complete=true all tasks are defined, and total will no longer grow.

Details of the M/R job

The full job object looks like

let job : Mapred_def.mapred_job =
  let partitions = 16 in
object
  method name = "test1"
  method input_dir = "/input"
  method output_dir = "/output"
  method work_dir = "/work"
  method check_config _ = ()
  method map me id r w =
    try
      while true do
        let r = r#input_record() in
        w # output_record r
      done
    with End_of_file ->
      w # flush()
  method map_tasks = 16
  method sort_limit = 134217728L (* 128 M *)
  method merge_limit = 8
  method split_limit = 4
  method extract_key me line = Mapred_split.tab_split_key line
  method partitions = partitions
  method partition_of_key me key = (Hashtbl.hash key) mod partitions
  method reduce me p r w =
    try
      while true do
        let r = r#input_record() in
        w # output_record r
      done
    with End_of_file ->
      w # flush()
end

Let's first have a look at map and reduce. map is a function

Mapred_def.mapred_env -> int -> Mapred_io.record_reader -> Mapred_io.record_writer -> unit

The record_reader argument is an object allowing access to the input file. The record_writer argument is the object for writing to the output file. map now typically reads the lines of the input (using input_record) and writes lines to the output (via output_record).

map is completely on its own for interpreting the inputs. It is not required that there is a key and a value - the input can be arbitrary. The output, however, should provide a key and a value. This should be done in a way so that the extract_key function can extract the key from the written lines. We use here Mapred_split.tab_split_key to get everything until the first TAB as key.

The Mapred_def.mapred_env object allows access to other PlasmaFS files (using the open_cluster method), to the config file, and to (optional) command-line arguments provided at exec_job time.

reduce has exactly the same signature as map. The Mapred_io.record_reader object, however, is now connected with the intermediate file containing all records for the partition passed as int. These records comply to the key/value format.

There are a few interesting parameters:

  • map_task determines how many map tasks are created. For maximum performance, this should be a small multiple of the total number of cores on all task machines. There is no need to make this number larger. (N.B. It is likely that this parameter is removed from the job definition object.)
  • sort_limit is how large the chunks are that are sorted in RAM. This does not directly correspond to the consumed RAM, but gives roughly the order of magnitude. If the average record length is low, the most extra RAM is consumed, and can reach about 6 times sort_limit. For larger records (several K) it is much smaller.
  • merge_limit says how many files are merged at most in a shuffle task. The higher the less often the data is copied during shuffling; however, a lot more RAM is consumed. One should calculate at least 64 M of shared memory for every open file.
  • split_limit says how many files are produced at most in a shuffle task. The same comments apply as for merge_limit.
  • partitions determines how many partitions are created in the output.
  • partition_of_key is the function that determines into which partition (0 to partitions-1) a key is put. The given definition (Hashtbl.hash key) mod partitions works well if there are no special requirements.

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml