(* Copyright 2010 Gerd Stolpmann This file is part of Plasma, a distributed filesystem and a map/reduce computation framework. Unless you have a written license agreement with the copyright holder (Gerd Stolpmann), the following terms apply: Plasma is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. Plasma is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with Foobar. If not, see <http://www.gnu.org/licenses/>. *) (* $Id: mapred_sched1.ml 239 2010-06-23 16:49:03Z gerd $ *) open Mapred_tasks open Plasma_rpcapi_aux open Printf type plan_config = { keep_temp_files : bool; task_servers : string list; task_servers_ip : Unix.inet_addr list; } type extended_task = { task : task; deps : extended_task list; mutable complete : bool; mutable started : bool; } let new_xtask t = { task = t; deps = []; complete = false; started = false } module T = struct type t = task let compare (t1:task) (t2:task) = Pervasives.compare t1 t2 (** whooo! *) end module TMap = Map.Make(T) type plan = { mutable tasks : extended_task TMap.t; config : plan_config; cluster : Plasma_client.plasma_cluster } let configure_plan ?(keep_temp_files=false) servers = let ips = List.map Plasma_util.ip_of_host servers in { keep_temp_files = keep_temp_files; task_servers = servers; task_servers_ip = ips } let hosts plan = List.map2 (fun n ip -> (n,ip)) plan.config.task_servers plan.config.task_servers_ip let mark_as_completed plan task = try let ext_task = TMap.find task plan.tasks in ext_task.complete <- true with | Not_found -> failwith "Mapred_sched.mark_as_completed: not found" let mark_as_started plan task = try let ext_task = TMap.find task plan.tasks in ext_task.started <- true with | Not_found -> failwith "Mapred_sched.mark_as_started: not found" let task_depends_on_list plan task = try let ext_task = TMap.find task plan.tasks in List.map (fun xt -> xt.task) ext_task.deps with | Not_found -> failwith "Mapred_sched.task_depends_on_list: not found" let kind_rank = function | `Map _ -> 0 | `Sort _ -> 1 | `Shuffle _ -> 2 | `Reduce _ -> 3 let rank = function | `Map mt -> mt.map_id | `Sort st -> st.sort_id | `Shuffle st -> fst st.shuffle_coverage | `Reduce _ -> 0 (* reduce tasks are independent from each other *) let executable_tasks plan = let l = TMap.fold (fun t xt acc -> if not xt.started && List.for_all (fun xt' -> xt'.complete) xt.deps then t :: acc else acc ) plan.tasks [] in (* Sort in "natural order" *) List.sort (fun t1 t2 -> match (kind_rank t1) - (kind_rank t2) with | 0 -> (rank t1) - (rank t2) | n -> n ) l let plan_finished plan = TMap.fold (fun t xt acc -> acc && xt.complete) plan.tasks true let cluster plan = plan.cluster let group_by n l = (* let l' = group_by_n l: we have l = List.flatten l' so that every element of l' has n elements, except the last which may have fewer e.g. # group_by 3 [0;1;2;3;4;5;6;7;8;9;10];; - : int list list = [[0; 1; 2]; [3; 4; 5]; [6; 7; 8]; [9; 10]] *) let rec group_rest acc m l = if m = 0 || l = [] then (List.rev acc) :: (if l=[] then [] else group_rest [] n l) else group_rest (List.hd l :: acc) (m-1) (List.tl l) in if l = [] then [] else group_rest [] n l let split_range n pmin pmax = (* splits the range of numbers pmin...pmax into n groups (g1min, g1max), (g2min, g2max), ..., (gnmin,gnmax) so that g(k)max = g(k+1)min, and the groups have approx the same size e.g. # split_range 4 0 10;; - : (int * int) list = [(0, 2); (3, 5); (6, 8); (9, 10)] *) let r = pmax-pmin+1 in (* number of elements in total *) let n = min n r in let gsize = r/n in let gmod = r mod n in let l = ref [] in let p = ref pmin in for k = 0 to n-1 do let size = gsize + (if k < gmod then 1 else 0) in l := (!p, !p+size-1) :: !l; p := !p + size; done; List.rev !l let ranked_hosts servertab blocks = (** Get the list of hosts storing [blocks] as list of pairs [(p,node)] where [p] is the share (range 0 to 1), and [node] is the IP address *) let ht = Hashtbl.create 5 in let n = ref 0 in List.iter (fun b -> if b.node_alive then ( let (h,p) = Plasma_util.parse_host_port b.node in let ip = try Unix.inet_addr_of_string h with _ -> failwith ("Mapred_sched.ranked_hosts: not an IP address: " ^ h) in if Hashtbl.mem servertab ip then ( let k = try Hashtbl.find ht ip with Not_found -> 0 in Hashtbl.replace ht ip (k+1); incr n ) ) ) blocks; let l = Hashtbl.fold (fun ip k acc -> (ip,k)::acc) ht [] in let l' = List.map (fun (ip,k) -> (float k /. float !n, ip)) l in List.sort (fun (p1,_) (p2,_) -> compare p1 p2) l' (* let best_hosts c plan task = (* fake impl: only look at the first 128 blocks *) XXX *) let create_plan c mj pc = let plan = { tasks = TMap.empty; config = pc; cluster = c; } in (** Create map tasks. For every host we determine the number of map tasks it should get. We iterate over the files and the blocklists. Look for sequences that are on the same host. When we have identified such a sequence, add it to the map task list of this host. If this list is already full, take another host randomly. *) let n_hosts = List.length pc.task_servers in let n_tasks_per_host = mj#map_tasks / n_hosts + 1 in let servertab = Hashtbl.create 5 in (** all task servers *) List.iter (fun ip -> Hashtbl.replace servertab ip ()) pc.task_servers_ip; let trans = Plasma_client.start c in let files = Plasma_client.list trans mj#input_dir in let files_ii = List.map (fun (file,inode) -> let ii = Plasma_client.get_inodeinfo trans inode in (file,inode,ii) ) files in let total_blocksL = List.fold_left (fun acc (_,_,ii) -> Int64.add acc ii.blocklimit ) 0L files_ii in let n_blocks_per_taskL = Int64.div total_blocksL (Int64.of_int mj#map_tasks) in let tasks_per_host = Hashtbl.create 5 in (** these are "full" tasks only. Value is a list of map_tasks *) let num_tasks_per_host = Hashtbl.create 5 in (** length of the lists in tasks_per_host *) let cur_task = Hashtbl.create 5 in (** this is the current task: [(size, list)]. [size] in blocks. [list] is a list of tuples [(filename,pos,len)]. *) let c_map = ref 0 in let get_map_task ip list = let id = !c_map in incr c_map; { map_input = (List.map (fun (filename,pos,len) -> sprintf "%s/%s" mj#input_dir filename, pos, len ) list); map_output = sprintf "%s/mapped_%06d" mj#work_dir id; map_id = id; map_best_hosts = [ip]; } in List.iter (fun (filename,inode,ii) -> (** analyze in groups of 128 blocks *) let n_groupsL = Int64.succ (Int64.div (Int64.pred ii.blocklimit) 128L) in if n_groupsL > Int64.of_int max_int then failwith "Mapred_sched: input file has too many blocks"; (* well, that limit is really high, on 32 bit systems: max_int = 1G - 1, so the max size is roughly 1G * blocksize, e.g. 1P for blocksize=1M. Also, you run out of memory way before that! *) let n_groups = Int64.to_int n_groupsL in for g = 0 to n_groups - 1 do (** Retrieve the blocklist (EXPENSIVE) *) let pos0 = Int64.mul (Int64.of_int g) 128L in let pos1 = min (Int64.add pos0 128L) ii.blocklimit in let lenL = Int64.sub pos1 pos0 in let len = Int64.to_int lenL in let blocks = Plasma_client.get_blocklist trans inode pos0 len in (** Get a ranking of the nodes: *) let nodes0 = ranked_hosts servertab blocks in (** nodes0 may be empty for two reasons: - all nodes dead - a hole in the file We check here only for the first. Holes will also be rejected by the record reader, and it is much simpler to find out there. *) if nodes0 = [] then ( if blocks <> [] then failwith ("File inaccessible (no live data nodes): " ^ mj#input_dir ^ "/" ^ filename) ) else ( let nodes = List.map (fun (p,ip) -> (p,ip,false)) nodes0 @ [ let (p,ip) = List.hd nodes0 in (p,ip,true) ] in (** Try to assign the task: *) ( try (** iterate over nodes. Skip nodes when num_tasks_per_host is >= n_tasks_per_hosts. Otherwise add the task to cur_task. If cur_task is >= blocks_per_task, move it to tasks_per_host. *) List.iter (fun (p,ip,is_last) -> let n = try Hashtbl.find num_tasks_per_host ip with Not_found -> 0 in if n < n_tasks_per_host || is_last then ( let ct_n, ct_l = try Hashtbl.find cur_task ip with Not_found -> (0L, []) in let ct_n' = Int64.add ct_n lenL in let ct_l' = (filename,pos0,lenL) :: ct_l in if ct_n' >= n_blocks_per_taskL then ( Hashtbl.remove cur_task ip; let task = get_map_task ip ct_l' in let ext_task = new_xtask (`Map task) in plan.tasks <- TMap.add (`Map task) ext_task plan.tasks; let full_l = try Hashtbl.find tasks_per_host ip with Not_found -> [] in Hashtbl.replace tasks_per_host ip (task :: full_l); Hashtbl.replace num_tasks_per_host ip (n+1) ) else Hashtbl.replace cur_task ip (ct_n', ct_l'); raise Exit ) ) nodes with | Exit -> () ) ); done ) files_ii; Plasma_client.commit trans; (* Move anything in cur_task to tasks_per_host *) Hashtbl.iter (fun ip (ct_n, ct_l) -> let task = get_map_task ip ct_l in let ext_task = new_xtask (`Map task) in plan.tasks <- TMap.add (`Map task) ext_task plan.tasks; let n = try Hashtbl.find num_tasks_per_host ip with Not_found -> 0 in let full_l = try Hashtbl.find tasks_per_host ip with Not_found -> [] in Hashtbl.replace tasks_per_host ip (task :: full_l); Hashtbl.replace num_tasks_per_host ip (n + 1); ) cur_task; (* Finally create the map task array by iterating over tasks_per_host *) let n_map_tasks = !c_map in let dummy = get_map_task Unix.inet_addr_any [] in let (map_tasks : map_task array) = Array.make n_map_tasks dummy in Hashtbl.iter (fun ip l -> List.iter (fun task -> map_tasks.( task.map_id ) <- task; ) l ) tasks_per_host; (* ----------------------- Create sort tasks ------------------------ *) let sort_tasks = Array.mapi (fun i mt -> { sort_input = mt.map_output; sort_input_del = not pc.keep_temp_files; sort_output = sprintf "%s/sorted_%06d" mj#work_dir i; sort_id = mt.map_id } ) map_tasks in Array.iteri (fun i st -> let mt = map_tasks.(i) in let ext_task = { (new_xtask (`Sort st)) with deps = [ TMap.find (`Map mt) plan.tasks ]; } in plan.tasks <- TMap.add ext_task.task ext_task plan.tasks ) sort_tasks; let indexed_sorts = Array.to_list (Array.mapi (fun i st -> (i,st)) sort_tasks) in (* ------------------ Create shuffle and reduce tasks ---------------- *) let c_shuffle = ref 0 in let mk_shuffle_out_name() = let z = !c_shuffle in incr c_shuffle; sprintf "%s/shuffle_%06d" mj#work_dir z in let shuffle_input = List.map (fun (i,st) -> (i, i, `Sort st) ) indexed_sorts in let pmax_total = mj # partitions-1 in let matrix = ref [ (0, pmax_total, shuffle_input) ] in (* rows: partitions columns: map tasks *) let output_file_of_task t pmin pmax = match t with | `Map _ | `Reduce _ -> assert false | `Sort st -> st.sort_output | `Shuffle st -> let l = List.filter (fun (f, qmin, qmax) -> qmin=pmin && qmax=pmax) st.shuffle_output in match l with | [ f,_,_ ] -> f | [] -> assert false | _ -> assert false in while !matrix <> [] do let next_matrix = ref [] in List.iter (fun (pmin,pmax,dep_tasks) -> match dep_tasks with | [] -> assert false | _ -> let grouped_dep_tasks = group_by 8 dep_tasks in let full_output = match grouped_dep_tasks with [_] -> true | _ -> false in let output_parts = if full_output then split_range (pmax-pmin+1) pmin pmax else split_range 4 pmin pmax in (* Now we want to shuffle as follows: - for each group of grouped_dep_tasks we create a shuffle task that reads the files of the group, shuffles them, and outputs into the files designated by output_parts *) let tasks = List.map (fun dep_task_group -> let (kmin,_,_) = List.hd dep_task_group in let (_,kmax,_) = List.hd (List.rev dep_task_group) in let shuffle_input = List.map (fun (jmin,jmax,t) -> (output_file_of_task t pmin pmax, jmin, jmax) ) dep_task_group in let shuffle_output = List.map (fun (qmin,qmax) -> let file = mk_shuffle_out_name() in (file, qmin, qmax) ) output_parts in let t = { shuffle_input = shuffle_input; shuffle_input_del = not pc.keep_temp_files; shuffle_output = shuffle_output; shuffle_partitions = (pmin,pmax); shuffle_coverage = (kmin,kmax) } in (* Add this task to the plan: *) let deps = List.map (fun (_,_,dt) -> dt) dep_task_group in let ext_task = { (new_xtask (`Shuffle t)) with deps = List.map (fun dt -> TMap.find dt plan.tasks) deps; } in plan.tasks <- TMap.add (`Shuffle t) ext_task plan.tasks; t ) grouped_dep_tasks in (* What remains to do? Do this separately for every output partition (qmin,qmax). If for such a partition only one output file exists, we are done! Otherwise look for all tasks generating output files for (qmin,qmax). These are the inputs for the next round. *) let ht = Hashtbl.create 5 in List.iter (fun task -> List.iter (fun (file,qmin,qmax) -> let l = try Hashtbl.find ht qmin with Not_found -> [] in Hashtbl.replace ht qmin (task :: l) ) task.shuffle_output ) tasks; List.iter (fun (qmin,qmax) -> let tasks_part = try Hashtbl.find ht qmin with Not_found -> [] in match tasks_part with | [] -> (* strange, but ignore *) () | [ task ] -> (* case: can trigger reduce *) assert(qmin=qmax); let (shuffle_out,_,_) = List.find (fun (_,qmin',_) -> qmin=qmin') task.shuffle_output in let reduce_out = sprintf "%s/reduce_%04d" mj#output_dir qmin in let reduce_task = { reduce_input = shuffle_out; reduce_input_del = not pc.keep_temp_files; reduce_output = reduce_out; reduce_partition = qmin } in (* Add this task to the plan: *) let ext_task = { (new_xtask (`Reduce reduce_task)) with deps = [TMap.find (`Shuffle task) plan.tasks]; } in plan.tasks <- TMap.add (`Reduce reduce_task) ext_task plan.tasks; () | _ -> let dep_tasks' = List.map (fun t -> let (kmin,kmax) = t.shuffle_coverage in (kmin,kmax,`Shuffle t) ) (List.rev tasks_part) in next_matrix := (qmin,qmax,dep_tasks') :: !next_matrix ) output_parts ) !matrix; matrix := List.rev !next_matrix done; plan let print_plan plan = TMap.iter (fun _ xt -> Mapred_tasks.print_task xt.task 0; if xt.deps <> [] then ( printf " Deps:\n"; List.iter (fun dt -> Mapred_tasks.print_task_id dt.task 8 ) xt.deps ) ) plan.tasks