Plasma GitLab Archive
Projects Blog Knowledge

Plasmamr_file_formats



Plasma MapReduce: File formats

In this context files are sequences of records. Plasma knows three different representations:

  • Text: Like in a normal text file every record is terminated by a LF character. Of course, this implies that the record must not contain LF characters. Applications need to work around this limitation, e.g. by using escape mechanisms. Implemented by Mapred_io.line_structured_format.
  • Fixed-size records: Every record has the same size of n bytes, where n is externally given. In contrast to the text format, every byte can be used, including LF. This format is most useful for binary numeric data. Implemented by Mapred_io.fixed_size_format.
  • Variable-size records: The records are preceded by length headers which say how long the immediately following record is. Every byte is allowed as part of the record. This representation is most universal, but there are some complications, see below. Implemented by Mapred_io.var_size_format.
If you can live with the limitation that records must not contain LF bytes the text format is the easiest choice. Otherwise you should have a look at the other two possibilities.

Note that Plasma simply considers a record as a string (on this level of the abstraction), without assuming any particular format of this string. The user has to configure how to extract the key from the string. The key is then used for sorting, and for determining the partition.

All formats take the size of the bigblocks into account. The bigblocks are the smallest units into which the files are split for distributing the work among the available task nodes. Bigblocks need to be multiples of filesystem blocks. For example, a user may specify bigblocks of 64 MB for a filesystem that uses 1 MB blocks. The connection with file formats is that the formats need to support this splitting of the files into smaller pieces, where each piece consists of a whole number of bigblocks.

There is a command-line utility for converting between formats, see Cmd_plasma_convert.

The file formats use conventionally different file name suffixes for easier recognition: ".var" means the variable-size format, ".fixed<n>" the fixed-size format with n bytes, and everything else is considered as text. There is also Mapred_io.auto_input_format which determines the format of input files by the file name suffix.

For toolkit users: The following discussions and examples are older than the toolkit, and they assume the classic job definition (Mapred_def.mapred_job). Of course, the various file formats are also available in the toolkit. There is a Mapred_toolkit.format parameter for places, and e.g. passing `Var_size_format here would select this format. The following is nevertheless interesting to know as background.

The text format

The text format is very simple: A record is a sequence of bytes followed by an LF byte. The length of the sequence is basically unlimited, but, however, for practical reasons the maximum is bigblock_size-1. The sequence may contain every byte except 10 (LF). Note that we do not assume any character encoding, and that even the null byte is allowed.

The big advantage of the text format is that it is compatible with lots of existing utilities like text editors, viewers, and command-line tools. The downside is that one needs to treat data specially that may include LF bytes.

The text format is implemented by Mapred_io.line_structured_format.

Example: We want to analyze the frequency of names in various countries. Every record has three fields: Name, country code, and frequency. The name is only composed of Unicode letters (lowercase if possible). The country code is the ISO code like "US", or "DE". The frequency is an integer. Because the LF byte cannot occur in the payload data, we need not to escape LF. Also, we can use TAB bytes to separate the three fields. A line could look like (the bytes are separated by spaces for clarity):

g e r d TAB D E TAB 6 5 2 4 3

When summing up the names by countries, the sort key would be the name plus the country code, so we would extract the first and the second field (by searching the second TAB byte). Note that it is a requirement in Plasma that the key is a contiguous part of the record (i.e. it would not be allowed to consider the first and third field as a key - the workaround is to introduce extra key fields).

In the code, the format is described as part of the Mapred_def.mapred_job object. Here, we could define the relevant methods as:

let job = 
   object
      method input_record_io me jc =
        Mapred_io.line_structured_format()

      method output_record_io me jc =
        Mapred_io.line_structured_format()

      method internal_record_io me jc =
        Mapred_io.line_structured_format()

      method extract_key me jc record =
        Mapred_fields.text_field_range ~sep:"\t" ~field:1 ~num:2 record

      method sorter =
        Mapred_sorters.generic_sorter
          ~hash:Mapred_sorters.String_asc.hash
          ~cmp:Mapred_sorters.String_asc.cmp

      ...
   end

Note that you need to implement some more methods for a full job definition (e.g. the map and reduce methods).

You may wonder why we have to refer to Mapred_io.line_structured_format three times. The answer is that we can specify the file format for three different occasions separately:

  • In input_record_io the format is described that is expected as input of the map/red job.
  • In ouput_record_io the format is described that is written as output of the map/red job.
  • In internal_record_io the format is described that is used by the internal steps. This includes the format written after map, and the format used within reduce (except of the final reduce output).
The key is mainly the input for the sorter (but the partition is also derived from it). It is here extracted from the full record by taking the first and the second field. The Mapred_fields.text_field_range function returns the position of the key in the record as a pair (p,l) where p is the position where the key starts, and l is the length in bytes.

The sorter is here specified so that it sorts the key in ascending order. We use here Mapred_sorters.generic_sorter and configure this implementation with two argument functions hash and cmp. There are lots of definitions for hash and cmp in Mapred_sorters, and we pick here Mapred_sorters.String_asc - which sorts strings in an ascending way.

The function cmp works in the same way as in the Ocaml standard library (e.g. in Array.sort), but the two strings to compare are given as triple buffer, pos, and len. This means the key to compare is to be taken from the larger string buffer in the byte range from pos to pos+len-1. The function hash returns for every key (given in the same style) a 30 bit integer. Basically, generic_sorter sorts first by this integer, and only for keys with the same hash the cmp function is used. The definitions in Mapred_sorters are quite clever, though, and the hash values are mostly used as accelerators: hash is then defined such that the same sorting order is achieved one would get if only cmp was effective.

Escaping

A simple escaping mechanism is provided by the Ocaml runtime. It is basically the same Ocaml uses for its own strings - LF can be written as \n, and TAB as \t. The mechanism is a bit hidden, and only available via the Printf and Scanf modules.

In order to escape a string, use the %S format specifier in printf, e.g.

let escaped = Printf.sprintf "%S" raw

Note that this also puts double quotes around the string.

The reverse is done with

let raw = Scanf.sscanf escaped "%S" (fun s -> s)

This mechanism has the drawback that also all non-ASCII characters are escaped with backslashes, which may increase the size of the records significantly if they really contain binary data.

Alternatives:

  • Especially sscanf is slow. There are the faster alternatives Mapred_fields.escape and Mapred_fields.unescape. These functions also do not enclose the escaped string in double quotes.
  • BASE-64 is an encoding for binary data that only leads to an increase of 1/3. It is available via the Ocamlnet module Netencoding.Base64 (in the netstring package). Unfortunately, BASE-64 does not keep the sorting order (but see below).
  • JSON is a format for representing simple-structured data. In Ocaml the libraries json-wheel and yojson support JSON.
If you escape fields that are used as keys you may run into some problems:
  • The escaping mechanism may allow various representations of the same byte (e.g. "\n" and "\010" in Ocaml). This may especially be a problem when you mix data from several sources, and every source has chosen a different representation. If you now sort the records by the keys in the escaped form, it may happen that the logically same key is sorted differently.
  • Also, it may happen that the escaping mechanism changes the order of the strings anyway (e.g. normally LF < A, but in escaped form we have \n > A).
If this is a problem there may be two solutions:
  • Introduce a special field that is only used for sorting, and apply an escaping method that does not change the sorting order. The functions Mapred_fields.encode_alt64 and Mapred_fields.decode_alt64 use a variant of BASE-64 called ALT-64 that keeps the sorting order, i.e. the encoded strings are sorted in the same order as the decoded strings. Like the standard BASE-64, ALT-64 increases the data volume by 1/3.
  • Decode the field just before passing it to the sorter. This is expensive, though.
The latter is not really obvious, and because of this let's make an example. Here we use the above mechanism via printf/scanf. The trick is to define new version of the functions hash and cmp that are passed to Mapred_sorters.generic_sorter:

let unescape escaped =
  Scanf.sscanf escaped "%S" (fun s -> s)

let hash s pos len =
  let s' = unescape(String.sub s pos len) in
  Mapred_sorters.String_asc.hash s' 0 (String.length s')

let cmp s1 pos1 len1 s2 pos2 len2 =
  let s1' = unescape(String.sub s1 pos1 len1) in
  let s2' = unescape(String.sub s2 pos2 len2) in
  Mapred_sorters.String_asc.cmp
    s1' 0 (String.length s1')
    s2' 0 (String.length s2')

Now define the sorter as Mapred_sorters.generic_sorter ~hash ~cmp.

The fixed-size format

The fixed-size format uses a fixed number of bytes per record. Every byte can occur, and thus escaping mechanisms are not needed. This format is normally used with binary numeric data because the fields usually also have a fixed "width".

Example: We have many data points that are simply enumerated with an integer (point #0, #1, #2, etc.). For each point there is an associated float. We need to sort the data by value.

The representation is now that we use 16 bytes per record, and the first eight bytes represent the ordinal number of the data point, and the second eight bytes are the float (double-precision IEEE float). We use big-endian byte order (which is recommended here because this is "compatible" with the lexicographic sorting of strings - if n1 < n2 we also have that the big-endian representations BE fulfil BE(n1) < BE(n2) if we just sort the strings).

The Netnumber module of Ocamlnet (in the netstring package) can be used to convert the numbers to the external representation and back:

let encode (ord,value) =
  let s1 = Netnumber.BE.int8_as_string (Netnumber.int8_of_int ord) in
  let s2 = Netnumber.BE.fp8_as_string (Netnumber.fp8_of_float value) in
  s1 ^ s2

let decode s =
  assert(String.length s = 16);
  let ord = Netnumber.int_of_int8 (Netnumber.read_int8 s 0) in
  let value = Netnumber.float_of_fp8 (Netnumber.read_fp8 s 8) in
  (ord,value)

The job definition is now:

let job = 
   object
      method input_record_io me jc =
        Mapred_io.fixed_size_format 16

      method output_record_io me jc =
        Mapred_io.fixed_size_format 16

      method internal_record_io me jc =
        Mapred_io.fixed_size_format 16

      method extract_key me jc record =
        (8, 8)

      method sorter =
        Mapred_sorters.generic_sorter
          ~hash:Mapred_sorters.Float_xdr_asc.hash
          ~cmp:Mapred_sorters.Float_xdr_asc.cmp

      ...
   end

The extraction of the key is now trivial: The float value can always be found at the eighth byte and is eight bytes long.

For sorting we use here the special comparator module Mapred_sorters.Float_xdr_asc which sorts a float given in binary big-endian representation.

The variable-size format

Basically, the variable-size format precedes every record with a length field. However, the way a map/reduce job processes data requires another principle for structuring the file. Because a task must be able to decode the records even when it is only processing a fragment of a file, we need to include a bit of helper information that guides the task to find the beginning of a record when starting in the middle of the file.

This is done by organizing the file as a sequence of chunks:

   +----------------------------------------------------+
   | Chunk 0 header                                     |
   +----------------------------------------------------+
   | Chunk 0 payload                                    |
   |                                                    |
   ...                                                ...
   |                                                    |
   |                                                    |
   +----------------------------------------------------+
   | Chunk 1 header                                     |
   +----------------------------------------------------+
   | Chunk 1 payload                                    |
   |                                                    |
   ...                                                ...
   |                                                    |
   |                                                    |
   +----------------------------------------------------+
   ...

Normally, the chunks have a fixed size of 64K (header plus payload), where the header occupies the first 32 bytes. The header contains the information:

  • How large a chunk is (well, normally 64K, but it is possible to deviate from this if needed)
  • How large the payload is - this is normally the "rest of the chunk", but it is possible to fill only a part of the chunk with data, and leave the remaining bytes zero (this is normally only the case for the last chunk of the file)
  • Where the first record starts in the chunk (if any)
  • Whether the payload is compressed (note that this is not yet implemented in Plasma-0.6)
  • A checksum to verify that some 32 bytes are really a chunk header
It is additionally required that a bigblock is a multiple of the chunk size (which is normally easy to guarantee). This allows the tasks to take any fragment of a file as input where the fragment consists of a whole number of bigblocks, and to find the start of the next record. This works because the fragment always consists of a whole number of chunks, and it is possible to jump from chunk header to chunk header until the first record is found.

The real payload is now the sequence of the payload regions in the chunks. It is possible that a record starts in one chunk and is continued in the next chunk (and even continued beyond).

As mentioned, the records have now a length header. If the length is up to 254 bytes, this header consists just of a single byte. Otherwise, the header is made up from 9 bytes.

The details of the variable-size format are documented for the function Mapred_io.var_size_format.

How to create and read files in variable-size format

It is recommended to just use Mapred_io.var_size_format, e.g.

let write_something() =
  let fmt = Mapred_io.var_size_format() in
  let wr = fmt # write_file cluster rc "/filename" in
  wr # output_record "something";
  wr # output_record "next";
  ...
  wr # close_out()

let read_back() =
  let fmt = Mapred_io.var_size_format() in
  let n = Mapred_io.file_blocks cluster "/filename" in
  let rd = fmt # read_file [] cluster rc "/filename" 0L n in
  let r1 = rd # input_record() in
  let r2 = rd # input_record() in
  ... (* until End_of_file *)
  rd # close_in()

Here, cluster is an open Plasma_client.plasma_cluster PlasmaFS client. In rc one needs to the record configuration (bigblock size, and buffer sizes, see Mapred_io.record_config).

Considerations for the layout of the records

There is now maximum freedom for the layout of the records. It is now possible to handle records in the same way as for the text format, or in the same way as for the fixed-size format. Sophisticated layouts are now also possible: The module Mapred_fields includes support for a special representation of fields within the record. As the records as a whole the fields have now length headers. For example,

Mapred_fields.var_concat ["one"; "two"; "three"]

creates a record with three fields. Note that here each field can have any value - LF and nulls included, of any length.

The function Mapred_fields.var_field_range can extract the byte positions of fields, and Mapred_fields.var_fields just returns the fields of an encoded record.

Example: We want to analyze a social network, and have four columns with: Name of a person, person ID, number of friends, photo as JPEG. We want to sort the data by the number of friends.

We now use Mapred_fields to represent the records such that

let record =
  Mapred_fields.var_concat
    [ name; string_of_int id; string_of_int friends; photo ]

Because there are no restrictions about the contents of the fields, there also no escaping problems.

This leads to this job definition:

let job = 
   object
      method input_record_io me jc =
        Mapred_io.var_size_format ()

      method output_record_io me jc =
        Mapred_io.var_size_format ()

      method internal_record_io me jc =
        Mapred_io.var_size_format ()

      method extract_key me jc record =
        Mapred_fields.var_field_range ~field:3 record

      method sorter =
        Mapred_sorters.generic_sorter
          ~hash:Mapred_sorters.Integer_asc.hash
          ~cmp:Mapred_sorters.Integer_asc.cmp

      ...
   end

Sorting by several keys

The methods extract_key, partition_of_key, and the sorter interact in a certain way. Bascially, extract_key is a preprocessor of the latter two functions, and factors common work out. What we finally need are:

  • A function that returns us the partition of a record
  • A sorter that sorts the records
Often, the partition is based on the same part of the record that is also used for sorting the records, and we commonly call this part the "key". However, there is no strict connection between the partition and the sorting criterion - it's only useful very frequently.

So, extract_key normally identifies the interesting part of the record that needs to be analyzed by partition_of_key and the sorter, and returns this as a pair (pos,len). Sometimes, it is not possible to pinpoint any such part, and we can only return pos=0 and len=String.length record. In this case, partition_of_key and the sorter get the complete record as input, and can look into different parts of the records to distill the required information.

All of partition_of_key and the sorter functions hash and cmp get their input as a triple (s,pos,len) where s is some string (often not only the record but a buffer containing the record), and pos and len are the output of extract_key relative to s. This means we can reconstruct the record by running

 let record = String.sub s pos len 

Example: We have three fields. We want to sort by the first and third field, and we take the hash of the second key to get the partition. We assume here text records with TAB-separated fields.

The relevant part of the job definition is then:

let job = 
   object
      method extract_key me jc record =
        (0, String.length record)

      method sorter =
        let hash s pos len =
          (* Only use the first field for hashing *)
          let (p1,l1) = Mapred_fields.text_field_range ~pos ~len ~field:1 s in
          Mapred_sorters.String_asc.hash s p1 l1
          (* (p1,l1) is the range of the first field in s *)

        and cmp s1 pos1 len1 s2 pos2 len2 =
          let (p1,l1) = 
            Mapred_fields.text_field_range ~pos:pos1 ~len:len1 ~field:1 s1 in
          (* (p1,l1) is the range of the first field in s1 *)
          let (p2,l2) = 
            Mapred_fields.text_field_range ~pos:pos2 ~len:len2 ~field:1 s2 in
          (* (p2,l2) is the range of the first field in s2 *)
          let d = Mapred_sorters.String_asc.cmp s1 p1 l1 s2 p2 l2 in
          if d <> 0 then
            d
          else
            (* Compare only by the third field if the first fields are equal *)
            let (q1,m1) = 
              Mapred_fields.text_field_range ~pos:pos1 ~len:len1 ~field:3 s1 in
            (* (q1,m1) is the range of the third field in s1 *)
            let (q2,m2) = 
              Mapred_fields.text_field_range ~pos:pos2 ~len:len2 ~field:3 s2 in
            (* (q2,m2) is the range of the third field in s2 *)
            Mapred_sorters.String_asc.cmp s1 q1 m1 s2 q2 m2 in

        Mapred_sorters.generic_sorter
          ~hash
          ~cmp

      method partition_of_key me jc s pos len =
        let field2 = Mapred_fields.text_field ~pos ~len ~field:2 s in
        (Hashtbl.hash field2) mod jc#partitions

      ...
   end

This example works even better if one of the binary formats is chosen:

  • For fixed-size format, there is normally no need to call a function like text_extract, because the positions in the record are fixed (e.g. the first field is found in bytes 0-7, the second in bytes 8-15, and the third field in bytes 16-23).
  • For variable-size format, there is the function Mapred_fields.var_field_range which does the corresponding to text_field_range, but is faster especially for the case that the records are long.

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml