Plasma GitLab Archive
Projects Blog Knowledge

(* Invoke the sorter directly *)

open Printf

let sort infile infile_eof outfile conf me =
  let mjc =
    Mapred_job_config.mapred_job_config
      (Mapred_job_config.test_job_config()) in
  let rc =
    ( object
	method bigblock_size = mjc#bigblock_size
	method mr_buffer_size = conf # mr_buffer_size
	method mr_buffer_size_tight = conf # mr_buffer_size_tight
      end
    ) in
  let sorter =
(*     Mapred_sorters.null_sorter *)
    Mapred_sorters.generic_sorter
      ~hash:(Mapred_sorters.Hex_asc.hash)
      ~cmp:(Mapred_sorters.Hex_asc.cmp)
      me mjc 1.0 in
  sorter#set_key_extraction Mapred_split.tab_split_key_range;
  let fs = me#filesystem in
  let bsize = fs#blocksize infile in
  let bsizeL = Int64.of_int bsize in
  let shm_mng = new Plasma_shm.null_shm_manager() in

  let n_blocks = 
    if infile_eof = 0L then
      0L
    else
      Int64.succ (Int64.div (Int64.pred infile_eof) bsizeL) in
  let reader = 
    (Mapred_io.line_structured_format())
      # read_file [] fs shm_mng rc infile 0L n_blocks in
  printf "Reading...\n%!";
  let q = Queue.create() in
  ( try
      while true do
	reader # input_records q;
	sorter # put_records q;
      done
    with
      | End_of_file -> ()
  );
  reader # close_in();

  Mapred_io.create_file ~repl:1 fs outfile;
  let writer = 
    (Mapred_io.line_structured_format())
      # write_file fs shm_mng rc outfile in

  printf "Sorting+writing...\n%!";
  sorter # sort writer;
  printf "Sort time: %.2f\n%!" sorter#sort_time;
  writer # close_out();
  sorter # close()


let () =
  let infile = ref "/sort_in" in
  let outfile = ref "/sort_out" in
  
  let clustername = ref None in
  let nn_nodes = ref [] in
  let disable_cluster = ref false in
  let debug = ref false in
  Arg.parse
    [ "-namenode", Arg.String (fun s -> nn_nodes := s :: !nn_nodes),
      "<host>:<port>    Access this namenode";

      "-cluster", Arg.String (fun s -> clustername := Some s),
      "<name>     name of the cluster";

      "-disable-cluster", Arg.Set disable_cluster,
      "     Disable PlasmaFS entirely";

      "-input", Arg.Set_string infile,
      "<file>   PlasmaFS input file";

      "-output", Arg.Set_string outfile,
      "<file>   PlasmaFS output file";

      "-debug", Arg.Set debug,
      "      run with debugging enabled"
    ]
    (fun s -> raise(Arg.Bad("Unexpected arg: " ^ s)))
    "usage: mr_io <options>";

  printf "Input: %s\n" !infile;
  printf "Output: %s\n%!" !outfile;

  Netlog.current_logger :=
    Netlog.channel_logger stderr (if !debug then `Debug else `Info);
  Plasma_util.debug := !debug;

  let client_config =
    if !disable_cluster then 
      None
    else
      Some
	(Plasma_client_config.get_config
	   ?clustername:!clustername
	   ~nn_nodes:!nn_nodes ()
	) in 
  let fs =
    Mapred_fs.standard_fs_cc ?client_config () in
  let shm_mng = new Plasma_shm.null_shm_manager() in

  try
    let length = fs # size [] !infile in
    let sort_size = Int64.to_int length in
    
    let conf =
      Mapred_config.test_config 
	 ~clustername:"foo" ~nodes:[] ~sort_size () in

    let me =
      ( object
	  method filesystem = fs
	  method shm_manager = shm_mng
          method config = conf
          method config_file = assert false
          method auth_ticket = ""
	  method command_name = "foo"
	  method accumulate _ = ()
	end
      ) in

    sort !infile length !outfile conf me
    
  with
    | error ->
        eprintf "Exception %s\n%!" (Netexn.to_string error)


This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml