Plasma GitLab Archive
Projects Blog Knowledge

Plasmamr_toolkit



The map/reduce toolkit

The Mapred_toolkit is a nice layer on top of the standard way of running map/reduce. It provides:

  • Easy invocation of map/reduce by just calling a function
  • Type-safe representation of records
  • An abstraction for files that comes close to List
  • An abstraction for directories: places
  • An option to run map/reduce locally without distribution for easier testing
It is recommended for everybody who does not have very special needs that would exceed its capabilities. It is also a good starting point for beginners.

Examples: The example programs are included in the Plasma tarball, under examples/mapred-toolkit.

It is a framework, not a library

Although you can pick functions out of the mr_framework package and use it like a library, it is not meant as this. Map/reduce is designed as a distributed algorithm, and this means the code must run on several computers. The framework ensures this, and you have to use it when you invoke functions from Mapred_toolkit.DSeq (providing distributed algorithms). This is even the case when you don't use PlasmaFS (the distributed filesystem), and just restrict yourself to a single computer, because the map/reduce program is executed anyway in several processes to take advantage from multi-core CPU's.

So, this framework works as follows: You create a single executable which is both used for controlling the algorithm, and as task server for running parts of the algorithm. The executable is copied to all computers where tasks are supposed to run and started there in server mode. Now, mr_framework provides a configurable main program that allows it to start the executable in the required modes, and that can be extended by your own code.

Actually, it is very simple. Your own main program has to look like:

let () =
  Mapred_main.dispatch
    [ ... ]

The list specifies your extensions. For instance, if you want to have a job "my_job", do it like

let () =
  Mapred_main.dispatch
    [ "my_job", "This is my_job", Mapred_toolkit.toolkit_job, run_my_job ]

The first string is the name of the job. It is used on the command-line to designate the function to execute. The second string is a description (for -help). With Mapred_toolkit.toolkit_job you indicate that you want to use the toolkit, and not a hand-crafted job. The identifier run_my_job names a function you have to provide. A "hello world" version of run_my_job could be:

let run_my_job me mj =
  Arg.parse
    []
    (fun s -> ())
    "usage: my_job";
  print_endline "Hello world!"

The function takes two arguments:

  • me is a Mapred_def.mapred_env and contains the environment. Here you find all global things, like the overall configuration (me#config), and access to the filesystem (me#filesystem). It is recommended to always have me at hand, and to pass it down to all functions that are invoked my run_my_job.
  • mj is a Mapred_def.mapred_job. This is an object that defines the various map/reduce placeholders like map, sorter, reduce, and combine. Actually, this is just the result of executing the third element of the dispatch list (here Mapred_toolkit.toolkit_job). The job object must exist identically in all running instances of the executable, and because of this it is handled separately. As we are using the toolkit, we don't create the job object ourselves, but just use the pre-made one from the toolkit. This job object defines generic versions of the map/reduce placeholder functions which can be refined later. Note that you normally can just forget about mj if you use the toolkit.
The run_my_job function can parse further arguments with Arg.parse.

Now, you create the executable:

ocamlfind ocamlopt -o my_prog \
                   -package mr_framework.toolkit \
                   -syntax camlp4o \
                   -linkpkg \
		   -thread \
		   my_prog.ml

Note that this compiler command also specifies to use camlp4 as parser. This is not strictly required but recommended, because it enables the quotations <:rfun< ... >> we'll use later.

Let's see what happens when you start the executable:

$ ./my_prog -help
usage: my_prog [-conf <configfile>] <mode> [-help|options]

  available modes:
  \- start_task_servers: install and start task servers
  \- stop_task_servers: stop task servers
  \- my_job: This is my_job

This means you can just run the user-supplied function, and the two predefined functions "start_task_servers" and "stop_task_servers" (actually, there are even a few more such functions not shown in the usage output). Let's start it:

$ ./my_prog my_job
Fatal error: exception Sys_error("./my_prog.conf: No such file or directory")
Raised at file "pervasives.ml", line 284, characters 20-46
Called from file "mapred_main.ml", line 384, characters 11-50
Called from file "mapred_main.ml", line 536, characters 12-44
Called from file "mr_wf_toolkit.ml", line 109, characters 0-93

Oops! The framework expects a confguration file. It is by default expected in the same directory, and the name is derived from the executable by appending ".conf". The most simple configuration file is:

netplex {
  namenodes {
    disabled = true;
  };
  mapred {
    node { addr = "localhost" };
    port = 8989;
    tmpdir = "/tmp/mapred";
    buffer_size = 67108864;
    buffer_size_tight = 16777216;
    sort_size = 134217728;
  };
  mapredjob {
  };
}

The section namenodes configures the access to PlasmaFS. For simplicity, we have just disabled PlasmaFS. This is ok if you just want to play around with Plasma, but is of course not the intended way of using it. If you disable PlasmaFS this means:

  • You cannot access PlasmaFS files anymore (file names of the form "plasma::/path"). Only local files work (names of the form "file::/path"). (We'll explain file names below.)
  • The security features are disabled (no authentication and encryption).
  • The task server will bind to 127.0.0.1 and not 0.0.0.0, and is thus unreachable from other computers.
The mapred section configures global settings. In node we specify that there is only one computer where we can execute tasks, namely "localhost". The port is the port the task server uses. The tmpdir is a local directory where the task server places files it needs for operation. This directory should exist, and should be writable by the user running the task server. The value buffer_size is the suggested amount of buffering when files are opened (here 64M). In buffer_size_tight one can given an alternate value to be used when there is not much free RAM anymore. The sort_size is used for sort buffers. (All buffer sizes in bytes.)

The mapredjob section is for map/reduce-specific configurations. We just take the default.

If you now create this file as my_job.conf, the command starts successfully:

$ ./my_prog my_job
Hello world!

Let's see if you can start the task server:

$ ./my_prog start_task_servers
Servers started

Although you called the executable "my_prog", it is renamed to "task_server" if started as such:

$ ps aux|grep task
gerd     14581  0.0  0.0  29288  8540 ?        Ss   14:20   0:01 
  /tmp/mapred/task_server -conf /tmp/mapred/task_server.conf 
  -log /tmp/mapred/task_server.stderr task_server 
  -mapred-conf /tmp/mapred/mapred.conf -pid /tmp/mapred/task_server.pid
gerd     14652  0.1  0.0  48392  9056 ?        Sl   14:20   0:05 
  /tmp/mapred/task_server -conf /tmp/mapred/task_server.conf 
  -log /tmp/mapred/task_server.stderr task_server 
  -mapred-conf /tmp/mapred/mapred.conf -pid /tmp/mapred/task_server.pid

What has happened is that "my_prog" was copied to /tmp/mapred/task_server and started in the /tmp/mapred directory. If you had configured several mapred nodes, this would have happened for all nodes. (Caveat: you need ssh/scp access to these nodes because start_task_servers uses these utilities to copy the files there, and to start the servers.)

You can also stop the task servers again:

$ ./my_prog stop_task_servers
Servers killed

Of course, the normal mode of operation is that you start locally a command like "my_prog my_job" which talks then to the task servers to run code in parallel on many machines. The framework checks whether exactly the same binary is running on all machines.

When do you need the task server? It is only required if you use functions from the sub module Mapred_toolkit.DSeq where all the distributed algorithm schemes are collected.

It is possible to have several executables running as task servers on the cluster, but be careful to use different "port" and different "tmpdir" settings for each type of server.

Registered functions

A user may register functions under a unique name. The intention is that all instances of the executable can refer to the function, and that it is even possible to exchange function references between instances. For example, your job may want to run a function in the task server, possibly on a different machine. In order to do this, the job must send the task server a reference to the function. This is done by just sending the name, which is known by both the local instance of the executable running your job, and by the instance running the task server.

You could use this feature for an RPC-style remote invocation, but it is not limited to this. In fact, in map/reduce the job normally only sends the function reference, and the task server invokes the function many times with different arguments which it takes from external files.

The module Mapred_rfun defines the concept of registered functions. There is a camlp4 quotation simplifying the definition of concrete functions. For instance, a function "f" converting an int to a string can be defined as

let f =
  <:rfun< @ k -> string_of_int k >>

This looks close to let f = fun k -> string_of_int k, but is not exactly the same.

First of all, f gets a unique name. You can see the (generated) name with Mapred_rfun.get_id f. Also, the function is entered into a global table of registered functions.

Second, the type is not int -> string, but (int -> string) Mapred_rfun.rfun. This just means that the function is encapsulated, and cannot be called directly. If you want to run it directly (not on a different machine), you need to look it up in the table with Mapred_rfun.lookup, e.g.

let s = (Mapred_rfun.lookup f) 42   (* = "42" *)

The definition must be done in toplevel scope! It is meaningless to define f inside another function, because the whole point of the definition is that the various instances of the program know the function under its name right after initializing the toplevel modules.

For the same reasons, the function should be pure, and avoid any side effects.

The function can have several arguments:

  <:rfun< @ arg1 arg2 ... -> expr >>

You may wonder why there is this "@" sign. This character separates two different kinds of arguments. So far we have seen the so-called remote arguments which are instantiated late, after doing the lookup (which may occur in a task server, hence the name "remote"). The full syntax is, however:

  <:rfun< larg1 larg2 ... @ rarg1 rarg2 ... -> expr >>

The arguments before "@" are the so-called local arguments, and are always instantiated by the local program. The idea is to emulate

let f larg1 larg2 ... =                                (* ILLEGAL! *)
  <:rfun< @ rarg1 rarg2 ... -> expr >>

which is useful to pass values down but is illegal as the <:rfun< >> quotation would occur inside another function. The concept of local arguments gets around this limitation. Actually, these arguments are sent together with the function reference to the task server where the registered function will be executed, and are instantiated just before running the function. (This is a bit like defining a closure across the network.)

The local arguments are marshalled using the Marshal module. Note that this restricts the possible values (no "live" values like file channels, no functions, no objects, no exceptions).

(As a side point you may wonder why we do not simply enable function marshalling, and avoid the whole topic of registered functions. The problem is that we cannot assume that the functions have the same addresses in task servers running on different machines, even if the servers execute the same executable. Nowadays executables are dynamically loaded, and this may lead to address shifts when done on different machines. Also, OS features like address randomization are in the way.)

The filesystem

Map/reduce programs do not directly interact with the local filesystem or with PlasmaFS, but use a virtual layer Mapred_fs.filesystem (which you can get via me#filesystem). This layer is an extended version of Ocamlnet's stream_fs abstraction. It is has a special feature, however, namely that one can merge several file trees into a single one.

Filenames use the syntax treename::/path to access the file tree known under treename, and to operate on (the absolute) /path in this tree. Note that paths are always absolute, and that there is no escaping of reserved characters like in URLs. There are normally two such trees:

  • file::/path is the local filesystem of the computer
  • plasma::/path is the PlasmaFS filesystem
You can omit the tree prefix and just use /path, but it depends then on further circumstances which tree is actually accessed.

There are a lot of operations for filesystems, e.g.

  • read file: fs # read flags path
  • write file: fs # write flags path
  • rename file: fs # rename flags oldpath newpath
  • delete file: fs # remove flags path
The details are documented for Mapred_fs.filesystem.

Example: Read a file line by line:

let read_file me =
  let ch = me # filesystem # read [] "plasma::/dir/file" in
  try
    while true do
      let line = ch # input_line() in
      print_endline line;
    done
  with End_of_file ->
    close_in ch

Note that there are also a number of useful filesystem functions in Mapred_io, e.g. Mapred_io.file_exists.

Files as sequences of records

Map/reduce accesses files strictly sequentially. Logically, the files are simply sequences of records, where the records are simply strings. The toolkit makes another assumption, however: Records can be divided into fields. This is highly configurable, as we'll see.

Plasma knows three types of files:

  • In a line-structured file (or simply text file) the records are terminated by LF bytes, i.e. a file has lines, and every line is considered as a record. This is very compatible with existing utilities, but has one drawback: one cannot store arbitrary data in the records. In particular, the records cannot contain the LF byte, because this is the terminator.
  • In files with fixed-size records every record consists of exactly the same number of bytes. There is no terminator or other kind of record framing. The records can now contain arbitrary data. Files with fixed-size records have conventionally file name ending in ".fixed<n>".
  • In files with variable-size records a relatively complicated framing format makes it possible that the records can have variable length and contain arbitrary data at the same time. Files with variable-size records have conventionally file names ending in ".var".
It is recommended to start with text files, because you don't need extra utilities to access them, but you can simply use shell commands like more, sed, awk or grep to process them.

There is the utility plasma_convert which can convert between the file types, see Cmd_plasma_convert for how to call it.

If you use PlasmaFS, you can use the plasma utility to copy files to PlasmaFS and from PlasmaFS (see Cmd_plasma, there are also many more functions to interact with PlasmaFS):

$ plasma put localfile /plasmafs/path
$ plasma cat /plasmafs/path >localfile

As noted, records are primarily arbitrary strings. Normally, however, one wants to divide the records up into fields (like that a database row has columns). For text files, a popular method to represent fields is to use another character as field separator, often the TAB character. The Mapred_fields module contains useful functions to split a TAB-separated record into fields, e.g.

let s = "f1\tf2\tf3" ;;
let f1 = Mapred_fields.text_field ~field:1 s ;;  (* = "f1" *)
let f2_f3 = Mapred_fields.text_field ~field:2 ~num:2 s ;;  (* = "f2\tf3" *)

For concatenating fields, you can simply use String.concat or Printf.sprintf.

There are companion functions for files with variable-size records which do not have the restriction that fields can neither contain LF nor TAB. See Mapred_fields.var_field_range.

Places

The toolkit provides the abstraction of "places". These are collections of files, and one can read the files, or put more files into the collection. Normally, places are backed by filesystems, which means that a place is a directory. However, the toolkit also provides in-memory places, which is useful for temporary data and for testing.

Together with the collection, a place defines a number of properties:

  • Which filesystem stores the files (if so)
  • Which file format is chosen (line-structured, fixed- or variable-size records)
  • How records are split into fields
  • The "record config" which are essentially some limits that apply for record processing (maximum size etc.)
Example: We define a place for a directory, and get the files there:

open Mapred_toolkit

let codec =
  ( <:rfun< @ s -> String.copy s >>,
    <:rfun< @ s p l -> String.sub s p l >>
  )

let list_files me =
  let rc = Mapred_def.get_rc me (16 * 1024 * 1024) in
  let place = 
    Place.from
      me#filesystem codec `Line_structured rc (`Deep_dir "plasma::/dir") in
  List.iter
    print_endline
    (Place.files place)

Let's have a more detailed look at this piece of code. The directory we inspect is "plasma::/dir". Because of the `Deep_dir tag, the directory is scanned recursively for files. (Note that files and directories whose names start with "." or "_" are omitted by the scan.)

The tag `Line_structured configures that the files have line-structured format. Of course, this is irrelevant for just listing the file names, but a place generally includes this specification.

The record configuration rc can be taken from me via Mapred_def.get_rc, with the only exception of the size of the bigblocks. Here this size is given as 16M (16 * 1024 * 1024). Bigblocks are used by some algorithms as an (approximate) upper bound of the record size. (Generally, when a large file is cut into pieces to be processed by tasks, each of these pieces must be a multiple of a bigblock.)

The codec is a pair of a (coder,decoder). The coder transforms a number of fields into a record (string), and a decoder extracts the fields from the record. In this case, we use the most dumb definitions for the codec, namely that the whole record is a single string field. In general, the codec is a pair of registered functions

type 'a codec =
  ('a -> string) rfun * 
  (string -> int -> int -> 'a) rfun

The type parameter 'a is our choice - this is how we want to represent records internally. For example, if we analyze card games, we could have something like

type game_feature =
  { player_name : string;
    game_id : int;
    money : float
  }

meaning that the player with player_name won money in the game with ID game_id. If we represent this as lines

<player_name> TAB <game_id> TAB <money>

we can define the coder and decoder as

let game_codec =
  ( <:rfun< @ game -> 
       sprintf "%s\t%d\t%f" game.player_name game.game_id game.money
    >>,
    <:rfun< @ s p l ->
       match Mapred_fields.text_fields ~pos:p ~len:l ~num:3 s with
         | [ pn; gid; m ] ->
	     { player_name = pn;
               game_id = int_of_string gid;
               money = float_of_string m
             }
         | _ ->
	     failwith "bad game_feature line"
    >>
  )

Note that the string s in the decoder can be more than the current record - in general it is the buffer where the current record is extracted from. The values p and l specify the range in the buffer where the current record can be found. If you need the whole record, use String.sub s p l.

The type of a place using this codec would be

  game_feature Place.t

The special place Place.notebook_place() can be used for keeping in-memory "files". Such files are called notebooks, and are explained in detail below. For notebooks, none of the above configurations are needed (codec, file format, etc).

Stores and sequences

The next abstractions we build on top of files are called stores and sequences. They are closely related to each other: A store means a physical location where sequential data can be stored - this is the file (or notebook). A sequence is a view on a store, and is the primary access channel to stores.

For stores, defined in Mapred_toolkit.Store, there are only a few functions. The most important ones connect stores with places:

Example: Let place be defined as in the list_files example above. You can now get the stores at this place with

open Mapred_toolkit

let stores = Store.read_place place

Now, stores is a string Store.t list, i.e. a list of stores where the records are strings (as place was a string Place.t).

If you need a store for a single file only, the recommended way is to define first a place for only this file, and then to call read_place to get the store for it. A place for a single file is denoted by using `File path instead of `Deep_dir:

  let place = 
    Place.from
      me#filesystem codec `Line_structured rc (`File "plasma::/the/file")
  let [store] =
    Store.read_place place

A file store is either open for reading or for writing: The stores returned by read_place can only be read, and the stores created by write_place can only be written. An exception are in-memory stores (notebooks), which are read/write. (Note that the separation between read and write mode is only an implementation restriction, and will probably go away at some point.)

A store must be closed after use: Mapred_toolkit.Store.close.

Sequences, defined in Mapred_toolkit.Seq, can now read stores record by record, and they can append new records to the end of the stores. The type 't Seq.seq is a lot like 't list: It means immutable sequences of values. However, sequences are extended at their end, whereas a list is extended at the beginning. Compare:

Operation                  List                  Sequence
----------------------------------------------------------------------
Empty container            []                    Seq.notebook() or
                                                 a sequence over an empty
                                                 file
Head                       List.hd l             Seq.hd s
Tail                       List.tl l             Seq.tl s
Test whether empty         l = []                Seq.is_empty s
Extend                     x :: l                Seq.add x l

This means that sequences behave like lists when they are decomposed into head and tail, but the inverse operation is not available. Instead there is only Seq.add which adds a new element to the end. Of course, this just reflects that sequences view files: With hd and tl one can iterate over the file from the beginning to the end, and files are only extensible at their end.

Another difference is that Seq.add modifies the underlying container. After

let s1 = Seq.add x s

the view s on the container is unchanged, and s1 is a new view including the just appended element x, in (not allowed) pseudo notation:

    s  = [ x1; x2; ...; xn ]
    s1 = [ x1; x2; ...; xn; x ]

In order to keep consistency, a Seq.add is only permitted if the sequence s views the end of the store. In contrast, lists are extended at the head by prepending new values (x :: l). If two such extensions occur (l1 = x1 :: l and l2 = x2 :: l), two new lists l1 and l2 are constructed that share a common tail. This would not work for sequences: After s1 = Seq.add x s the sequence s no longer views the end of the store (it stops one element before the end), and a following s2 = Seq.add x s would run into a runtime error:

let n1 =
  Seq.add 30 (Seq.add 20 (Seq.add 10 (Seq.notebook())))

(* The store contains now: [10; 20; 30]
                 n1 views: [10; 20; 30]
 *)

let n2 =
  Seq.add 40 n1

(* The store contains now: [10; 20; 30; 40]
                 n1 views: [10; 20; 30]
                 n2 views: [10; 20; 30; 40]
 *)

let n3 =
  Seq.add 50 n1

(* Failure! n1 does not view the end of the store, and "inserting" the
   50 after the 30 would violate the conditions that were already
   granted to n2
 *)

As stores, sequences can be read-only, write-only, or read/write. Other than for stores, this is now reflected in the type:

  • A read-only sequence has type ('t, [`R]) seq
  • A write-only sequence has type ('t, [`W]) seq
  • A read/write sequence has type ('t, [`R|`W]) seq
Here, 't is the type of the elements. One can create sequences with:

  • Read a store st: Seq.read st
  • Write to the end of a store st: Seq.extend st
  • Create a read/write notebook: Seq.notebook()
Example: We want to copy every tenth record of one file to another file (e.g. for getting a testing sample from a big dataset). More precisely, we want to do this for every file in place pl1, and put the extracted records into files in place pl2.

let distill pl1 pl2 =
  let rec distill_seq s1 s2 n =
    if not (Seq.is_empty s1) then (
      if n = 10 then
        distill_seq (Seq.tl s1) (Seq.add (Seq.hd s1) s2) 1
      else
        distill_seq (Seq.tl s1) s2 (n+1)
    ) in
  let distill_store st1 st2 =
    let s1 = Seq.read st1 in
    let s2 = Seq.extend st2 in
    distill_seq s1 s2 1
  in
  List.iter
    (fun st1 ->
       let st2 = Store.write_place pl2 in
       distill_store st1 st2;
       Store.close st1;
       Store.close st2;
    )
    (Store.read_place pl1)

Full code: Look into examples/mapred-toolkit/tenth.ml of the Plasma tarball.

This function works in the following way: The contents of pl1 are retrieved as a list of stores, and for each of these stores st1 a new counterpart st2 is created by writing to pl2. For st1 a read-only sequence s1 is made which sees now the whole file, and by calling Seq.tl it is iterated over the elements of s1. In parallel, the write-only sequence s2 extends st2, and the selected elements are added to it.

Note that this function does not return any data directly; the result is written to pl2. If you want to further process it, the way to do it is to get again the stores with Store.read_place pl2. Alternatively, it would be possible to return the final version of s2 using

  let rec distill_seq s1 s2 n =
    if not (Seq.is_empty s1) then (
      if n = 10 then
        distill_seq (Seq.tl s1) (Seq.add (Seq.hd s1) s2) 1
      else
        distill_seq (Seq.tl s1) s2 (n+1)
    ) 
    else
      s2  

(note the final else clause) and modify the other parts of the distill function so that all s2 are returned. Remember that s2 is only a view to the container and not the container itself. Actually, the data is appended to the underlying file, and the in-memory representation of s2 is constant.

Now, what could you do with such an s2? The sequence is write-only, so one could only add more elements, but not read the existing ones. The solution is to call Mapred_toolkit.Seq.reread:

let s3 = Seq.reread s2

which creates another view for the same store, and reads the elements contained in it. Note that reread always reads all elements of the store, and not only those that are in s2 (in this example this makes no difference, though).

As you see, the sequence abstraction has some peculiarities, but is nevertheless useful to deal with external files as if they were in-memory containers. Note that we left a lot of representation details unspecified in distill:

  • The function is polymorphic regarding the type of the records
  • We did not define how to represent the records as files, neither on the level of records nor on the level of fields
  • We did not say where the files are stored. The "files" could even only be in-memory notebooks.

Combinators for sequences

The combinators iter and fold work like their list counterparts (when fold means fold_left):

  val iter : ('a -> unit) -> ('a,[>`R]) seq -> unit
  val fold : ('a -> 'b -> 'a) -> 'a -> ('b,[>`R]) seq -> 'a

A bit different is map:

  val map : ('a -> 'b) -> ('a,[>`R]) seq -> ('b,[>`W] as 'm) seq -> ('b,'m) seq

As you see there is one extra argument. You call it like

let s3 = Seq.map f s1 s2

which means that s3 is s2 plus the mapped elements of s1. We need s2 to denote the store where the mapped elements can be added to.

There is the variant mapl, allowing to map a single element to a list:

  val mapl : ('a -> 'b list) -> 
             ('a,[>`R]) seq -> 
             ('b,[>`W] as 'm) seq -> 
                  ('b,'m) seq

All of the mentioned combinators do not buffer more elements in memory than necessary, and can be used to process arbitrarily large sequences.

Sorting in-memory: the non-distributed version of map/reduce

We'll discuss now Mapred_toolkit.Seq.sort and Mapred_toolkit.Seq.mapl_sort_fold. The former function is like List.sort but for sequences, whereas the latter function implements the whole map/reduce algorithm scheme locally (i.e. non-distributed and in-memory).

Seq.sort

The signature of the sort function looks a bit different than List.sort:

  val sort : 
        hash:('a -> int) ->
        cmp:('a -> 'a -> int) ->
        ('a,[>`R]) seq ->
        ('a,[>`W] as 'm) seq ->
          ('a,'m) seq

In particular, there is again the additional write-only sequence we already know from map. The call

let s3 = Seq.sort ~hash ~cmp s1 s2

appends the sorted elements of s1 to s2, and returns this concatenation as s3. The extra argument s2 is needed because we need a store where we can put the sorted data.

The other difference is that there is not only cmp for comparing values but also a hash function. This is an optimization, and if you do not need it or it is not applicable, just pass hash as a function that always returns 0.

The improvement is that this sorting algorithm performs an initial pass over all elements, and computes the hash values for them. The hash values must be non-negative 30 bit integers (even on 64 bit platforms). Now, before calling cmp for two elements x and y, the algorithm first looks at hx = hash x and hy = hash y, and compares hx and hy. Only if both values are equal, cmp is called at all. If they are not equal, the elements are sorted by hx and hy.

Of course, this optimization assumes that cmp is somewhat costly. In map/reduce context, the data passed to cmp is often not directly available - usually the record must first be decoded into fields. This makes cmp more expensive than usual, and it is best to avoid it.

Now, how to define hash in a useful way? There are two approaches: If you do not really need sorted elements, but only elements that are grouped by a key, you can set

let hash (key,value) = Hashtbl.hash key

(assuming the elements are pairs (key,value), and you want to group by key). In this case, the only guarantee is that the hash values are identical for the same keys, and thus the keys are grouped together. (Of course, you still need a meaningful cmp function, because it is possible that different keys are mapped to the same hash value.)

The other approach is to extract information from the keys so that

  • hash x > hash y implies x > y, and
  • hash x < hash y implies x < y
For example, if the keys are strings, one can extract the first 30 bits of the strings. This is done by the pre-defined function Mapred_sorters.String_asc.hash. Other functions in Mapred_sorters deal with decimal numbers, hex numbers, float numbers, and a few binary encodings.

Example: We want to sort a read-only sequence of type game_feature Seq.seq by player name, and append it to another sequence.

let sort_by_player s_in s_out =
  Seq.sort
    ~hash:(fun g ->
             let p = g.player_name in
             Mapred_sorters.String_asc.hash p 0 (String.length p))
    ~sort:(fun gx gy ->
             String.comapre gx.player_name gy.player_name)
    s_in
    s_out

Seq.mapl_sort_fold

The map/reduce scheme is essentially an enhanced sorting scheme. In the version of Mapred_toolkit.Seq.mapl_sort_fold it is not exploited that this scheme can be implemented in a distributed way. It is just a poor man's emulation that processes the data in-memory. Nevertheless, it may be useful for testing algorithms, and for learning what map/reduce means semantically.

The signature is quite complicated:

  val mapl_sort_fold :
        mapl:('a -> 'b list) ->
        hash:('b -> int) ->
        cmp:('b -> 'b -> int) ->
        initfold:(int -> 'c) ->
        fold:('c -> 'b -> 'c * 'd list) ->
        ?finfold:('c -> 'd list) ->
        partition_of:('b -> int) ->
        partitions:int ->
        'a Place.t ->                    (* p_in *)
        'd Place.t ->                    (* p_out *)
          ('d,[`W]) seq list

The main arguments are two places: The first place p_in contains the input data, and the output is written into new files into the second place p_out. This means that this "sorter" always processes a bunch of files at once, and not only a single sequence.

So, why does this version of a sorting algorithm use all these extra functionality like mapping and folding? In order to understand this, one has to keep in mind that this algorithm is designed for the distributed case, where you want to split the overall effort into smaller pieces (tasks) so that at any time many of the tasks can be done in parallel. So we just assume this here, knowing that Seq.mapl_sort_fold does not take advantage from it (but the version below, DSeq.mapl_sort_fold does).

If you want to sort a large file, a possible algorithm is to cut the file into many pieces, sort every piece separately (in RAM), and finally merge all the sorted file parts to a single big output file. This is already quite close what map/reduce is about. It has the good property that the sorting tasks can be executed in parallel. At merge time, though, the problem arises that the iterative merge of the file parts to a single file cannot be fully parallelized. The solution is to modify the scheme slightly: We accept that we do not get one big file at the end, but a set of files, called partitions. The partitions are defined by the sorting key - this means that all records with the same key will go into the same partition. The exact criterion can be defined by the user. Now, if we have P partitions, the whole merging step can be at least parallelized so that P independent data flows process the data. Of course, we have now to include somehow into the scheme that the data is split into partitions. There is a lot of freedom how to do it - Plasma normally splits iteratively while merging, and the combined split+merge steps are called shuffling. (N.B. Hadoop splits immediately after sorting - just as a reminder that map/reduce is not exactly defined.)

Basically, what I'm trying to say is that the core of map/reduce creates P output files, and every file contains the sorted records of one partition. In the implementation of this scheme you get several opportunities for including user code: The map step is done at the very beginning before sorting (it can e.g. be implemented as a hook while reading data into the sort buffer), and one can use it to preprocess data. The fold (or reduce) step is done at the very end when one of the output files is written, and one can use it to postprocess data.

As a picture:

   +---------+---------+----- INPUT PIECES ----+----------+--------+
   |         |         |                       |          |        |
   | input1  | input2  | ...                   |input(n-1)|input(n)|
   |         |         |                       |          |        |
   +---------+---------+-----------------------+----------+--------+
        |         |                                  |         |
        |         |                                  |         |
       mapl      mapl                               mapl      mapl
        |         |                                  |         |
        V         V                                  V         V
     +------+ +-------+                          +------+  +------+
     | m1   | | m2    | ........................ |m(n-1)|  | m(n) |
     +------+ +-------+                          +------+  +------+
        |         |                                  |         |
        |         |                                  |         |
       sort      sort                               sort      sort
        |         |                                  |         |
        V         V                                  V         V
     +------+ +-------+                          +------+  +------+
     | s1   | | s2    | ........................ |s(n-1)|  | s(n) |
     +------+ +-------+                          +------+  +------+
        |        |                                  |          |
         \        \                                /          /
           \       \                              /          /
             -------------->  shuffle  <---------------------
                         (merge + partition)
                             /          \
                            /            \
              +-------+    /              \      +------+          
              | p1    |......................... | p(P) |          
              +-------+                          +------+
                  |                                  |          
                  |                                  |          
                 fold                               fold          
                  |                                  |          
                  V                                  V          
              +-------+                          +------+          
              | r1    |......................... | r(P) |          
              +-------+                          +------+
                       P OUTPUT FILES (PARTITIONS)

Let's look at the named arguments of mapl_sort_fold:

  • mapl is the map preprocessing step. The raw input record as taken from the input file (type 'a) can be mapped to a representation that is easier to process in the following (type 'b). For example, this step could extract the sorting key from the raw input and make it easily accessible. The mapl function returns a list, which means one is not bound to a 1:1 mapping, but can also drop records entirely, or split a single record into many (e.g. because the raw input is a compound format).
  • hash and cmp define the sorting criterion in the same way as in Seq.sort above. This is done for records of type 'b, i.e. after mapping.
  • partitions is the number of partitions.
  • partition_of: this function returns for a record into which partition the record falls. The integer must be between 0 and partitions-1. Also, we have a consistency condition: If two records are considered equal by hash and cmp, the partition_of function must always return the same partition number.
  • The folding step is defined by three functions initfold, fold, and finfold. This type of folding can do a bit more than List.fold_left, because every cycle does not only update the accumulator, but can also output a result (which goes then into the output file). The accumulator has type 'c, and the output records have type 'd. So, fold takes two arguments acc and x, the accumulator and the current element, and outputs a pair (acc',out), the updated accumulator and a list of output records. initfold is used to initialize the accumulator. The argument of initfold is the partition number. finfold can be used to output final records after the folding loop already cycled over all elements. It gets the final version of the accumulator as input, and may output a list of final records. (Defaults to the empty list if finfold remains unset.)
Tip: If you do not need the mapping and/or folding step, define the parameter functions as in

    ~mapl:(fun x -> [x])
    ~initfold:(fun _ -> ())
    ~fold:(fun _ x -> ((),[x])

This results in the "naked" version of a map/reduce job, which then only performs the sorting, and neither pre- nor postprocesses the data.

The result of mapl_sort_fold are the list of sequences containing the folded partitions.

Example: We want to compute the average amount of money a player wins in card games. The input is available as a number of XML files.

Here, the mapl step would simply parse the XML files, and represent the essential information (player name, win) in a more compact way, e.g. in a tab-separated record. The sort step sorts the records by player name. The partitions are defined by player names - all records for the same player are expected to end in the same partition (e.g. by something like p = (hash name) mod P). The shuffle phase will then process the records so that each partition has all records for a player, and so that these records are even adjacent in the files. The fold step just runs over the partitions, and whenever a new player is detected, all the wins are summed up, and finally the average win is output.

Note that the user needs not to provide the sort and shuffle algorithms - these are always the same, and are only configured by hash and cmp.

This would translate into the following call of mapl_sort_fold:

(* 'a = string: the input records are XML documents
   'b = game_feature
   'c = (string * int * float) option -- (player, n_games, sum)
   'd = string * float -- (player, avg_win)
 *)

let run_my_job me mj =
  let pl_in = ... in
  let pl_out = ... in

  let s_out =
    Seq.mapl_sort_fold
      ~mapl:(fun xml -> [ parse_xml_game_feature xml ])
      ~hash:(fun g ->
	       let p = g.player_name in
	       Mapred_sorters.String_asc.hash p 0 (String.length p))
      ~cmp:(fun gx gy ->
	       String.comapre gx.player_name gy.player_name) 
      ~initfold:(fun p -> None)
      ~fold:(fun acc g ->
	       match acc with
		 | None -> 
		      (Some(g.player_name,1,g.money), [])
		 | Some(n,k,s) when n = g.player_name ->
		      (Some(n,k+1,s+.g.money), [])
		 | Some(n,k,s) ->
		      (Some(g.player_name,1,g.money), [n, s /. float k])
	    )
      ~finfold:(fun acc ->
		  match acc with
		    | None -> []
		    | Some(n,k,s) -> [n, s /. float k]
	       )
      ~partition_of:(fun g -> (Hashtbl.hash g.player_name) mod partitions)
      ~partitions
      pl_in
      pl_out in
  ...

Note that the number of partitions is arbitrary here.

Full code: Available in examples/mapred-toolkit/cardgames.ml. There is also a transscript Sess_mr_cardgames.

Distributed Algorithms

The schemes described in this section distribute the work over the task servers. Because of this it is required that the task servers are running (./my_prog start_task_servers).

If your only task server is "localhost" it is possible to process data from/in the local filesystem, and PlasmaFS is not needed. This means that you can only distribute the load over the CPU cores of the local machine, and that the job is decomposed into steps that do not consume more RAM than available, even for big input files. This is a useful mode if you have more data to process than fits in memory. If the dataset is not that big, it is usually faster to use the non-distributed variant described in the previous section.

Remember that you turn off PlasmaFS by including a setting disabled = true into the namenodes section of the configuration file (of your toolkit program).

If you have task servers on more nodes than just "localhost", you need PlasmaFS, and the input and output files must be stored in it. The deployment of PlasmaFS is described in detail here: Plasmafs_deployment. The configuration file must then include a namenodes section like:

  namenodes {
    clustername = "name";               (* the name of the PlasmaFS instance *)
    node { addr = "namenodehost1" };    (* a host running a name node *)
    node { addr = "namenodehost2" };    (* a host running a name node *)
    ...
    port = 2730;                        (* or whatever the port is *)
  }

It is required that at least one live namenode is mentioned in the file. If you have a text file listing the name node hosts line by line, you can also point to this file via

  namenodes {
    clustername = "name";               (* the name of the PlasmaFS instance *)
    node_list = "/path/to/hosts/file";  (* the file with the hosts *)
    port = 2730;                        (* or whatever the port is *)
  }

DSeq.mapl

The first distributed algorithm explained here is DSeq.mapl, which takes data from an input place, maps all records by applying a function, and writes the result to an output place. The files at the input place are split up into small pieces, and for each piece a task is started that performs the mapping. The tasks can all run in parallel. There will be as many output files as tasks.

The signature of this function is:

  val mapl : 
       (mapred_info -> 'a -> 'b list) rfun -> 
       'a Place.t -> 
       'b Place.t -> 
       Dseq.config ->
          ('b,[`W]) Seq.seq list result

It is called as let r = mapl f pl_in pl_out conf. In contrast to the non-distributed version Seq.mapl this function requires that the mapper f is a registered function. This is required because f is executed within the task servers. Also, f has an additional mapred_info argument. This is just a means to make the environment and the job configuration available to f:

  class type mapred_info =
  object
    method mapred_env : Mapred_def.mapred_env
    method mapred_job_config : Mapred_def.mapred_job_config
  end

Remember that f cannot be a closure, and must be defined in the scope of a toplevel module (like all registered functions). Because of this, it is difficult to make values of the current environment available to f. The mapred_info object solves this problem at least for the overall configuration.

DSeq.mapl also takes a DSeq.config argument. It defines properties of the job, including those that go later into mapred_job_config. In the simplest form, this config argument can be obtained by

  let config = DSeq.create_config me

where me is the mapred_environment coming from the main program (see above). During development, it is recommended to enable the job report, which is done by

  let config = DSeq.create_config ~report:true me

The report outputs messages while the job is being executed. In particular you can see which tasks are started on which machines, and follow the progress of the job.

There are more options than report, but most are only applicable to full map/reduce and not for mapl jobs.

Example: We want to map files where the records contain integers, and increase every number by 1.

let f_mapl =
  <:rfun< @ mi x -> [x+1] >>

let run_my_job me mj =
  let pl_in = ... in
  let pl_out = ... in

  let r_out =
    DSeq.mapl
      f_mapl
      pl_in
      pl_out
      (DSeq.create_config ~report:true me) in
  let s_out =
    DSeq.get_result r_out in
  ...

(We've left the definition of pl_in and pl_out out. You would need a codec here that uses int_of_string and string_of_int to represent integers.)

The function f_mapl is here the registered mapper. mi is the mapred_info argument, and x the integer from the input. This number is mapped to [x+1], i.e. for every input integer we output exactly one output integer, increased by 1.

Note that DSeq.mapl does not return its result directly, but wrapped into a type DSeq.result. One can get the final value with Mapred_toolkit.DSeq.get_result. Additionally, there are also functions for getting statistics and the job ID.

Lazy evaluation: Note that the job is first started when you call get_result. This allows it to retrieve the job ID before.

DSeq.mapl_sort_fold

Finally, we come to the full and distributed implementation of map/reduce. It is provided by Mapred_toolkit.DSeq.mapl_sort_fold. The signature is similar to the non-distributed version Seq.mapl_sort_fold:

  val mapl_sort_fold :
        mapl:(mapred_info -> 'a -> 'b list) rfun ->
        hash:(mapred_info -> 'b -> int) rfun ->
        cmp:(mapred_info -> 'b -> 'b -> int) rfun ->
        initfold:(mapred_info -> int -> 'c) rfun ->
        fold:(mapred_info -> 'c -> 'b -> 'c * 'd list) rfun ->
        ?finfold:(mapred_info -> 'c -> 'd list) rfun ->
        partition_of:(mapred_info -> 'b -> int) rfun ->
        ?initcombine:(mapred_info -> 'e) rfun ->
        ?combine:(mapred_info -> 'e -> 'b -> 'e * 'b list) rfun ->
        ?fincombine:(mapred_info -> 'e -> 'b list) rfun ->
        'a Place.t ->
        'd Place.t ->
        config ->
        'b Place.codec ->
          ('d,[`W]) Seq.seq list result

There are a few systematic differences, and a number of addtional arguments (all optional):

  • The argument functions are now always registered functions
  • The argument functions take all the mapred_info object as first input
  • There is no partitions parameter anymore. The partitions are now taken from config (see below)
  • There is a config argument as for DSeq.mapl
  • There is an additional codec argument
  • The result is wrapped into the DSeq.result type, as for DSeq.mapl
Let's walk through this list. It is not very surprising that we now have to deal with registered functions. The tasks calling these functions are executed in the scope of the task server, and because of this we need this kind of function to make them available there.

The mapred_info argument exists for the same reasons as explained above for DSeq.mapl: Registered functions would otherwise not have any way to get these configuration objects.

The partitions parameter can now be set when creating config, e.g.

  let config = DSeq.create_config ~partitions me

If left out here, the number of partitions is taken from the configuration file (as most other config parameters).

Now, what about this additional codec? The problem is that the values of type 'b are stored in intermediate files while the job is executed, and we need a way here to represent values of type 'b in such files.

Example: Let's translate the above example for Seq.mapl_sort_fold, where we computed the average profit of players in card games.

(* 'a = string: the input records are XML documents
   'b = game_feature
   'c = acc = (string * int * float) option -- (player, n_games, sum)
   'd = out = string * float -- (player, avg_win)
   'e = unused
 *)

type acc = (string * int * float) option
type out = string * float

let f_mapl =
  <:rfun< @ mi xml -> [ parse_xml_game_feature xml ])

let f_hash =
  <:rfun< @ mi g -> 
     let p = g.player_name in
     Mapred_sorters.String_asc.hash p 0 (String.length p))
  >>

let f_cmp =
  <:rfun< @ mi gx gy ->
     String.comapre gx.player_name gy.player_name) 
  >>

let f_initfoid =
  <:rfun< @ mi p -> (None : out) >>

let f_fold =
  <:rfun< @ mi acc g ->
     match (acc : acc) with
      | None -> 
	  (Some(g.player_name,1,g.money), [])
      | Some(n,k,s) when n = g.player_name ->
	  (Some(n,k+1,s+.g.money), [])
      | Some(n,k,s) ->
	  (Some(g.player_name,1,g.money), [n, s /. float k])
  >>

let f_finfold =
  <:rfun< @ mi acc ->
      match (acc : acc) with
	| None -> []
        | Some(n,k,s) -> [n, s /. float k]
  >>

let f_partition_of =
  <:rfun< partitions @ mi g ->
     (Hashtbl.hash g.player_name) mod partitions
  >>

let game_codec = ... 
  (* see above, the definition can be found in prev section *)


let run_my_job me mj =
  let pl_in = ... in
  let pl_out = ... in
  let partitions = ... in

  let config =
    DSeq.create_config ~report:true ~partitions me in

  let r_out =
    DSeq.mapl_sort_fold
      ~mapl:f_mapl
      ~hash:f_hash
      ~cmp:f_cmp
      ~initfold:f_initfold
      ~fold:f_fold
      ~finfold:f_finfold
      ~partition_of:(f_partition_of partitions)
      pl_in
      pl_out
      config
      game_codec in
  let s_out =
    DSeq.get_result r_out in
  ...

As you can see, the changes are relatively small - essentially, we've moved all the functions out of run_my_job, and made them registered functions. At one point there is a small difficulty: for the function partition_of we needed the number of partitions. In the non-distributed case, this number was known from the environment (i.e. partition_of was a real closure). As workaround, we passed partitions as local argument to f_partition_of.

A possible version for the game_codec was already presented in a previous section of this text. This codec can encode and decode values of type game_feature, and this is exactly what we need here.

Caveat: If you run this job, don't be disappointed when only a few tasks are started. You need big files (at least 100M, better 1G) to see the effect that the work is split up into smaller units.

Caveat: If pl_in or pl_out are notebook places and not file-backed places, DSeq.mapl_sort_fold falls back to Seq.mapl_sort_fold, i.e. the job is run non-distributed.

Full code: Available in examples/mapred-toolkit/cardgames.ml. There is also a transscript Sess_mr_cardgames.

Combiners

DSeq.mapl_sort_fold provides the additional hooks initcombine, combine, and fincombine. These can be used to implement an optimization that reduces the amount of data to shuffle.

The idea is to perform folding runs over intermediate files which are not yet fully merged with the other files of the same partitions. This means that these files are incomplete, i.e. they do not contain all records, and they may cover several partitions. These intermediate files are already sorted, however.

For many applications of map/reduce it is meaningful to process the data in this intermediate step, and it is even useful when it is possible to reduce the volume of data this way. Imagine you count the frequency of words in texts. Normally, you would just sort all occurring words, and then count the number of each word (which are now adjacent). In this example it is possible to already count the words for the mentioned intermediate files (which are subsets of the result), and to later sum these preliminary counts up to get the final numbers.

If passed, the functions initcombine, combine, and fincombine perform combiner runs over all files where this is possible. These functions work in the same way as the folding functions.

Practical advices

How to kill a job: You can kill a distributed job by simply pressing CTRL-C in the terminal where you started it. Killing takes a few seconds to execute (be patient).

Alternatively, there is the option of just stopping the task servers. Currently there is no other way of controlling the lifetime of a job.

Accessing common data: Sometimes it is necessary to access an auxiliary dataset from registered functions like mapl or fold. Remember that you have access to the filesystem via mi # mapred_env # filesystem and can load data from a PlasmaFS file when needed. It is possible to store data in global variables of the process. However, be careful here: These variables are the same for all jobs you run via the same task server. You should use the job ID to mark which data belongs to which job. You can also override the methods pre_job_start and post_job_finish in the job definition to manage your datasets.

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml