Plasma GitLab Archive
Projects Blog Knowledge

Sess_mr_wordfreq


Transscript: Running the classic map/reduce job mr_wordfreq

This type of job splits text input files into words, and counts the number of occurrences of each word. For example, if the input is

THE QUICK BROWN FOX JUMPS 
OVER THE LAZY DOG

the expected output is

BROWN 1
DOG 1
FOX 1
LAZY 1
OVER 1
QUICK 1
THE 2

The output is sorted by words. We don't do any normalizations of the input (like switching to a uniform case, or processing special characters).

Preparations

The example code is in examples/mapred-classic/mr_wordfreq.ml in the tar ball. Go into this directory, and build the program mr_wordfreq:

$ cd examples/mapred-classic
$ omake mr_wordfreq

(If you haven't installed Plasma, you might need OCAMLPATH=../../src omake mr_wordfreq instead.)

We will run the job twice:

  • PlasmaFS is disabled
  • PlasmaFS is enabled
For the first run, you need a directory where you can store the files:

$ mkdir ~/tmp/mr
$ mkdir ~/tmp/mr/input ~/tmp/mr/output ~/tmp/mr/work ~/tmp/mr/log

Getting test data

We download a few RFC pages from IETF as test data:

$ cd ~/tmp/mr
$ wget 'ftp://ftp.rfc-editor.org/in-notes/tar/RFCs0001-0500.tar.gz'
--2012-01-18 15:52:20--  ftp://ftp.rfc-editor.org/in-notes/tar/RFCs0001-0500.tar.gz
           => `RFCs0001-0500.tar.gz.1'
Resolving ftp.rfc-editor.org... 12.22.58.47
Connecting to ftp.rfc-editor.org|12.22.58.47|:21... connected.
Logging in as anonymous ... Logged in!
==> SYST ... done.    ==> PWD ... done.
==> TYPE I ... done.  ==> CWD (1) /in-notes/tar ... done.
==> SIZE RFCs0001-0500.tar.gz ... 24353681
==> PASV ... done.    ==> RETR RFCs0001-0500.tar.gz ... done.
Length: 24353681 (23M) (unauthoritative)
2012-01-18 15:55:34 (124 KB/s) - `RFCs0001-0500.tar.gz.1' saved [24353681]

$ cd input
$ tar xzf ../RFCs0001-0500.tar.gz 
$ rm *.pdf *.ps

Note the "rm" - the download contains also some PDF's and Postscript files. We only want the text files, though. The total volume is around 5M. This is not very much. A bigger download is

 ftp://ftp.rfc-editor.org/in-notes/tar/RFC-all.tar.gz 

with more than 100 MB. In the map/reduce context this is still very small, but for testing it may already be good enough.

Changing the configuration file

Now go back to the directory with the program you've just built, and edit mr_wordfreq.conf, so it reads like (adjust the paths):

netplex {
  namenodes {
    disabled = true;
  };
  mapred {
    node { addr = "localhost" };
    port = 8989;
    tmpdir = "/tmp/mapred";
    buffer_size = 268435456;      (* 256 M *)
    buffer_size_tight = 67108864; (* 64 M *)
    sort_size = 50000000;
  };
  mapredjob {
    input_dir = "file::/home/gerd/tmp/mr/input";
    output_dir = "file::/home/gerd/tmp/mr/output";
    work_dir = "file::/home/gerd/tmp/mr/work";
    log_dir = "file::/home/gerd/tmp/mr/log";
    partitions = 4;
    merge_limit = 64;
    split_limit = 4;
  }
}

Start the task server

This is done by:

$ ./mr_wordfreq start_task_servers
Servers started

Run the job

This looks like:

$ ./mr_wordfreq exec_job
--------------------------------------------------------
Job name:             mr_8567c7f3e78b7ebb2ff1c12c056a62b4
Job ID:               mr_8567c7f3e78b7ebb2ff1c12c056a62b4
Mapred_fs input dir:  file::/home/gerd/tmp/mr/input
Mapred_fs output dir: file::/home/gerd/tmp/mr/output
Mapred_fs work dir:   file::/home/gerd/tmp/mr/work
Mapred_fs log dir:    file::/home/gerd/tmp/mr/log
Number task nodes:    1
Task server dir:      /tmp/mapred
--------------------------------------------------------
Checking...
Planning...
Pre-start hook...
Starting...
Checking executable versions
Starting job
Tasks: runnable=1 running=0 finished=0 total=1 complete=false
Load: I/O=0.000000 CPU=0.000000
Submitted task Map 0 #000000 to 192.168.5.10
Job can be interrupted with CTRL-C!
Finished task Map 0 #000000
Tasks: runnable=1 running=0 finished=1 total=7 complete=true
Load: I/O=0.000000 CPU=0.000000
Submitted task Sort 0 #000001 to 192.168.5.10
Finished task Sort 0 #000001
Tasks: runnable=1 running=0 finished=2 total=7 complete=true
Load: I/O=0.000000 CPU=0.000000
Submitted task Shuffle s0-0/p0-3 #000002 to 192.168.5.10
Finished task Shuffle s0-0/p0-3 #000002
Tasks: runnable=4 running=0 finished=3 total=7 complete=true
Load: I/O=0.000000 CPU=0.000000
Submitted task Shuffle s0-0/p0-0 #000003 to 192.168.5.10
Submitted task Shuffle s0-0/p1-1 #000004 to 192.168.5.10
Submitted task Shuffle s0-0/p2-2 #000005 to 192.168.5.10
Submitted task Shuffle s0-0/p3-3 #000006 to 192.168.5.10
Finished task Shuffle s0-0/p2-2 #000005
Tasks: runnable=0 running=3 finished=4 total=7 complete=true
Load: I/O=1.500000 CPU=1.500000
Finished task Shuffle s0-0/p1-1 #000004
Tasks: runnable=0 running=2 finished=5 total=7 complete=true
Load: I/O=1.000000 CPU=1.000000
Finished task Shuffle s0-0/p3-3 #000006
Tasks: runnable=0 running=1 finished=6 total=7 complete=true
Load: I/O=0.500000 CPU=0.500000
Finished task Shuffle s0-0/p0-0 #000003
Tasks: runnable=0 running=0 finished=7 total=7 complete=true
Load: I/O=0.000000 CPU=0.000000
Finished job
Counter write_lines           1974125.00
Counter write_fs_time         0.04
Counter write_bytes           7798784.00
Counter write_blocks          132.00
Counter time_underutil        100.00
Counter sort_time             0.74
Counter runtime               5.86
Counter read_lines            1963783.00
Counter read_fs_time          0.24
Counter read_bytes            16237239.00
Counter read_blocks           675.00
Counter num_tasks_successful  7.00
Counter num_tasks_misalloc    0.00
Counter num_input_files_nonlocal 0.00
Counter num_input_files       7.00
Counter avg_runqueue_tasks    1.23
Counter avg_running_tasks     1.23
Wallclock runtime:            5.856329 seconds
Job_status: successful
Cleanup...
Sending cleanup tasks
Cleaning rest up
Done clean-up
Post-finish hook...
Job_status: successful
Finished.

Looking at the result

First, notice that ~/tmp/mr/work is empty again, and that ~/tmp/mr/log contains log files for each task. The real result is found in ~/tmp/mr/output:

$ ls -l ~/tmp/mr/output
total 540
-rw-r--r-- 1 gerd gerd 136772 Jan 18 15:58 partition_0
-rw-r--r-- 1 gerd gerd 130392 Jan 18 15:58 partition_1
-rw-r--r-- 1 gerd gerd 130156 Jan 18 15:58 partition_2
-rw-r--r-- 1 gerd gerd 132178 Jan 18 15:58 partition_3

As we configured 4 partitions, the system split the data into these partitions. Each file is sorted by key, and includes the frequencies in the second field:

...
McKenzie 221
McKenzie, 10
McLean, 25
McQuillan 9
McQuillan, 1
Mckenzie 1
Mcquillan 1
Meadows 1
Mealy. 1
Meaningful 1
Meanwhile, 1
Measurement 41
Meetings 3
Mehmet 1
Memorial 1
Merge 1
Merit-Univ. 2
Message' 1
Message). 1
Messages 28
Messages, 1
Meta-compilers 3
Metcalf 2
Metcalfe) 1
Method-1) 1
Meyer. 1
Meyer: 22
Michel 1
Michigan 21
Michigan, 3
Mikan 1
Mil 1
Milt 1
Minus 8
Mirsad 4
...

Copy the data to PlasmaFS

First check whether you can access PlasmaFS:

$ plasma fsstat
BLOCK STATS:
Total:                      819200 blocks
Used:                        19916 blocks
Transitional:                    0 blocks
Free:                       799284 blocks

DATANODE STATS:
Enabled nodes:   1
Alive nodes:     1

If the plasma utility is not found, extend the PATH variable.

If this does not work, this can have numerous reasons:

Once this works, create the directories:

$ plasma mkdir /input /output /work /log
Created dir: /input
Committed.
Created dir: /output
Committed.
Created dir: /work
Committed.
Created dir: /log
Committed.

Copy the data to PlasmaFS. For simplicity, we just cat here all *.txt files together and store all texts in one file:

$ cd ~/tmp/mr/input
$ cat *.txt | plasma put -stdin /input/rfc.txt
Copied: <stdin> -> /input/rfc.txt

Changing the configuration file

Edit mr_wordfreq.conf again, so it reads like:

netplex {
  namenodes {
    clustername = "p1";                  (* CHANGE THIS *)
    node { addr = "office1:2730" };      (* CHANGE THIS *)
  };
  mapred {
    node { addr = "office1" };           (* CHANGE THIS. ADD MORE NODES! *)
    port = 8989;
    tmpdir = "/tmp/mapred";
    buffer_size = 268435456;      (* 256 M *)
    buffer_size_tight = 67108864; (* 64 M *)
    sort_size = 50000000;
  };
  mapredjob {
    input_dir = "plasma::/input";
    output_dir = "plasma::/output";
    work_dir = "plasma:/work";
    log_dir = "plasma::/log";
    partitions = 4;
    merge_limit = 64;
    split_limit = 4;
  }
}

Start the task server

This is done by:

$ ./mr_wordfreq start_task_servers
Servers started

Run the job

This looks like:

$ ./mr_wordfreq exec_job

--------------------------------------------------------
Job name:             mr_437cd6f6ffc4435866a25ff0cafeaad4
Job ID:               mr_437cd6f6ffc4435866a25ff0cafeaad4
Mapred_fs input dir:  plasma::/input
Mapred_fs output dir: plasma::/output
Mapred_fs work dir:   plasma::/work
Mapred_fs log dir:    plasma::/log
Number task nodes:    1
Task server dir:      /tmp/mapred
--------------------------------------------------------
Checking...
Planning...
Pre-start hook...
Starting...
Checking executable versions
Starting job
Tasks: runnable=1 running=0 finished=0 total=1 complete=false
Load: I/O=0.000000 CPU=0.000000
Submitted task Map 0 #000000 to 192.168.5.10
Job can be interrupted with CTRL-C!
Finished task Map 0 #000000
Tasks: runnable=1 running=0 finished=1 total=7 complete=true
Load: I/O=0.000000 CPU=0.000000
Submitted task Sort 0 #000001 to 192.168.5.10
Finished task Sort 0 #000001
Tasks: runnable=1 running=0 finished=2 total=7 complete=true
Load: I/O=0.000000 CPU=0.000000
Submitted task Shuffle s0-0/p0-3 #000002 to 192.168.5.10
Finished task Shuffle s0-0/p0-3 #000002
Tasks: runnable=4 running=0 finished=3 total=7 complete=true
Load: I/O=0.000000 CPU=0.000000
Submitted task Shuffle s0-0/p0-0 #000003 to 192.168.5.10
Submitted task Shuffle s0-0/p1-1 #000004 to 192.168.5.10
Submitted task Shuffle s0-0/p2-2 #000005 to 192.168.5.10
Submitted task Shuffle s0-0/p3-3 #000006 to 192.168.5.10
Finished task Shuffle s0-0/p0-0 #000003
Tasks: runnable=0 running=3 finished=4 total=7 complete=true
Load: I/O=1.500000 CPU=1.500000
Finished task Shuffle s0-0/p1-1 #000004
Tasks: runnable=0 running=2 finished=5 total=7 complete=true
Load: I/O=1.000000 CPU=1.000000
Finished task Shuffle s0-0/p3-3 #000006
Tasks: runnable=0 running=1 finished=6 total=7 complete=true
Load: I/O=0.500000 CPU=0.500000
Finished task Shuffle s0-0/p2-2 #000005
Tasks: runnable=0 running=0 finished=7 total=7 complete=true
Load: I/O=0.000000 CPU=0.000000
Finished job
Counter write_lines           1974125.00
Counter write_fs_time         0.85
Counter write_bytes           7798784.00
Counter write_blocks          132.00
Counter time_underutil        100.00
Counter sort_time             0.70
Counter runtime               12.68
Counter read_lines            2042676.00
Counter read_fs_time          0.61
Counter read_bytes            16237239.00
Counter read_blocks           251.00
Counter num_tasks_successful  7.00
Counter num_tasks_misalloc    0.00
Counter num_input_files_nonlocal 0.00
Counter num_input_files       7.00
Counter avg_runqueue_tasks    1.49
Counter avg_running_tasks     1.49
Wallclock runtime:            12.675212 seconds
Job_status: successful
Cleanup...
Sending cleanup tasks
Cleaning rest up
Done clean-up
Post-finish hook...
Job_status: successful
Finished.

Note that this took a bit longer than before. The main reason for the difference is that PlasmaFS always writes files synchronously.

Looking at the result

Now the result is in PlasmaFS. It should look exactly identical:

$ plasma ls /output
-rw-r--r-- gerd gerd 136772 2012-01-18 16:36 partition_0
-rw-r--r-- gerd gerd 130392 2012-01-18 16:36 partition_1
-rw-r--r-- gerd gerd 130156 2012-01-18 16:36 partition_2
-rw-r--r-- gerd gerd 132178 2012-01-18 16:36 partition_3

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml