(*
Copyright 2010 Gerd Stolpmann
This file is part of Plasma, a distributed filesystem and a
map/reduce computation framework. Unless you have a written license
agreement with the copyright holder (Gerd Stolpmann), the following
terms apply:
Plasma is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Plasma is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Foobar. If not, see <http://www.gnu.org/licenses/>.
*)
(* $Id: mapred_sched1.ml 239 2010-06-23 16:49:03Z gerd $ *)
open Mapred_tasks
open Plasma_rpcapi_aux
open Printf
type plan_config =
{ keep_temp_files : bool;
task_servers : string list;
task_servers_ip : Unix.inet_addr list;
}
type extended_task =
{ task : task;
deps : extended_task list;
mutable complete : bool;
mutable started : bool;
}
let new_xtask t =
{ task = t; deps = []; complete = false; started = false }
module T = struct
type t = task
let compare (t1:task) (t2:task) =
Pervasives.compare t1 t2
(** whooo! *)
end
module TMap = Map.Make(T)
type plan =
{ mutable tasks : extended_task TMap.t;
config : plan_config;
cluster : Plasma_client.plasma_cluster
}
let configure_plan ?(keep_temp_files=false) servers =
let ips =
List.map Plasma_util.ip_of_host servers in
{ keep_temp_files = keep_temp_files;
task_servers = servers;
task_servers_ip = ips
}
let hosts plan =
List.map2
(fun n ip -> (n,ip))
plan.config.task_servers
plan.config.task_servers_ip
let mark_as_completed plan task =
try
let ext_task = TMap.find task plan.tasks in
ext_task.complete <- true
with
| Not_found ->
failwith "Mapred_sched.mark_as_completed: not found"
let mark_as_started plan task =
try
let ext_task = TMap.find task plan.tasks in
ext_task.started <- true
with
| Not_found ->
failwith "Mapred_sched.mark_as_started: not found"
let task_depends_on_list plan task =
try
let ext_task = TMap.find task plan.tasks in
List.map (fun xt -> xt.task) ext_task.deps
with
| Not_found ->
failwith "Mapred_sched.task_depends_on_list: not found"
let kind_rank =
function
| `Map _ -> 0
| `Sort _ -> 1
| `Shuffle _ -> 2
| `Reduce _ -> 3
let rank =
function
| `Map mt -> mt.map_id
| `Sort st -> st.sort_id
| `Shuffle st -> fst st.shuffle_coverage
| `Reduce _ -> 0 (* reduce tasks are independent from each other *)
let executable_tasks plan =
let l =
TMap.fold
(fun t xt acc ->
if not xt.started && List.for_all (fun xt' -> xt'.complete) xt.deps
then
t :: acc
else
acc
)
plan.tasks
[] in
(* Sort in "natural order" *)
List.sort
(fun t1 t2 ->
match (kind_rank t1) - (kind_rank t2) with
| 0 -> (rank t1) - (rank t2)
| n -> n
)
l
let plan_finished plan =
TMap.fold
(fun t xt acc ->
acc && xt.complete)
plan.tasks
true
let cluster plan =
plan.cluster
let group_by n l =
(* let l' = group_by_n l:
we have l = List.flatten l' so that every element of l' has n elements,
except the last which may have fewer
e.g.
# group_by 3 [0;1;2;3;4;5;6;7;8;9;10];;
- : int list list = [[0; 1; 2]; [3; 4; 5]; [6; 7; 8]; [9; 10]]
*)
let rec group_rest acc m l =
if m = 0 || l = [] then
(List.rev acc) :: (if l=[] then [] else group_rest [] n l)
else
group_rest (List.hd l :: acc) (m-1) (List.tl l) in
if l = [] then
[]
else
group_rest [] n l
let split_range n pmin pmax =
(* splits the range of numbers pmin...pmax into n groups
(g1min, g1max), (g2min, g2max), ..., (gnmin,gnmax)
so that g(k)max = g(k+1)min, and the groups have approx the same size
e.g.
# split_range 4 0 10;;
- : (int * int) list = [(0, 2); (3, 5); (6, 8); (9, 10)]
*)
let r = pmax-pmin+1 in (* number of elements in total *)
let n = min n r in
let gsize = r/n in
let gmod = r mod n in
let l = ref [] in
let p = ref pmin in
for k = 0 to n-1 do
let size = gsize + (if k < gmod then 1 else 0) in
l := (!p, !p+size-1) :: !l;
p := !p + size;
done;
List.rev !l
let ranked_hosts servertab blocks =
(** Get the list of hosts storing [blocks] as list of pairs [(p,node)]
where [p] is the share (range 0 to 1), and [node] is the IP address
*)
let ht = Hashtbl.create 5 in
let n = ref 0 in
List.iter
(fun b ->
if b.node_alive then (
let (h,p) = Plasma_util.parse_host_port b.node in
let ip =
try Unix.inet_addr_of_string h
with _ ->
failwith ("Mapred_sched.ranked_hosts: not an IP address: " ^ h) in
if Hashtbl.mem servertab ip then (
let k = try Hashtbl.find ht ip with Not_found -> 0 in
Hashtbl.replace ht ip (k+1);
incr n
)
)
)
blocks;
let l = Hashtbl.fold (fun ip k acc -> (ip,k)::acc) ht [] in
let l' = List.map (fun (ip,k) -> (float k /. float !n, ip)) l in
List.sort
(fun (p1,_) (p2,_) -> compare p1 p2)
l'
(*
let best_hosts c plan task =
(* fake impl: only look at the first 128 blocks *)
XXX
*)
let create_plan c mj pc =
let plan =
{ tasks = TMap.empty;
config = pc;
cluster = c;
} in
(** Create map tasks. For every host we determine the number of map
tasks it should get.
We iterate over the files and the blocklists. Look for sequences
that are on the same host. When we have identified such a sequence,
add it to the map task list of this host. If this list is already
full, take another host randomly.
*)
let n_hosts = List.length pc.task_servers in
let n_tasks_per_host = mj#map_tasks / n_hosts + 1 in
let servertab = Hashtbl.create 5 in (** all task servers *)
List.iter
(fun ip -> Hashtbl.replace servertab ip ())
pc.task_servers_ip;
let trans = Plasma_client.start c in
let files = Plasma_client.list trans mj#input_dir in
let files_ii =
List.map
(fun (file,inode) ->
let ii = Plasma_client.get_inodeinfo trans inode in
(file,inode,ii)
)
files in
let total_blocksL =
List.fold_left
(fun acc (_,_,ii) ->
Int64.add acc ii.blocklimit
)
0L
files_ii in
let n_blocks_per_taskL =
Int64.div total_blocksL (Int64.of_int mj#map_tasks) in
let tasks_per_host = Hashtbl.create 5 in
(** these are "full" tasks only. Value is a list of map_tasks *)
let num_tasks_per_host = Hashtbl.create 5 in
(** length of the lists in tasks_per_host *)
let cur_task = Hashtbl.create 5 in
(** this is the current task: [(size, list)]. [size] in blocks.
[list] is a list of tuples [(filename,pos,len)].
*)
let c_map = ref 0 in
let get_map_task ip list =
let id = !c_map in
incr c_map;
{ map_input = (List.map
(fun (filename,pos,len) ->
sprintf "%s/%s" mj#input_dir filename, pos, len
)
list);
map_output = sprintf "%s/mapped_%06d" mj#work_dir id;
map_id = id;
map_best_hosts = [ip];
} in
List.iter
(fun (filename,inode,ii) ->
(** analyze in groups of 128 blocks *)
let n_groupsL =
Int64.succ (Int64.div (Int64.pred ii.blocklimit) 128L) in
if n_groupsL > Int64.of_int max_int then
failwith "Mapred_sched: input file has too many blocks";
(* well, that limit is really high, on 32 bit systems:
max_int = 1G - 1, so the max size is roughly 1G * blocksize,
e.g. 1P for blocksize=1M. Also, you run out of memory way
before that!
*)
let n_groups = Int64.to_int n_groupsL in
for g = 0 to n_groups - 1 do
(** Retrieve the blocklist (EXPENSIVE) *)
let pos0 = Int64.mul (Int64.of_int g) 128L in
let pos1 = min (Int64.add pos0 128L) ii.blocklimit in
let lenL = Int64.sub pos1 pos0 in
let len = Int64.to_int lenL in
let blocks = Plasma_client.get_blocklist trans inode pos0 len in
(** Get a ranking of the nodes: *)
let nodes0 = ranked_hosts servertab blocks in
(** nodes0 may be empty for two reasons:
- all nodes dead
- a hole in the file
We check here only for the first. Holes will also be rejected
by the record reader, and it is much simpler to find out there.
*)
if nodes0 = [] then (
if blocks <> [] then
failwith ("File inaccessible (no live data nodes): " ^
mj#input_dir ^ "/" ^ filename)
)
else (
let nodes =
List.map (fun (p,ip) -> (p,ip,false)) nodes0 @
[ let (p,ip) = List.hd nodes0 in (p,ip,true) ] in
(** Try to assign the task: *)
( try
(** iterate over nodes. Skip nodes when num_tasks_per_host is
>= n_tasks_per_hosts. Otherwise add the task to cur_task.
If cur_task is >= blocks_per_task, move it to tasks_per_host.
*)
List.iter
(fun (p,ip,is_last) ->
let n =
try Hashtbl.find num_tasks_per_host ip
with Not_found -> 0 in
if n < n_tasks_per_host || is_last then (
let ct_n, ct_l =
try Hashtbl.find cur_task ip
with Not_found -> (0L, []) in
let ct_n' = Int64.add ct_n lenL in
let ct_l' = (filename,pos0,lenL) :: ct_l in
if ct_n' >= n_blocks_per_taskL then (
Hashtbl.remove cur_task ip;
let task = get_map_task ip ct_l' in
let ext_task = new_xtask (`Map task) in
plan.tasks <- TMap.add (`Map task) ext_task plan.tasks;
let full_l =
try Hashtbl.find tasks_per_host ip
with Not_found -> [] in
Hashtbl.replace tasks_per_host ip (task :: full_l);
Hashtbl.replace num_tasks_per_host ip (n+1)
)
else
Hashtbl.replace cur_task ip (ct_n', ct_l');
raise Exit
)
)
nodes
with
| Exit -> ()
)
);
done
)
files_ii;
Plasma_client.commit trans;
(* Move anything in cur_task to tasks_per_host *)
Hashtbl.iter
(fun ip (ct_n, ct_l) ->
let task = get_map_task ip ct_l in
let ext_task = new_xtask (`Map task) in
plan.tasks <- TMap.add (`Map task) ext_task plan.tasks;
let n =
try Hashtbl.find num_tasks_per_host ip
with Not_found -> 0 in
let full_l =
try Hashtbl.find tasks_per_host ip
with Not_found -> [] in
Hashtbl.replace tasks_per_host ip (task :: full_l);
Hashtbl.replace num_tasks_per_host ip (n + 1);
)
cur_task;
(* Finally create the map task array by iterating over tasks_per_host *)
let n_map_tasks = !c_map in
let dummy = get_map_task Unix.inet_addr_any [] in
let (map_tasks : map_task array) =
Array.make n_map_tasks dummy in
Hashtbl.iter
(fun ip l ->
List.iter
(fun task ->
map_tasks.( task.map_id ) <- task;
)
l
)
tasks_per_host;
(* ----------------------- Create sort tasks ------------------------ *)
let sort_tasks =
Array.mapi
(fun i mt ->
{ sort_input = mt.map_output;
sort_input_del = not pc.keep_temp_files;
sort_output = sprintf "%s/sorted_%06d" mj#work_dir i;
sort_id = mt.map_id
}
)
map_tasks in
Array.iteri
(fun i st ->
let mt = map_tasks.(i) in
let ext_task =
{ (new_xtask (`Sort st)) with
deps = [ TMap.find (`Map mt) plan.tasks ];
} in
plan.tasks <- TMap.add ext_task.task ext_task plan.tasks
)
sort_tasks;
let indexed_sorts =
Array.to_list (Array.mapi (fun i st -> (i,st)) sort_tasks) in
(* ------------------ Create shuffle and reduce tasks ---------------- *)
let c_shuffle = ref 0 in
let mk_shuffle_out_name() =
let z = !c_shuffle in
incr c_shuffle;
sprintf "%s/shuffle_%06d" mj#work_dir z in
let shuffle_input =
List.map
(fun (i,st) ->
(i, i, `Sort st)
)
indexed_sorts in
let pmax_total = mj # partitions-1 in
let matrix = ref [ (0, pmax_total, shuffle_input) ] in
(* rows: partitions
columns: map tasks
*)
let output_file_of_task t pmin pmax =
match t with
| `Map _ | `Reduce _ ->
assert false
| `Sort st ->
st.sort_output
| `Shuffle st ->
let l =
List.filter
(fun (f, qmin, qmax) -> qmin=pmin && qmax=pmax)
st.shuffle_output in
match l with
| [ f,_,_ ] -> f
| [] ->
assert false
| _ ->
assert false
in
while !matrix <> [] do
let next_matrix = ref [] in
List.iter
(fun (pmin,pmax,dep_tasks) ->
match dep_tasks with
| [] ->
assert false
| _ ->
let grouped_dep_tasks = group_by 8 dep_tasks in
let full_output =
match grouped_dep_tasks with [_] -> true | _ -> false in
let output_parts =
if full_output then
split_range (pmax-pmin+1) pmin pmax
else
split_range 4 pmin pmax in
(* Now we want to shuffle as follows:
- for each group of grouped_dep_tasks we create
a shuffle task that reads the files of the group,
shuffles them, and outputs into the files designated
by output_parts
*)
let tasks =
List.map
(fun dep_task_group ->
let (kmin,_,_) = List.hd dep_task_group in
let (_,kmax,_) = List.hd (List.rev dep_task_group) in
let shuffle_input =
List.map
(fun (jmin,jmax,t) ->
(output_file_of_task t pmin pmax, jmin, jmax)
)
dep_task_group in
let shuffle_output =
List.map
(fun (qmin,qmax) ->
let file =
mk_shuffle_out_name() in
(file, qmin, qmax)
)
output_parts in
let t =
{ shuffle_input = shuffle_input;
shuffle_input_del = not pc.keep_temp_files;
shuffle_output = shuffle_output;
shuffle_partitions = (pmin,pmax);
shuffle_coverage = (kmin,kmax)
} in
(* Add this task to the plan: *)
let deps =
List.map (fun (_,_,dt) -> dt) dep_task_group in
let ext_task =
{ (new_xtask (`Shuffle t)) with
deps =
List.map (fun dt -> TMap.find dt plan.tasks) deps;
} in
plan.tasks <- TMap.add (`Shuffle t) ext_task plan.tasks;
t
)
grouped_dep_tasks in
(* What remains to do? Do this separately for every
output partition (qmin,qmax). If for such a partition
only one output file exists, we are done! Otherwise
look for all tasks generating output files for (qmin,qmax).
These are the inputs for the next round.
*)
let ht = Hashtbl.create 5 in
List.iter
(fun task ->
List.iter
(fun (file,qmin,qmax) ->
let l =
try Hashtbl.find ht qmin with Not_found -> [] in
Hashtbl.replace ht qmin (task :: l)
)
task.shuffle_output
)
tasks;
List.iter
(fun (qmin,qmax) ->
let tasks_part =
try Hashtbl.find ht qmin with Not_found -> [] in
match tasks_part with
| [] ->
(* strange, but ignore *)
()
| [ task ] ->
(* case: can trigger reduce *)
assert(qmin=qmax);
let (shuffle_out,_,_) =
List.find
(fun (_,qmin',_) -> qmin=qmin')
task.shuffle_output in
let reduce_out =
sprintf "%s/reduce_%04d" mj#output_dir qmin in
let reduce_task =
{ reduce_input = shuffle_out;
reduce_input_del = not pc.keep_temp_files;
reduce_output = reduce_out;
reduce_partition = qmin
} in
(* Add this task to the plan: *)
let ext_task =
{ (new_xtask (`Reduce reduce_task)) with
deps = [TMap.find (`Shuffle task) plan.tasks];
} in
plan.tasks <-
TMap.add (`Reduce reduce_task) ext_task plan.tasks;
()
| _ ->
let dep_tasks' =
List.map
(fun t ->
let (kmin,kmax) = t.shuffle_coverage in
(kmin,kmax,`Shuffle t)
)
(List.rev tasks_part)
in
next_matrix :=
(qmin,qmax,dep_tasks') :: !next_matrix
)
output_parts
)
!matrix;
matrix := List.rev !next_matrix
done;
plan
let print_plan plan =
TMap.iter
(fun _ xt ->
Mapred_tasks.print_task xt.task 0;
if xt.deps <> [] then (
printf " Deps:\n";
List.iter
(fun dt ->
Mapred_tasks.print_task_id dt.task 8
)
xt.deps
)
)
plan.tasks