Plasma GitLab Archive
Projects Blog Knowledge

(* $Id: mr_io.ml 576 2012-01-15 02:02:18Z gerd $ *)

(* Testing Mapred_io.read_file *)

(* We create test files:

   F1: empty file

   F2: fill the file with 26 lines. Each line has a certain length:
      F2.1: bigblock/2 - 2
      F2.2: bigblock/2 - 1
      F2.3: bigblock/2
      F2.4: bigblock - 2
      F2.5: bigblock - 1
      F2.6: bigblock

      (line length includes here the final LF)

   For var-sized format: bigblock = bigblock size - chunk headers


   F3: fill the file with a first line of specific length, followed by
       26 lines with a different length:
      F3.1: 1 and bigblock/2 - 3
      F3.2: 2 and bigblock/2 - 3 
      F3.3: 3 and bigblock/2 - 3
      F3.4: 11 and bigblock/2 - 3
      F3.5: 12 and bigblock/2 - 3
      F3.6: 13 and bigblock/2 - 3

   F4: fill the files with short records only (81 bytes). The number of
      lines is (bigblock-1)/81+1

   Requested region:

   R1: request nothing
   R2: request complete file
   R3: request all whole bigblocks (but not the trailing partial bigblock)
   R4: request n-1 whole bigblocks
   R5: request bigblock #1 (i.e. the second bigblock)

   We always use bigblock=2*block.

   Tests are repeated with various buffer sizes (which should not affect
   the outcome).

   Expected outcome: The test checks that a certain number of lines N are
   read, and that these lines contain the right contents.

   R1 + all F: N=0

   R2 + F1: N=0
   R2 + F2: N=26
   R2 + F3: N=27

   R3 + F1: N=0  (actually nonsense - no whole bigblocks)
   R3 + F2.1, F2.2: N=25
   R3 + F2.3, F2.4, F2.5, F2.6: N=26
   R3 + F3: N=26

   R4 + F1: N=0  (actually nonsense - no whole bigblocks)
   R4 + F2.1, F2.2: N=23
   R4 + F2.3: N=25.
              However, for fixed_size and var_size: N=24
   R4 + F2.4, F2.5: N=25
   R4 + F2.6: N=26
              However, for fixed_size and var_size: N=25
   R4 + F3: N=24

   R5 + F1: invalid
   R5 + F2.1, F2.2: N=2
   R5 + F2.3: N=3
   R5 + F2.4, 2.5, 2.6: N=1
   R5 + F3: N=2
   
 *)

open Printf
open Plasma_rpcapi_aux
open Uq_engines.Operators

let problems = ref 0
let testdir = ref "/"

let f_cases =
  [ "F1"; 
    "F2.1"; "F2.2"; "F2.3"; "F2.4"; "F2.5"; "F2.6";
    "F3.1"; "F3.2"; "F3.3"; "F3.4"; "F3.5"; "F3.6";
    "F4"
  ]

let r_cases =
  [ "R1"; "R2"; "R3"; "R4"; "R5" ]
   

let create_rc bbsize mr_buf_size =
  ( object
      method bigblock_size = bbsize
      method mr_buffer_size = mr_buf_size
      method mr_buffer_size_tight = mr_buf_size
    end
  )


let create_file fs name =
  ( try fs # remove [] name;
    with Unix.Unix_error(Unix.ENOENT,_,_) -> ()
  );
  Mapred_io.create_file fs name


let write_file fs name s =
  create_file fs name;
  let ch = fs # write [] name in
  ch # output_string s;
  ch # close_out()


let vs_write_file chunklen_factor fs name lines =
  create_file fs name;
  let bsize = fs # blocksize !testdir in
  let rc = create_rc (2*bsize) (64 * bsize) in
  let shm_mng = new Plasma_shm.null_shm_manager() in
  let wr = new Mapred_io.vs_write_file
    ~chunklen:(chunklen_factor * bsize) fs shm_mng rc name in
  List.iter
    (fun line -> wr # output_record line)
    lines;
  wr # close_out()


let create_line k l =
  (* line number k of the file (k>=0). l is the length including LF *)
  if l = 0 then
    ""
  else
    String.make (l-1) (Char.chr (64+(k mod 62))) ^ "\n"


let check_line k line =
  let c = Char.chr (64+(k mod 62)) in
  for j = 0 to String.length line - 1 do
    if line.[j] <> c then (
      failwith (sprintf "bad line: expected #%d found #%d"
		  (k mod 62)
		  ((Char.code line.[j] - 64) mod 62));
    )
  done


let name_fcase fcase =
  !testdir ^ "/mr_io_" ^ fcase

let get_bigsize bsize format =
  (* Return the _inner_ bigsize, i.e. how many bytes can be stored (payload) *)
  match format with
    | `Vs -> 2*(bsize - 32)
    | `Vs2 -> 2 * bsize - 32
    | _ -> 2 * bsize 
  (* vs: we subtract the length of the chunk header (32 bytes) per block.
     The chunk length is bsize (see below)
   *)
	
let create_fcase fs fcase format =
  let name = name_fcase fcase in
  let b = ref [] in
  let bsize = fs # blocksize !testdir in
  let vs_flag = (format = `Vs || format = `Vs2) in
  let bigsize = get_bigsize bsize format in

  let cr_line k n =
    if vs_flag then (
      if n <= 255 then
	create_line k (n-1)
      else
	if n >= 264 then
	  create_line k (n-9)
	else
	  failwith "cr_line: cannot create this length"
    )
    else create_line k n in


  let f2 n =
    for k = 0 to 25 do
      let line = cr_line k n in
      b := line :: !b
    done in

  let f3 n1 n2 =
    let line = cr_line 0 n1 in
    b := line :: !b;
    for k = 1 to 26 do
      let line = cr_line k n2 in
      b := line :: !b
    done in

  let f4 size =
    let n = (size-1)/81 + 1 in
    for k = 0 to n-1 do
      let line = cr_line k 81 in
      b := line :: !b
    done in


  ( match fcase with
      | "F1" ->  ()
      | "F2.1" -> f2 (bigsize/2-2)
      | "F2.2" -> f2 (bigsize/2-1)
      | "F2.3" -> f2 (bigsize/2-0)
      | "F2.4" -> f2 (bigsize-2)
      | "F2.5" -> f2 (bigsize-1)
      | "F2.6" -> f2 (bigsize-0)
      | "F3.1" -> f3 1 (bigsize/2-3)
      | "F3.2" -> f3 2 (bigsize/2-3)
      | "F3.3" -> f3 3 (bigsize/2-3)
      | "F3.4" -> f3 11 (bigsize/2-3)
      | "F3.5" -> f3 12 (bigsize/2-3)
      | "F3.6" -> f3 13 (bigsize/2-3)
      | "F4"   -> f4 bigsize
      | _ -> failwith "unknown fcase"
  );

  let lines = List.rev !b in

  match format with
    | `Vs ->
	vs_write_file 1 fs (name ^ ".vs") lines
    | `Vs2 ->
	vs_write_file 2 fs (name ^ ".vs2") lines
    | _ ->
	let s = String.concat "" lines in
	write_file fs name s


let request_length fs name rcase =
  let eof = fs # size [] name in
  let bsize = fs # blocksize !testdir in
  let bsizeL = Int64.of_int bsize in
  let n_blocks = 
    if eof=0L then 0L else Int64.succ (Int64.div (Int64.pred eof) bsizeL) in
  let bigsize = 2*bsize in
  let bigsizeL = Int64.of_int bigsize in
  let n_bigblocks = 
    if eof=0L then 0L else Int64.succ (Int64.div (Int64.pred eof) bigsizeL) in
  match rcase with
    | "R1" -> 0L
    | "R2" -> n_blocks
    | "R3" -> Int64.mul 2L (Int64.div eof bigsizeL)
    | "R4" -> max (Int64.mul 2L (Int64.pred (Int64.div eof bigsizeL))) 0L
    | "R5" -> min 2L (Int64.sub n_bigblocks 2L)
    | _ -> failwith "unknown rcase"


let request_offset fs name rcase =
  (* at which block we begin to read *)
  match rcase with
    | "R1" | "R2" | "R3" | "R4" -> 0L
    | "R5" -> 2L
    | _ -> failwith "unknown rcase"


let line_offset fcase rcase format =
  (* the line number of the first read line *)
  match rcase with
    | "R1" | "R2" | "R3" | "R4" -> 0
    | "R5" ->
	( match fcase with
	    | "F1" -> assert false
	    | "F2.1" | "F2.2" -> 3
	    | "F2.3" -> if format=`Ls then 3 else 2
	    | "F2.4" | "F2.5" -> 2
	    | "F2.6" -> if format=`Ls then 2 else 1
	    | "F3.1" | "F3.2" | "F3.3" -> 4
	    | "F3.4" | "F3.5" | "F3.6" -> 3
	    | "F4" -> 0
	    | _ -> failwith "unknown fcase"
	)
    | _ -> failwith "unknown rcase"



let expected_num_lines fcase rcase format bigsize =
  match rcase with
    | "R1" -> 0
    | "R2" ->
	( match fcase with
	    | "F1" -> 0
	    | "F2.1" | "F2.2" | "F2.3" | "F2.4" | "F2.5" | "F2.6" -> 26
	    | "F3.1" | "F3.2" | "F3.3" | "F3.4" | "F3.5" | "F3.6" -> 27
	    | "F4" -> (bigsize-1)/81+1
	    | _ -> failwith "unknown fcase"
	)
    | "R3" ->
	( match fcase with
	    | "F1" -> 0
	    | "F2.1" | "F2.2" -> 25
	    | "F2.3" | "F2.4" | "F2.5" | "F2.6" -> 26
	    | "F3.1" | "F3.2" | "F3.3" | "F3.4" | "F3.5" | "F3.6" -> 26
	    | "F4" -> (bigsize-1)/81+1
	    | _ -> failwith "unknown fcase"
	)
    | "R4" ->
	( match fcase with
	    | "F1" -> 0
	    | "F2.1" | "F2.2" -> 23
	    | "F2.3" -> if format=`Ls then 25 else 24
	    | "F2.4" -> 25
	    | "F2.5" -> 25
	    | "F2.6" -> if format=`Ls then 26 else 25
	    | "F3.1" | "F3.2" | "F3.3" | "F3.4" | "F3.5" | "F3.6" -> 24
	    | "F4" -> 0
	    | _ -> failwith "unknown fcase"
	)
    | "R5" ->
	( match fcase with
	    | "F1" -> assert false
	    | "F2.1" | "F2.2" | "F2.3" -> 2
	    | "F2.4" | "F2.5" | "F2.6" -> 1
	    | "F3.4" -> 3
	    | "F3.5" -> if format=`Ls then 3 else 2
	    | "F3.1" | "F3.2" | "F3.3" | "F3.6" -> 2
	    | "F4" -> 0
	    | _ -> failwith "unknown fcase"
	)
    | _ ->
	failwith "unknown rcase"



let run_test rd_factory strip_lf format fs mr_buf_size fcase rcase suffix =
  printf "Test: %s + %s for buf_size=%d\n%!" rcase fcase mr_buf_size;
  try
    let name = name_fcase fcase ^ suffix in
    let bsize = fs # blocksize !testdir in
    let bigsize = get_bigsize bsize format in
    let rc = create_rc (2*bsize) mr_buf_size in
    let reqlen = request_length fs name rcase in
    let reqoff = request_offset fs name rcase in
    let lineoff = line_offset fcase rcase format in
    printf "  reqoff=%Ld reqlen=%Ld lineoff=%d\n%!" reqoff reqlen lineoff;
    let shm_mng = new Plasma_shm.null_shm_manager() in
    let rr =
      rd_factory # read_file [] fs shm_mng rc name reqoff reqlen in
    ( try
	while true do
	  let k = rr#pos_in in
	  let line = rr#input_record () in
	  if strip_lf then (
	    let l = String.length line in
	    if l > 0 then (
	      if line.[l-1] <> '\n' then
		failwith "Line does not end with LF";
	      check_line (lineoff+k) (String.sub line 0 (l-1))
	    )
	  )
	  else
	    check_line (lineoff+k) line
	done
      with
	| End_of_file -> ()
    );
    rr # close_in();
    
    let n = expected_num_lines fcase rcase format bigsize in
    if n <> rr#pos_in then
      failwith (sprintf "bad number of lines - expected: %d - actual: %d"
		  n rr#pos_in);
    
    printf "  test passed\n%!"
  with
    | Failure msg ->
	printf "  TEST FAILED: %s\n%!" msg;
	incr problems


let run_test_fd_1 fs cluster mr_buf_size fcase rcase name =
  let esys = Plasma_client.event_system cluster in
  try
    let bsize = fs # blocksize !testdir in
    let bigsize = get_bigsize bsize `Ls in
    let rc =
      ( object
	  method bigblock_size = 2*bsize
	  method mr_buffer_size = mr_buf_size
	  method mr_buffer_size_tight = mr_buf_size
	end
      ) in
    let reqlen = request_length fs name rcase in
    let reqoff = request_offset fs name rcase in
    let lineoff = line_offset fcase rcase `Ls in
    printf "  reqoff=%Ld reqlen=%Ld lineoff=%d\n%!" reqoff reqlen lineoff;
    let shm_mng = new Plasma_shm.null_shm_manager() in
    let rr =
      (Mapred_io.line_structured_format())
	# read_file [] fs shm_mng rc name reqoff reqlen in
    let (fd_rd, fd_wr) = Unix.pipe() in
    Unix.set_nonblock fd_rd;
    Unix.set_nonblock fd_wr;
    let fd_wr_closed = ref false in
    let d1 = `Polldescr(`Read_write, fd_rd, esys) in
    let d2 = `Buffer_in(Uq_io.create_in_buffer d1) in

    let pump_e =
      rr # to_fd_e fd_wr esys
      ++ (fun () -> 
	    Unix.close fd_wr; fd_wr_closed := true; eps_e (`Done ()) esys) in

    let rec read_e k =
      (Uq_io.input_line_e d2 >> Uq_io.eof_as_none)
      ++ (fun line_opt ->
	    match line_opt with
	      | None -> (* EOF *)
		  eps_e (`Done k) esys
	      | Some line ->
		  check_line (lineoff + k) line;
		  read_e (k + 1)
	 )
    in

    let e =
      Uq_engines.sync_engine
	pump_e
	(read_e 0)
      ++ (fun ((), k_end) ->
	    let n = expected_num_lines fcase rcase `Ls bigsize in
	    if n <> k_end then
	      failwith (sprintf
			  "bad number of lines - expected: %d - actual: %d"
			  n k_end);
	    eps_e (`Done ()) esys
	 ) in
    Unixqueue.run esys;
    ( match e#state with
	| `Working _ -> assert false
	| `Aborted -> assert false
	| `Error err -> raise err
	| `Done () -> ()
    );
    Unix.close fd_rd;
    if not !fd_wr_closed then Unix.close fd_wr;
    rr # close_in();
    printf "  test passed\n%!"
  with
    | Failure msg ->
	printf "  TEST FAILED: %s\n%!" msg


let run_test_fd fs mr_buf_size fcase rcase =
  printf "FD Test: %s + %s for buf_size=%d\n%!" rcase fcase mr_buf_size;
  let esys = Unixqueue.create_unix_event_system() in
  let name = name_fcase fcase in
  match fs # open_cluster name esys with
    | None ->
	printf "   not PlasmaFS - skipping test\n%!"
    | Some cluster ->
	run_test_fd_1 fs cluster mr_buf_size fcase rcase name


let run_ls fs =
  printf "*** LINE BY LINE ***\n%!";
  let bsize = fs # blocksize !testdir in
  let factory = Mapred_io.line_structured_format() in
  List.iter
    (fun bufsize ->
       List.iter
	 (fun rcase ->
	    List.iter
	      (fun fcase ->
		 if rcase<>"R5" || fcase <> "F1" then (
		   run_test
		     factory false `Ls fs bufsize fcase rcase "";
		   run_test_fd 
		     fs bufsize fcase rcase;
		 )
	      )
	      f_cases
	 )
	 r_cases
    )
    [ bsize; 2*bsize; 3*bsize; 8*bsize ]


let run_fs fs =
  printf "*** FIXED RECORD SIZE ***\n%!";
  let bsize = fs # blocksize !testdir in
  let bigsize = 2*bsize in
  List.iter
    (fun bufsize ->
       List.iter
	 (fun rcase ->
	    (* F1 *)
	    if rcase<>"R5" then (
	      let f1_factory = Mapred_io.fixed_size_format 80 in
	      run_test 
		f1_factory true `Fs fs bufsize "F1" rcase "";
	    );
	    (* F2 *)
	    List.iter
	      (fun (fcase,size) ->
		 let factory = Mapred_io.fixed_size_format size in
		 run_test factory true `Fs fs bufsize fcase rcase "";
	      )
	      [ "F2.1", (bigsize/2-2);
		"F2.2", (bigsize/2-1);
		"F2.3", (bigsize/2);
		"F2.4", (bigsize-2);
		"F2.5", (bigsize-1);
		"F2.6", bigsize;
	      ];
	 )
	 r_cases
    )
    [ bsize; 2*bsize; 3*bsize; 8*bsize ]


let run_vs fs =
  printf "*** VARIABLE RECORD SIZE ***\n%!";
  let bsize = fs # blocksize !testdir in
  let factory = Mapred_io.var_size_format () in
  List.iter
    (fun bufsize ->
       List.iter
	 (fun rcase ->
	    List.iter
	      (fun fcase ->
		 if rcase<>"R5" || fcase <> "F1" then (
		   run_test factory true `Vs fs bufsize fcase rcase ".vs"
		 )
	      )
	      f_cases
	 )
	 r_cases
    )
    [ bsize; 2*bsize; 3*bsize; 8*bsize ]


let run_vs2 fs =
  printf "*** VARIABLE RECORD SIZE (DOUBLE CHUNK LENGTH) ***\n%!";
  let bsize = fs # blocksize !testdir in
  let factory = Mapred_io.var_size_format () in
  List.iter
    (fun bufsize ->
       List.iter
	 (fun rcase ->
	    List.iter
	      (fun fcase ->
		 if rcase<>"R5" || fcase <> "F1" then (
		   run_test factory true `Vs2 fs bufsize fcase rcase ".vs2"
		 )
	      )
	      f_cases
	 )
	 r_cases
    )
    [ bsize; 2*bsize; 3*bsize; 8*bsize ]


let run ?clustername ?nn_nodes ~disable_cluster ~en_ls ~en_fs ~en_vs ~en_vs2 
        () =
  let client_config =
    if disable_cluster then
      None
    else
      Some
	(Plasma_client_config.get_config
	   ?clustername
	   ?nn_nodes ()
	) in 
  let fs =
    Mapred_fs.standard_fs_cc
      ?client_config
      ~configure_cluster:(fun cluster ->
			    Plasma_client.configure_buffer cluster 100;
			    Plasma_client.configure_auth_daemon cluster;
			 )
      () in
  
  printf "Creating files...\n%!";

  (* Create files *)
  List.iter
    (fun fcase -> 
       if en_ls || en_fs then
	 create_fcase fs fcase `Ls;
       if en_vs then
	 create_fcase fs fcase `Vs;
       if en_vs2 then
	 create_fcase fs fcase `Vs2;
    )
    f_cases;

  if en_ls then
    run_ls fs;

  if en_fs then
    run_fs fs;

  if en_vs then
    run_vs fs;

  if en_vs2 then
    run_vs2 fs;

  printf "End of test sequence.\n%!";
  if !problems = 0 then
    printf "No errors.\n%!"
  else
    printf "%d errors!\n%!" !problems

let main () =
  let clustername = ref None in
  let nn_nodes = ref [] in
  let debug = ref false in
  let en_ls = ref false in
  let en_fs = ref false in
  let en_vs = ref false in
  let en_vs2 = ref false in
  let disable_cluster = ref false in
  Arg.parse
    [ "-namenode", Arg.String (fun s -> nn_nodes := s :: !nn_nodes),
      "<host>:<port>    Access this namenode";

      "-cluster", Arg.String (fun s -> clustername := Some s),
      "<name>     name of the cluster";

      "-disable-cluster", Arg.Set disable_cluster,
      "     Disable PlasmaFS entirely";

      "-testdir", Arg.Set_string testdir,
      "<dir>      in which directory to run the tests";

      "-ls", Arg.Set en_ls,
      "      run the tests with line-structured files";

      "-fs", Arg.Set en_fs,
      "      run the tests with fixed-size record";

      "-vs", Arg.Set en_vs,
      "      run the tests with variable-size record";

      "-vs2", Arg.Set en_vs2,
      "      run the tests with variable-size record (double chunk len)";

      "-debug", Arg.Set debug,
      "      run with debugging enabled";
    ]
    (fun s -> raise(Arg.Bad("Unexpected arg: " ^ s)))
    "usage: mr_io <options>";

  Netlog.current_logger :=
    Netlog.channel_logger stderr (if !debug then `Debug else `Info);
  Plasma_util.debug := !debug;

  if not !en_ls && not !en_fs && not !en_vs && not !en_vs2 then (
    en_ls := true;
    en_fs := true;
    en_vs := true;
    en_vs2 := true;
  );

  Printexc.record_backtrace true;
  Mapred_io.more_logging := true;
  try
    run
      ?clustername:!clustername
      ?nn_nodes:(if !nn_nodes = [] then None else Some !nn_nodes)
      ~disable_cluster:!disable_cluster
      ~en_ls:!en_ls
      ~en_fs:!en_fs
      ~en_vs:!en_vs
      ~en_vs2:!en_vs2
      ()
  with
    | error ->
	let bt = Printexc.get_backtrace() in
	eprintf "Exception %s\n%!" (Netexn.to_string error);
	eprintf "Backtrace: %s\n%!" bt;
	eprintf "Before, there were %d errors.\n%!" !problems


let () = main()


This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml