(* $Id: mr_io.ml 576 2012-01-15 02:02:18Z gerd $ *) (* Testing Mapred_io.read_file *) (* We create test files: F1: empty file F2: fill the file with 26 lines. Each line has a certain length: F2.1: bigblock/2 - 2 F2.2: bigblock/2 - 1 F2.3: bigblock/2 F2.4: bigblock - 2 F2.5: bigblock - 1 F2.6: bigblock (line length includes here the final LF) For var-sized format: bigblock = bigblock size - chunk headers F3: fill the file with a first line of specific length, followed by 26 lines with a different length: F3.1: 1 and bigblock/2 - 3 F3.2: 2 and bigblock/2 - 3 F3.3: 3 and bigblock/2 - 3 F3.4: 11 and bigblock/2 - 3 F3.5: 12 and bigblock/2 - 3 F3.6: 13 and bigblock/2 - 3 F4: fill the files with short records only (81 bytes). The number of lines is (bigblock-1)/81+1 Requested region: R1: request nothing R2: request complete file R3: request all whole bigblocks (but not the trailing partial bigblock) R4: request n-1 whole bigblocks R5: request bigblock #1 (i.e. the second bigblock) We always use bigblock=2*block. Tests are repeated with various buffer sizes (which should not affect the outcome). Expected outcome: The test checks that a certain number of lines N are read, and that these lines contain the right contents. R1 + all F: N=0 R2 + F1: N=0 R2 + F2: N=26 R2 + F3: N=27 R3 + F1: N=0 (actually nonsense - no whole bigblocks) R3 + F2.1, F2.2: N=25 R3 + F2.3, F2.4, F2.5, F2.6: N=26 R3 + F3: N=26 R4 + F1: N=0 (actually nonsense - no whole bigblocks) R4 + F2.1, F2.2: N=23 R4 + F2.3: N=25. However, for fixed_size and var_size: N=24 R4 + F2.4, F2.5: N=25 R4 + F2.6: N=26 However, for fixed_size and var_size: N=25 R4 + F3: N=24 R5 + F1: invalid R5 + F2.1, F2.2: N=2 R5 + F2.3: N=3 R5 + F2.4, 2.5, 2.6: N=1 R5 + F3: N=2 *) open Printf open Plasma_rpcapi_aux open Uq_engines.Operators let problems = ref 0 let testdir = ref "/" let f_cases = [ "F1"; "F2.1"; "F2.2"; "F2.3"; "F2.4"; "F2.5"; "F2.6"; "F3.1"; "F3.2"; "F3.3"; "F3.4"; "F3.5"; "F3.6"; "F4" ] let r_cases = [ "R1"; "R2"; "R3"; "R4"; "R5" ] let create_rc bbsize mr_buf_size = ( object method bigblock_size = bbsize method mr_buffer_size = mr_buf_size method mr_buffer_size_tight = mr_buf_size end ) let create_file fs name = ( try fs # remove [] name; with Unix.Unix_error(Unix.ENOENT,_,_) -> () ); Mapred_io.create_file fs name let write_file fs name s = create_file fs name; let ch = fs # write [] name in ch # output_string s; ch # close_out() let vs_write_file chunklen_factor fs name lines = create_file fs name; let bsize = fs # blocksize !testdir in let rc = create_rc (2*bsize) (64 * bsize) in let shm_mng = new Plasma_shm.null_shm_manager() in let wr = new Mapred_io.vs_write_file ~chunklen:(chunklen_factor * bsize) fs shm_mng rc name in List.iter (fun line -> wr # output_record line) lines; wr # close_out() let create_line k l = (* line number k of the file (k>=0). l is the length including LF *) if l = 0 then "" else String.make (l-1) (Char.chr (64+(k mod 62))) ^ "\n" let check_line k line = let c = Char.chr (64+(k mod 62)) in for j = 0 to String.length line - 1 do if line.[j] <> c then ( failwith (sprintf "bad line: expected #%d found #%d" (k mod 62) ((Char.code line.[j] - 64) mod 62)); ) done let name_fcase fcase = !testdir ^ "/mr_io_" ^ fcase let get_bigsize bsize format = (* Return the _inner_ bigsize, i.e. how many bytes can be stored (payload) *) match format with | `Vs -> 2*(bsize - 32) | `Vs2 -> 2 * bsize - 32 | _ -> 2 * bsize (* vs: we subtract the length of the chunk header (32 bytes) per block. The chunk length is bsize (see below) *) let create_fcase fs fcase format = let name = name_fcase fcase in let b = ref [] in let bsize = fs # blocksize !testdir in let vs_flag = (format = `Vs || format = `Vs2) in let bigsize = get_bigsize bsize format in let cr_line k n = if vs_flag then ( if n <= 255 then create_line k (n-1) else if n >= 264 then create_line k (n-9) else failwith "cr_line: cannot create this length" ) else create_line k n in let f2 n = for k = 0 to 25 do let line = cr_line k n in b := line :: !b done in let f3 n1 n2 = let line = cr_line 0 n1 in b := line :: !b; for k = 1 to 26 do let line = cr_line k n2 in b := line :: !b done in let f4 size = let n = (size-1)/81 + 1 in for k = 0 to n-1 do let line = cr_line k 81 in b := line :: !b done in ( match fcase with | "F1" -> () | "F2.1" -> f2 (bigsize/2-2) | "F2.2" -> f2 (bigsize/2-1) | "F2.3" -> f2 (bigsize/2-0) | "F2.4" -> f2 (bigsize-2) | "F2.5" -> f2 (bigsize-1) | "F2.6" -> f2 (bigsize-0) | "F3.1" -> f3 1 (bigsize/2-3) | "F3.2" -> f3 2 (bigsize/2-3) | "F3.3" -> f3 3 (bigsize/2-3) | "F3.4" -> f3 11 (bigsize/2-3) | "F3.5" -> f3 12 (bigsize/2-3) | "F3.6" -> f3 13 (bigsize/2-3) | "F4" -> f4 bigsize | _ -> failwith "unknown fcase" ); let lines = List.rev !b in match format with | `Vs -> vs_write_file 1 fs (name ^ ".vs") lines | `Vs2 -> vs_write_file 2 fs (name ^ ".vs2") lines | _ -> let s = String.concat "" lines in write_file fs name s let request_length fs name rcase = let eof = fs # size [] name in let bsize = fs # blocksize !testdir in let bsizeL = Int64.of_int bsize in let n_blocks = if eof=0L then 0L else Int64.succ (Int64.div (Int64.pred eof) bsizeL) in let bigsize = 2*bsize in let bigsizeL = Int64.of_int bigsize in let n_bigblocks = if eof=0L then 0L else Int64.succ (Int64.div (Int64.pred eof) bigsizeL) in match rcase with | "R1" -> 0L | "R2" -> n_blocks | "R3" -> Int64.mul 2L (Int64.div eof bigsizeL) | "R4" -> max (Int64.mul 2L (Int64.pred (Int64.div eof bigsizeL))) 0L | "R5" -> min 2L (Int64.sub n_bigblocks 2L) | _ -> failwith "unknown rcase" let request_offset fs name rcase = (* at which block we begin to read *) match rcase with | "R1" | "R2" | "R3" | "R4" -> 0L | "R5" -> 2L | _ -> failwith "unknown rcase" let line_offset fcase rcase format = (* the line number of the first read line *) match rcase with | "R1" | "R2" | "R3" | "R4" -> 0 | "R5" -> ( match fcase with | "F1" -> assert false | "F2.1" | "F2.2" -> 3 | "F2.3" -> if format=`Ls then 3 else 2 | "F2.4" | "F2.5" -> 2 | "F2.6" -> if format=`Ls then 2 else 1 | "F3.1" | "F3.2" | "F3.3" -> 4 | "F3.4" | "F3.5" | "F3.6" -> 3 | "F4" -> 0 | _ -> failwith "unknown fcase" ) | _ -> failwith "unknown rcase" let expected_num_lines fcase rcase format bigsize = match rcase with | "R1" -> 0 | "R2" -> ( match fcase with | "F1" -> 0 | "F2.1" | "F2.2" | "F2.3" | "F2.4" | "F2.5" | "F2.6" -> 26 | "F3.1" | "F3.2" | "F3.3" | "F3.4" | "F3.5" | "F3.6" -> 27 | "F4" -> (bigsize-1)/81+1 | _ -> failwith "unknown fcase" ) | "R3" -> ( match fcase with | "F1" -> 0 | "F2.1" | "F2.2" -> 25 | "F2.3" | "F2.4" | "F2.5" | "F2.6" -> 26 | "F3.1" | "F3.2" | "F3.3" | "F3.4" | "F3.5" | "F3.6" -> 26 | "F4" -> (bigsize-1)/81+1 | _ -> failwith "unknown fcase" ) | "R4" -> ( match fcase with | "F1" -> 0 | "F2.1" | "F2.2" -> 23 | "F2.3" -> if format=`Ls then 25 else 24 | "F2.4" -> 25 | "F2.5" -> 25 | "F2.6" -> if format=`Ls then 26 else 25 | "F3.1" | "F3.2" | "F3.3" | "F3.4" | "F3.5" | "F3.6" -> 24 | "F4" -> 0 | _ -> failwith "unknown fcase" ) | "R5" -> ( match fcase with | "F1" -> assert false | "F2.1" | "F2.2" | "F2.3" -> 2 | "F2.4" | "F2.5" | "F2.6" -> 1 | "F3.4" -> 3 | "F3.5" -> if format=`Ls then 3 else 2 | "F3.1" | "F3.2" | "F3.3" | "F3.6" -> 2 | "F4" -> 0 | _ -> failwith "unknown fcase" ) | _ -> failwith "unknown rcase" let run_test rd_factory strip_lf format fs mr_buf_size fcase rcase suffix = printf "Test: %s + %s for buf_size=%d\n%!" rcase fcase mr_buf_size; try let name = name_fcase fcase ^ suffix in let bsize = fs # blocksize !testdir in let bigsize = get_bigsize bsize format in let rc = create_rc (2*bsize) mr_buf_size in let reqlen = request_length fs name rcase in let reqoff = request_offset fs name rcase in let lineoff = line_offset fcase rcase format in printf " reqoff=%Ld reqlen=%Ld lineoff=%d\n%!" reqoff reqlen lineoff; let shm_mng = new Plasma_shm.null_shm_manager() in let rr = rd_factory # read_file [] fs shm_mng rc name reqoff reqlen in ( try while true do let k = rr#pos_in in let line = rr#input_record () in if strip_lf then ( let l = String.length line in if l > 0 then ( if line.[l-1] <> '\n' then failwith "Line does not end with LF"; check_line (lineoff+k) (String.sub line 0 (l-1)) ) ) else check_line (lineoff+k) line done with | End_of_file -> () ); rr # close_in(); let n = expected_num_lines fcase rcase format bigsize in if n <> rr#pos_in then failwith (sprintf "bad number of lines - expected: %d - actual: %d" n rr#pos_in); printf " test passed\n%!" with | Failure msg -> printf " TEST FAILED: %s\n%!" msg; incr problems let run_test_fd_1 fs cluster mr_buf_size fcase rcase name = let esys = Plasma_client.event_system cluster in try let bsize = fs # blocksize !testdir in let bigsize = get_bigsize bsize `Ls in let rc = ( object method bigblock_size = 2*bsize method mr_buffer_size = mr_buf_size method mr_buffer_size_tight = mr_buf_size end ) in let reqlen = request_length fs name rcase in let reqoff = request_offset fs name rcase in let lineoff = line_offset fcase rcase `Ls in printf " reqoff=%Ld reqlen=%Ld lineoff=%d\n%!" reqoff reqlen lineoff; let shm_mng = new Plasma_shm.null_shm_manager() in let rr = (Mapred_io.line_structured_format()) # read_file [] fs shm_mng rc name reqoff reqlen in let (fd_rd, fd_wr) = Unix.pipe() in Unix.set_nonblock fd_rd; Unix.set_nonblock fd_wr; let fd_wr_closed = ref false in let d1 = `Polldescr(`Read_write, fd_rd, esys) in let d2 = `Buffer_in(Uq_io.create_in_buffer d1) in let pump_e = rr # to_fd_e fd_wr esys ++ (fun () -> Unix.close fd_wr; fd_wr_closed := true; eps_e (`Done ()) esys) in let rec read_e k = (Uq_io.input_line_e d2 >> Uq_io.eof_as_none) ++ (fun line_opt -> match line_opt with | None -> (* EOF *) eps_e (`Done k) esys | Some line -> check_line (lineoff + k) line; read_e (k + 1) ) in let e = Uq_engines.sync_engine pump_e (read_e 0) ++ (fun ((), k_end) -> let n = expected_num_lines fcase rcase `Ls bigsize in if n <> k_end then failwith (sprintf "bad number of lines - expected: %d - actual: %d" n k_end); eps_e (`Done ()) esys ) in Unixqueue.run esys; ( match e#state with | `Working _ -> assert false | `Aborted -> assert false | `Error err -> raise err | `Done () -> () ); Unix.close fd_rd; if not !fd_wr_closed then Unix.close fd_wr; rr # close_in(); printf " test passed\n%!" with | Failure msg -> printf " TEST FAILED: %s\n%!" msg let run_test_fd fs mr_buf_size fcase rcase = printf "FD Test: %s + %s for buf_size=%d\n%!" rcase fcase mr_buf_size; let esys = Unixqueue.create_unix_event_system() in let name = name_fcase fcase in match fs # open_cluster name esys with | None -> printf " not PlasmaFS - skipping test\n%!" | Some cluster -> run_test_fd_1 fs cluster mr_buf_size fcase rcase name let run_ls fs = printf "*** LINE BY LINE ***\n%!"; let bsize = fs # blocksize !testdir in let factory = Mapred_io.line_structured_format() in List.iter (fun bufsize -> List.iter (fun rcase -> List.iter (fun fcase -> if rcase<>"R5" || fcase <> "F1" then ( run_test factory false `Ls fs bufsize fcase rcase ""; run_test_fd fs bufsize fcase rcase; ) ) f_cases ) r_cases ) [ bsize; 2*bsize; 3*bsize; 8*bsize ] let run_fs fs = printf "*** FIXED RECORD SIZE ***\n%!"; let bsize = fs # blocksize !testdir in let bigsize = 2*bsize in List.iter (fun bufsize -> List.iter (fun rcase -> (* F1 *) if rcase<>"R5" then ( let f1_factory = Mapred_io.fixed_size_format 80 in run_test f1_factory true `Fs fs bufsize "F1" rcase ""; ); (* F2 *) List.iter (fun (fcase,size) -> let factory = Mapred_io.fixed_size_format size in run_test factory true `Fs fs bufsize fcase rcase ""; ) [ "F2.1", (bigsize/2-2); "F2.2", (bigsize/2-1); "F2.3", (bigsize/2); "F2.4", (bigsize-2); "F2.5", (bigsize-1); "F2.6", bigsize; ]; ) r_cases ) [ bsize; 2*bsize; 3*bsize; 8*bsize ] let run_vs fs = printf "*** VARIABLE RECORD SIZE ***\n%!"; let bsize = fs # blocksize !testdir in let factory = Mapred_io.var_size_format () in List.iter (fun bufsize -> List.iter (fun rcase -> List.iter (fun fcase -> if rcase<>"R5" || fcase <> "F1" then ( run_test factory true `Vs fs bufsize fcase rcase ".vs" ) ) f_cases ) r_cases ) [ bsize; 2*bsize; 3*bsize; 8*bsize ] let run_vs2 fs = printf "*** VARIABLE RECORD SIZE (DOUBLE CHUNK LENGTH) ***\n%!"; let bsize = fs # blocksize !testdir in let factory = Mapred_io.var_size_format () in List.iter (fun bufsize -> List.iter (fun rcase -> List.iter (fun fcase -> if rcase<>"R5" || fcase <> "F1" then ( run_test factory true `Vs2 fs bufsize fcase rcase ".vs2" ) ) f_cases ) r_cases ) [ bsize; 2*bsize; 3*bsize; 8*bsize ] let run ?clustername ?nn_nodes ~disable_cluster ~en_ls ~en_fs ~en_vs ~en_vs2 () = let client_config = if disable_cluster then None else Some (Plasma_client_config.get_config ?clustername ?nn_nodes () ) in let fs = Mapred_fs.standard_fs_cc ?client_config ~configure_cluster:(fun cluster -> Plasma_client.configure_buffer cluster 100; Plasma_client.configure_auth_daemon cluster; ) () in printf "Creating files...\n%!"; (* Create files *) List.iter (fun fcase -> if en_ls || en_fs then create_fcase fs fcase `Ls; if en_vs then create_fcase fs fcase `Vs; if en_vs2 then create_fcase fs fcase `Vs2; ) f_cases; if en_ls then run_ls fs; if en_fs then run_fs fs; if en_vs then run_vs fs; if en_vs2 then run_vs2 fs; printf "End of test sequence.\n%!"; if !problems = 0 then printf "No errors.\n%!" else printf "%d errors!\n%!" !problems let main () = let clustername = ref None in let nn_nodes = ref [] in let debug = ref false in let en_ls = ref false in let en_fs = ref false in let en_vs = ref false in let en_vs2 = ref false in let disable_cluster = ref false in Arg.parse [ "-namenode", Arg.String (fun s -> nn_nodes := s :: !nn_nodes), "<host>:<port> Access this namenode"; "-cluster", Arg.String (fun s -> clustername := Some s), "<name> name of the cluster"; "-disable-cluster", Arg.Set disable_cluster, " Disable PlasmaFS entirely"; "-testdir", Arg.Set_string testdir, "<dir> in which directory to run the tests"; "-ls", Arg.Set en_ls, " run the tests with line-structured files"; "-fs", Arg.Set en_fs, " run the tests with fixed-size record"; "-vs", Arg.Set en_vs, " run the tests with variable-size record"; "-vs2", Arg.Set en_vs2, " run the tests with variable-size record (double chunk len)"; "-debug", Arg.Set debug, " run with debugging enabled"; ] (fun s -> raise(Arg.Bad("Unexpected arg: " ^ s))) "usage: mr_io <options>"; Netlog.current_logger := Netlog.channel_logger stderr (if !debug then `Debug else `Info); Plasma_util.debug := !debug; if not !en_ls && not !en_fs && not !en_vs && not !en_vs2 then ( en_ls := true; en_fs := true; en_vs := true; en_vs2 := true; ); Printexc.record_backtrace true; Mapred_io.more_logging := true; try run ?clustername:!clustername ?nn_nodes:(if !nn_nodes = [] then None else Some !nn_nodes) ~disable_cluster:!disable_cluster ~en_ls:!en_ls ~en_fs:!en_fs ~en_vs:!en_vs ~en_vs2:!en_vs2 () with | error -> let bt = Printexc.get_backtrace() in eprintf "Exception %s\n%!" (Netexn.to_string error); eprintf "Backtrace: %s\n%!" bt; eprintf "Before, there were %d errors.\n%!" !problems let () = main()