Plasma GitLab Archive
Projects Blog Knowledge

Plasmamr_start



Plasma MapReduce: What is Map/Reduce?

A few preliminary answers:

  • Map/Reduce is a method to sort sequential data. The sorting criterion can be arbitrarily defined.
  • Map/Reduce is an algorithm scheme. This means it is an algorithm where some parts are left open, and one can instantiate the scheme by filling out these parts. Map/Reduce got its name because the placeholder functions are commonly named map and reduce. The user normally only implements these placeholders, and the algorithm around these is implemented by a framework such as Plasma MapReduce.
  • Map/Reduce is run in a cluster of computers, and scales with the number of computers. This means when the number records is increased by a factor of n, the duration of the whole Map/Reduce computation increases by c * n * log(n) (where c is a constant). This does not only hold theoretically but also practically. Map/Reduce can process giant files in reasonable time, given a sufficient number of computers is available.
  • The data Map/Reduce processes as input must be stored in a special distributed filesystem (like PlasmaFS), and the result will also be written into the filesystem. The filesystem is optimized for large files, and does not hide where which block of data is stored. The filesystem and the compute framework are typically run on the same cluster of computers.
  • Map/Reduce tries to use the resources well. When it is advantageous to run a part of the algorithm on the machine where input data is stored, the Map/Reduce framework will consider doing so in order to avoid network I/O ("move the algorithm, not the data").

The Algorithm Scheme

The effect of the scheme can be explained in a few paragraphs. This describes into what the scheme transforms the data, but not why this can be well implemented (which we leave open here - read the published articles for this). In the following we use Ocaml pseudo syntax to get a relatively exact description:

The input data must be given as a sequence of records:

let input =
  [ r0; r1; r2; ...; r(n-1) ]

In a first step, the records are mapped. Each record is transformed to a sequence of map outputs by applying the map function. The output type of map must be pairs of keys and values:

let map_output =
  List.flatten
    (List.map 
      (fun r ->
        let (key, value) = map r in
        (key, value)
      )
      input)

The mapping step is useful because the data is often not stored in the best format, and needs to be preprocessed before the next step, sorting, can be started.

The sequence map_output is the one that is sorted. So we get

let sort_output =
  List.sort
    (fun (key1, value1) (key2, value2) -> compare key1 key2)
    map_output

Next, we partition the sequence. The partitions have numbers from 0 to p-1. Which keys go into which partition is configurable by the user, but the framework will do some automatic partitioning if the user does not have any wishes.

let filter_partition q l =
  List.filter (fun (key,value) -> partition_of_key key = q) l

let partitions =
  List.map
    (fun q -> filter_partition q group_output)
    [0; 1; ...; p-1]

Partioning the data means to split the whole dataset up into distinct parts. Why is this useful? First, it gives an opportunity to classify the data (i.e. all the records with a certain property are moved to a certain partition). Second, the implementation profits from this, because partitioning gives additional chances for parallelizing the algorithm.

Finally, there is the reduction step. Each partition is passed through reduce, the other placeholder function. reduce works like a folding operator: It has an accumulator (i.e. local state), and gets all the keys and values of a single partition as input:

let reduce_partition partition =
  List.fold_left
    reduce
    empty_state
    partition

let output =
  List.map
    reduce_partition
    partitions  

The output consists of as many sequences of keys and values as there are partititons.

Representing the data in files

All the sequences in the algorithm scheme are actually data files in the implementation. In Plasma MapReduce, all files are stored in PlasmaFS, even intermediate files. The user needs to create three directories:

  • An input directory: This directory contains the files that are taken as input. The files are implicitly concatenated. Of course, the algorithm scheme would only require to read from one input file, but for symmetry with the output directory, Plasma MapReduce allows several files (one can then easily do several Map/Reduce computations in sequence)
  • A work directory: This directory contains intermediate files. These files are usually deleted after they are processed. PlasmaFS has the feature that files can be stored locally, i.e. on the computer writing the file, so usually no network I/O is required to create the intermediate files. Also, access to such local files is highly optimized (via shared memory).
  • An output directory: This directory will contain the output files. There is exactly one output file for each partition.
  • A log directory: This directory will contain the log files of all tasks that are already finished.
For simplicity Plasma MapReduce uses simple text files to represent the sequences of records. This means every record is one line. If it is required to interpret a record as key/value pair, the user can provide a function to split the line into a key and a value. Often, the part up to the first TAB character is taken as key, and the rest of the line as value. (Update: Since Plasma-0.6 there are two further file formats, the fixed-size and the variable-size format, see Plasmamr_file_formats.)

There is the restriction that lines must not become longer than a certain maximum length. This length is also known as bigblock size. Bigblocks are the units in which map/reduce processes data. Bigblocks can be larger than the blocks of the filesystem.

Elementary execution steps

The framework uses three types of elementary steps, called tasks, to execute the algorithm scheme. Each task can be run on every machine of the cluster. However, some machines are better than others, because the data (or some part of it) is stored locally on the computer.

The task types are:

  • Map: A map task takes a list of input file fragments as input, calls the map function for each record, and writes map output files. The fragments in the input may not only come from one input file, but from several. The framework tries to choose the fragments so that the data blocks are all stored on the computer where the map task is executed. The output files are written so they do not exceed a certain size (sort limit). When the limit is hit, the map task starts writing to a second output file.
  • Sort: A sort task takes one of the map output files, sorts it (in RAM) so that records with the same key are adjacent, and writes a sort output file (of the same size).
  • Shuffle: A shuffle task takes the output of sort tasks or previous shuffle tasks as input, and merges the input files, so that the "adjacent key" condition also holds for the merged files. The result of the merge operation is not written into a single output file, but typically into several output files, so that only records belonging to certain partitions are written into a file. This means a shuffle task also splits data by partition. If the shuffle task is a final shuffle task, the reduce function is also called before the data is written into an output file.
The framework now invokes the map tasks so that all input data is mapped. The mapped data is sorted. The sorted data is shuffled, so that finally every sort output is merged with all other sort outputs, and so that the merged data is split into the partitions. When the shuffle task sees that the output file is a final output file, it also runs reduce over it.

In some sense Map/Reduce is nothing but an enhanced disk-based merge sort, where inputs can be preprocessed (mapped) and outputs can be folded (reduced).

Resource limits

The tasks are dynamically assigned to computers just before they are executed. It is important to know how resource limits affect this.

Most important here, there is the illusion of infinite disk space. This means, disk space is not limited by the capacity of the local machine, but only by the sum of all disks of the cluster. This is typically so large that the developer of a Map/Reduce application never has to think about it.

Note that Plasma MapReduce keeps this illusion even for intermediate files. This is different from other implementations (e.g. Hadoop) where intermediate files can only be stored on local disks, and nodes become unusable when the disk fills up.

The CPU is also a non-scarce resource, especially as CPU's have multiple cores nowadays. Plasma MapReduce can take advantage of multi-core CPU's because it is implemented via multi-processing. All tasks are run in their own process.

Now to the resources that may become problematic. First, the speed of sequential disk I/O is probably the worst bottleneck. Many Plasma MapReduce tasks access data on the local disk, and these tasks usually saturate the I/O capacity. It is advisable to stay away from very small blocksizes, because for small blocks there is also an increased risk that random disk seeks also slow down I/O. PlasmaFS includes a lot of optimizations that try to avoid to scatter the blocks of a file on the whole disk, but there are limits. For blocksizes of 1M and up this is probably no big problem anymore.

The network is the next bottleneck. As a rule of thumb, all input data have to travel a few times over the network. The faster the network the better.

The RAM of the machines may also become a real problem. Unlike for other resources, there is a hard limit. As Plasma MapReduce uses a lot of shared memory, it is likely that the limit for shared memory is hit first. When determining how many tasks should run at once on a machine considerations concerning RAM consumption play an important role. Plasma MapReduce already implements some logic to reduce RAM consumption when RAM is tight (effectively, the size of adjustable buffers is reduced) to make the system more flexible in this respect.

As I/O and data copying dominate Map/Reduce, the available memory bandwidth may also be important. However, this is not yet explored.

Implementation limits

There are, of course, also "self-made" limits. There is not yet much known where these are for Plasma MapReduce, but one should have an eye on:

  • Namenode RAM. Here, RAM is consumed for every open transaction, and on a big cluster, this may be many. Also, RAM is consumed for the blockmaps, i.e. the bitmaps that say which block is used and which is free. The blockmaps are completely kept in memory.
  • Other namenode resources. The namenode server is only single-threaded, so it cannot take advantage from multiple cores. Also, for a really busy cluster, the number of file descriptors may become an issue. This limit can be increased in the operating system, though.

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml