Plasma GitLab Archive
Projects Blog Knowledge

(*
  Copyright 2010 Gerd Stolpmann

  This file is part of Plasma, a distributed filesystem and a
  map/reduce computation framework. Unless you have a written license
  agreement with the copyright holder (Gerd Stolpmann), the following
  terms apply:

  Plasma is free software: you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation, either version 3 of the License, or
  (at your option) any later version.

  Plasma is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with Foobar.  If not, see <http://www.gnu.org/licenses/>.

*)
(* $Id: mapred_sched1.ml 239 2010-06-23 16:49:03Z gerd $ *)

open Mapred_tasks
open Plasma_rpcapi_aux
open Printf

type plan_config =
    { keep_temp_files : bool;
      task_servers : string list;
      task_servers_ip : Unix.inet_addr list;
    }

type extended_task =
    { task : task;
      deps : extended_task list;
      mutable complete : bool;
      mutable started : bool;
    }

let new_xtask t =
  { task = t; deps = []; complete = false; started = false }

module T = struct
  type t = task
  let compare (t1:task) (t2:task) = 
    Pervasives.compare t1 t2
    (** whooo! *)
end

module TMap = Map.Make(T)

type plan =
    { mutable tasks : extended_task TMap.t;
      config : plan_config;
      cluster : Plasma_client.plasma_cluster
    }

let configure_plan ?(keep_temp_files=false) servers =
  let ips =
    List.map Plasma_util.ip_of_host servers in
  { keep_temp_files = keep_temp_files;
    task_servers = servers;
    task_servers_ip = ips
  }

let hosts plan =
  List.map2
    (fun n ip -> (n,ip))
    plan.config.task_servers
    plan.config.task_servers_ip

let mark_as_completed plan task =
  try
    let ext_task = TMap.find task plan.tasks in
    ext_task.complete <- true
  with
    | Not_found ->
	failwith "Mapred_sched.mark_as_completed: not found"

let mark_as_started plan task =
  try
    let ext_task = TMap.find task plan.tasks in
    ext_task.started <- true
  with
    | Not_found ->
	failwith "Mapred_sched.mark_as_started: not found"

let task_depends_on_list plan task =
  try
    let ext_task = TMap.find task plan.tasks in
    List.map (fun xt -> xt.task) ext_task.deps
  with
    | Not_found ->
	failwith "Mapred_sched.task_depends_on_list: not found"

let kind_rank =
  function
    | `Map _ -> 0
    | `Sort _ -> 1
    | `Shuffle _ -> 2
    | `Reduce _ -> 3

let rank =
  function
    | `Map mt -> mt.map_id
    | `Sort st -> st.sort_id
    | `Shuffle st -> fst st.shuffle_coverage
    | `Reduce _ -> 0  (* reduce tasks are independent from each other *)

let executable_tasks plan =
  let l =
    TMap.fold
      (fun t xt acc ->
	 if not xt.started && List.for_all (fun xt' -> xt'.complete) xt.deps
	 then
	   t :: acc
	 else
	   acc
      )
      plan.tasks
      [] in
  (* Sort in "natural order" *)
  List.sort
    (fun t1 t2 ->
       match (kind_rank t1) - (kind_rank t2) with
	 | 0 -> (rank t1) - (rank t2)
	 | n -> n
    )
    l


let plan_finished plan =
  TMap.fold
    (fun t xt acc ->
       acc && xt.complete)
    plan.tasks
    true

let cluster plan =
  plan.cluster


let group_by n l =
  (* let l' = group_by_n l: 
     we have l = List.flatten l' so that every element of l' has n elements,
     except the last which may have fewer
     e.g. 
     # group_by 3 [0;1;2;3;4;5;6;7;8;9;10];;                           
     - : int list list = [[0; 1; 2]; [3; 4; 5]; [6; 7; 8]; [9; 10]]
   *)
  let rec group_rest acc m l =
    if m = 0 || l = [] then
      (List.rev acc) :: (if l=[] then [] else group_rest [] n l)
    else
      group_rest (List.hd l :: acc) (m-1) (List.tl l) in
  if l = [] then
    []
  else
    group_rest [] n l


let split_range n pmin pmax =
  (* splits the range of numbers pmin...pmax into n groups
     (g1min, g1max), (g2min, g2max), ..., (gnmin,gnmax)
     so that g(k)max = g(k+1)min, and the groups have approx the same size
     e.g.
     # split_range 4 0 10;;
     - : (int * int) list = [(0, 2); (3, 5); (6, 8); (9, 10)]
   *)
  let r = pmax-pmin+1 in   (* number of elements in total *)
  let n = min n r in
  let gsize = r/n in
  let gmod = r mod n in
  let l = ref [] in
  let p = ref pmin in
  for k = 0 to n-1 do
    let size = gsize + (if k < gmod then 1 else 0) in
    l := (!p, !p+size-1) :: !l;
    p := !p + size;
  done;
  List.rev !l


let ranked_hosts servertab blocks =
  (** Get the list of hosts storing [blocks] as list of pairs [(p,node)]
      where [p] is the share (range 0 to 1), and [node] is the IP address
   *)
  let ht = Hashtbl.create 5 in
  let n = ref 0 in
  List.iter
    (fun b ->
       if b.node_alive then (
	 let (h,p) = Plasma_util.parse_host_port b.node in
	 let ip =
	   try Unix.inet_addr_of_string h 
	   with _ ->
	     failwith ("Mapred_sched.ranked_hosts: not an IP address: " ^ h) in
	 if Hashtbl.mem servertab ip then (
	   let k = try Hashtbl.find ht ip with Not_found -> 0 in
	   Hashtbl.replace ht ip (k+1);
	   incr n
	 )
       )
    )
    blocks;
  let l = Hashtbl.fold (fun ip k acc -> (ip,k)::acc) ht [] in
  let l' = List.map (fun (ip,k) -> (float k /. float !n, ip)) l in
  List.sort
    (fun (p1,_) (p2,_) -> compare p1 p2)
    l'


(*
let best_hosts c plan task =
  (* fake impl: only look at the first 128 blocks *)
  XXX
 *)

let create_plan c mj pc =
  let plan =
    { tasks = TMap.empty;
      config = pc;
      cluster = c;
    } in

  (** Create map tasks. For every host we determine the number of map
     tasks it should get. 

     We iterate over the files and the blocklists. Look for sequences
     that are on the same host. When we have identified such a sequence,
     add it to the map task list of this host. If this list is already
     full, take another host randomly.
   *)
  let n_hosts = List.length pc.task_servers in
  let n_tasks_per_host = mj#map_tasks / n_hosts + 1 in

  let servertab = Hashtbl.create 5 in   (** all task servers *)
  List.iter
    (fun ip -> Hashtbl.replace servertab ip ())
    pc.task_servers_ip;

  let trans = Plasma_client.start c in
  let files = Plasma_client.list trans mj#input_dir in
  let files_ii =
    List.map
      (fun (file,inode) -> 
	 let ii = Plasma_client.get_inodeinfo trans inode in
	 (file,inode,ii)
      )
      files in
  let total_blocksL =
    List.fold_left
      (fun acc (_,_,ii) ->
	 Int64.add acc ii.blocklimit
      )
      0L
      files_ii in
  let n_blocks_per_taskL = 
    Int64.div total_blocksL (Int64.of_int mj#map_tasks) in

  let tasks_per_host = Hashtbl.create 5 in
  (** these are "full" tasks only. Value is a list of map_tasks *)

  let num_tasks_per_host = Hashtbl.create 5 in
  (** length of the lists in tasks_per_host *)

  let cur_task = Hashtbl.create 5 in
  (** this is the current task: [(size, list)]. [size] in blocks.
      [list] is a list of tuples [(filename,pos,len)].
   *)

  let c_map = ref 0 in

  let get_map_task ip list =
    let id = !c_map in
    incr c_map;
    { map_input = (List.map
		     (fun (filename,pos,len) ->
			sprintf "%s/%s" mj#input_dir filename, pos, len
		     )
		     list);
      map_output = sprintf "%s/mapped_%06d" mj#work_dir id;
      map_id = id;
      map_best_hosts = [ip];
    } in

  List.iter
    (fun (filename,inode,ii) ->
       (** analyze in groups of 128 blocks *)
       let n_groupsL = 
	 Int64.succ (Int64.div (Int64.pred ii.blocklimit) 128L) in
       if n_groupsL > Int64.of_int max_int then
	 failwith "Mapred_sched: input file has too many blocks";
       (* well, that limit is really high, on 32 bit systems:
	  max_int = 1G - 1, so the max size is roughly 1G * blocksize,
	  e.g. 1P for blocksize=1M. Also, you run out of memory way
	  before that!
	*)
       
       let n_groups = Int64.to_int n_groupsL in
       for g = 0 to n_groups - 1 do
	 (** Retrieve the blocklist (EXPENSIVE) *)
	 let pos0 = Int64.mul (Int64.of_int g) 128L in
	 let pos1 = min (Int64.add pos0 128L) ii.blocklimit in
	 let lenL = Int64.sub pos1 pos0 in
	 let len = Int64.to_int lenL in
	 let blocks = Plasma_client.get_blocklist trans inode pos0 len in
	 (** Get a ranking of the nodes: *)
	 let nodes0 = ranked_hosts servertab blocks in
	 (** nodes0 may be empty for two reasons:
	    - all nodes dead
	    - a hole in the file
	    We check here only for the first. Holes will also be rejected
	    by the record reader, and it is much simpler to find out there.
	  *)
	 if nodes0 = [] then (
	   if blocks <> [] then
	     failwith ("File inaccessible (no live data nodes): " ^
			 mj#input_dir ^ "/" ^ filename)
	 )
	 else (
	   let nodes = 
	     List.map (fun (p,ip) -> (p,ip,false)) nodes0 @
	       [ let (p,ip) = List.hd nodes0 in (p,ip,true) ] in
	   (** Try to assign the task: *)
	   ( try
	       (** iterate over nodes. Skip nodes when num_tasks_per_host is
		  >= n_tasks_per_hosts. Otherwise add the task to cur_task.
		  If cur_task is >= blocks_per_task, move it to tasks_per_host.
		*)
	       List.iter
		 (fun (p,ip,is_last) ->
		    let n = 
		      try Hashtbl.find num_tasks_per_host ip 
		      with Not_found -> 0 in
		    if n < n_tasks_per_host || is_last then (
		      let ct_n, ct_l =
			try Hashtbl.find cur_task ip 
			with Not_found -> (0L, []) in
		      let ct_n' = Int64.add ct_n lenL in
		      let ct_l' = (filename,pos0,lenL) :: ct_l in
		      if ct_n' >= n_blocks_per_taskL then (
			Hashtbl.remove cur_task ip;
			let task = get_map_task ip ct_l' in
			let ext_task = new_xtask (`Map task) in
			plan.tasks <- TMap.add (`Map task) ext_task plan.tasks;
			let full_l =
			  try Hashtbl.find tasks_per_host ip
			  with Not_found -> [] in
			Hashtbl.replace tasks_per_host ip (task :: full_l);
			Hashtbl.replace num_tasks_per_host ip (n+1)
		      )
		      else
			Hashtbl.replace cur_task ip (ct_n', ct_l');
		      raise Exit
		    )
		 )
		 nodes
	     with
	       | Exit -> ()
	   )
	 );
       done
       
    )
    files_ii;

  Plasma_client.commit trans;

  (* Move anything in cur_task to tasks_per_host *)
  Hashtbl.iter
    (fun ip (ct_n, ct_l) ->
       let task = get_map_task ip ct_l in
       let ext_task = new_xtask (`Map task) in
       plan.tasks <- TMap.add (`Map task) ext_task plan.tasks;
       let n = 
	 try Hashtbl.find num_tasks_per_host ip 
	 with Not_found -> 0 in
       let full_l =
	 try Hashtbl.find tasks_per_host ip
	 with Not_found -> [] in
       Hashtbl.replace tasks_per_host ip (task :: full_l);
       Hashtbl.replace num_tasks_per_host ip (n + 1);
    )
    cur_task;

  (* Finally create the map task array by iterating over tasks_per_host *)
  let n_map_tasks = !c_map in
  let dummy = get_map_task Unix.inet_addr_any [] in
  let (map_tasks : map_task array) = 
    Array.make n_map_tasks dummy in

  Hashtbl.iter
    (fun ip l ->
       List.iter
	 (fun task ->
	    map_tasks.( task.map_id ) <- task;
	 )
	 l
    )
    tasks_per_host;

  (* ----------------------- Create sort tasks ------------------------ *)

  let sort_tasks =
    Array.mapi
      (fun i mt ->
	 { sort_input = mt.map_output;
	   sort_input_del = not pc.keep_temp_files;
	   sort_output = sprintf "%s/sorted_%06d" mj#work_dir i;
	   sort_id = mt.map_id
	 }
      )
      map_tasks in
  Array.iteri
    (fun i st ->
       let mt = map_tasks.(i) in
       let ext_task =
	 { (new_xtask (`Sort st)) with
	     deps = [ TMap.find (`Map mt) plan.tasks ];
	 } in
       plan.tasks <- TMap.add ext_task.task ext_task plan.tasks
    )
    sort_tasks;

  let indexed_sorts =
    Array.to_list (Array.mapi (fun i st -> (i,st)) sort_tasks) in

  (* ------------------ Create shuffle and reduce tasks ---------------- *)

  let c_shuffle = ref 0 in

  let mk_shuffle_out_name() =
    let z = !c_shuffle in
    incr c_shuffle;
    sprintf "%s/shuffle_%06d" mj#work_dir z in

  let shuffle_input =
    List.map
      (fun (i,st) ->
	 (i, i, `Sort st)
      )
      indexed_sorts in

  let pmax_total = mj # partitions-1 in

  let matrix = ref [ (0, pmax_total, shuffle_input) ] in
  (* rows: partitions
     columns: map tasks
   *)

  let output_file_of_task t pmin pmax =
    match t with
      | `Map _ | `Reduce _ ->
	  assert false
      | `Sort st ->
	  st.sort_output
      | `Shuffle st ->
	  let l =
	    List.filter
	      (fun (f, qmin, qmax) -> qmin=pmin && qmax=pmax)
	      st.shuffle_output in
	  match l with
	    | [ f,_,_ ] -> f
	    | [] ->
		assert false
	    | _ -> 
		assert false
  in
  
  while !matrix <> [] do
    let next_matrix = ref [] in
    List.iter
      (fun (pmin,pmax,dep_tasks) ->
	 match dep_tasks with
	   | [] ->
	       assert false
	   | _ ->
	       let grouped_dep_tasks = group_by 8 dep_tasks in
	       let full_output = 
		 match grouped_dep_tasks with [_] -> true | _ -> false in
	       let output_parts =
		 if full_output then
		   split_range (pmax-pmin+1) pmin pmax
		 else
		   split_range 4 pmin pmax in
	       (* Now we want to shuffle as follows:
		  - for each group of grouped_dep_tasks we create
		    a shuffle task that reads the files of the group,
		    shuffles them, and outputs into the files designated
		    by output_parts
		*)
	       let tasks =
		 List.map
		   (fun dep_task_group ->
		      let (kmin,_,_) = List.hd dep_task_group in
		      let (_,kmax,_) = List.hd (List.rev dep_task_group) in
		      let shuffle_input =
			List.map
			  (fun (jmin,jmax,t) ->
			     (output_file_of_task t pmin pmax, jmin, jmax)
			  )
			  dep_task_group in
		      let shuffle_output =
			List.map
			  (fun (qmin,qmax) ->
			     let file =
			       mk_shuffle_out_name() in
			     (file, qmin, qmax)
			  )
			  output_parts in

		      let t =
			{ shuffle_input = shuffle_input;
			  shuffle_input_del = not pc.keep_temp_files;
			  shuffle_output = shuffle_output;
			  shuffle_partitions = (pmin,pmax);
			  shuffle_coverage = (kmin,kmax)
			} in
		      (* Add this task to the plan: *)
		      let deps = 
			List.map (fun (_,_,dt) -> dt) dep_task_group in
		      let ext_task =
			{ (new_xtask (`Shuffle t)) with
			    deps = 
			    List.map (fun dt -> TMap.find dt plan.tasks) deps;
			} in
		      plan.tasks <- TMap.add (`Shuffle t) ext_task plan.tasks;
		      t
		   )
		   grouped_dep_tasks in

	       (* What remains to do? Do this separately for every
		  output partition (qmin,qmax). If for such a partition
		  only one output file exists, we are done! Otherwise
		  look for all tasks generating output files for (qmin,qmax).
		  These are the inputs for the next round.
		*)
	       let ht = Hashtbl.create 5 in
	       List.iter
		 (fun task ->
		    List.iter
		      (fun (file,qmin,qmax) ->
			 let l = 
			   try Hashtbl.find ht qmin with Not_found -> [] in
			 Hashtbl.replace ht qmin (task :: l)
		      )
		      task.shuffle_output
		 )
		 tasks;
	       List.iter
		 (fun (qmin,qmax) ->
		    let tasks_part =
		      try Hashtbl.find ht qmin with Not_found -> [] in
		    match tasks_part with
		      | [] ->
			  (* strange, but ignore *) 
			  ()
		      | [ task ] ->
			  (* case: can trigger reduce *)
			  assert(qmin=qmax);
			  let (shuffle_out,_,_) =
			    List.find 
			      (fun (_,qmin',_) -> qmin=qmin')
			      task.shuffle_output in
			  let reduce_out =
			    sprintf "%s/reduce_%04d" mj#output_dir qmin in
			  let reduce_task =
			    { reduce_input = shuffle_out;
			      reduce_input_del = not pc.keep_temp_files;
			      reduce_output = reduce_out;
			      reduce_partition = qmin
			    } in
			  (* Add this task to the plan: *)
			  let ext_task =
			    { (new_xtask (`Reduce reduce_task)) with
				deps = [TMap.find (`Shuffle task) plan.tasks];
			    } in
			  plan.tasks <- 
			    TMap.add (`Reduce reduce_task) ext_task plan.tasks;
			  ()
		      | _ ->
			  let dep_tasks' =
			    List.map
			      (fun t ->
				 let (kmin,kmax) = t.shuffle_coverage in
				 (kmin,kmax,`Shuffle t)
			      )
			      (List.rev tasks_part)
			  in
			  next_matrix := 
			    (qmin,qmax,dep_tasks') :: !next_matrix
		 )
		 output_parts
      )
      !matrix;
    matrix := List.rev !next_matrix
  done;
  
  plan


let print_plan plan =
  TMap.iter
    (fun _ xt ->
       Mapred_tasks.print_task xt.task 0;
       if xt.deps <> [] then (
	 printf "    Deps:\n";
	 List.iter
	   (fun dt ->
	      Mapred_tasks.print_task_id dt.task 8
	   )
	   xt.deps
       )
    )
    plan.tasks


This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml