(* Copyright 2010 Gerd Stolpmann This file is part of Plasma, a distributed filesystem and a map/reduce computation framework. Unless you have a written license agreement with the copyright holder (Gerd Stolpmann), the following terms apply: Plasma is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. Plasma is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with Foobar. If not, see <http://www.gnu.org/licenses/>. *) (* $Id: mapred_task_sim.ml 580 2012-01-17 20:09:57Z gerd $ *) (* This module only _simulates_ task execution. It does not create any files. It access input files, though (but only size) Special job configs: sim_map_factor: it is assumed that "map" outputs this many blocks for each input block. This is a float. Default: 1.0 *) open Mapred_def open Mapred_tasks open Plasma_rpcapi_aux open Printf open Uq_engines.Operators let dlogf = Plasma_util.dlogf let run_stack f s = while not (Stack.is_empty s) do f (Stack.pop s) done (* let ii = Plasma_client.retry c "get_inodeinfo" (fun filename -> Plasma_client.with_trans c (fun trans -> Plasma_client.get_inodeinfo trans filename ) ) XXX in *) let rec seq start max = if start <= max then start :: seq(start+1) max else [] let ok files = `Ok { ok_files = files; ok_counters = Mapred_stats.empty } let exec_map_task me jc mj (t:map_task) = let cleanup = Stack.create() in try let fs = me # filesystem in let bsize = fs # blocksize jc#input_dir in let map_factor = try float_of_string(jc#custom "sim_map_factor") with Not_found -> 1.0 in let blocks = List.fold_left (fun acc (_,_,n) -> Int64.add acc n) 0L t.map_input in let sim_blocks = Int64.of_float(map_factor *. Int64.to_float blocks) in let files = Int64.to_int (Int64.div (Int64.pred(Int64.mul sim_blocks (Int64.of_int bsize))) (Int64.of_int me#config#mr_sort_size) ) + 1 in let out_list = List.map (fun k -> let name = sprintf "%s%05d%s" t.map_output_prefix k t.map_output_suffix in (`Tag "out", [name, 0L, 0L]) ) (seq 0 (files-1)) in run_stack (fun f -> try f() with _ -> ()) cleanup; ok out_list with | error -> let msg = sprintf "Exception %s%s" (Netexn.to_string error) (if Printexc.backtrace_status() then " - backtrace: " ^ (Printexc.get_backtrace()) else "" ) in run_stack (fun f -> try f() with _ -> ()) cleanup; `Error msg let exec_sort_task me jc mj (t:sort_task) = ok [`Tag "out", [t.sort_output,0L,0L]] let exec_emap_task me jc mj (t:emap_task) = let cleanup = Stack.create() in try let fs = me # filesystem in let bsize = fs # blocksize jc#input_dir in let map_factor = try float_of_string(jc#custom "sim_map_factor") with Not_found -> 1.0 in let blocks = List.fold_left (fun acc (_,_,n) -> Int64.add acc n) 0L t.emap_map.map_input in let sim_blocks = Int64.of_float(map_factor *. Int64.to_float blocks) in let files = Int64.to_int (Int64.div (Int64.pred(Int64.mul sim_blocks (Int64.of_int bsize))) (Int64.of_int me#config#mr_sort_size) ) + 1 in let simple_files = seq 0 (files-1) in let n_prepart = List.length t.emap_output in let tagged_files = List.flatten (List.map (fun k -> List.map2 (fun (tag,_,_,_) j -> let q = k*n_prepart + j in let name = sprintf "%s%05d%s" t.emap_map.map_output_prefix q t.emap_map.map_output_suffix in (tag, [name, 0L, 0L]) ) t.emap_output (seq 0 (n_prepart-1)) ) simple_files ) in run_stack (fun f -> try f() with _ -> ()) cleanup; ok tagged_files with | error -> let msg = sprintf "Exception %s%s" (Netexn.to_string error) (if Printexc.backtrace_status() then " - backtrace: " ^ (Printexc.get_backtrace()) else "" ) in run_stack (fun f -> try f() with _ -> ()) cleanup; `Error msg let exec_shuffle_task me jc mj (t:shuffle_task) = ok (List.map (fun (file,_,_,_) -> (`Tag "out", [file,0L,0L])) t.shuffle_output ) let execute (me:Mapred_def.mapred_env) (jc:Mapred_def.mapred_job_config) (mj:Mapred_def.mapred_job) t : task_result = Netlog.logf `Notice "Start task simulation %s" (Mapred_tasks.string_of_task_id t); let r = match t with | `Map t' -> exec_map_task me jc mj t' | `Emap t' -> exec_emap_task me jc mj t' | `Sort t' -> exec_sort_task me jc mj t' | `Shuffle t' -> exec_shuffle_task me jc mj t' | `Cleanup -> ok [] (* no cleanup here. This is done in mapred_task_server *) in Netlog.logf `Notice "End task simulation %s" (Mapred_tasks.string_of_task_id t); r