Plasma GitLab Archive
Projects Blog Knowledge

Plasmamr_howto



Plasma MapReduce: How to run a job?

Preliminaries

It is required that there is a PlasmaFS filesystem up and running on the cluster. There is no way to use Map/Reduce without PlasmaFS.

There need to be four directories for input data, work files, output data, and log files. We assume here that they have names /input, /work, /output, and /log but of course they can have any name, and stored in any subdirectory.

You can use the plasma utility to create these directories (also see Cmd_plasma for documentation - we assume here the plasma utility is configured to automatically find the cluster):

plasma mkdir /input
plasma mkdir /work
plasma mkdir /output
plasma mkdir /log

The /input directory must be filled with the input files (text files where every line is not longer than the blocksize of the filesystem). The input files can have any names - the Map/Reduce framework simply processes all files it finds in this directory.

You can use plasma to upload the input files, e.g.

plasma put input0 /input/input0

when there is a local file input0. For the sample programs the file must have a key and a value on each line, separated by a single TAB character.

The other three directories must be empty before a job is started.

We also assume here that you have built and installed the Plasma distribution, so that a command like

ocamlfind ocamlopt -package mr_framework ...

can be used to compile a Plasma Map/Reduce program.

The sample M/R program

The distribution builds a sample Map/Reduce program mr_test. This test program does not do anything reasonable, but it is a good "hello world" program. The functions for map and reduce are the identities, so that the overall effect of the program is to group the input data by key.

The sample program is:

let job : Mapred_def.mapred_job =
object
  method custom_params = []
  method check_config _ _ = ()
  method pre_job_start _ _ = ()
  method post_job_finish _ _ = ()

  method map me jc ti r w = ...
  method extract_key me jc line = ...
  method partition_of_key me jc = ...
  method reduce me jc ti r w = ...
end

let () =
  Mapred_main.main job

What you can see is that a M/R job is specified by an object of type Mapred_def.mapred_job. This object contains the functions for map and reduce, and a few other functions. Before looking in detail at this, let me describe how to compile and start this job.

The program calls Mapred_main.main with the job object. This is the main program which parses command-line arguments, and runs the job as the user wants it. You compile this with

ocamlfind ocamlopt -package mr_framework -linkpkg -o mr_test mr_test.ml

Before you can start mr_test you need a configuration file mr_test.conf (generally, the framework appends ".conf" to the executable when looking for the config file).

This file can look like (you need to adapt this to your setup):

netplex {
  namenodes {
    clustername = "test";
    node { addr = "office3:2730" };
    node { addr = "office4:2730" }
  };
  mapred {
    node { addr = "office3" };
    node { addr = "office4" };
    port = 8989;
    tmpdir = "/tmp/mapred";
    load_limit = 8.1;
    buffer_size = 67108864;         (* 64 M *)
    buffer_size_tight = 16777216;   (* 16 M *)
    sort_size =  67108864;          (* 64 M *)
  };
  mapredjob {
    input_dir = "/input";
    output_dir = "/output";
    work_dir = "/work";
    log_dir = "/log";
    partitions = 10;
    bigblock_size = 16777216;       (* 16 M *)
  }
}

Most parameters describe the compute environment, and resource limits:

  • The parameter clustername is the name of the PlasmaFS cluster
  • The node/addr parameters in namenodes specify the namenodes of the PlasmaFS cluster. You can enumerate them as shown, or put the host names into a separate file, and point to this with a single node_list parameter:
        namenodes {
          clustername = "test";
          node_list = "namenode.hosts";
          port = 2730
        }
      
  • The node/addr parameters in mapred specify on which nodes the tasks are going to be executed. This can be any machines, but it is advantageous to use the datanode machines. Also, you can use node_list:
        mapred {
          node_list = "tasknode.hosts";
          port = 8989;
          ...
        }
      
  • The port in mapred can be freely chosen
  • In the tmpdir directory the program will put executables, log files, and other runtime files. Not much space is needed there.
  • The load limit determines how many tasks are executed on each node. A "load unit" of 1 does not directly correspond to a process, though. The framework takes into account that different tasks use the resources differently. As a rule of thumb, put here two times the number of cores on a single machine, plus 0.1. Update: This value is now only used during job planning to determine how fine-grained the tasks should be. At job runtime, the real number of cores is determined and used instead.
  • The size of the file buffers is given by buffer_size_tight and buffer_size. If you have lots of RAM, choose buffer_size_tight = bigblock_size, and buffer_size as a small multiple (as in the example). When memory is tight the former parameter is used, and in normal memory situations the latter parameter is used. If you want to save even more memory, it is possible to choose smaller file buffers (less than bigblock_size). This increases the disk activity, though, because the buffers have to be refilled more frequently.
  • sort_size says how big the buffers for sorting in RAM are.
The parameters in the mapredjob block are specific for the job. These parameters are first looked at when the exec_job command is started (see below), and can be overridden on the command line:
  • input_dir, output_dir, work_dir, and log_dir define the four job directories
  • partitions determines the number of partitions, i.e. the number of final output files. Usually one sets this to a small multiple of the number of task nodes.
  • bigblock_size is the size of the data units the tasks process. Bigblocks can be larger than the filesystem blocks. See below for more information.
There are a few more job-specific parameters, see Mapred_def.mapred_job_config for a description.

The program mr_test is a multi-purpose program: It acts both as task server on the machines, and as a central job control instance. Before the job can be started, the task servers need to be deployed and started:

./mr_test start_task_servers

This uses ssh to install the mr_test executable on all task nodes. The directory for this is tmpdir.

There is also stop_task_servers to stop the servers.

The job is started with

./mr_test exec_job

Progress messages are emitted on stdout:

Checking...
Planning...
Starting...
[Thu Jun 10 19:43:07 2010] [info] Starting job
[Thu Jun 10 19:43:07 2010] [info] Stats: runnable=16 running=0 finished=0 total=16 complete=false
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 0 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 1 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 2 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 3 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 4 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 5 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 6 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 7 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 9 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 10 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 11 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 12 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 13 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 14 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 15 to 192.168.5.40
Job can be interrupted with CTRL-C!
[Thu Jun 10 19:47:35 2010] [info] Finished task Map 14
[Thu Jun 10 19:47:35 2010] [info] Stats: runnable=10 running=14 finished=1 total=26 complete=false
[Thu Jun 10 19:47:35 2010] [info] Submitted task Sort 8 to 192.168.5.40
...

The job can be killed with CTRL-C or kill -2 <pid>. This also kills the running tasks, and performs file cleanup.

The statistics line is to be interpreted as follows: The runnable tasks are the tasks whose input files exist, and that could be immediately started if there were more load capacity on the nodes. The running tasks are the tasks currently running on the node. The finished number is the sum of all finished tasks. total is the total number of tasks, including non-runnable tasks. You will notice that total grows in the course of the execution. This has to do with the incremental planning algorithm: At job start it is not yet known how many tasks have to be run in total. This number depends on how many output files are written by the map tasks. If you see complete=true all tasks are defined, and total will no longer grow.

Details of the M/R job

The full job object looks like

let job : Mapred_def.mapred_job =
object
  method custom_params = []
  method check_config _ _ = ()
  method pre_job_start _ _ = ()
  method post_job_finish _ _ = ()

  method map me jc ti r w =
    try
      while true do
	let r = r#input_record() in
	w # output_record r
      done
    with End_of_file ->
      w # flush()

  method extract_key me jc line = 
    Mapred_split.tab_split_key line

  method partition_of_key me jc =
    let p = jc#partitions in
    (fun key ->
       (Hashtbl.hash key) mod p
    )
  method reduce me jc ti r w =
    try
      while true do
	let r = r#input_record() in
	w # output_record r
      done
    with End_of_file ->
      w # flush()
end

Let's first have a look at map and reduce. map is a function

Mapred_def.mapred_env -> Mapred_def.mapred_job_config -> task_info -> Mapred_io.record_reader -> Mapred_io.record_writer -> unit

The record_reader argument is an object allowing access to the input file. The record_writer argument is the object for writing to the output file. map now typically reads the lines of the input (using input_record) and writes lines to the output (via output_record).

map is completely on its own for interpreting the inputs. It is not required that there is a key and a value - the input can be arbitrary. The output, however, should provide a key and a value. This should be done in a way so that the extract_key function can extract the key from the written lines. We use here Mapred_split.tab_split_key to get everything until the first TAB as key.

The Mapred_def.mapred_env object allows access to other PlasmaFS files (using the open_cluster method), to the config file, and to (optional) command-line arguments provided at exec_job time.

reduce has exactly the same signature as map. The Mapred_io.record_reader object, however, is now connected with the intermediate file containing all records for the partition passed as int. These records comply to the key/value format.

There are more functions in the job object than just map and reduce:

  • partition_of_key is the function that determines into which partition (0 to partitions-1) a key is put. The given definition (Hashtbl.hash key) mod partitions works well if there are no special requirements.
  • extract_key defines how to get the key from a processed line
For a full description, see Mapred_def.mapred_job.

Setting bigblock_size

The default size of bigblocks is 16M. This mainly means that the input data is initially split into units of bigblocks, and every map task processes a whole number of such bigblocks. Because every map task needs to process at least one record (line), records must not be larger than bigblocks:

The size of bigblocks defines the maximum size of records (lines).

Other than this, the user can normally ignore the size of bigblocks. When changing this parameter, this should only affect performance, but not the semantics of the map/reduce program.

Increasing the size of bigblocks has these effects:

  • Fewer namenode operations are needed to perform a map task
  • The decision which node runs a map task becomes more coarse-grained and more random (it is likelier that more datanodes store the blocks in the file region)
If you set bigblock_size too high, there is the risk that not enough map tasks can be generated to distribute the load evenly on all nodes (i.e. total_data_size / bigblock_size should be at least a small multiple of the number of nodes). There is no danger that too many map tasks could be generated if bigblock_size was way too low, because a single map task can process several bigblocks.

Setting the buffer size: buffer_size and buffer_size_tight

Remember that all data files, including intermediate ones, and stored in PlasmaFS. The amount of RAM spent for PlasmaFS file buffers is crucial because of this (and there is no page cache sitting in the background compensating for dimensioning errors). So, the parameter buffer_size means:

  • When a task opens a PlasmaFS file for reading or writing, this number of bytes are buffered up before the next PlasmaFS operation is started (for getting more data, or for stashing the data away)
  • If the task needs to open several files for reading at the same time, buffer_size is the sum of all required file buffers (but, of course, a file buffer has at least one block)
The idea is that a task is given about buffer_size bytes for reading data, and another buffer_size bytes for writing data. It is obvious that this parameter has a direct and quite predictable effect on the RAM consumption.

There is also an effect on performance: If you give more RAM for file buffers, this

  • reduces the number of namenode operations, and
  • causes that more blocks can be read in sequence, reducing disk seeks.
So, the recommendation is to make this parameter as high as possible. As of Plasma-0.4, there is no direct connection anymore with the size of bigblocks, so buffer_size can be set higher or lower than this.

If a machine runs out of RAM, the tasks will fail. In order to prevent this, a mechanism was added to reduce the memory consumption for file buffers when RAM is already tight, and the failure is foreseeable. The parameter buffer_size_tight is used instead of buffer_size when free RAM is considered as tight. Set it to e.g. 1/4 of buffer_size.

The parameters merge_limit and split_limit

These parameters can be set in the mapredjob section:

  mapredjob {
    ...
    merge_limit = 32;
    split_limit = 32;
  };

They have a direct effect on layout of the map/reduce algorithm. After mapping and sorting the data, the intermediate files need to be merged. We merge merge_limit sorted file in one step. At the same time, the data is split into partitions. We split into split_limit partitions in one step. (Note that merging and splitting is done "hand-in-hand" by one processing step.)

How many files need to be merged? This is something close to total_data_size / sort_limit. So, if you have e.g. 100G of data and a sort_limit of 256M, you can expect 400 such files.

How many steps do we need for merging? We can merge merge_limit ^ n files with n steps. So for merge_limit=32 we could already merge 1024 files in two steps, and 32768 files in only three steps. For the 400 files of the example we need two steps.

A similar calculation can be done for the splitting into partitions. The number of partitions is, of course, usually much lower, and only a small multiple of the number of nodes. So the merging-and-splitting phase of the algorithm is normally dominated by merging.

If you increase these values, you decrease the number of steps the algorithm needs, and also the number of data copies (each step copies the data once, but usually only locally, and not over the network).

Enhanced map tasks

Plasma-0.4 contains an alternate implementation of map. It can be enabled by

  mapredjob {
    ...
    enhanced_mapping = <m>;
  };

where <m> is a number. Enhanced map tasks integrate the sort step, and even a first split step. The number <m> is the number of partitions that are generated in this step. Usually, one sets m = split_limit.

A radical approach would be to set enhanced_mapping = partitions. This setting is optimal for reducing data copying as much as possible, and for minimizing network utilization. It has also a downside, though: The likeliness of random seeks increases. Also, a high number of intermediate files is created.

A good compromise seems to be enhanced_mapping = partitions/split_limit (round the division up). This takes into account that we always need a round of steps for merging files, and a merge step can also split simultaneously, and this is practically cost-free. If you can predict several rounds of merging, you can decrease enhanced_mapping even further.

Installing additional files on the task nodes

By setting task_files (in the mapredjob section) one can install additional files on all task nodes at job start time. task_files is a space-separated list of filenames. The files are installed in an automatically determined directory on the task nodes.

Right now there is no direct way to get the path of this directory. One has to first create the task manager object:

let tm = Mapred_taskfiles.scp_taskfile_manager me#config jc

me and jc are passed as first and second argument of map and reduce.

Now the directory is tm#local_directory.

Note that these files are deleted when the job finishes.

Writing additional log files

Tasks write some log files by default:

  • Netlog.logf output is written to a file <log_dir>/<prefix><level>.log
  • Stderr output is written to a file <log_dir>/<prefix>stderr.log
The <log_dir> is first a local directory on the task node. When the task finishes, all the files with the right <prefix> are moved to the configured PlasmaFS log directory. (Every task uses a different <prefix>, but all tasks write to the same directory.)

Actually, all files with this <prefix> are finally moved, not only those written by the map/reduce framework. So custom code can write additional files there, and these files are handled in the same way.

The <log_dir> can be obtained by tm#log_directory (when tm is the taskfile manager, see above).

The <prefix> can be obtained by ti#task_prefix when ti is the task_info argument passed to map and reduce.

Files in <log_dir> not having the right filename prefix are also moved to the PlasmaFS log directory, but only at job termination time.

Streaming

Streaming means to start subprocesses for map and reduce. See Mapred_streaming for details.

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml