This text describes the classic way of defining jobs. Since
Plasma-0.6, there is another methodology called the toolkit. It
is described in detail in Mapred_toolkit
. Note that the existence
of the toolkit does not invalidate the following, because the toolkit
is just a layer on top of the classic API.
If you have experience with other implementations of map/reduce, the classic job definition will be easier to understand.
Preliminaries
For any serious application, it is required that there is a PlasmaFS filesystem up and running on the cluster, and that input and output files are stored in PlasmaFS. For just playing with map/reduce, however, there is now a way to use it without PlasmaFS. In this case the files are stored in the normal Unix filesystem. Of course, you are then limited to only one computer (but you can at least exploit all CPU cores).
In the following, we simply use filenames like "/input". Of course, you should change these to real paths. There is the possibility of having a tree prefix:
There need to be four directories for input data, work files, output
data, and log files. We assume here that they have names /input
,
/work
, /output
, and /log
but of course they can have any name,
and stored in any subdirectory.
If you have PlasmaFS, you can use the plasma
utility to create these
directories (also see Cmd_plasma
for documentation - we assume here
the plasma
utility is configured to automatically find the cluster):
plasma mkdir /input
plasma mkdir /work
plasma mkdir /output
plasma mkdir /log
In case that you store the data in the local filesystem, just use the normal "mkdir" command.
The /input
directory must be filled with the input files (normally
text files, i.e. files using the LF character for splitting the data
into lines). The input files can have any names - the Map/Reduce
framework simply processes all files it finds in this directory.
If you have PlasmaFS, you can use plasma
to upload the input files, e.g.
plasma put input0 /input/input0
when there is a local file input0
. For the sample programs the file
must have a key and a value on each line, separated by a single TAB
character.
The other three directories must be empty before a job is started.
We also assume here that you have built and installed the Plasma distribution, so that a command like
ocamlfind ocamlopt -package mr_framework -thread ...
can be used to compile a Plasma Map/Reduce program.
The sample M/R program
The distribution builds a sample Map/Reduce program mr_test
. This
test program does not do anything reasonable, but it is a good "hello
world" program. The functions for map
and reduce
are the identities,
so that the overall effect of the program is to group the input data
by key.
The sample program is:
let job : Mapred_def.mapred_job =
object
method custom_params = []
method check_config _ _ = ()
method pre_job_start _ _ = ()
method post_job_finish _ _ = ()
method map me jc ti r w = ...
method extract_key me jc line = ...
method partition_of_key me jc = ...
method reduce me jc ti r w = ...
method sorter = ...
method combine me jc ti = None
method input_record_io me jc = Mapred_io.line_structured_format()
method internal_record_io me jc = Mapred_io.line_structured_format()
method output_record_io me jc = Mapred_io.line_structured_format()
end
let () =
Mapred_main.main job
What you can see is that a M/R job is specified by an object of type
Mapred_def.mapred_job
. This object contains the functions for map
and reduce
, and a few other functions. Before looking in detail at
this, let me describe how to compile and start this job.
The program calls Mapred_main.main
with the job object. This is
the main program which parses command-line arguments, and runs the
job as the user wants it. You compile this with
ocamlfind ocamlopt -package mr_framework -thread -linkpkg -o mr_test mr_test.ml
Before you can start mr_test
you need a configuration file
mr_test.conf
(generally, the framework appends ".conf" to the
executable when looking for the config file).
If you use PlasmaFS, this file can look like (you need to adapt this to your setup):
netplex {
namenodes {
clustername = "test";
node { addr = "office3:2730" };
node { addr = "office4:2730" }
};
mapred {
node { addr = "office3" };
node { addr = "office4" };
port = 8989;
tmpdir = "/tmp/mapred";
buffer_size = 67108864; (* 64 M *)
buffer_size_tight = 16777216; (* 16 M *)
sort_size = 67108864; (* 64 M *)
};
mapredjob {
input_dir = "/input";
output_dir = "/output";
work_dir = "/work";
log_dir = "/log";
partitions = 10;
bigblock_size = 16777216; (* 16 M *)
}
}
If you do not use PlasmaFS, make sure you have
netplex {
namenodes {
disabled = true;
};
mapred {
node { addr = "localhost" };
... (* other settings unchanged *)
};
mapredjob {
... (* other settings unchanged *)
};
}
because otherwise the program would try to access the PlasmaFS namenode (even if your file paths all use the "file::" prefix).
Most parameters describe the compute environment, and resource limits:
clustername
is the name of the PlasmaFS clusternode
/addr
parameters in namenodes
specify the namenodes
of the PlasmaFS cluster. You can enumerate them as shown, or put
the host names into a separate file, and point to this with
a single node_list
parameter:
namenodes {
clustername = "test";
node_list = "namenode.hosts";
port = 2730
}
node
/addr
parameters in mapred
specify on which nodes
the tasks are going to be executed. This can be any machines, but
it is advantageous to use the datanode machines. Also, you can
use node_list
:
mapred {
node_list = "tasknode.hosts";
port = 8989;
...
}
port
in mapred
can be freely chosentmpdir
directory the program will put executables, log files,
and other runtime files. Not much space is needed there.buffer_size_tight
and
buffer_size
. If you have lots of RAM, choose
buffer_size_tight = bigblock_size
, and buffer_size
as a small
multiple (as in the example). When memory is tight the former parameter
is used, and in normal memory situations the latter parameter is used.
If you want to save even more memory, it is
possible to choose smaller file buffers (less than bigblock_size
).
This increases the disk
activity, though, because the buffers have to be refilled more
frequently. sort_size
says how big the buffers for sorting in RAM are.mapredjob
block are specific for the job.
These parameters are first looked at when the exec_job
command
is started (see below), and can be overridden on the command line:input_dir
, output_dir
, work_dir
, and log_dir
define the
four job directoriespartitions
determines the number of partitions, i.e. the number
of final output files. Usually one sets this to a small multiple
of the number of task nodes.bigblock_size
is the size of the data units the tasks process.
Bigblocks can be larger than the filesystem blocks.
See below for more information.Mapred_def.mapred_job_config
for a description.
The program mr_test
is a multi-purpose program: It acts both as task
server on the machines, and as a central job control instance. Before
the job can be started, the task servers need to be deployed and
started:
./mr_test start_task_servers
This uses ssh
to install the mr_test
executable on all task
nodes. The directory for this is tmpdir
.
There is also stop_task_servers
to stop the servers.
The job is started with
./mr_test exec_job
Progress messages are emitted on stdout:
Checking...
Planning...
Starting...
[Thu Jun 10 19:43:07 2010] [info] Starting job
[Thu Jun 10 19:43:07 2010] [info] Stats: runnable=16 running=0 finished=0 total=16 complete=false
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 0 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 1 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 2 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 3 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 4 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 5 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 6 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 7 to 192.168.5.30
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 9 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 10 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 11 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 12 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 13 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 14 to 192.168.5.40
[Thu Jun 10 19:43:07 2010] [info] Submitted task Map 15 to 192.168.5.40
Job can be interrupted with CTRL-C!
[Thu Jun 10 19:47:35 2010] [info] Finished task Map 14
[Thu Jun 10 19:47:35 2010] [info] Stats: runnable=10 running=14 finished=1 total=26 complete=false
[Thu Jun 10 19:47:35 2010] [info] Submitted task Sort 8 to 192.168.5.40
...
The job can be killed with CTRL-C
or kill -2 <pid>
. This also
kills the running tasks, and performs file cleanup.
The statistics line is to be interpreted as follows: The runnable
tasks are the tasks whose input files exist, and that could be
immediately started if there were more load capacity on the nodes. The
running
tasks are the tasks currently running on the node. The
finished
number is the sum of all finished tasks. total
is the
total number of tasks, including non-runnable tasks. You will notice
that total
grows in the course of the execution. This has to do with
the incremental planning algorithm: At job start it is not yet known
how many tasks have to be run in total. This number depends on how
many output files are written by the map tasks. If you see
complete=true
all tasks are defined, and total
will no longer
grow.
Details of the M/R job
The full job object looks like
let job : Mapred_def.mapred_job =
object
method custom_params = []
method check_config _ _ = ()
method pre_job_start _ _ = ()
method post_job_finish _ _ = ()
method map me jc ti r w =
try
while true do
let r = r#input_record() in
w # output_record r
done
with End_of_file ->
w # flush()
method extract_key me jc line =
Mapred_split.tab_split_key line
method partition_of_key me jc =
let p = jc#partitions in
(fun key ->
(Hashtbl.hash key) mod p
)
method reduce me jc ti r w =
try
while true do
let r = r#input_record() in
w # output_record r
done
with End_of_file ->
w # flush()
method sorter =
Mapred_sorters.generic_sorter
~hash:(Mapred_sorters.String_asc.hash)
~cmp:(Mapred_sorters.String_asc.cmp)
method combine _ _ _ = None
method input_record_io me jc =
Mapred_io.line_structured_format()
method internal_record_io me jc =
Mapred_io.var_size_format()
method output_record_io me jc =
Mapred_io.line_structured_format()
end
Let's first have a look at map
and reduce
. map
is a function
Mapred_def.mapred_env
->
Mapred_def.mapred_job_config
->
task_info
->
Mapred_io.record_reader
->
Mapred_io.record_writer
->
unit
The record_reader
argument is an object allowing access to the
input file. The record_writer
argument is the object for writing
to the output file. map
now typically reads the lines of the
input (using input_record
) and writes lines to the output (via
output_record
).
map
is completely on its own for interpreting the inputs. It is not
required that there is a key and a value - the input can be arbitrary.
The output, however, should provide a key and a value. This should be
done in a way so that the extract_key
function can extract the key
from the written lines. We use here Mapred_split.tab_split_key
to
get everything until the first TAB as key.
The Mapred_def.mapred_env
object allows access to other PlasmaFS
files (using the open_cluster
method), to the config file, and to
(optional) command-line arguments provided at exec_job
time.
reduce
has exactly the same signature as map
. The
Mapred_io.record_reader
object, however, is now connected with the
intermediate file containing all records for the partition passed as
int
. These records comply to the key/value format.
There are more functions in the job object than just map
and
reduce
:
partition_of_key
is the function that determines into which
partition (0 to partitions-1
) a key is put. The given definition
(Hashtbl.hash key) mod partitions
works well if there are no
special requirements.extract_key
defines how to get the key from a processed lineMapred_def.mapred_job
.
Transscript of a job run
The transscript of a slightly different job is documented here:
Sess_mr_wordfreq
.
Setting
bigblock_size
The default size of bigblocks is 16M. This mainly means that the input
data is initially split into units of bigblocks, and every map
task
processes a whole number of such bigblocks. Because every map
task
needs to process at least one record (line), records must not be
larger than bigblocks:
The size of bigblocks defines the maximum size of records (lines).
Other than this, the user can normally ignore the size of bigblocks. When changing this parameter, this should only affect performance, but not the semantics of the map/reduce program.
Increasing the size of bigblocks has these effects:
map
taskmap
task becomes more coarse-grained
and more random (it is likelier that more datanodes store the blocks
in the file region)bigblock_size
too high, there is the risk that not enough
map
tasks can be generated to distribute the load evenly on all
nodes (i.e. total_data_size / bigblock_size
should be at least a small
multiple of the number of nodes). There is no danger that too many
map
tasks could be generated if bigblock_size
was way too low, because
a single map
task can process several bigblocks.
Setting the buffer size:
buffer_size
and buffer_size_tight
Remember that all data files, including intermediate ones, and stored
in PlasmaFS. The amount of RAM spent for PlasmaFS file buffers is
crucial because of this (and there is no page cache sitting in the
background compensating for dimensioning errors). So, the parameter
buffer_size
means:
buffer_size
is the sum of all required file buffers
(but, of course, a file buffer has at least one block)buffer_size
bytes for reading
data, and another buffer_size
bytes for writing data. It is obvious
that this parameter has a direct and quite predictable effect on the
RAM consumption.
There is also an effect on performance: If you give more RAM for file buffers, this
buffer_size
can be set higher or lower than
this.
If a machine runs out of RAM, the tasks will fail. In order to prevent
this, a mechanism was added to reduce the memory consumption for
file buffers when RAM is already tight, and the failure is foreseeable.
The parameter buffer_size_tight
is used instead of buffer_size
when free RAM is considered as tight. Set it to e.g. 1/4 of buffer_size
.
The parameters
merge_limit
and split_limit
These parameters can be set in the mapredjob
section:
mapredjob {
...
merge_limit = 32;
split_limit = 32;
};
They have a direct effect on layout of the map/reduce algorithm. After mapping
and sorting the data, the intermediate files need to be merged. We
merge merge_limit
sorted file in one step. At the same time, the data is
split into partitions. We split into split_limit
partitions in one
step. (Note that merging and splitting is done "hand-in-hand" by one
processing step.)
How many files need to be merged? This is something close to
total_data_size / sort_limit
. So, if you have e.g. 100G of data and
a sort_limit
of 256M, you can expect 400 such files.
How many steps do we need for merging? We can merge merge_limit ^ n
files with n
steps. So for merge_limit=32
we could already merge
1024 files in two steps, and 32768 files in only three steps.
For the 400 files of the example we need two steps.
A similar calculation can be done for the splitting into partitions. The number of partitions is, of course, usually much lower, and only a small multiple of the number of nodes. So the merging-and-splitting phase of the algorithm is normally dominated by merging.
If you increase these values, you decrease the number of steps the algorithm needs, and also the number of data copies (each step copies the data once, but usually only locally, and not over the network).
Enhanced map tasks
Plasma-0.4 contains an alternate implementation of map
. It can be
enabled by
mapredjob {
...
enhanced_mapping = <m>;
};
where <m>
is a number. Enhanced map
tasks integrate the sort step,
and even a first split step. The number <m>
is the number of
partitions that are generated in this step. Usually, one sets
m = split_limit
.
A radical approach would be to set enhanced_mapping = partitions
. This
setting is optimal for reducing data copying as much as possible, and
for minimizing network utilization. It has also a downside, though:
The likeliness of random seeks increases. Also, a high number of
intermediate files is created.
A good compromise seems to be enhanced_mapping = partitions/split_limit
(round the division up). This takes into account that we always need a
round of steps for merging files, and a merge step can
also split simultaneously, and this is practically cost-free. If you
can predict several rounds of merging, you can decrease
enhanced_mapping
even further.
Installing additional files on the task nodes
By setting task_files
(in the mapredjob
section) one can install
additional files on all task nodes at job start time. task_files
is a space-separated list of filenames. The files are installed in
an automatically determined directory on the task nodes.
Right now there is no direct way to get the path of this directory. One has to first create the task manager object:
let tm = Mapred_taskfiles.scp_taskfile_manager me#config jc
me
and jc
are passed as first and second argument of map
and
reduce
.
Now the directory is tm#local_directory
.
Note that these files are deleted when the job finishes.
Writing additional log files
Tasks write some log files by default:
Netlog.logf
output is written to a file <log_dir>/<prefix><level>.log
<log_dir>/<prefix>stderr.log
<log_dir>
is first a local directory on the task node. When the
task finishes, all the files with the right <prefix>
are moved to the
configured PlasmaFS log directory. (Every task uses a different
<prefix>
, but all tasks write to the same directory.)
Actually, all files with this <prefix>
are finally moved, not only those
written by the map/reduce framework. So custom code can write additional
files there, and these files are handled in the same way.
The <log_dir>
can be obtained by tm#log_directory
(when tm
is the
taskfile manager, see above).
The <prefix>
can be obtained by ti#task_prefix
when ti
is the
task_info
argument passed to map
and reduce
.
Files in <log_dir>
not having the right filename prefix are also moved
to the PlasmaFS log directory, but only at job termination time.
Streaming
Streaming means to start subprocesses for map
and reduce
.
See Mapred_streaming
for details.