Preliminaries
It is required that there is a PlasmaFS filesystem up and running on the cluster. There is no way to use Map/Reduce without PlasmaFS.
There need to be four directories for input data, work files, output
data, and log files. We assume here that they have names /input,
/work, /output, and /log but of course they can have any name,
and stored in any subdirectory.
You can use the plasma utility to create these directories
(also see Cmd_plasma for documentation - we assume here the
plasma utility is configured to automatically find the cluster):
plasma mkdir /input
plasma mkdir /work
plasma mkdir /output
plasma mkdir /log
The /input directory must be filled with the input files (text files
where every line is not longer than the blocksize of the filesystem).
The input files can have any names - the Map/Reduce framework simply
processes all files it finds in this directory.
You can use plasma to upload the input files, e.g.
plasma put input0 /input/input0
when there is a local file input0. For the sample programs the file
must have a key and a value on each line, separated by a single TAB
character.
The other three directories must be empty before a job is started.
We also assume here that you have built and installed the Plasma distribution, so that a command like
ocamlfind ocamlopt -package mr_framework ...
can be used to compile a Plasma Map/Reduce program.
The sample M/R program
The distribution builds a sample Map/Reduce program mr_test. This
test program does not do anything reasonable, but it is a good "hello
world" program. The functions for map and reduce are the identities,
so that the overall effect of the program is to group the input data
by key.
The sample program is:
let job : Mapred_def.mapred_job =
object
  method custom_params = []
  method check_config _ _ = ()
  method pre_job_start _ _ = ()
  method post_job_finish _ _ = ()
  method map me jc ti r w = ...
  method extract_key me jc line = ...
  method partition_of_key me jc = ...
  method reduce me jc ti r w = ...
end
let () =
  Mapred_main.main job
What you can see is that a M/R job is specified by an object of type
Mapred_def.mapred_job. This object contains the functions for map
and reduce, and a few other functions. Before looking in detail at
this, let me describe how to compile and start this job.
The program calls Mapred_main.main with the job object. This is
the main program which parses command-line arguments, and runs the
job as the user wants it. You compile this with
ocamlfind ocamlopt -package mr_framework -linkpkg -o mr_test mr_test.ml
Before you can start mr_test you need a configuration file
mr_test.conf (generally, the framework appends ".conf" to the
executable when looking for the config file).
This file can look like (you need to adapt this to your setup):
netplex {
  namenodes {
    clustername = "test";
    node { addr = "office3:2730" };
    node { addr = "office4:2730" }
  };
  mapred {
    node { addr = "office3" };
    node { addr = "office4" };
    port = 8989;
    tmpdir = "/tmp/mapred";
    load_limit = 8.1;
    shm_low = 1000000000;
    shm_high = 2000000000;
    buffer_size = 67108864; (* 64 M *)
    buffer_size_tight = 16777216;   (* 16 M *)
    sort_size =  67108864; (* 64 M *)
  };
  mapredjob {
    input_dir = "/input";
    output_dir = "/output";
    work_dir = "/work";
    log_dir = "/log";
    partitions = 10;
  }
}
Most parameters describe the compute environment, and resource limits:
clustername is the name of the PlasmaFS clusternode/addr parameters in namenodes specify the namenodes
  of the PlasmaFS cluster. You can enumerate them as shown, or put
  the host names into a separate file, and point to this with
  a single node_list parameter:
      namenodes {
      clustername = "test";
      node_list = "namenode.hosts";
      port = 2730
    }
  node/addr parameters in mapred specify on which nodes
  the tasks are going to be executed. This can be any machines, but
  it is advantageous to use the datanode machines. Also, you can
  use node_list:
      mapred {
      node_list = "tasknode.hosts";
      port = 8989;
      ...
    }
  port in mapred can be freely chosentmpdir directory the program will put executables, log files,
  and other runtime files. Not much space is needed there.shm_low and shm_high control the consumption of
  shared memory. If the (estimate) amount of shared memory exceeds
  shm_high, it is assumed that memory is tight, and measures are
  taken to reduce RAM consumption. If the use of shared memory drops
  again below shm_low, it is assumed that the memory pressure is
  gone, and consumption is turned back to normal. The parameters are
  given in bytes.buffer_size and
  buffer_size_tight. Both must be larger than the size of bigblocks.
  When memory is tight the latter parameter is used, and in normal
  memory situations the former parameter is used.sort_size says how big the buffers for sorting in RAM are.mapredjob block are specific for the job.
These parameters are first looked at when the exec_job command
is started (see below), and can be overridden on the command line:input_dir, output_dir, work_dir, and log_dir define the
  four job directoriespartitions determines the number of partitions, i.e. the number
  of output files.Mapred_def.mapred_job_config for a description.
The program mr_test is a multi-purpose program: It acts both as task
server on the machines, and as a central job control instance. Before
the job can be started, the task servers need to be deployed and
started:
./mr_test start_task_servers
This uses ssh to install the mr_test executable on all task
nodes. The directory for this is tmpdir.
There is also stop_task_servers to stop the servers.
The job is started with
./mr_test exec_job
Progress messages are emitted on stdout:
Checking...
Planning...
Starting...
[Thu Jun 10 19:43:07 2010] [info] Starting job
[Thu Jun 10 19:43:07 2010] [info] Stats: runnable=16 running=0 finished=0 total=16 complete=false
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 0 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 1 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 2 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 3 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 4 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 5 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 6 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 7 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 9 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 10 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 11 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 12 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 13 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 14 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 15 to 192.168.5.40
Job can be interrupted with CTRL-C!
[Thu Jun 10 19:47:35 2010] [info] Finished task Map 14
[Thu Jun 10 19:47:35 2010] [info] Stats: runnable=10 running=14 finished=1 total=26 complete=false
[Thu Jun 10 19:47:35 2010] [info] Submitted task Sort 8 to 192.168.5.40
...
The job can be killed with CTRL-C or kill -2 <pid>. This also
kills the running tasks, and performs file cleanup.
The statistics line is to be interpreted as follows: The runnable
tasks are the tasks whose input files exist, and that could be
immediately started if there were more load capacity on the nodes. The
running tasks are the tasks currently running on the node. The
finished number is the sum of all finished tasks. total is the
total number of tasks, including non-runnable tasks. You will notice
that total grows in the course of the execution. This has to do with
the incremental planning algorithm: At job start it is not yet known
how many tasks have to be run in total. This number depends on how
many output files are written by the map tasks. If you see
complete=true all tasks are defined, and total will no longer
grow.
Details of the M/R job
The full job object looks like
let job : Mapred_def.mapred_job =
object
  method custom_params = []
  method check_config _ _ = ()
  method pre_job_start _ _ = ()
  method post_job_finish _ _ = ()
  method map me jc ti r w =
    try
      while true do
	let r = r#input_record() in
	w # output_record r
      done
    with End_of_file ->
      w # flush()
  method extract_key me jc line = 
    Mapred_split.tab_split_key line
  method partition_of_key me jc =
    let p = jc#partitions in
    (fun key ->
       (Hashtbl.hash key) mod p
    )
  method reduce me jc ti r w =
    try
      while true do
	let r = r#input_record() in
	w # output_record r
      done
    with End_of_file ->
      w # flush()
end
Let's first have a look at map and reduce. map is a function
Mapred_def.mapred_env -> Mapred_def.mapred_job_config -> task_info
-> Mapred_io.record_reader
-> Mapred_io.record_writer -> unit
The record_reader argument is an object allowing access to the
input file. The record_writer argument is the object for writing
to the output file. map now typically reads the lines of the
input (using input_record) and writes lines to the output (via
output_record).
map is completely on its own for interpreting the inputs. It is not
required that there is a key and a value - the input can be arbitrary.
The output, however, should provide a key and a value. This should be
done in a way so that the extract_key function can extract the key
from the written lines. We use here Mapred_split.tab_split_key to
get everything until the first TAB as key.
The Mapred_def.mapred_env object allows access to other PlasmaFS
files (using the open_cluster method), to the config file, and to
(optional) command-line arguments provided at exec_job time.
reduce has exactly the same signature as map. The
Mapred_io.record_reader object, however, is now connected with the
intermediate file containing all records for the partition passed as
int. These records comply to the key/value format.
There are more functions in the job object than just map and
reduce:
partition_of_key is the function that determines into which
  partition (0 to partitions-1) a key is put. The given definition
  (Hashtbl.hash key) mod partitions works well if there are no
  special requirements.extract_key defines how to get the key from a processed lineMapred_def.mapred_job.
Setting 
bigblock_size
The default size of bigblocks is 16M. This means map/reduce processes all files in chunks of (at least) bigblocks.
The size of bigblocks is also the maximum size of records (lines).
When increasing the size of bigblocks, make sure you also increase
buffer_size and buffer_size_tight (see above). Bigblocks should not
be larger than the buffers.
Installing additional files on the task nodes
By setting task_files (in the mapredjob section) one can install
additional files on all task nodes at job start time. task_files
is a space-separated list of filenames. The files are installed in
an automatically determined directory on the task nodes.
Right now there is no direct way to get the path of this directory. One has to first create the task manager object:
let tm = Mapred_taskfiles.scp_taskfile_manager me#config jc
me and jc are passed as first and second argument of map and
reduce.
Now the directory is tm#local_directory.
Note that these files are deleted when the job finishes.
Writing additional log files
Tasks write some log files by default:
Netlog.logf output is written to a file <log_dir>/<prefix><level>.log<log_dir>/<prefix>stderr.log<log_dir> is first a local directory on the task node. When the
task finishes, all the files with the right <prefix> are moved to the
configured PlasmaFS log directory. (Every task uses a different
<prefix>, but all tasks write to the same directory.)
Actually, all files with this <prefix> are finally moved, not only those
written by the map/reduce framework. So custom code can write additional
files there, and these files are handled in the same way.
The <log_dir> can be obtained by tm#log_directory (when tm is the
taskfile manager, see above).
The <prefix> can be obtained by ti#task_prefix when ti is the
task_info argument passed to map and reduce.
Files in <log_dir> not having the right filename prefix are also moved
to the PlasmaFS log directory, but only at job termination time.
Streaming
Streaming means to start subprocesses for map and reduce.
See Mapred_streaming for details.
