Plasma GitLab Archive
Projects Blog Knowledge

(* $Id: mr_io.ml 274 2010-10-24 15:04:09Z gerd $ *)

(* Testing Mapred_io.read_file *)

(* We create test files:

   F1: empty file

   F2: fill the file with 26 lines. Each line has a certain length:
      F2.1: bigblock/2 - 2
      F2.2: bigblock/2 - 1
      F2.3: bigblock/2
      F2.4: bigblock - 2
      F2.5: bigblock - 1
      F2.6: bigblock

      (line length includes here the final LF)

   F3: fill the file with a first line of specific length, followed by
       26 lines with a different length:
      F3.1: 1 and bigblock/2 - 3
      F3.2: 2 and bigblock/2 - 3 
      F3.3: 3 and bigblock/2 - 3
      F3.4: 11 and bigblock/2 - 3
      F3.5: 12 and bigblock/2 - 3
      F3.6: 13 and bigblock/2 - 3

   Requested region:

   R1: request nothing
   R2: request complete file
   R3: request all whole bigblocks (but not the trailing partial bigblock)
   R4: request n-1 whole bigblocks

   We always use bigblock=2*block.

   Tests are repeated with various buffer sizes (which should not affect
   the outcome).

   Expected outcome: The test checks that a certain number of lines N are
   read, and that these lines contain the right contents.

   R1 + all F: N=0

   R2 + F1: N=0
   R2 + F2: N=26
   R2 + F3: N=27

   R3 + F1: N=0  (actually nonsense - no whole bigblocks)
   R3 + F2.1, F2.2: N=25
   R3 + F2.3, F2.4, F2.5, F2.6: N=26
   R3 + F3: N=26

   R4 + F1: N=0  (actually nonsense - no whole bigblocks)
   R4 + F2.1, F2.2: N=23
   R4 + F2.3: N=25
   R4 + F2.4, F2.5: N=25
   R4 + F2.6: N=26
   R4 + F3: N=24
 *)

open Printf
open Plasma_rpcapi_aux


let f_cases =
  [ "F1"; 
    "F2.1"; "F2.2"; "F2.3"; "F2.4"; "F2.5"; "F2.6";
    "F3.1"; "F3.2"; "F3.3"; "F3.4"; "F3.5"; "F3.6";
  ]

let r_cases =
  [ "R1"; "R2"; "R3"; "R4" ]
   


let write_file cluster name s =
  let trans = Plasma_client.start cluster in
  ( try Plasma_client.unlink trans name 
    with Plasma_client.Plasma_error `enoent -> ()
  );
  let ii = Plasma_client.regular_ii cluster 0o666 in
  let inode = Plasma_client.create_file trans name ii in
  let () = Plasma_client.commit trans in
  let rec wrloop p =
    let l = String.length s - p in
    if l > 0 then (
      let n =
	Plasma_client.write cluster inode (Int64.of_int p) (`String s) p l in
      wrloop (p+n)
    ) in
  wrloop 0;
  Plasma_client.flush cluster inode 0L 0L


let create_line k l =
  (* line number k of the file (k>=0). l is the length including LF *)
  assert(l > 0);
  String.make (l-1) (Char.chr (64+k)) ^ "\n"


let check_line k line =
  let c = Char.chr (64+k) in
  for j = 0 to String.length line - 1 do
    if line.[j] <> c then
      failwith "bad line"
  done


let name_fcase fcase =
  "/mr_io_" ^ fcase

let create_fcase cluster fcase =
  let name = name_fcase fcase in
  let b = Buffer.create 500 in
  let bsize = Plasma_client.blocksize cluster in
  let bigsize = 2*bsize in

  let f2 n =
    for k = 0 to 25 do
      let line = create_line k n in
      Buffer.add_string b line
    done in

  let f3 n1 n2 =
    let line = create_line 0 n1 in
    Buffer.add_string b line;
    for k = 1 to 26 do
      let line = create_line k n2 in
      Buffer.add_string b line
    done in

  ( match fcase with
      | "F1" ->  ()
      | "F2.1" -> f2 (bigsize/2-2)
      | "F2.2" -> f2 (bigsize/2-1)
      | "F2.3" -> f2 (bigsize/2-0)
      | "F2.4" -> f2 (bigsize-2)
      | "F2.5" -> f2 (bigsize-1)
      | "F2.6" -> f2 (bigsize-0)
      | "F3.1" -> f3 1 (bigsize/2-3)
      | "F3.2" -> f3 2 (bigsize/2-3)
      | "F3.3" -> f3 3 (bigsize/2-3)
      | "F3.4" -> f3 11 (bigsize/2-3)
      | "F3.5" -> f3 12 (bigsize/2-3)
      | "F3.6" -> f3 13 (bigsize/2-3)
      | _ -> failwith "unknown fcase"
  );

  let s = Buffer.contents b in
  write_file cluster name s


let request_length cluster name rcase =
  let trans = Plasma_client.start cluster in
  let ii = Plasma_client.get_inodeinfo trans
             (Plasma_client.lookup trans name false) in
  let () = Plasma_client.abort trans in
  let bsize = Plasma_client.blocksize cluster in
  let bigsize = 2*bsize in
  match rcase with
    | "R1" -> 0L
    | "R2" -> ii.blocklimit
    | "R3" -> Int64.mul 2L (Int64.div ii.eof (Int64.of_int bigsize))
    | "R4" -> max (Int64.mul 2L (Int64.pred (Int64.div ii.eof (Int64.of_int bigsize)))) 0L
    | _ -> failwith "unknown rcase"


let expected_num_lines fcase rcase =
  match rcase with
    | "R1" ->
	( match fcase with
	    | "F1" -> 0
	    | _ -> 1
	)
    | "R2" ->
	( match fcase with
	    | "F1" -> 0
	    | "F2.1" | "F2.2" | "F2.3" | "F2.4" | "F2.5" | "F2.6" -> 26
	    | "F3.1" | "F3.2" | "F3.3" | "F3.4" | "F3.5" | "F3.6" -> 27
	    | _ -> failwith "unknown fcase"
	)
    | "R3" ->
	( match fcase with
	    | "F1" -> 0
	    | "F2.1" | "F2.2" -> 25
	    | "F2.3" | "F2.4" | "F2.5" | "F2.6" -> 26
	    | "F3.1" | "F3.2" | "F3.3" | "F3.4" | "F3.5" | "F3.6" -> 26
	    | _ -> failwith "unknown fcase"
	)
    | "R4" ->
	( match fcase with
	    | "F1" -> 0
	    | "F2.1" | "F2.2" -> 23
	    | "F2.3" -> 25
	    | "F2.4" | "F2.5" -> 25
	    | "F2.6" -> 26
	    | "F3.1" | "F3.2" | "F3.3" | "F3.4" | "F3.5" | "F3.6" -> 24
	    | _ -> failwith "unknown fcase"
	)
    | _ ->
	failwith "unknown rcase"



let run_test cluster mr_buf_size fcase rcase =
  printf "Test: %s + %s for buf_size=%d\n%!" rcase fcase mr_buf_size;
  try
    let name = name_fcase fcase in
    let bsize = Plasma_client.blocksize cluster in
    let rc =
      ( object
	  method bigblock_size = 2*bsize
	  method mr_buffer_size = mr_buf_size
	  method mr_buffer_size_tight = mr_buf_size
	end
      ) in
    let reqlen = request_length cluster name rcase in
    printf "  reqlen=%Ld\n%!" reqlen;
    let rr =
      Mapred_io.read_file cluster rc name 0L reqlen in
    ( try
	while true do
	  let k = rr#pos_in in
	  let line = rr#input_record () in
	  check_line k line
	done
      with
	| End_of_file -> ()
    );
    
    let n = expected_num_lines fcase rcase in
    if n <> rr#pos_in then
      failwith (sprintf "bad number of lines - expected: %d - actual: %d"
		  n rr#pos_in);
    
    printf "  test passed\n%!"
  with
    | Failure msg ->
	printf "  TEST FAILED: %s\n%!" msg


let run ?clustername ?nn_nodes() =
  let cconf = Plasma_client_config.get_config ?clustername ?nn_nodes() in
  let esys = Unixqueue.create_unix_event_system() in
  let cluster = Plasma_client.open_cluster_cc cconf esys in
  
  Plasma_client.configure_buffer cluster 100;

  printf "Creating files...\n%!";

  (* Create files *)
  List.iter
    (fun fcase -> create_fcase cluster fcase)
    f_cases;

  let bsize = Plasma_client.blocksize cluster in
  List.iter
    (fun bufsize ->
       List.iter
	 (fun rcase ->
	    List.iter
	      (fun fcase ->
		 run_test cluster bufsize fcase rcase
	      )
	      f_cases
	 )
	 r_cases
    )
    [ 2*bsize; 3*bsize; 8*bsize ];

  printf "End of test sequence.\n%!"


let main () =
  let clustername = ref None in
  let nn_nodes = ref [] in
  let debug = ref false in
  Arg.parse
    [ "-namenode", Arg.String (fun s -> nn_nodes := s :: !nn_nodes),
      "<host>:<port>    Access this namenode";

      "-cluster", Arg.String (fun s -> clustername := Some s),
      "<name>     name of the cluster";

      "-debug", Arg.Set debug,
      "      run with debugging enabled"
    ]
    (fun s -> raise(Arg.Bad("Unexpected arg: " ^ s)))
    "usage: mr_io <options>";

  Netlog.current_logger :=
    Netlog.channel_logger stderr (if !debug then `Debug else `Info);

  run
    ?clustername:!clustername
    ?nn_nodes:(if !nn_nodes = [] then None else Some !nn_nodes)
    ()


let () = main()


This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml