Plasma GitLab Archive
Projects Blog Knowledge

(*
  Copyright 2010 Gerd Stolpmann

  This file is part of Plasma, a distributed filesystem and a
  map/reduce computation framework. Unless you have a written license
  agreement with the copyright holder (Gerd Stolpmann), the following
  terms apply:

  Plasma is free software: you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation, either version 3 of the License, or
  (at your option) any later version.

  Plasma is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with Foobar.  If not, see <http://www.gnu.org/licenses/>.

 *)
(* $Id: mapred_task_sim.ml 580 2012-01-17 20:09:57Z gerd $ *)

(* This module only _simulates_ task execution. It does not create
   any files.

   It access input files, though (but only size)

   Special job configs:

   sim_map_factor: it is assumed that "map" outputs this many blocks
     for each input block. This is a float. Default: 1.0

 *)

open Mapred_def
open Mapred_tasks
open Plasma_rpcapi_aux
open Printf
open Uq_engines.Operators

let dlogf = Plasma_util.dlogf

let run_stack f s =
  while not (Stack.is_empty s) do
    f (Stack.pop s)
  done

(*
    let ii = 
      Plasma_client.retry c "get_inodeinfo"
	(fun filename ->
	   Plasma_client.with_trans c
	     (fun trans ->
		Plasma_client.get_inodeinfo trans filename
	     )
	)
	XXX in
 *)

let rec seq start max =
  if start <= max then
    start :: seq(start+1) max
  else
    []


let ok files =
  `Ok
    { ok_files = files;
      ok_counters = Mapred_stats.empty
    }


let exec_map_task me jc mj (t:map_task) =
  let cleanup = Stack.create() in
  try
    let fs = me # filesystem in
    let bsize = fs # blocksize jc#input_dir in

    let map_factor =
      try float_of_string(jc#custom "sim_map_factor")
      with Not_found -> 1.0 in

    let blocks =
      List.fold_left
	(fun acc (_,_,n) -> Int64.add acc n)
	0L
	t.map_input in

    let sim_blocks =
      Int64.of_float(map_factor *. Int64.to_float blocks) in

    let files =
      Int64.to_int
	(Int64.div
	   (Int64.pred(Int64.mul sim_blocks (Int64.of_int bsize)))
	   (Int64.of_int me#config#mr_sort_size)
	) + 1 in
    
    let out_list =
      List.map
	(fun k ->
	   let name = 
	     sprintf "%s%05d%s" t.map_output_prefix k t.map_output_suffix in
	   (`Tag "out", [name, 0L, 0L])
	)
	(seq 0 (files-1)) in

    run_stack (fun f -> try f() with _ -> ()) cleanup;

    ok out_list
  with
    | error ->
        let msg =
          sprintf "Exception %s%s"
            (Netexn.to_string error)
            (if Printexc.backtrace_status() then
               " - backtrace: " ^ (Printexc.get_backtrace())
             else ""
            ) in
        run_stack (fun f -> try f() with _ -> ()) cleanup;
        `Error msg


let exec_sort_task me jc mj (t:sort_task) =
  ok [`Tag "out", [t.sort_output,0L,0L]]


let exec_emap_task me jc mj (t:emap_task) =
  let cleanup = Stack.create() in
  try
    let fs = me # filesystem in
    let bsize = fs # blocksize jc#input_dir in

    let map_factor =
      try float_of_string(jc#custom "sim_map_factor")
      with Not_found -> 1.0 in

    let blocks =
      List.fold_left
	(fun acc (_,_,n) -> Int64.add acc n)
	0L
	t.emap_map.map_input in

    let sim_blocks =
      Int64.of_float(map_factor *. Int64.to_float blocks) in

    let files =
      Int64.to_int
	(Int64.div
	   (Int64.pred(Int64.mul sim_blocks (Int64.of_int bsize)))
	   (Int64.of_int me#config#mr_sort_size)
	) + 1 in

    let simple_files =
      seq 0 (files-1) in

    let n_prepart = List.length t.emap_output in

    let tagged_files =
      List.flatten
	(List.map
	   (fun k ->
	      List.map2
		(fun (tag,_,_,_) j ->
		   let q = k*n_prepart + j in
		   let name = 
		     sprintf "%s%05d%s" 
		       t.emap_map.map_output_prefix q 
		       t.emap_map.map_output_suffix in
		   (tag, [name, 0L, 0L])
		)
		t.emap_output
		(seq 0 (n_prepart-1))
	   )
	   simple_files
	) in
    
    run_stack (fun f -> try f() with _ -> ()) cleanup;

    ok tagged_files
  with
    | error ->
        let msg =
          sprintf "Exception %s%s"
            (Netexn.to_string error)
            (if Printexc.backtrace_status() then
               " - backtrace: " ^ (Printexc.get_backtrace())
             else ""
            ) in
        run_stack (fun f -> try f() with _ -> ()) cleanup;
        `Error msg

let exec_shuffle_task me jc mj (t:shuffle_task) =
  ok (List.map
	(fun (file,_,_,_) -> (`Tag "out", [file,0L,0L]))
	t.shuffle_output
     )


let execute (me:Mapred_def.mapred_env) 
            (jc:Mapred_def.mapred_job_config)
            (mj:Mapred_def.mapred_job) t
    : task_result =
  Netlog.logf `Notice "Start task simulation %s"
    (Mapred_tasks.string_of_task_id t);
  let r =
    match t with
      | `Map t' -> 
          exec_map_task me jc mj t'
      | `Emap t' -> 
          exec_emap_task me jc mj t'
      | `Sort t' ->
          exec_sort_task me jc mj t'
      | `Shuffle t' ->
          exec_shuffle_task me jc mj t'
      | `Cleanup ->
          ok []   (* no cleanup here. This is done in mapred_task_server *)
  in
  Netlog.logf `Notice "End task simulation %s"
    (Mapred_tasks.string_of_task_id t);
  r


This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml