In this context files are sequences of records. Plasma knows three different representations:
Mapred_io.line_structured_format
.Mapred_io.fixed_size_format
.Mapred_io.var_size_format
.Note that Plasma simply considers a record as a string (on this level of the abstraction), without assuming any particular format of this string. The user has to configure how to extract the key from the string. The key is then used for sorting, and for determining the partition.
All formats take the size of the bigblocks into account. The bigblocks are the smallest units into which the files are split for distributing the work among the available task nodes. Bigblocks need to be multiples of filesystem blocks. For example, a user may specify bigblocks of 64 MB for a filesystem that uses 1 MB blocks. The connection with file formats is that the formats need to support this splitting of the files into smaller pieces, where each piece consists of a whole number of bigblocks.
There is a command-line utility for converting between formats, see
Cmd_plasma_convert
.
The file formats use conventionally different file name suffixes
for easier recognition: ".var" means the variable-size format,
".fixed<n>" the fixed-size format with n bytes, and everything else
is considered as text. There is also Mapred_io.auto_input_format
which determines the format of input files by the file name suffix.
For toolkit users: The following discussions and examples are older
than the toolkit, and they assume the classic job definition
(Mapred_def.mapred_job
). Of course, the various file formats are
also available in the toolkit. There is a Mapred_toolkit.format
parameter for places, and e.g. passing `Var_size_format
here would
select this format. The following is nevertheless interesting to know
as background.
The text format is very simple: A record is a sequence of bytes
followed by an LF byte. The length of the sequence is basically
unlimited, but, however, for practical reasons the maximum is
bigblock_size-1
. The sequence may contain every byte except 10
(LF). Note that we do not assume any character encoding, and that even
the null byte is allowed.
The big advantage of the text format is that it is compatible with lots of existing utilities like text editors, viewers, and command-line tools. The downside is that one needs to treat data specially that may include LF bytes.
The text format is implemented by Mapred_io.line_structured_format
.
Example: We want to analyze the frequency of names in various countries. Every record has three fields: Name, country code, and frequency. The name is only composed of Unicode letters (lowercase if possible). The country code is the ISO code like "US", or "DE". The frequency is an integer. Because the LF byte cannot occur in the payload data, we need not to escape LF. Also, we can use TAB bytes to separate the three fields. A line could look like (the bytes are separated by spaces for clarity):
g e r d TAB D E TAB 6 5 2 4 3
When summing up the names by countries, the sort key would be the name plus the country code, so we would extract the first and the second field (by searching the second TAB byte). Note that it is a requirement in Plasma that the key is a contiguous part of the record (i.e. it would not be allowed to consider the first and third field as a key - the workaround is to introduce extra key fields).
In the code, the format is described as part of the Mapred_def.mapred_job
object. Here, we could define the relevant methods as:
let job =
object
method input_record_io me jc =
Mapred_io.line_structured_format()
method output_record_io me jc =
Mapred_io.line_structured_format()
method internal_record_io me jc =
Mapred_io.line_structured_format()
method extract_key me jc record =
Mapred_fields.text_field_range ~sep:"\t" ~field:1 ~num:2 record
method sorter =
Mapred_sorters.generic_sorter
~hash:Mapred_sorters.String_asc.hash
~cmp:Mapred_sorters.String_asc.cmp
...
end
Note that you need to implement some more methods for a full job
definition (e.g. the map
and reduce
methods).
You may wonder why we have to refer to Mapred_io.line_structured_format
three times. The answer is that we can specify the file format for three
different occasions separately:
input_record_io
the format is described that is expected as input
of the map/red job.ouput_record_io
the format is described that is written as output
of the map/red job.internal_record_io
the format is described that is used by the
internal steps. This includes the format written after map
, and the
format used within reduce
(except of the final reduce output).Mapred_fields.text_field_range
function
returns the position of the key in the record as a pair (p,l)
where
p
is the position where the key starts, and l
is the length in
bytes.
The sorter is here specified so that it sorts the key in ascending order.
We use here Mapred_sorters.generic_sorter
and configure this
implementation with two argument functions hash
and cmp
. There are
lots of definitions for hash
and cmp
in Mapred_sorters
, and we
pick here Mapred_sorters.String_asc
- which sorts strings in an
ascending way.
The function cmp
works in the same way as in the Ocaml standard
library (e.g. in Array.sort
), but the two strings to compare are
given as triple buffer
, pos
, and len
. This means the key to
compare is to be taken from the larger string buffer
in the byte
range from pos
to pos+len-1
. The function hash
returns for every
key (given in the same style) a 30 bit integer. Basically,
generic_sorter
sorts first by this integer, and only for keys with
the same hash the cmp
function is used. The definitions in
Mapred_sorters
are quite clever, though, and the hash values are
mostly used as accelerators: hash
is then defined such that the
same sorting order is achieved one would get if only cmp
was
effective.
A simple escaping mechanism is provided by the Ocaml runtime. It is
basically the same Ocaml uses for its own strings - LF can be written
as \n, and TAB as \t. The mechanism is a bit hidden, and only available
via the Printf
and Scanf
modules.
In order to escape a string, use the %S
format specifier in printf
,
e.g.
let escaped = Printf.sprintf "%S" raw
Note that this also puts double quotes around the string.
The reverse is done with
let raw = Scanf.sscanf escaped "%S" (fun s -> s)
This mechanism has the drawback that also all non-ASCII characters are escaped with backslashes, which may increase the size of the records significantly if they really contain binary data.
Alternatives:
sscanf
is slow. There are the faster alternatives
Mapred_fields.escape
and Mapred_fields.unescape
. These functions
also do not enclose the escaped string in double quotes.Netencoding.Base64
(in
the netstring
package). Unfortunately, BASE-64 does not keep the
sorting order (but see below).json-wheel
and yojson
support JSON.Mapred_fields.encode_alt64
and Mapred_fields.decode_alt64
use a variant of BASE-64 called ALT-64 that keeps the sorting order, i.e. the
encoded strings are sorted in the same order as the decoded strings.
Like the standard BASE-64, ALT-64 increases the data volume by 1/3.hash
and cmp
that
are passed to Mapred_sorters.generic_sorter
:
let unescape escaped =
Scanf.sscanf escaped "%S" (fun s -> s)
let hash s pos len =
let s' = unescape(String.sub s pos len) in
Mapred_sorters.String_asc.hash s' 0 (String.length s')
let cmp s1 pos1 len1 s2 pos2 len2 =
let s1' = unescape(String.sub s1 pos1 len1) in
let s2' = unescape(String.sub s2 pos2 len2) in
Mapred_sorters.String_asc.cmp
s1' 0 (String.length s1')
s2' 0 (String.length s2')
Now define the sorter as Mapred_sorters.generic_sorter ~hash ~cmp
.
The fixed-size format uses a fixed number of bytes per record. Every byte can occur, and thus escaping mechanisms are not needed. This format is normally used with binary numeric data because the fields usually also have a fixed "width".
Example: We have many data points that are simply enumerated with an integer (point #0, #1, #2, etc.). For each point there is an associated float. We need to sort the data by value.
The representation is now that we use 16 bytes per record, and the first
eight bytes represent the ordinal number of the data point, and the second
eight bytes are the float (double-precision IEEE float). We use big-endian
byte order (which is recommended here because this is "compatible" with
the lexicographic sorting of strings - if n1 < n2
we also have that the
big-endian representations BE fulfil BE(n1) < BE(n2)
if we just sort
the strings).
The Netnumber
module of Ocamlnet (in the netstring
package) can be
used to convert the numbers to the external representation and back:
let encode (ord,value) =
let s1 = Netnumber.BE.int8_as_string (Netnumber.int8_of_int ord) in
let s2 = Netnumber.BE.fp8_as_string (Netnumber.fp8_of_float value) in
s1 ^ s2
let decode s =
assert(String.length s = 16);
let ord = Netnumber.int_of_int8 (Netnumber.read_int8 s 0) in
let value = Netnumber.float_of_fp8 (Netnumber.read_fp8 s 8) in
(ord,value)
The job definition is now:
let job =
object
method input_record_io me jc =
Mapred_io.fixed_size_format 16
method output_record_io me jc =
Mapred_io.fixed_size_format 16
method internal_record_io me jc =
Mapred_io.fixed_size_format 16
method extract_key me jc record =
(8, 8)
method sorter =
Mapred_sorters.generic_sorter
~hash:Mapred_sorters.Float_xdr_asc.hash
~cmp:Mapred_sorters.Float_xdr_asc.cmp
...
end
The extraction of the key is now trivial: The float value can always be found at the eighth byte and is eight bytes long.
For sorting we use here the special comparator module
Mapred_sorters.Float_xdr_asc
which sorts a float given in binary
big-endian representation.
Basically, the variable-size format precedes every record with a length field. However, the way a map/reduce job processes data requires another principle for structuring the file. Because a task must be able to decode the records even when it is only processing a fragment of a file, we need to include a bit of helper information that guides the task to find the beginning of a record when starting in the middle of the file.
This is done by organizing the file as a sequence of chunks:
+----------------------------------------------------+
| Chunk 0 header |
+----------------------------------------------------+
| Chunk 0 payload |
| |
... ...
| |
| |
+----------------------------------------------------+
| Chunk 1 header |
+----------------------------------------------------+
| Chunk 1 payload |
| |
... ...
| |
| |
+----------------------------------------------------+
...
Normally, the chunks have a fixed size of 64K (header plus payload), where the header occupies the first 32 bytes. The header contains the information:
The real payload is now the sequence of the payload regions in the chunks. It is possible that a record starts in one chunk and is continued in the next chunk (and even continued beyond).
As mentioned, the records have now a length header. If the length is up to 254 bytes, this header consists just of a single byte. Otherwise, the header is made up from 9 bytes.
The details of the variable-size format are documented for the function
Mapred_io.var_size_format
.
It is recommended to just use Mapred_io.var_size_format
, e.g.
let write_something() =
let fmt = Mapred_io.var_size_format() in
let wr = fmt # write_file cluster rc "/filename" in
wr # output_record "something";
wr # output_record "next";
...
wr # close_out()
let read_back() =
let fmt = Mapred_io.var_size_format() in
let n = Mapred_io.file_blocks cluster "/filename" in
let rd = fmt # read_file [] cluster rc "/filename" 0L n in
let r1 = rd # input_record() in
let r2 = rd # input_record() in
... (* until End_of_file *)
rd # close_in()
Here, cluster
is an open Plasma_client.plasma_cluster
PlasmaFS client.
In rc
one needs to the record configuration (bigblock size, and buffer
sizes, see Mapred_io.record_config
).
There is now maximum freedom for the layout of the records. It is now
possible to handle records in the same way as for the text format,
or in the same way as for the fixed-size format. Sophisticated layouts
are now also possible: The module Mapred_fields
includes support
for a special representation of fields within the record. As the
records as a whole the fields have now length headers. For example,
Mapred_fields.var_concat ["one"; "two"; "three"]
creates a record with three fields. Note that here each field can have any value - LF and nulls included, of any length.
The function Mapred_fields.var_field_range
can extract the byte
positions of fields, and Mapred_fields.var_fields
just returns the
fields of an encoded record.
Example: We want to analyze a social network, and have four columns with: Name of a person, person ID, number of friends, photo as JPEG. We want to sort the data by the number of friends.
We now use Mapred_fields
to represent the records such that
let record =
Mapred_fields.var_concat
[ name; string_of_int id; string_of_int friends; photo ]
Because there are no restrictions about the contents of the fields, there also no escaping problems.
This leads to this job definition:
let job =
object
method input_record_io me jc =
Mapred_io.var_size_format ()
method output_record_io me jc =
Mapred_io.var_size_format ()
method internal_record_io me jc =
Mapred_io.var_size_format ()
method extract_key me jc record =
Mapred_fields.var_field_range ~field:3 record
method sorter =
Mapred_sorters.generic_sorter
~hash:Mapred_sorters.Integer_asc.hash
~cmp:Mapred_sorters.Integer_asc.cmp
...
end
The methods extract_key
, partition_of_key
, and the sorter
interact
in a certain way. Bascially, extract_key
is a preprocessor of the latter
two functions, and factors common work out. What we finally need are:
So, extract_key
normally identifies the interesting part of the
record that needs to be analyzed by partition_of_key
and the
sorter
, and returns this as a pair (pos,len)
. Sometimes, it is not
possible to pinpoint any such part, and we can only return pos=0
and
len=String.length record
. In this case, partition_of_key
and the
sorter
get the complete record as input, and can look into different
parts of the records to distill the required information.
All of partition_of_key
and the sorter functions hash
and cmp
get their input as a triple (s,pos,len)
where s
is some string
(often not only the record but a buffer containing the record), and
pos
and len
are the output of extract_key
relative to s
. This
means we can reconstruct the record by running
let record = String.sub s pos len
Example: We have three fields. We want to sort by the first and third field, and we take the hash of the second key to get the partition. We assume here text records with TAB-separated fields.
The relevant part of the job definition is then:
let job =
object
method extract_key me jc record =
(0, String.length record)
method sorter =
let hash s pos len =
(* Only use the first field for hashing *)
let (p1,l1) = Mapred_fields.text_field_range ~pos ~len ~field:1 s in
Mapred_sorters.String_asc.hash s p1 l1
(* (p1,l1) is the range of the first field in s *)
and cmp s1 pos1 len1 s2 pos2 len2 =
let (p1,l1) =
Mapred_fields.text_field_range ~pos:pos1 ~len:len1 ~field:1 s1 in
(* (p1,l1) is the range of the first field in s1 *)
let (p2,l2) =
Mapred_fields.text_field_range ~pos:pos2 ~len:len2 ~field:1 s2 in
(* (p2,l2) is the range of the first field in s2 *)
let d = Mapred_sorters.String_asc.cmp s1 p1 l1 s2 p2 l2 in
if d <> 0 then
d
else
(* Compare only by the third field if the first fields are equal *)
let (q1,m1) =
Mapred_fields.text_field_range ~pos:pos1 ~len:len1 ~field:3 s1 in
(* (q1,m1) is the range of the third field in s1 *)
let (q2,m2) =
Mapred_fields.text_field_range ~pos:pos2 ~len:len2 ~field:3 s2 in
(* (q2,m2) is the range of the third field in s2 *)
Mapred_sorters.String_asc.cmp s1 q1 m1 s2 q2 m2 in
Mapred_sorters.generic_sorter
~hash
~cmp
method partition_of_key me jc s pos len =
let field2 = Mapred_fields.text_field ~pos ~len ~field:2 s in
(Hashtbl.hash field2) mod jc#partitions
...
end
This example works even better if one of the binary formats is chosen:
text_extract
, because the positions in the record are fixed
(e.g. the first field is found in bytes 0-7, the second in bytes 8-15,
and the third field in bytes 16-23).Mapred_fields.var_field_range
which does the corresponding to
text_field_range
,
but is faster especially for the case that the records are long.