Plasma GitLab Archive
Projects Blog Knowledge

Module Mapred_tasks


module Mapred_tasks: sig .. end
Representation of tasks

type file_fragment = string * int64 * int64 
Filename, start block pos, length in blocks. If the length is 0, this means "till end of file"
type file = file_fragment list 

type map_task = {
   map_input : file; (*the list of input block fragments to read, given as triples (filename,block,len)*)
   map_output_prefix : string; (*the preifx for output files to create and to write*)
   map_id : int; (*identifies the map task (numbered 0 to n-1)*)
   map_best_hosts : Unix.inet_addr list; (*best hosts for executing this task (might be empty)*)
}
type sort_task = {
   sort_input : file; (*The file to sort*)
   sort_input_del : bool; (*whether to delete the input after finishing*)
   sort_output : string; (*the output file to create and to write*)
   sort_id : int; (*the ID of the input map task*)
}
type shuffle_task = {
   shuffle_input : (file * int * int) list; (*The inputs: list of files (file, kmin, kmax). This means that file contains all records from the map tasks kmin to kmax which are also in shuffle_partitions.

It is required that the inputs are contiguous regarding the coverage of map tasks, i.e. for all tasks k given in shuffle_coverage there must be an input file covering it.

*)
   shuffle_input_del : bool; (*whether to delete the input after finishing*)
   shuffle_output : (string * int * int) list; (*the outputs: list of files (file, pmin, pmax). This means that file contains all records of the input that fall into the range of partitions pmin to pmax.*)
   shuffle_partitions : int * int; (*The files are in the range of partitions (pmin,pmax)*)
   shuffle_coverage : int * int; (*The files cover the map task range (kmin,kmax)*)
   shuffle_reduce : bool; (*Whether this shuffle task is also a reduce task*)
}
Shuffling means to rearrange the output of the map/sort tasks into a set of partitions. The map tasks are numbered 0 to n-1 (via map_id) and for each task there is a file that serves here as input. The partitions are numbered 0 to m-1, and finally there is for each partition exactly one output file.
type task = [ `Cleanup
| `Map of map_task
| `Shuffle of shuffle_task
| `Sort of sort_task ]
type task_result = [ `Error of string | `Ok of file list ] 
val encode_task : task -> string
val decode_task : string -> task
val encode_task_result : task_result -> string
val decode_task_result : string -> task_result
val string_of_task_id : task -> string
val print_task_id : task -> int -> unit
val print_task : task -> int -> unit
val print_file : ?tag:string -> file -> int -> unit
This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml