(* $Id: mr_io.ml 274 2010-10-24 15:04:09Z gerd $ *) (* Testing Mapred_io.read_file *) (* We create test files: F1: empty file F2: fill the file with 26 lines. Each line has a certain length: F2.1: bigblock/2 - 2 F2.2: bigblock/2 - 1 F2.3: bigblock/2 F2.4: bigblock - 2 F2.5: bigblock - 1 F2.6: bigblock (line length includes here the final LF) F3: fill the file with a first line of specific length, followed by 26 lines with a different length: F3.1: 1 and bigblock/2 - 3 F3.2: 2 and bigblock/2 - 3 F3.3: 3 and bigblock/2 - 3 F3.4: 11 and bigblock/2 - 3 F3.5: 12 and bigblock/2 - 3 F3.6: 13 and bigblock/2 - 3 Requested region: R1: request nothing R2: request complete file R3: request all whole bigblocks (but not the trailing partial bigblock) R4: request n-1 whole bigblocks We always use bigblock=2*block. Tests are repeated with various buffer sizes (which should not affect the outcome). Expected outcome: The test checks that a certain number of lines N are read, and that these lines contain the right contents. R1 + all F: N=0 R2 + F1: N=0 R2 + F2: N=26 R2 + F3: N=27 R3 + F1: N=0 (actually nonsense - no whole bigblocks) R3 + F2.1, F2.2: N=25 R3 + F2.3, F2.4, F2.5, F2.6: N=26 R3 + F3: N=26 R4 + F1: N=0 (actually nonsense - no whole bigblocks) R4 + F2.1, F2.2: N=23 R4 + F2.3: N=25 R4 + F2.4, F2.5: N=25 R4 + F2.6: N=26 R4 + F3: N=24 *) open Printf open Plasma_rpcapi_aux let f_cases = [ "F1"; "F2.1"; "F2.2"; "F2.3"; "F2.4"; "F2.5"; "F2.6"; "F3.1"; "F3.2"; "F3.3"; "F3.4"; "F3.5"; "F3.6"; ] let r_cases = [ "R1"; "R2"; "R3"; "R4" ] let write_file cluster name s = let trans = Plasma_client.start cluster in ( try Plasma_client.unlink trans name with Plasma_client.Plasma_error `enoent -> () ); let ii = Plasma_client.regular_ii cluster 0o666 in let inode = Plasma_client.create_file trans name ii in let () = Plasma_client.commit trans in let rec wrloop p = let l = String.length s - p in if l > 0 then ( let n = Plasma_client.write cluster inode (Int64.of_int p) (`String s) p l in wrloop (p+n) ) in wrloop 0; Plasma_client.flush cluster inode 0L 0L let create_line k l = (* line number k of the file (k>=0). l is the length including LF *) assert(l > 0); String.make (l-1) (Char.chr (64+k)) ^ "\n" let check_line k line = let c = Char.chr (64+k) in for j = 0 to String.length line - 1 do if line.[j] <> c then failwith "bad line" done let name_fcase fcase = "/mr_io_" ^ fcase let create_fcase cluster fcase = let name = name_fcase fcase in let b = Buffer.create 500 in let bsize = Plasma_client.blocksize cluster in let bigsize = 2*bsize in let f2 n = for k = 0 to 25 do let line = create_line k n in Buffer.add_string b line done in let f3 n1 n2 = let line = create_line 0 n1 in Buffer.add_string b line; for k = 1 to 26 do let line = create_line k n2 in Buffer.add_string b line done in ( match fcase with | "F1" -> () | "F2.1" -> f2 (bigsize/2-2) | "F2.2" -> f2 (bigsize/2-1) | "F2.3" -> f2 (bigsize/2-0) | "F2.4" -> f2 (bigsize-2) | "F2.5" -> f2 (bigsize-1) | "F2.6" -> f2 (bigsize-0) | "F3.1" -> f3 1 (bigsize/2-3) | "F3.2" -> f3 2 (bigsize/2-3) | "F3.3" -> f3 3 (bigsize/2-3) | "F3.4" -> f3 11 (bigsize/2-3) | "F3.5" -> f3 12 (bigsize/2-3) | "F3.6" -> f3 13 (bigsize/2-3) | _ -> failwith "unknown fcase" ); let s = Buffer.contents b in write_file cluster name s let request_length cluster name rcase = let trans = Plasma_client.start cluster in let ii = Plasma_client.get_inodeinfo trans (Plasma_client.lookup trans name false) in let () = Plasma_client.abort trans in let bsize = Plasma_client.blocksize cluster in let bigsize = 2*bsize in match rcase with | "R1" -> 0L | "R2" -> ii.blocklimit | "R3" -> Int64.mul 2L (Int64.div ii.eof (Int64.of_int bigsize)) | "R4" -> max (Int64.mul 2L (Int64.pred (Int64.div ii.eof (Int64.of_int bigsize)))) 0L | _ -> failwith "unknown rcase" let expected_num_lines fcase rcase = match rcase with | "R1" -> ( match fcase with | "F1" -> 0 | _ -> 1 ) | "R2" -> ( match fcase with | "F1" -> 0 | "F2.1" | "F2.2" | "F2.3" | "F2.4" | "F2.5" | "F2.6" -> 26 | "F3.1" | "F3.2" | "F3.3" | "F3.4" | "F3.5" | "F3.6" -> 27 | _ -> failwith "unknown fcase" ) | "R3" -> ( match fcase with | "F1" -> 0 | "F2.1" | "F2.2" -> 25 | "F2.3" | "F2.4" | "F2.5" | "F2.6" -> 26 | "F3.1" | "F3.2" | "F3.3" | "F3.4" | "F3.5" | "F3.6" -> 26 | _ -> failwith "unknown fcase" ) | "R4" -> ( match fcase with | "F1" -> 0 | "F2.1" | "F2.2" -> 23 | "F2.3" -> 25 | "F2.4" | "F2.5" -> 25 | "F2.6" -> 26 | "F3.1" | "F3.2" | "F3.3" | "F3.4" | "F3.5" | "F3.6" -> 24 | _ -> failwith "unknown fcase" ) | _ -> failwith "unknown rcase" let run_test cluster mr_buf_size fcase rcase = printf "Test: %s + %s for buf_size=%d\n%!" rcase fcase mr_buf_size; try let name = name_fcase fcase in let bsize = Plasma_client.blocksize cluster in let rc = ( object method bigblock_size = 2*bsize method mr_buffer_size = mr_buf_size method mr_buffer_size_tight = mr_buf_size end ) in let reqlen = request_length cluster name rcase in printf " reqlen=%Ld\n%!" reqlen; let rr = Mapred_io.read_file cluster rc name 0L reqlen in ( try while true do let k = rr#pos_in in let line = rr#input_record () in check_line k line done with | End_of_file -> () ); let n = expected_num_lines fcase rcase in if n <> rr#pos_in then failwith (sprintf "bad number of lines - expected: %d - actual: %d" n rr#pos_in); printf " test passed\n%!" with | Failure msg -> printf " TEST FAILED: %s\n%!" msg let run ?clustername ?nn_nodes() = let cconf = Plasma_client_config.get_config ?clustername ?nn_nodes() in let esys = Unixqueue.create_unix_event_system() in let cluster = Plasma_client.open_cluster_cc cconf esys in Plasma_client.configure_buffer cluster 100; printf "Creating files...\n%!"; (* Create files *) List.iter (fun fcase -> create_fcase cluster fcase) f_cases; let bsize = Plasma_client.blocksize cluster in List.iter (fun bufsize -> List.iter (fun rcase -> List.iter (fun fcase -> run_test cluster bufsize fcase rcase ) f_cases ) r_cases ) [ 2*bsize; 3*bsize; 8*bsize ]; printf "End of test sequence.\n%!" let main () = let clustername = ref None in let nn_nodes = ref [] in let debug = ref false in Arg.parse [ "-namenode", Arg.String (fun s -> nn_nodes := s :: !nn_nodes), "<host>:<port> Access this namenode"; "-cluster", Arg.String (fun s -> clustername := Some s), "<name> name of the cluster"; "-debug", Arg.Set debug, " run with debugging enabled" ] (fun s -> raise(Arg.Bad("Unexpected arg: " ^ s))) "usage: mr_io <options>"; Netlog.current_logger := Netlog.channel_logger stderr (if !debug then `Debug else `Info); run ?clustername:!clustername ?nn_nodes:(if !nn_nodes = [] then None else Some !nn_nodes) () let () = main()