The Mapred_toolkit
is a nice layer on top of the standard way of
running map/reduce. It provides:
List
Examples: The example programs are included in the Plasma tarball,
under examples/mapred-toolkit
.
It is a framework, not a library
Although you can pick functions out of the mr_framework
package and
use it like a library, it is not meant as this. Map/reduce is designed
as a distributed algorithm, and this means the code must run on
several computers. The framework ensures this, and you have to use it
when you invoke functions from Mapred_toolkit.DSeq
(providing
distributed algorithms). This is even the case when you don't use
PlasmaFS (the distributed filesystem), and just restrict yourself to a
single computer, because the map/reduce program is executed anyway in
several processes to take advantage from multi-core CPU's.
So, this framework works as follows: You create a single executable
which is both used for controlling the algorithm, and as task server
for running parts of the algorithm. The executable is copied to all
computers where tasks are supposed to run and started there in server
mode. Now, mr_framework
provides a configurable main program that
allows it to start the executable in the required modes, and that
can be extended by your own code.
Actually, it is very simple. Your own main program has to look like:
let () =
Mapred_main.dispatch
[ ... ]
The list specifies your extensions. For instance, if you want to have a job "my_job", do it like
let () =
Mapred_main.dispatch
[ "my_job", "This is my_job", Mapred_toolkit.toolkit_job, run_my_job ]
The first string is the name of the job. It is used on the command-line
to designate the function to execute. The second string is a description
(for -help). With Mapred_toolkit.toolkit_job
you indicate that you want
to use the toolkit, and not a hand-crafted job. The identifier run_my_job
names a function you have to provide. A "hello world" version of
run_my_job
could be:
let run_my_job me mj =
Arg.parse
[]
(fun s -> ())
"usage: my_job";
print_endline "Hello world!"
The function takes two arguments:
me
is a Mapred_def.mapred_env
and contains the environment. Here
you find all global things, like the overall configuration
(me#config
), and access to the filesystem (me#filesystem
).
It is recommended to always have me
at hand, and to pass it
down to all functions that are invoked my run_my_job
.mj
is a Mapred_def.mapred_job
. This is an object that defines
the various map/reduce placeholders like map
, sorter
, reduce
,
and combine
. Actually, this is just the result of executing the
third element of the dispatch list (here Mapred_toolkit.toolkit_job
).
The job object must exist identically in all running instances of the
executable, and because of this it is handled separately. As we
are using the toolkit, we don't create the job object ourselves, but
just use the pre-made one from the toolkit. This job object defines generic
versions of the map/reduce placeholder functions which can be refined
later. Note that you normally can just forget about mj
if you use
the toolkit.run_my_job
function can parse further arguments with Arg.parse
.
Now, you create the executable:
ocamlfind ocamlopt -o my_prog \
-package mr_framework.toolkit \
-syntax camlp4o \
-linkpkg \
-thread \
my_prog.ml
Note that this compiler command also specifies to use camlp4 as parser.
This is not strictly required but recommended, because it enables the
quotations <:rfun< ... >>
we'll use later.
Let's see what happens when you start the executable:
$ ./my_prog -help
usage: my_prog [-conf <configfile>] <mode> [-help|options]
available modes:
\- start_task_servers: install and start task servers
\- stop_task_servers: stop task servers
\- my_job: This is my_job
This means you can just run the user-supplied function, and the two predefined functions "start_task_servers" and "stop_task_servers" (actually, there are even a few more such functions not shown in the usage output). Let's start it:
$ ./my_prog my_job
Fatal error: exception Sys_error("./my_prog.conf: No such file or directory")
Raised at file "pervasives.ml", line 284, characters 20-46
Called from file "mapred_main.ml", line 384, characters 11-50
Called from file "mapred_main.ml", line 536, characters 12-44
Called from file "mr_wf_toolkit.ml", line 109, characters 0-93
Oops! The framework expects a confguration file. It is by default expected in the same directory, and the name is derived from the executable by appending ".conf". The most simple configuration file is:
netplex {
namenodes {
disabled = true;
};
mapred {
node { addr = "localhost" };
port = 8989;
tmpdir = "/tmp/mapred";
buffer_size = 67108864;
buffer_size_tight = 16777216;
sort_size = 134217728;
};
mapredjob {
};
}
The section namenodes
configures the access to PlasmaFS. For simplicity,
we have just disabled PlasmaFS. This is ok if you just want to play around
with Plasma, but is of course not the intended way of using it. If you
disable PlasmaFS this means:
mapred
section configures global settings. In node
we specify
that there is only one computer where we can execute tasks, namely
"localhost". The port
is the port the task server uses. The
tmpdir
is a local directory where the task server places files it
needs for operation. This directory should exist, and should be
writable by the user running the task server. The value buffer_size
is the suggested amount of buffering when files are opened (here
64M). In buffer_size_tight
one can given an alternate value to be
used when there is not much free RAM anymore. The sort_size
is used
for sort buffers. (All buffer sizes in bytes.)
The mapredjob
section is for map/reduce-specific configurations. We
just take the default.
If you now create this file as my_job.conf
, the command starts
successfully:
$ ./my_prog my_job
Hello world!
Let's see if you can start the task server:
$ ./my_prog start_task_servers
Servers started
Although you called the executable "my_prog", it is renamed to "task_server" if started as such:
$ ps aux|grep task
gerd 14581 0.0 0.0 29288 8540 ? Ss 14:20 0:01
/tmp/mapred/task_server -conf /tmp/mapred/task_server.conf
-log /tmp/mapred/task_server.stderr task_server
-mapred-conf /tmp/mapred/mapred.conf -pid /tmp/mapred/task_server.pid
gerd 14652 0.1 0.0 48392 9056 ? Sl 14:20 0:05
/tmp/mapred/task_server -conf /tmp/mapred/task_server.conf
-log /tmp/mapred/task_server.stderr task_server
-mapred-conf /tmp/mapred/mapred.conf -pid /tmp/mapred/task_server.pid
What has happened is that "my_prog" was copied to /tmp/mapred/task_server and started in the /tmp/mapred directory. If you had configured several mapred nodes, this would have happened for all nodes. (Caveat: you need ssh/scp access to these nodes because start_task_servers uses these utilities to copy the files there, and to start the servers.)
You can also stop the task servers again:
$ ./my_prog stop_task_servers
Servers killed
Of course, the normal mode of operation is that you start locally a command like "my_prog my_job" which talks then to the task servers to run code in parallel on many machines. The framework checks whether exactly the same binary is running on all machines.
When do you need the task server? It is only required if you use
functions from the sub module Mapred_toolkit.DSeq
where all the
distributed algorithm schemes are collected.
It is possible to have several executables running as task servers on the cluster, but be careful to use different "port" and different "tmpdir" settings for each type of server.
Registered functions
A user may register functions under a unique name. The intention is that all instances of the executable can refer to the function, and that it is even possible to exchange function references between instances. For example, your job may want to run a function in the task server, possibly on a different machine. In order to do this, the job must send the task server a reference to the function. This is done by just sending the name, which is known by both the local instance of the executable running your job, and by the instance running the task server.
You could use this feature for an RPC-style remote invocation, but it is not limited to this. In fact, in map/reduce the job normally only sends the function reference, and the task server invokes the function many times with different arguments which it takes from external files.
The module Mapred_rfun
defines the concept of registered functions.
There is a camlp4 quotation simplifying the definition of concrete
functions. For instance, a function "f" converting an int to a
string can be defined as
let f =
<:rfun< @ k -> string_of_int k >>
This looks close to let f = fun k -> string_of_int k
, but is not
exactly the same.
First of all, f
gets a unique name. You can see the (generated)
name with Mapred_rfun.get_id f
. Also, the function is entered into
a global table of registered functions.
Second, the type is not int -> string
, but
(int -> string) Mapred_rfun.rfun
. This just means that the function
is encapsulated, and cannot be called directly. If you want to run
it directly (not on a different machine), you need to look it up
in the table with Mapred_rfun.lookup
, e.g.
let s = (Mapred_rfun.lookup f) 42 (* = "42" *)
The definition must be done in toplevel scope! It is meaningless
to define f
inside another function, because the whole point of
the definition is that the various instances of the program know the
function under its name right after initializing the toplevel modules.
For the same reasons, the function should be pure, and avoid any side effects.
The function can have several arguments:
<:rfun< @ arg1 arg2 ... -> expr >>
You may wonder why there is this "@" sign. This character separates two different kinds of arguments. So far we have seen the so-called remote arguments which are instantiated late, after doing the lookup (which may occur in a task server, hence the name "remote"). The full syntax is, however:
<:rfun< larg1 larg2 ... @ rarg1 rarg2 ... -> expr >>
The arguments before "@" are the so-called local arguments, and are always instantiated by the local program. The idea is to emulate
let f larg1 larg2 ... = (* ILLEGAL! *)
<:rfun< @ rarg1 rarg2 ... -> expr >>
which is useful to pass values down but is illegal as the <:rfun< >>
quotation would occur inside another function. The concept of
local arguments
gets around this limitation. Actually, these arguments are sent
together with the function reference to the task server where the
registered function will be executed, and are instantiated just before
running the function. (This is a bit like defining a closure across
the network.)
The local arguments are marshalled using the Marshal
module. Note
that this restricts the possible values (no "live" values like
file channels, no functions, no objects, no exceptions).
(As a side point you may wonder why we do not simply enable function marshalling, and avoid the whole topic of registered functions. The problem is that we cannot assume that the functions have the same addresses in task servers running on different machines, even if the servers execute the same executable. Nowadays executables are dynamically loaded, and this may lead to address shifts when done on different machines. Also, OS features like address randomization are in the way.)
The filesystem
Map/reduce programs do not directly interact with the local filesystem
or with PlasmaFS, but use a virtual layer Mapred_fs.filesystem
(which you can get via me#filesystem
). This layer is an extended
version of Ocamlnet's stream_fs
abstraction. It is has a special
feature, however, namely that one can merge several file trees into a
single one.
Filenames use the syntax treename::/path
to access the file tree
known under treename
, and to operate on (the absolute) /path
in
this tree. Note that paths are always absolute, and that there is
no escaping of reserved characters like in URLs. There are normally
two such trees:
file::/path
is the local filesystem of the computerplasma::/path
is the PlasmaFS filesystem/path
, but it depends then
on further circumstances which tree is actually accessed.
There are a lot of operations for filesystems, e.g.
fs # read flags path
fs # write flags path
fs # rename flags oldpath newpath
fs # remove flags path
Mapred_fs.filesystem
.
Example: Read a file line by line:
let read_file me =
let ch = me # filesystem # read [] "plasma::/dir/file" in
try
while true do
let line = ch # input_line() in
print_endline line;
done
with End_of_file ->
close_in ch
Note that there are also a number of useful filesystem functions in
Mapred_io
, e.g. Mapred_io.file_exists
.
Files as sequences of records
Map/reduce accesses files strictly sequentially. Logically, the files are simply sequences of records, where the records are simply strings. The toolkit makes another assumption, however: Records can be divided into fields. This is highly configurable, as we'll see.
Plasma knows three types of files:
more
, sed
, awk
or grep
to process them.
There is the utility plasma_convert
which can convert between the
file types, see Cmd_plasma_convert
for how to call it.
If you use PlasmaFS, you can use the plasma
utility to copy files
to PlasmaFS and from PlasmaFS (see Cmd_plasma
, there are also many
more functions to interact with PlasmaFS):
$ plasma put localfile /plasmafs/path
$ plasma cat /plasmafs/path >localfile
As noted, records are primarily arbitrary strings. Normally, however,
one wants to divide the records up into fields (like that a database
row has columns). For text files, a popular method to represent fields
is to use another character as field separator, often the TAB character.
The Mapred_fields
module contains useful functions to split a
TAB-separated record into fields, e.g.
let s = "f1\tf2\tf3" ;;
let f1 = Mapred_fields.text_field ~field:1 s ;; (* = "f1" *)
let f2_f3 = Mapred_fields.text_field ~field:2 ~num:2 s ;; (* = "f2\tf3" *)
For concatenating fields, you can simply use String.concat
or
Printf.sprintf
.
There are companion functions for files with variable-size records
which do not have the restriction that fields can neither contain LF
nor TAB. See Mapred_fields.var_field_range
.
Places
The toolkit provides the abstraction of "places". These are collections of files, and one can read the files, or put more files into the collection. Normally, places are backed by filesystems, which means that a place is a directory. However, the toolkit also provides in-memory places, which is useful for temporary data and for testing.
Together with the collection, a place defines a number of properties:
open Mapred_toolkit
let codec =
( <:rfun< @ s -> String.copy s >>,
<:rfun< @ s p l -> String.sub s p l >>
)
let list_files me =
let rc = Mapred_def.get_rc me (16 * 1024 * 1024) in
let place =
Place.from
me#filesystem codec `Line_structured rc (`Deep_dir "plasma::/dir") in
List.iter
print_endline
(Place.files place)
Let's have a more detailed look at this piece of code. The directory we
inspect is "plasma::/dir". Because of the `Deep_dir
tag, the directory
is scanned recursively for files. (Note that files and directories whose
names start with "." or "_" are omitted by the scan.)
The tag `Line_structured
configures that the files have line-structured
format. Of course, this is irrelevant for just listing the file names,
but a place generally includes this specification.
The record configuration rc
can be taken from me
via
Mapred_def.get_rc
, with the only exception of the size of the
bigblocks. Here this size is given as 16M (16 * 1024 *
1024). Bigblocks are used by some algorithms as an (approximate) upper
bound of the record size. (Generally, when a large file is cut into
pieces to be processed by tasks, each of these pieces must be a multiple
of a bigblock.)
The codec
is a pair of a (coder,decoder)
. The coder transforms a
number of fields into a record (string), and a decoder extracts the
fields from the record. In this case, we use the most dumb definitions
for the codec, namely that the whole record is a single string field.
In general, the codec is a pair of registered functions
type 'a codec =
('a -> string) rfun *
(string -> int -> int -> 'a) rfun
The type parameter 'a
is our choice - this is how we want to represent
records internally. For example, if we analyze card games, we could have
something like
type game_feature =
{ player_name : string;
game_id : int;
money : float
}
meaning that the player with player_name
won money
in the game with
ID game_id
. If we represent this as lines
<player_name> TAB <game_id> TAB <money>
we can define the coder and decoder as
let game_codec =
( <:rfun< @ game ->
sprintf "%s\t%d\t%f" game.player_name game.game_id game.money
>>,
<:rfun< @ s p l ->
match Mapred_fields.text_fields ~pos:p ~len:l ~num:3 s with
| [ pn; gid; m ] ->
{ player_name = pn;
game_id = int_of_string gid;
money = float_of_string m
}
| _ ->
failwith "bad game_feature line"
>>
)
Note that the string s
in the decoder can be more than the current
record - in general it is the buffer where the current record is extracted
from. The values p
and l
specify the range in the buffer where the current
record can be found. If you need the whole record, use String.sub s p l
.
The type of a place using this codec would be
game_feature Place.t
The special place Place.notebook_place()
can be used for keeping
in-memory "files". Such files are called notebooks, and are explained
in detail below. For notebooks, none of the above configurations are
needed (codec, file format, etc).
Stores and sequences
The next abstractions we build on top of files are called stores and sequences. They are closely related to each other: A store means a physical location where sequential data can be stored - this is the file (or notebook). A sequence is a view on a store, and is the primary access channel to stores.
For stores, defined in Mapred_toolkit.Store
, there are only a few
functions. The most important ones connect stores with places:
Mapred_toolkit.Store.read_place
: This function inspects a place,
and returns the stores collected there.Mapred_toolkit.Store.write_place
: This function creates a new
store at a place with an automatically generated name.place
be defined as in the list_files
example above.
You can now get the stores at this place with
open Mapred_toolkit
let stores = Store.read_place place
Now, stores
is a string Store.t list
, i.e. a list of stores where
the records are strings (as place
was a string Place.t
).
If you need a store for a single file only, the recommended way is
to define first a place for only this file, and then to call
read_place
to get the store for it. A place for a single file
is denoted by using `File path
instead of `Deep_dir
:
let place =
Place.from
me#filesystem codec `Line_structured rc (`File "plasma::/the/file")
let [store] =
Store.read_place place
A file store is either open for reading or for writing: The stores
returned by read_place
can only be read, and the stores created by
write_place
can only be written. An exception are in-memory stores
(notebooks), which are read/write. (Note that the separation between
read and write mode is only an implementation restriction, and will
probably go away at some point.)
A store must be closed after use: Mapred_toolkit.Store.close
.
Sequences, defined in Mapred_toolkit.Seq
, can now read stores
record by record, and they can append new records to the end of the
stores. The type 't Seq.seq
is a lot like 't list
: It means
immutable sequences of values. However, sequences are extended at their
end, whereas a list is extended at the beginning. Compare:
Operation List Sequence
----------------------------------------------------------------------
Empty container [] Seq.notebook() or
a sequence over an empty
file
Head List.hd l Seq.hd s
Tail List.tl l Seq.tl s
Test whether empty l = [] Seq.is_empty s
Extend x :: l Seq.add x l
This means that sequences behave like lists when they are decomposed
into head and tail, but the inverse operation is not
available. Instead there is only Seq.add
which adds a new element to
the end. Of course, this just reflects that sequences view files:
With hd
and tl
one can iterate over the file from the beginning
to the end, and files are only extensible at their end.
Another difference is that Seq.add
modifies the underlying
container. After
let s1 = Seq.add x s
the view s
on the container is unchanged, and s1
is a new view
including the just appended element x
, in (not allowed) pseudo notation:
s = [ x1; x2; ...; xn ]
s1 = [ x1; x2; ...; xn; x ]
In order to keep consistency, a Seq.add
is only permitted if the
sequence s
views the end of the store. In contrast, lists are
extended at the head by prepending new values (x :: l
). If two such
extensions occur (l1 = x1 :: l
and l2 = x2 :: l
), two new lists
l1
and l2
are constructed that share a common tail. This would
not work for sequences: After s1 = Seq.add x s
the sequence s
no
longer views the end of the store (it stops one element before the
end), and a following s2 = Seq.add x s
would run into a runtime
error:
let n1 =
Seq.add 30 (Seq.add 20 (Seq.add 10 (Seq.notebook())))
(* The store contains now: [10; 20; 30]
n1 views: [10; 20; 30]
*)
let n2 =
Seq.add 40 n1
(* The store contains now: [10; 20; 30; 40]
n1 views: [10; 20; 30]
n2 views: [10; 20; 30; 40]
*)
let n3 =
Seq.add 50 n1
(* Failure! n1 does not view the end of the store, and "inserting" the
50 after the 30 would violate the conditions that were already
granted to n2
*)
As stores, sequences can be read-only, write-only, or read/write. Other than for stores, this is now reflected in the type:
('t, [`R]) seq
('t, [`W]) seq
('t, [`R|`W]) seq
't
is the type of the elements. One can create sequences with:
st
: Seq.read st
st
: Seq.extend st
Seq.notebook()
pl1
,
and put the extracted records into files in place pl2
.
let distill pl1 pl2 =
let rec distill_seq s1 s2 n =
if not (Seq.is_empty s1) then (
if n = 10 then
distill_seq (Seq.tl s1) (Seq.add (Seq.hd s1) s2) 1
else
distill_seq (Seq.tl s1) s2 (n+1)
) in
let distill_store st1 st2 =
let s1 = Seq.read st1 in
let s2 = Seq.extend st2 in
distill_seq s1 s2 1
in
List.iter
(fun st1 ->
let st2 = Store.write_place pl2 in
distill_store st1 st2;
Store.close st1;
Store.close st2;
)
(Store.read_place pl1)
Full code: Look into examples/mapred-toolkit/tenth.ml
of the Plasma
tarball.
This function works in the following way: The contents of pl1
are
retrieved as a list of stores, and for each of these stores st1
a new counterpart st2
is created by writing to pl2
. For st1
a read-only sequence s1
is made which sees now the whole file, and
by calling Seq.tl
it is iterated over the elements of s1
.
In parallel, the write-only sequence s2
extends st2
, and the
selected elements are added to it.
Note that this function does not return any data directly; the
result is written to pl2
. If you want to further process it,
the way to do it is to get again the stores with Store.read_place pl2
.
Alternatively, it would be possible to return the final version of
s2
using
let rec distill_seq s1 s2 n =
if not (Seq.is_empty s1) then (
if n = 10 then
distill_seq (Seq.tl s1) (Seq.add (Seq.hd s1) s2) 1
else
distill_seq (Seq.tl s1) s2 (n+1)
)
else
s2
(note the final else
clause)
and modify the other parts of the distill
function so that all s2
are returned. Remember that s2
is only a view to the container and
not the container itself. Actually, the data is appended to the
underlying file, and the in-memory representation of s2
is constant.
Now, what could you do with such an s2
? The sequence is write-only,
so one could only add more elements, but not read the existing ones.
The solution is to call Mapred_toolkit.Seq.reread
:
let s3 = Seq.reread s2
which creates another view for the same store, and reads the elements
contained in it. Note that reread
always reads all elements of the
store, and not only those that are in s2
(in this example this makes
no difference, though).
As you see, the sequence abstraction has some peculiarities, but is
nevertheless useful to deal with external files as if they were
in-memory containers. Note that we left a lot of representation details
unspecified in distill
:
The combinators iter
and fold
work like their list counterparts
(when fold
means fold_left
):
val iter : ('a -> unit) -> ('a,[>`R]) seq -> unit
val fold : ('a -> 'b -> 'a) -> 'a -> ('b,[>`R]) seq -> 'a
A bit different is map
:
val map : ('a -> 'b) -> ('a,[>`R]) seq -> ('b,[>`W] as 'm) seq -> ('b,'m) seq
As you see there is one extra argument. You call it like
let s3 = Seq.map f s1 s2
which means that s3
is s2
plus the mapped elements of s1
. We need
s2
to denote the store where the mapped elements can be added to.
There is the variant mapl
, allowing to map a single element to a list:
val mapl : ('a -> 'b list) ->
('a,[>`R]) seq ->
('b,[>`W] as 'm) seq ->
('b,'m) seq
All of the mentioned combinators do not buffer more elements in memory than necessary, and can be used to process arbitrarily large sequences.
Sorting in-memory: the non-distributed version of map/reduce
We'll discuss now Mapred_toolkit.Seq.sort
and
Mapred_toolkit.Seq.mapl_sort_fold
. The former function is like
List.sort
but for sequences, whereas the latter function implements
the whole map/reduce algorithm scheme locally (i.e. non-distributed
and in-memory).
Seq.sort
The signature of the sort function looks a bit different than List.sort
:
val sort :
hash:('a -> int) ->
cmp:('a -> 'a -> int) ->
('a,[>`R]) seq ->
('a,[>`W] as 'm) seq ->
('a,'m) seq
In particular, there is again the additional write-only sequence we
already know from map
. The call
let s3 = Seq.sort ~hash ~cmp s1 s2
appends the sorted elements of s1
to s2
, and returns this
concatenation as s3
. The extra argument s2
is needed because we
need a store where we can put the sorted data.
The other difference is that there is not only cmp
for comparing
values but also a hash
function. This is an optimization, and if
you do not need it or it is not applicable, just pass hash
as
a function that always returns 0.
The improvement is that this sorting algorithm performs an initial
pass over all elements, and computes the hash
values for them.
The hash
values must be non-negative 30 bit integers (even on
64 bit platforms). Now, before calling cmp
for two elements
x
and y
, the algorithm first looks at hx = hash x
and
hy = hash y
, and compares hx
and hy
. Only if both values are
equal, cmp
is called at all. If they are not equal, the elements
are sorted by hx
and hy
.
Of course, this optimization assumes that cmp
is somewhat costly.
In map/reduce context, the data passed to cmp
is often not directly
available - usually the record must first be decoded into fields. This
makes cmp
more expensive than usual, and it is best to avoid it.
Now, how to define hash
in a useful way? There are two approaches:
If you do not really need sorted elements, but only elements that are
grouped by a key, you can set
let hash (key,value) = Hashtbl.hash key
(assuming the elements are pairs (key,value)
, and you want to group
by key
). In this case, the only guarantee is that the hash values
are identical for the same keys, and thus the keys are grouped together.
(Of course, you still need a meaningful cmp
function, because it is
possible that different keys are mapped to the same hash value.)
The other approach is to extract information from the keys so that
hash x > hash y
implies x > y
, andhash x < hash y
implies x < y
Mapred_sorters.String_asc.hash
. Other functions in Mapred_sorters
deal with decimal numbers, hex numbers, float numbers, and a few binary
encodings.
Example: We want to sort a read-only sequence of type
game_feature Seq.seq
by player name, and append it to another sequence.
let sort_by_player s_in s_out =
Seq.sort
~hash:(fun g ->
let p = g.player_name in
Mapred_sorters.String_asc.hash p 0 (String.length p))
~sort:(fun gx gy ->
String.comapre gx.player_name gy.player_name)
s_in
s_out
Seq.mapl_sort_fold
The map/reduce scheme is essentially an enhanced sorting scheme. In the
version of Mapred_toolkit.Seq.mapl_sort_fold
it is not exploited that
this scheme can be implemented in a distributed way. It is just a
poor man's emulation that processes the data in-memory. Nevertheless,
it may be useful for testing algorithms, and for learning what
map/reduce means semantically.
The signature is quite complicated:
val mapl_sort_fold :
mapl:('a -> 'b list) ->
hash:('b -> int) ->
cmp:('b -> 'b -> int) ->
initfold:(int -> 'c) ->
fold:('c -> 'b -> 'c * 'd list) ->
?finfold:('c -> 'd list) ->
partition_of:('b -> int) ->
partitions:int ->
'a Place.t -> (* p_in *)
'd Place.t -> (* p_out *)
('d,[`W]) seq list
The main arguments are two places: The first place p_in
contains the
input data, and the output is written into new files into the second
place p_out
. This means that this "sorter" always processes a bunch
of files at once, and not only a single sequence.
So, why does this version of a sorting algorithm use all these extra
functionality like mapping and folding? In order to understand this,
one has to keep in mind that this algorithm is designed for the
distributed case, where you want to split the overall effort into
smaller pieces (tasks) so that at any time many of the tasks can be done
in parallel. So we just assume this here, knowing that
Seq.mapl_sort_fold
does not take advantage from it (but the
version below, DSeq.mapl_sort_fold
does).
If you want to sort a large file, a possible algorithm is to cut the file into many pieces, sort every piece separately (in RAM), and finally merge all the sorted file parts to a single big output file. This is already quite close what map/reduce is about. It has the good property that the sorting tasks can be executed in parallel. At merge time, though, the problem arises that the iterative merge of the file parts to a single file cannot be fully parallelized. The solution is to modify the scheme slightly: We accept that we do not get one big file at the end, but a set of files, called partitions. The partitions are defined by the sorting key - this means that all records with the same key will go into the same partition. The exact criterion can be defined by the user. Now, if we have P partitions, the whole merging step can be at least parallelized so that P independent data flows process the data. Of course, we have now to include somehow into the scheme that the data is split into partitions. There is a lot of freedom how to do it - Plasma normally splits iteratively while merging, and the combined split+merge steps are called shuffling. (N.B. Hadoop splits immediately after sorting - just as a reminder that map/reduce is not exactly defined.)
Basically, what I'm trying to say is that the core of map/reduce creates P output files, and every file contains the sorted records of one partition. In the implementation of this scheme you get several opportunities for including user code: The map step is done at the very beginning before sorting (it can e.g. be implemented as a hook while reading data into the sort buffer), and one can use it to preprocess data. The fold (or reduce) step is done at the very end when one of the output files is written, and one can use it to postprocess data.
As a picture:
+---------+---------+----- INPUT PIECES ----+----------+--------+
| | | | | |
| input1 | input2 | ... |input(n-1)|input(n)|
| | | | | |
+---------+---------+-----------------------+----------+--------+
| | | |
| | | |
mapl mapl mapl mapl
| | | |
V V V V
+------+ +-------+ +------+ +------+
| m1 | | m2 | ........................ |m(n-1)| | m(n) |
+------+ +-------+ +------+ +------+
| | | |
| | | |
sort sort sort sort
| | | |
V V V V
+------+ +-------+ +------+ +------+
| s1 | | s2 | ........................ |s(n-1)| | s(n) |
+------+ +-------+ +------+ +------+
| | | |
\ \ / /
\ \ / /
--------------> shuffle <---------------------
(merge + partition)
/ \
/ \
+-------+ / \ +------+
| p1 |......................... | p(P) |
+-------+ +------+
| |
| |
fold fold
| |
V V
+-------+ +------+
| r1 |......................... | r(P) |
+-------+ +------+
P OUTPUT FILES (PARTITIONS)
Let's look at the named arguments of mapl_sort_fold
:
mapl
is the map preprocessing step. The raw input record
as taken from the input file (type 'a
) can be mapped to a representation
that is easier to process in the following (type 'b
). For example, this
step could extract the sorting key from the raw input and
make it easily accessible. The mapl
function returns a list,
which means one is not bound to a 1:1 mapping, but can also
drop records entirely, or split a single record into many
(e.g. because the raw input is a compound format).hash
and cmp
define the sorting criterion in the same
way as in Seq.sort
above. This is done for records of type 'b
,
i.e. after mapping.partitions
is the number of partitions.partition_of
: this function returns for a record into which
partition the record falls. The integer must be between 0 and
partitions-1
. Also, we have a consistency condition:
If two records are considered equal by hash
and cmp
,
the partition_of
function must always return the same
partition number.initfold
,
fold
, and finfold
. This type of folding can do a bit more than
List.fold_left
, because every cycle does not only update
the accumulator, but can also output a result (which goes then
into the output file). The accumulator has type 'c
, and
the output records have type 'd
. So, fold
takes two arguments
acc
and x
, the accumulator and the current element, and
outputs a pair (acc',out)
, the updated accumulator and a
list of output records. initfold
is used to initialize the
accumulator. The argument of initfold
is the partition number.
finfold
can be used to output final records after the folding
loop already cycled over all elements. It gets the final
version of the accumulator as input, and may output a list of
final records. (Defaults to the empty list if finfold
remains
unset.)
~mapl:(fun x -> [x])
~initfold:(fun _ -> ())
~fold:(fun _ x -> ((),[x])
This results in the "naked" version of a map/reduce job, which then only performs the sorting, and neither pre- nor postprocesses the data.
The result of mapl_sort_fold
are the list of sequences containing the
folded partitions.
Example: We want to compute the average amount of money a player wins in card games. The input is available as a number of XML files.
Here, the mapl
step would simply parse the XML files, and represent
the essential information (player name, win) in a more compact way,
e.g. in a tab-separated record. The sort
step sorts the records by
player name. The partitions are defined by player names - all records
for the same player are expected to end in the same partition (e.g.
by something like p = (hash name) mod P
). The shuffle phase will
then process the records so that each partition has all records for
a player, and so that these records are even adjacent in the files.
The fold
step just runs over the partitions, and whenever a new player
is detected, all the wins are summed up, and finally the average win
is output.
Note that the user needs not to provide the sort
and shuffle
algorithms - these are always the same, and are only configured by
hash
and cmp
.
This would translate into the following call of mapl_sort_fold
:
(* 'a = string: the input records are XML documents
'b = game_feature
'c = (string * int * float) option -- (player, n_games, sum)
'd = string * float -- (player, avg_win)
*)
let run_my_job me mj =
let pl_in = ... in
let pl_out = ... in
let s_out =
Seq.mapl_sort_fold
~mapl:(fun xml -> [ parse_xml_game_feature xml ])
~hash:(fun g ->
let p = g.player_name in
Mapred_sorters.String_asc.hash p 0 (String.length p))
~cmp:(fun gx gy ->
String.comapre gx.player_name gy.player_name)
~initfold:(fun p -> None)
~fold:(fun acc g ->
match acc with
| None ->
(Some(g.player_name,1,g.money), [])
| Some(n,k,s) when n = g.player_name ->
(Some(n,k+1,s+.g.money), [])
| Some(n,k,s) ->
(Some(g.player_name,1,g.money), [n, s /. float k])
)
~finfold:(fun acc ->
match acc with
| None -> []
| Some(n,k,s) -> [n, s /. float k]
)
~partition_of:(fun g -> (Hashtbl.hash g.player_name) mod partitions)
~partitions
pl_in
pl_out in
...
Note that the number of partitions
is arbitrary here.
Full code: Available in examples/mapred-toolkit/cardgames.ml
.
There is also a transscript Sess_mr_cardgames
.
Distributed Algorithms
The schemes described in this section distribute the work over the
task servers. Because of this it is required that the task servers
are running (./my_prog start_task_servers
).
If your only task server is "localhost" it is possible to process data from/in the local filesystem, and PlasmaFS is not needed. This means that you can only distribute the load over the CPU cores of the local machine, and that the job is decomposed into steps that do not consume more RAM than available, even for big input files. This is a useful mode if you have more data to process than fits in memory. If the dataset is not that big, it is usually faster to use the non-distributed variant described in the previous section.
Remember that you turn off PlasmaFS by including a setting
disabled = true
into the namenodes
section of the configuration
file (of your toolkit program).
If you have task servers on more nodes than just "localhost", you need
PlasmaFS, and the input and output files must be stored in it. The
deployment of PlasmaFS is described in detail here: Plasmafs_deployment
.
The configuration file must then include a namenodes
section like:
namenodes {
clustername = "name"; (* the name of the PlasmaFS instance *)
node { addr = "namenodehost1" }; (* a host running a name node *)
node { addr = "namenodehost2" }; (* a host running a name node *)
...
port = 2730; (* or whatever the port is *)
}
It is required that at least one live namenode is mentioned in the file. If you have a text file listing the name node hosts line by line, you can also point to this file via
namenodes {
clustername = "name"; (* the name of the PlasmaFS instance *)
node_list = "/path/to/hosts/file"; (* the file with the hosts *)
port = 2730; (* or whatever the port is *)
}
DSeq.mapl
The first distributed algorithm explained here is DSeq.mapl
, which
takes data from an input place, maps all records by applying a
function, and writes the result to an output place. The files at the
input place are split up into small pieces, and for each piece a task
is started that performs the mapping. The tasks can all run in
parallel. There will be as many output files as tasks.
The signature of this function is:
val mapl :
(mapred_info -> 'a -> 'b list) rfun ->
'a Place.t ->
'b Place.t ->
Dseq.config ->
('b,[`W]) Seq.seq list result
It is called as let r = mapl f pl_in pl_out conf
. In contrast to
the non-distributed version Seq.mapl
this function requires that
the mapper f
is a registered function. This is required because
f
is executed within the task servers. Also, f
has an additional
mapred_info
argument. This is just a means to make the environment
and the job configuration available to f
:
class type mapred_info =
object
method mapred_env : Mapred_def.mapred_env
method mapred_job_config : Mapred_def.mapred_job_config
end
Remember that f
cannot be a closure, and must be defined in the
scope of a toplevel module (like all registered functions). Because
of this, it is difficult to make values of the current environment
available to f
. The mapred_info
object solves this problem at
least for the overall configuration.
DSeq.mapl
also takes a DSeq.config
argument. It defines properties
of the job, including those that go later into mapred_job_config
.
In the simplest form, this config
argument can be obtained by
let config = DSeq.create_config me
where me
is the mapred_environment
coming from the main program
(see above). During development, it is recommended to enable the
job report, which is done by
let config = DSeq.create_config ~report:true me
The report outputs messages while the job is being executed. In particular you can see which tasks are started on which machines, and follow the progress of the job.
There are more options than report
, but most are only applicable to
full map/reduce and not for mapl
jobs.
Example: We want to map files where the records contain integers, and increase every number by 1.
let f_mapl =
<:rfun< @ mi x -> [x+1] >>
let run_my_job me mj =
let pl_in = ... in
let pl_out = ... in
let r_out =
DSeq.mapl
f_mapl
pl_in
pl_out
(DSeq.create_config ~report:true me) in
let s_out =
DSeq.get_result r_out in
...
(We've left the definition of pl_in
and pl_out
out. You would need
a codec here that uses int_of_string
and string_of_int
to represent
integers.)
The function f_mapl
is here the registered mapper. mi
is the
mapred_info
argument, and x
the integer from the input. This
number is mapped to [x+1]
, i.e. for every input integer we output
exactly one output integer, increased by 1.
Note that DSeq.mapl
does not return its result directly, but
wrapped into a type DSeq.result
. One can get the final value with
Mapred_toolkit.DSeq.get_result
. Additionally, there are also functions
for getting statistics and the job ID.
Lazy evaluation: Note that the job is first started when you
call get_result
. This allows it to retrieve the job ID before.
DSeq.mapl_sort_fold
Finally, we come to the full and distributed implementation of map/reduce.
It is provided by Mapred_toolkit.DSeq.mapl_sort_fold
. The signature
is similar to the non-distributed version Seq.mapl_sort_fold
:
val mapl_sort_fold :
mapl:(mapred_info -> 'a -> 'b list) rfun ->
hash:(mapred_info -> 'b -> int) rfun ->
cmp:(mapred_info -> 'b -> 'b -> int) rfun ->
initfold:(mapred_info -> int -> 'c) rfun ->
fold:(mapred_info -> 'c -> 'b -> 'c * 'd list) rfun ->
?finfold:(mapred_info -> 'c -> 'd list) rfun ->
partition_of:(mapred_info -> 'b -> int) rfun ->
?initcombine:(mapred_info -> 'e) rfun ->
?combine:(mapred_info -> 'e -> 'b -> 'e * 'b list) rfun ->
?fincombine:(mapred_info -> 'e -> 'b list) rfun ->
'a Place.t ->
'd Place.t ->
config ->
'b Place.codec ->
('d,[`W]) Seq.seq list result
There are a few systematic differences, and a number of addtional arguments (all optional):
mapred_info
object as
first inputpartitions
parameter anymore. The partitions are
now taken from config
(see below)config
argument as for DSeq.mapl
DSeq.result
type, as for DSeq.mapl
The mapred_info
argument exists for the same reasons as explained
above for DSeq.mapl
: Registered functions would otherwise not have
any way to get these configuration objects.
The partitions
parameter can now be set when creating config
, e.g.
let config = DSeq.create_config ~partitions me
If left out here, the number of partitions is taken from the
configuration file (as most other config
parameters).
Now, what about this additional codec? The problem is that the values
of type 'b
are stored in intermediate files while the job is executed,
and we need a way here to represent values of type 'b
in such files.
Example: Let's translate the above example for Seq.mapl_sort_fold
,
where we computed the average profit of players in card games.
(* 'a = string: the input records are XML documents
'b = game_feature
'c = acc = (string * int * float) option -- (player, n_games, sum)
'd = out = string * float -- (player, avg_win)
'e = unused
*)
type acc = (string * int * float) option
type out = string * float
let f_mapl =
<:rfun< @ mi xml -> [ parse_xml_game_feature xml ])
let f_hash =
<:rfun< @ mi g ->
let p = g.player_name in
Mapred_sorters.String_asc.hash p 0 (String.length p))
>>
let f_cmp =
<:rfun< @ mi gx gy ->
String.comapre gx.player_name gy.player_name)
>>
let f_initfoid =
<:rfun< @ mi p -> (None : out) >>
let f_fold =
<:rfun< @ mi acc g ->
match (acc : acc) with
| None ->
(Some(g.player_name,1,g.money), [])
| Some(n,k,s) when n = g.player_name ->
(Some(n,k+1,s+.g.money), [])
| Some(n,k,s) ->
(Some(g.player_name,1,g.money), [n, s /. float k])
>>
let f_finfold =
<:rfun< @ mi acc ->
match (acc : acc) with
| None -> []
| Some(n,k,s) -> [n, s /. float k]
>>
let f_partition_of =
<:rfun< partitions @ mi g ->
(Hashtbl.hash g.player_name) mod partitions
>>
let game_codec = ...
(* see above, the definition can be found in prev section *)
let run_my_job me mj =
let pl_in = ... in
let pl_out = ... in
let partitions = ... in
let config =
DSeq.create_config ~report:true ~partitions me in
let r_out =
DSeq.mapl_sort_fold
~mapl:f_mapl
~hash:f_hash
~cmp:f_cmp
~initfold:f_initfold
~fold:f_fold
~finfold:f_finfold
~partition_of:(f_partition_of partitions)
pl_in
pl_out
config
game_codec in
let s_out =
DSeq.get_result r_out in
...
As you can see, the changes are relatively small - essentially, we've
moved all the functions out of run_my_job
, and made them registered
functions. At one point there is a small difficulty: for the function
partition_of
we needed the number of partitions. In the non-distributed
case, this number was known from the environment (i.e. partition_of
was a real closure). As workaround, we passed partitions
as local
argument to f_partition_of
.
A possible version for the game_codec
was already presented in a
previous section of this text. This codec can encode and decode
values of type game_feature
, and this is exactly what we need here.
Caveat: If you run this job, don't be disappointed when only a few tasks are started. You need big files (at least 100M, better 1G) to see the effect that the work is split up into smaller units.
Caveat: If pl_in
or pl_out
are notebook places and not
file-backed places, DSeq.mapl_sort_fold
falls back to
Seq.mapl_sort_fold
, i.e. the job is run non-distributed.
Full code: Available in examples/mapred-toolkit/cardgames.ml
.
There is also a transscript Sess_mr_cardgames
.
Combiners
DSeq.mapl_sort_fold
provides the additional hooks initcombine
,
combine
, and fincombine
. These can be used to implement an
optimization that reduces the amount of data to shuffle.
The idea is to perform folding runs over intermediate files which are not yet fully merged with the other files of the same partitions. This means that these files are incomplete, i.e. they do not contain all records, and they may cover several partitions. These intermediate files are already sorted, however.
For many applications of map/reduce it is meaningful to process the data in this intermediate step, and it is even useful when it is possible to reduce the volume of data this way. Imagine you count the frequency of words in texts. Normally, you would just sort all occurring words, and then count the number of each word (which are now adjacent). In this example it is possible to already count the words for the mentioned intermediate files (which are subsets of the result), and to later sum these preliminary counts up to get the final numbers.
If passed, the functions initcombine
,
combine
, and fincombine
perform combiner runs over all files
where this is possible. These functions work in the same way as
the folding functions.
Practical advices
How to kill a job: You can kill a distributed job by simply pressing CTRL-C in the terminal where you started it. Killing takes a few seconds to execute (be patient).
Alternatively, there is the option of just stopping the task servers. Currently there is no other way of controlling the lifetime of a job.
Accessing common data: Sometimes it is necessary to access
an auxiliary dataset from registered functions like mapl
or
fold
. Remember that you have access to the filesystem via
mi # mapred_env # filesystem
and can load data from a PlasmaFS
file when needed. It is possible to store data in global variables
of the process. However, be careful here: These variables are the
same for all jobs you run via the same task server. You should use
the job ID to mark which data belongs to which job. You can also
override the methods pre_job_start
and post_job_finish
in the
job definition to manage your datasets.