A few preliminary answers:
map
and reduce
. The user normally
only implements these placeholders, and the algorithm around these
is implemented by a framework such as Plasma MapReduce.n
, the duration of the whole Map/Reduce computation
increases by c * n * log(n)
(where c
is a constant). This does not
only hold theoretically but
also practically. Map/Reduce can process giant files in reasonable
time, given a sufficient number of computers is available.The effect of the scheme can be explained in a few paragraphs. This describes into what the scheme transforms the data, but not why this can be well implemented (which we leave open here - read the published articles for this). In the following we use Ocaml pseudo syntax to get a relatively exact description:
The input data must be given as a sequence of records:
let input =
[ r0; r1; r2; ...; r(n-1) ]
In a first step, the records are mapped. Each record is transformed
to a sequence of map outputs by applying the map
function. The
output type of map
must be pairs of keys and values:
let map_output =
List.flatten
(List.map
(fun r ->
let (key, value) = map r in
(key, value)
)
input)
The sequence map_output
is the one that is rearranged. This can be
best imagined as sorting the records. Actually, this is not the full
truth, as the records are only grouped by the key so that all
records with the same key are adjacent to each other. So we get
let group_output =
group_by (* imagine List.sort by key *)
(fun (key, value) -> key)
map_output
Next, we partition the sequence. The partitions have numbers from
0 to p-1
. Which keys go into which partition is configurable by the
user, but the framework will do some automatic partitioning if the user
does not have any wishes.
let filter_partition q l =
List.filter (fun (key,value) -> partition_of_key key = q) l
let partitions =
List.map
(fun q -> filter_partition q group_output)
[0; 1; ...; p-1]
Finally, there is the reduction step. Each partition is passed through
reduce
, the other placeholder function. reduce
works like a
folding operator: It has an accumulator (i.e. local state), and gets
all the keys and values of a single partition as input:
let reduce_partition partition =
List.fold_left
reduce
empty_state
partition
let output =
List.map
reduce_partition
partitions
The output consists of as many sequences of keys and values as there are partititons.
All the sequences in the algorithm scheme are actually data files in the implementation. In Plasma MapReduce, all files are stored in PlasmaFS, even intermediate files. The user needs to create three directories:
There is the restriction that lines must not become longer than the blocksize of the filesystem. The background is that the framework needs to split the data up into processable chunks (in the map phase), and the currently chosen scheme works only when the length of the records is limited in this way. (There is the possibility, though, to weaken this requirement. The chunks could also be taken as multiple blocks, for a constant multiple.)
The framework uses three types of elementary steps, called tasks, to execute the algorithm scheme. Each task can be run on every machine of the cluster. However, some machines are better than others, because the data (or some part of it) is stored locally on the computer.
The task types are:
map
function for each record, and writes map output
files. The fragments in the input may not only come from one
input file, but from several. The framework tries to choose
the fragments so that the data blocks are all stored on the computer
where the map task is executed. The output files are written so
they do not exceed a certain size (sort limit). When the limit
is hit, the map task starts writing to a second output file.reduce
function is also called before the data is written
into an output file.reduce
over it.
In some sense Map/Reduce is nothing but an enhanced disk-based merge sort, where inputs can be preprocessed (mapped) and outputs can be folded (reduced).
The tasks are dynamically assigned to computers just before they are executed. It is important to know how resource limits affect this.
Most important here, there is the illusion of infinite disk space. This means, disk space is not limited by the capacity of the local machine, but only by the sum of all disks of the cluster. This is typically so large that the developer of a Map/Reduce application never has to think about it.
Note that Plasma MapReduce keeps this illusion even for intermediate files. This is different from other implementations (e.g. Hadoop) where intermediate files can only be stored on local disks, and nodes become unusable when the disk fills up.
The CPU is also a non-scarce resource, especially as CPU's have multiple cores nowadays. Plasma MapReduce can take advantage of multi-core CPU's because it is implemented via multi-processing. All tasks are run in their own process.
Now to the resources that may become problematic. First, the speed of sequential disk I/O is probably the worst bottleneck. Many Plasma MapReduce tasks access data on the local disk, and these tasks usually saturate the I/O capacity. It is advisable to stay away from very small blocksizes, because for small blocks there is also an increased risk that random disk seeks also slow down I/O. PlasmaFS includes a lot of optimizations that try to avoid to scatter the blocks of a file on the whole disk, but there are limits. For blocksizes of 1M and up this is probably no big problem anymore.
The network is the next bottleneck. As a rule of thumb, all input data have to travel a few times over the network. The faster the network the better.
The RAM of the machines may also become a real problem. Unlike for other resources, there is a hard limit. As Plasma MapReduce uses a lot of shared memory, it is likely that the limit for shared memory is hit first. When determining how many tasks should run at once on a machine considerations concerning RAM consumption play an important role. Plasma MapReduce already implements some logic to reduce RAM consumption when RAM is tight (effectively, the size of adjustable buffers is reduced) to make the system more flexible in this respect.
As I/O and data copying dominate Map/Reduce, the available memory bandwidth may also be important. However, this is not yet explored.
There are, of course, also "self-made" limits. There is not yet much known where these are for Plasma MapReduce, but one should have an eye on: