This type of job splits text input files into words, and counts the number of occurrences of each word. For example, if the input is
THE QUICK BROWN FOX JUMPS
OVER THE LAZY DOG
the expected output is
BROWN 1
DOG 1
FOX 1
LAZY 1
OVER 1
QUICK 1
THE 2
The output is sorted by words. We don't do any normalizations of the input (like switching to a uniform case, or processing special characters).
The example code is in examples/mapred-classic/mr_wordfreq.ml
in the
tar ball. Go into this directory, and build the program mr_wordfreq
:
$ cd examples/mapred-classic
$ omake mr_wordfreq
(If you haven't installed Plasma, you might need
OCAMLPATH=../../src omake mr_wordfreq
instead.)
We will run the job twice:
$ mkdir ~/tmp/mr
$ mkdir ~/tmp/mr/input ~/tmp/mr/output ~/tmp/mr/work ~/tmp/mr/log
We download a few RFC pages from IETF as test data:
$ cd ~/tmp/mr
$ wget 'ftp://ftp.rfc-editor.org/in-notes/tar/RFCs0001-0500.tar.gz'
--2012-01-18 15:52:20-- ftp://ftp.rfc-editor.org/in-notes/tar/RFCs0001-0500.tar.gz
=> `RFCs0001-0500.tar.gz.1'
Resolving ftp.rfc-editor.org... 12.22.58.47
Connecting to ftp.rfc-editor.org|12.22.58.47|:21... connected.
Logging in as anonymous ... Logged in!
==> SYST ... done. ==> PWD ... done.
==> TYPE I ... done. ==> CWD (1) /in-notes/tar ... done.
==> SIZE RFCs0001-0500.tar.gz ... 24353681
==> PASV ... done. ==> RETR RFCs0001-0500.tar.gz ... done.
Length: 24353681 (23M) (unauthoritative)
2012-01-18 15:55:34 (124 KB/s) - `RFCs0001-0500.tar.gz.1' saved [24353681]
$ cd input
$ tar xzf ../RFCs0001-0500.tar.gz
$ rm *.pdf *.ps
Note the "rm" - the download contains also some PDF's and Postscript files. We only want the text files, though. The total volume is around 5M. This is not very much. A bigger download is
ftp://ftp.rfc-editor.org/in-notes/tar/RFC-all.tar.gz
with more than 100 MB. In the map/reduce context this is still very small, but for testing it may already be good enough.
Now go back to the directory with the program you've just built, and
edit mr_wordfreq.conf
, so it reads like (adjust the paths):
netplex {
namenodes {
disabled = true;
};
mapred {
node { addr = "localhost" };
port = 8989;
tmpdir = "/tmp/mapred";
buffer_size = 268435456; (* 256 M *)
buffer_size_tight = 67108864; (* 64 M *)
sort_size = 50000000;
};
mapredjob {
input_dir = "file::/home/gerd/tmp/mr/input";
output_dir = "file::/home/gerd/tmp/mr/output";
work_dir = "file::/home/gerd/tmp/mr/work";
log_dir = "file::/home/gerd/tmp/mr/log";
partitions = 4;
merge_limit = 64;
split_limit = 4;
}
}
This is done by:
$ ./mr_wordfreq start_task_servers
Servers started
This looks like:
$ ./mr_wordfreq exec_job
--------------------------------------------------------
Job name: mr_8567c7f3e78b7ebb2ff1c12c056a62b4
Job ID: mr_8567c7f3e78b7ebb2ff1c12c056a62b4
Mapred_fs input dir: file::/home/gerd/tmp/mr/input
Mapred_fs output dir: file::/home/gerd/tmp/mr/output
Mapred_fs work dir: file::/home/gerd/tmp/mr/work
Mapred_fs log dir: file::/home/gerd/tmp/mr/log
Number task nodes: 1
Task server dir: /tmp/mapred
--------------------------------------------------------
Checking...
Planning...
Pre-start hook...
Starting...
Checking executable versions
Starting job
Tasks: runnable=1 running=0 finished=0 total=1 complete=false
Load: I/O=0.000000 CPU=0.000000
Submitted task Map 0 #000000 to 192.168.5.10
Job can be interrupted with CTRL-C!
Finished task Map 0 #000000
Tasks: runnable=1 running=0 finished=1 total=7 complete=true
Load: I/O=0.000000 CPU=0.000000
Submitted task Sort 0 #000001 to 192.168.5.10
Finished task Sort 0 #000001
Tasks: runnable=1 running=0 finished=2 total=7 complete=true
Load: I/O=0.000000 CPU=0.000000
Submitted task Shuffle s0-0/p0-3 #000002 to 192.168.5.10
Finished task Shuffle s0-0/p0-3 #000002
Tasks: runnable=4 running=0 finished=3 total=7 complete=true
Load: I/O=0.000000 CPU=0.000000
Submitted task Shuffle s0-0/p0-0 #000003 to 192.168.5.10
Submitted task Shuffle s0-0/p1-1 #000004 to 192.168.5.10
Submitted task Shuffle s0-0/p2-2 #000005 to 192.168.5.10
Submitted task Shuffle s0-0/p3-3 #000006 to 192.168.5.10
Finished task Shuffle s0-0/p2-2 #000005
Tasks: runnable=0 running=3 finished=4 total=7 complete=true
Load: I/O=1.500000 CPU=1.500000
Finished task Shuffle s0-0/p1-1 #000004
Tasks: runnable=0 running=2 finished=5 total=7 complete=true
Load: I/O=1.000000 CPU=1.000000
Finished task Shuffle s0-0/p3-3 #000006
Tasks: runnable=0 running=1 finished=6 total=7 complete=true
Load: I/O=0.500000 CPU=0.500000
Finished task Shuffle s0-0/p0-0 #000003
Tasks: runnable=0 running=0 finished=7 total=7 complete=true
Load: I/O=0.000000 CPU=0.000000
Finished job
Counter write_lines 1974125.00
Counter write_fs_time 0.04
Counter write_bytes 7798784.00
Counter write_blocks 132.00
Counter time_underutil 100.00
Counter sort_time 0.74
Counter runtime 5.86
Counter read_lines 1963783.00
Counter read_fs_time 0.24
Counter read_bytes 16237239.00
Counter read_blocks 675.00
Counter num_tasks_successful 7.00
Counter num_tasks_misalloc 0.00
Counter num_input_files_nonlocal 0.00
Counter num_input_files 7.00
Counter avg_runqueue_tasks 1.23
Counter avg_running_tasks 1.23
Wallclock runtime: 5.856329 seconds
Job_status: successful
Cleanup...
Sending cleanup tasks
Cleaning rest up
Done clean-up
Post-finish hook...
Job_status: successful
Finished.
First, notice that ~/tmp/mr/work
is empty again, and that
~/tmp/mr/log
contains log files for each task. The real result
is found in ~/tmp/mr/output
:
$ ls -l ~/tmp/mr/output
total 540
-rw-r--r-- 1 gerd gerd 136772 Jan 18 15:58 partition_0
-rw-r--r-- 1 gerd gerd 130392 Jan 18 15:58 partition_1
-rw-r--r-- 1 gerd gerd 130156 Jan 18 15:58 partition_2
-rw-r--r-- 1 gerd gerd 132178 Jan 18 15:58 partition_3
As we configured 4 partitions, the system split the data into these partitions. Each file is sorted by key, and includes the frequencies in the second field:
...
McKenzie 221
McKenzie, 10
McLean, 25
McQuillan 9
McQuillan, 1
Mckenzie 1
Mcquillan 1
Meadows 1
Mealy. 1
Meaningful 1
Meanwhile, 1
Measurement 41
Meetings 3
Mehmet 1
Memorial 1
Merge 1
Merit-Univ. 2
Message' 1
Message). 1
Messages 28
Messages, 1
Meta-compilers 3
Metcalf 2
Metcalfe) 1
Method-1) 1
Meyer. 1
Meyer: 22
Michel 1
Michigan 21
Michigan, 3
Mikan 1
Mil 1
Milt 1
Minus 8
Mirsad 4
...
First check whether you can access PlasmaFS:
$ plasma fsstat
BLOCK STATS:
Total: 819200 blocks
Used: 19916 blocks
Transitional: 0 blocks
Free: 799284 blocks
DATANODE STATS:
Enabled nodes: 1
Alive nodes: 1
If the plasma
utility is not found, extend the PATH variable.
If this does not work, this can have numerous reasons:
~/.plasmafs
. If not, read
Testing with the plasma clientps aux|grep authnode.conf
. It must run on the computer where
you will give the exec_job
command later. If not,
add this host to authnode.hosts
, and redeploy (see
Configuring an instance).
$ plasma mkdir /input /output /work /log
Created dir: /input
Committed.
Created dir: /output
Committed.
Created dir: /work
Committed.
Created dir: /log
Committed.
Copy the data to PlasmaFS. For simplicity, we just cat here all *.txt files together and store all texts in one file:
$ cd ~/tmp/mr/input
$ cat *.txt | plasma put -stdin /input/rfc.txt
Copied: <stdin> -> /input/rfc.txt
Edit mr_wordfreq.conf
again, so it reads like:
netplex {
namenodes {
clustername = "p1"; (* CHANGE THIS *)
node { addr = "office1:2730" }; (* CHANGE THIS *)
};
mapred {
node { addr = "office1" }; (* CHANGE THIS. ADD MORE NODES! *)
port = 8989;
tmpdir = "/tmp/mapred";
buffer_size = 268435456; (* 256 M *)
buffer_size_tight = 67108864; (* 64 M *)
sort_size = 50000000;
};
mapredjob {
input_dir = "plasma::/input";
output_dir = "plasma::/output";
work_dir = "plasma:/work";
log_dir = "plasma::/log";
partitions = 4;
merge_limit = 64;
split_limit = 4;
}
}
This is done by:
$ ./mr_wordfreq start_task_servers
Servers started
This looks like:
$ ./mr_wordfreq exec_job
--------------------------------------------------------
Job name: mr_437cd6f6ffc4435866a25ff0cafeaad4
Job ID: mr_437cd6f6ffc4435866a25ff0cafeaad4
Mapred_fs input dir: plasma::/input
Mapred_fs output dir: plasma::/output
Mapred_fs work dir: plasma::/work
Mapred_fs log dir: plasma::/log
Number task nodes: 1
Task server dir: /tmp/mapred
--------------------------------------------------------
Checking...
Planning...
Pre-start hook...
Starting...
Checking executable versions
Starting job
Tasks: runnable=1 running=0 finished=0 total=1 complete=false
Load: I/O=0.000000 CPU=0.000000
Submitted task Map 0 #000000 to 192.168.5.10
Job can be interrupted with CTRL-C!
Finished task Map 0 #000000
Tasks: runnable=1 running=0 finished=1 total=7 complete=true
Load: I/O=0.000000 CPU=0.000000
Submitted task Sort 0 #000001 to 192.168.5.10
Finished task Sort 0 #000001
Tasks: runnable=1 running=0 finished=2 total=7 complete=true
Load: I/O=0.000000 CPU=0.000000
Submitted task Shuffle s0-0/p0-3 #000002 to 192.168.5.10
Finished task Shuffle s0-0/p0-3 #000002
Tasks: runnable=4 running=0 finished=3 total=7 complete=true
Load: I/O=0.000000 CPU=0.000000
Submitted task Shuffle s0-0/p0-0 #000003 to 192.168.5.10
Submitted task Shuffle s0-0/p1-1 #000004 to 192.168.5.10
Submitted task Shuffle s0-0/p2-2 #000005 to 192.168.5.10
Submitted task Shuffle s0-0/p3-3 #000006 to 192.168.5.10
Finished task Shuffle s0-0/p0-0 #000003
Tasks: runnable=0 running=3 finished=4 total=7 complete=true
Load: I/O=1.500000 CPU=1.500000
Finished task Shuffle s0-0/p1-1 #000004
Tasks: runnable=0 running=2 finished=5 total=7 complete=true
Load: I/O=1.000000 CPU=1.000000
Finished task Shuffle s0-0/p3-3 #000006
Tasks: runnable=0 running=1 finished=6 total=7 complete=true
Load: I/O=0.500000 CPU=0.500000
Finished task Shuffle s0-0/p2-2 #000005
Tasks: runnable=0 running=0 finished=7 total=7 complete=true
Load: I/O=0.000000 CPU=0.000000
Finished job
Counter write_lines 1974125.00
Counter write_fs_time 0.85
Counter write_bytes 7798784.00
Counter write_blocks 132.00
Counter time_underutil 100.00
Counter sort_time 0.70
Counter runtime 12.68
Counter read_lines 2042676.00
Counter read_fs_time 0.61
Counter read_bytes 16237239.00
Counter read_blocks 251.00
Counter num_tasks_successful 7.00
Counter num_tasks_misalloc 0.00
Counter num_input_files_nonlocal 0.00
Counter num_input_files 7.00
Counter avg_runqueue_tasks 1.49
Counter avg_running_tasks 1.49
Wallclock runtime: 12.675212 seconds
Job_status: successful
Cleanup...
Sending cleanup tasks
Cleaning rest up
Done clean-up
Post-finish hook...
Job_status: successful
Finished.
Note that this took a bit longer than before. The main reason for the difference is that PlasmaFS always writes files synchronously.
Now the result is in PlasmaFS. It should look exactly identical:
$ plasma ls /output
-rw-r--r-- gerd gerd 136772 2012-01-18 16:36 partition_0
-rw-r--r-- gerd gerd 130392 2012-01-18 16:36 partition_1
-rw-r--r-- gerd gerd 130156 2012-01-18 16:36 partition_2
-rw-r--r-- gerd gerd 132178 2012-01-18 16:36 partition_3