(* $Id: mr_io.ml 576 2012-01-15 02:02:18Z gerd $ *)
(* Testing Mapred_io.read_file *)
(* We create test files:
F1: empty file
F2: fill the file with 26 lines. Each line has a certain length:
F2.1: bigblock/2 - 2
F2.2: bigblock/2 - 1
F2.3: bigblock/2
F2.4: bigblock - 2
F2.5: bigblock - 1
F2.6: bigblock
(line length includes here the final LF)
For var-sized format: bigblock = bigblock size - chunk headers
F3: fill the file with a first line of specific length, followed by
26 lines with a different length:
F3.1: 1 and bigblock/2 - 3
F3.2: 2 and bigblock/2 - 3
F3.3: 3 and bigblock/2 - 3
F3.4: 11 and bigblock/2 - 3
F3.5: 12 and bigblock/2 - 3
F3.6: 13 and bigblock/2 - 3
F4: fill the files with short records only (81 bytes). The number of
lines is (bigblock-1)/81+1
Requested region:
R1: request nothing
R2: request complete file
R3: request all whole bigblocks (but not the trailing partial bigblock)
R4: request n-1 whole bigblocks
R5: request bigblock #1 (i.e. the second bigblock)
We always use bigblock=2*block.
Tests are repeated with various buffer sizes (which should not affect
the outcome).
Expected outcome: The test checks that a certain number of lines N are
read, and that these lines contain the right contents.
R1 + all F: N=0
R2 + F1: N=0
R2 + F2: N=26
R2 + F3: N=27
R3 + F1: N=0 (actually nonsense - no whole bigblocks)
R3 + F2.1, F2.2: N=25
R3 + F2.3, F2.4, F2.5, F2.6: N=26
R3 + F3: N=26
R4 + F1: N=0 (actually nonsense - no whole bigblocks)
R4 + F2.1, F2.2: N=23
R4 + F2.3: N=25.
However, for fixed_size and var_size: N=24
R4 + F2.4, F2.5: N=25
R4 + F2.6: N=26
However, for fixed_size and var_size: N=25
R4 + F3: N=24
R5 + F1: invalid
R5 + F2.1, F2.2: N=2
R5 + F2.3: N=3
R5 + F2.4, 2.5, 2.6: N=1
R5 + F3: N=2
*)
open Printf
open Plasma_rpcapi_aux
open Uq_engines.Operators
let problems = ref 0
let testdir = ref "/"
let f_cases =
[ "F1";
"F2.1"; "F2.2"; "F2.3"; "F2.4"; "F2.5"; "F2.6";
"F3.1"; "F3.2"; "F3.3"; "F3.4"; "F3.5"; "F3.6";
"F4"
]
let r_cases =
[ "R1"; "R2"; "R3"; "R4"; "R5" ]
let create_rc bbsize mr_buf_size =
( object
method bigblock_size = bbsize
method mr_buffer_size = mr_buf_size
method mr_buffer_size_tight = mr_buf_size
end
)
let create_file fs name =
( try fs # remove [] name;
with Unix.Unix_error(Unix.ENOENT,_,_) -> ()
);
Mapred_io.create_file fs name
let write_file fs name s =
create_file fs name;
let ch = fs # write [] name in
ch # output_string s;
ch # close_out()
let vs_write_file chunklen_factor fs name lines =
create_file fs name;
let bsize = fs # blocksize !testdir in
let rc = create_rc (2*bsize) (64 * bsize) in
let shm_mng = new Plasma_shm.null_shm_manager() in
let wr = new Mapred_io.vs_write_file
~chunklen:(chunklen_factor * bsize) fs shm_mng rc name in
List.iter
(fun line -> wr # output_record line)
lines;
wr # close_out()
let create_line k l =
(* line number k of the file (k>=0). l is the length including LF *)
if l = 0 then
""
else
String.make (l-1) (Char.chr (64+(k mod 62))) ^ "\n"
let check_line k line =
let c = Char.chr (64+(k mod 62)) in
for j = 0 to String.length line - 1 do
if line.[j] <> c then (
failwith (sprintf "bad line: expected #%d found #%d"
(k mod 62)
((Char.code line.[j] - 64) mod 62));
)
done
let name_fcase fcase =
!testdir ^ "/mr_io_" ^ fcase
let get_bigsize bsize format =
(* Return the _inner_ bigsize, i.e. how many bytes can be stored (payload) *)
match format with
| `Vs -> 2*(bsize - 32)
| `Vs2 -> 2 * bsize - 32
| _ -> 2 * bsize
(* vs: we subtract the length of the chunk header (32 bytes) per block.
The chunk length is bsize (see below)
*)
let create_fcase fs fcase format =
let name = name_fcase fcase in
let b = ref [] in
let bsize = fs # blocksize !testdir in
let vs_flag = (format = `Vs || format = `Vs2) in
let bigsize = get_bigsize bsize format in
let cr_line k n =
if vs_flag then (
if n <= 255 then
create_line k (n-1)
else
if n >= 264 then
create_line k (n-9)
else
failwith "cr_line: cannot create this length"
)
else create_line k n in
let f2 n =
for k = 0 to 25 do
let line = cr_line k n in
b := line :: !b
done in
let f3 n1 n2 =
let line = cr_line 0 n1 in
b := line :: !b;
for k = 1 to 26 do
let line = cr_line k n2 in
b := line :: !b
done in
let f4 size =
let n = (size-1)/81 + 1 in
for k = 0 to n-1 do
let line = cr_line k 81 in
b := line :: !b
done in
( match fcase with
| "F1" -> ()
| "F2.1" -> f2 (bigsize/2-2)
| "F2.2" -> f2 (bigsize/2-1)
| "F2.3" -> f2 (bigsize/2-0)
| "F2.4" -> f2 (bigsize-2)
| "F2.5" -> f2 (bigsize-1)
| "F2.6" -> f2 (bigsize-0)
| "F3.1" -> f3 1 (bigsize/2-3)
| "F3.2" -> f3 2 (bigsize/2-3)
| "F3.3" -> f3 3 (bigsize/2-3)
| "F3.4" -> f3 11 (bigsize/2-3)
| "F3.5" -> f3 12 (bigsize/2-3)
| "F3.6" -> f3 13 (bigsize/2-3)
| "F4" -> f4 bigsize
| _ -> failwith "unknown fcase"
);
let lines = List.rev !b in
match format with
| `Vs ->
vs_write_file 1 fs (name ^ ".vs") lines
| `Vs2 ->
vs_write_file 2 fs (name ^ ".vs2") lines
| _ ->
let s = String.concat "" lines in
write_file fs name s
let request_length fs name rcase =
let eof = fs # size [] name in
let bsize = fs # blocksize !testdir in
let bsizeL = Int64.of_int bsize in
let n_blocks =
if eof=0L then 0L else Int64.succ (Int64.div (Int64.pred eof) bsizeL) in
let bigsize = 2*bsize in
let bigsizeL = Int64.of_int bigsize in
let n_bigblocks =
if eof=0L then 0L else Int64.succ (Int64.div (Int64.pred eof) bigsizeL) in
match rcase with
| "R1" -> 0L
| "R2" -> n_blocks
| "R3" -> Int64.mul 2L (Int64.div eof bigsizeL)
| "R4" -> max (Int64.mul 2L (Int64.pred (Int64.div eof bigsizeL))) 0L
| "R5" -> min 2L (Int64.sub n_bigblocks 2L)
| _ -> failwith "unknown rcase"
let request_offset fs name rcase =
(* at which block we begin to read *)
match rcase with
| "R1" | "R2" | "R3" | "R4" -> 0L
| "R5" -> 2L
| _ -> failwith "unknown rcase"
let line_offset fcase rcase format =
(* the line number of the first read line *)
match rcase with
| "R1" | "R2" | "R3" | "R4" -> 0
| "R5" ->
( match fcase with
| "F1" -> assert false
| "F2.1" | "F2.2" -> 3
| "F2.3" -> if format=`Ls then 3 else 2
| "F2.4" | "F2.5" -> 2
| "F2.6" -> if format=`Ls then 2 else 1
| "F3.1" | "F3.2" | "F3.3" -> 4
| "F3.4" | "F3.5" | "F3.6" -> 3
| "F4" -> 0
| _ -> failwith "unknown fcase"
)
| _ -> failwith "unknown rcase"
let expected_num_lines fcase rcase format bigsize =
match rcase with
| "R1" -> 0
| "R2" ->
( match fcase with
| "F1" -> 0
| "F2.1" | "F2.2" | "F2.3" | "F2.4" | "F2.5" | "F2.6" -> 26
| "F3.1" | "F3.2" | "F3.3" | "F3.4" | "F3.5" | "F3.6" -> 27
| "F4" -> (bigsize-1)/81+1
| _ -> failwith "unknown fcase"
)
| "R3" ->
( match fcase with
| "F1" -> 0
| "F2.1" | "F2.2" -> 25
| "F2.3" | "F2.4" | "F2.5" | "F2.6" -> 26
| "F3.1" | "F3.2" | "F3.3" | "F3.4" | "F3.5" | "F3.6" -> 26
| "F4" -> (bigsize-1)/81+1
| _ -> failwith "unknown fcase"
)
| "R4" ->
( match fcase with
| "F1" -> 0
| "F2.1" | "F2.2" -> 23
| "F2.3" -> if format=`Ls then 25 else 24
| "F2.4" -> 25
| "F2.5" -> 25
| "F2.6" -> if format=`Ls then 26 else 25
| "F3.1" | "F3.2" | "F3.3" | "F3.4" | "F3.5" | "F3.6" -> 24
| "F4" -> 0
| _ -> failwith "unknown fcase"
)
| "R5" ->
( match fcase with
| "F1" -> assert false
| "F2.1" | "F2.2" | "F2.3" -> 2
| "F2.4" | "F2.5" | "F2.6" -> 1
| "F3.4" -> 3
| "F3.5" -> if format=`Ls then 3 else 2
| "F3.1" | "F3.2" | "F3.3" | "F3.6" -> 2
| "F4" -> 0
| _ -> failwith "unknown fcase"
)
| _ ->
failwith "unknown rcase"
let run_test rd_factory strip_lf format fs mr_buf_size fcase rcase suffix =
printf "Test: %s + %s for buf_size=%d\n%!" rcase fcase mr_buf_size;
try
let name = name_fcase fcase ^ suffix in
let bsize = fs # blocksize !testdir in
let bigsize = get_bigsize bsize format in
let rc = create_rc (2*bsize) mr_buf_size in
let reqlen = request_length fs name rcase in
let reqoff = request_offset fs name rcase in
let lineoff = line_offset fcase rcase format in
printf " reqoff=%Ld reqlen=%Ld lineoff=%d\n%!" reqoff reqlen lineoff;
let shm_mng = new Plasma_shm.null_shm_manager() in
let rr =
rd_factory # read_file [] fs shm_mng rc name reqoff reqlen in
( try
while true do
let k = rr#pos_in in
let line = rr#input_record () in
if strip_lf then (
let l = String.length line in
if l > 0 then (
if line.[l-1] <> '\n' then
failwith "Line does not end with LF";
check_line (lineoff+k) (String.sub line 0 (l-1))
)
)
else
check_line (lineoff+k) line
done
with
| End_of_file -> ()
);
rr # close_in();
let n = expected_num_lines fcase rcase format bigsize in
if n <> rr#pos_in then
failwith (sprintf "bad number of lines - expected: %d - actual: %d"
n rr#pos_in);
printf " test passed\n%!"
with
| Failure msg ->
printf " TEST FAILED: %s\n%!" msg;
incr problems
let run_test_fd_1 fs cluster mr_buf_size fcase rcase name =
let esys = Plasma_client.event_system cluster in
try
let bsize = fs # blocksize !testdir in
let bigsize = get_bigsize bsize `Ls in
let rc =
( object
method bigblock_size = 2*bsize
method mr_buffer_size = mr_buf_size
method mr_buffer_size_tight = mr_buf_size
end
) in
let reqlen = request_length fs name rcase in
let reqoff = request_offset fs name rcase in
let lineoff = line_offset fcase rcase `Ls in
printf " reqoff=%Ld reqlen=%Ld lineoff=%d\n%!" reqoff reqlen lineoff;
let shm_mng = new Plasma_shm.null_shm_manager() in
let rr =
(Mapred_io.line_structured_format())
# read_file [] fs shm_mng rc name reqoff reqlen in
let (fd_rd, fd_wr) = Unix.pipe() in
Unix.set_nonblock fd_rd;
Unix.set_nonblock fd_wr;
let fd_wr_closed = ref false in
let d1 = `Polldescr(`Read_write, fd_rd, esys) in
let d2 = `Buffer_in(Uq_io.create_in_buffer d1) in
let pump_e =
rr # to_fd_e fd_wr esys
++ (fun () ->
Unix.close fd_wr; fd_wr_closed := true; eps_e (`Done ()) esys) in
let rec read_e k =
(Uq_io.input_line_e d2 >> Uq_io.eof_as_none)
++ (fun line_opt ->
match line_opt with
| None -> (* EOF *)
eps_e (`Done k) esys
| Some line ->
check_line (lineoff + k) line;
read_e (k + 1)
)
in
let e =
Uq_engines.sync_engine
pump_e
(read_e 0)
++ (fun ((), k_end) ->
let n = expected_num_lines fcase rcase `Ls bigsize in
if n <> k_end then
failwith (sprintf
"bad number of lines - expected: %d - actual: %d"
n k_end);
eps_e (`Done ()) esys
) in
Unixqueue.run esys;
( match e#state with
| `Working _ -> assert false
| `Aborted -> assert false
| `Error err -> raise err
| `Done () -> ()
);
Unix.close fd_rd;
if not !fd_wr_closed then Unix.close fd_wr;
rr # close_in();
printf " test passed\n%!"
with
| Failure msg ->
printf " TEST FAILED: %s\n%!" msg
let run_test_fd fs mr_buf_size fcase rcase =
printf "FD Test: %s + %s for buf_size=%d\n%!" rcase fcase mr_buf_size;
let esys = Unixqueue.create_unix_event_system() in
let name = name_fcase fcase in
match fs # open_cluster name esys with
| None ->
printf " not PlasmaFS - skipping test\n%!"
| Some cluster ->
run_test_fd_1 fs cluster mr_buf_size fcase rcase name
let run_ls fs =
printf "*** LINE BY LINE ***\n%!";
let bsize = fs # blocksize !testdir in
let factory = Mapred_io.line_structured_format() in
List.iter
(fun bufsize ->
List.iter
(fun rcase ->
List.iter
(fun fcase ->
if rcase<>"R5" || fcase <> "F1" then (
run_test
factory false `Ls fs bufsize fcase rcase "";
run_test_fd
fs bufsize fcase rcase;
)
)
f_cases
)
r_cases
)
[ bsize; 2*bsize; 3*bsize; 8*bsize ]
let run_fs fs =
printf "*** FIXED RECORD SIZE ***\n%!";
let bsize = fs # blocksize !testdir in
let bigsize = 2*bsize in
List.iter
(fun bufsize ->
List.iter
(fun rcase ->
(* F1 *)
if rcase<>"R5" then (
let f1_factory = Mapred_io.fixed_size_format 80 in
run_test
f1_factory true `Fs fs bufsize "F1" rcase "";
);
(* F2 *)
List.iter
(fun (fcase,size) ->
let factory = Mapred_io.fixed_size_format size in
run_test factory true `Fs fs bufsize fcase rcase "";
)
[ "F2.1", (bigsize/2-2);
"F2.2", (bigsize/2-1);
"F2.3", (bigsize/2);
"F2.4", (bigsize-2);
"F2.5", (bigsize-1);
"F2.6", bigsize;
];
)
r_cases
)
[ bsize; 2*bsize; 3*bsize; 8*bsize ]
let run_vs fs =
printf "*** VARIABLE RECORD SIZE ***\n%!";
let bsize = fs # blocksize !testdir in
let factory = Mapred_io.var_size_format () in
List.iter
(fun bufsize ->
List.iter
(fun rcase ->
List.iter
(fun fcase ->
if rcase<>"R5" || fcase <> "F1" then (
run_test factory true `Vs fs bufsize fcase rcase ".vs"
)
)
f_cases
)
r_cases
)
[ bsize; 2*bsize; 3*bsize; 8*bsize ]
let run_vs2 fs =
printf "*** VARIABLE RECORD SIZE (DOUBLE CHUNK LENGTH) ***\n%!";
let bsize = fs # blocksize !testdir in
let factory = Mapred_io.var_size_format () in
List.iter
(fun bufsize ->
List.iter
(fun rcase ->
List.iter
(fun fcase ->
if rcase<>"R5" || fcase <> "F1" then (
run_test factory true `Vs2 fs bufsize fcase rcase ".vs2"
)
)
f_cases
)
r_cases
)
[ bsize; 2*bsize; 3*bsize; 8*bsize ]
let run ?clustername ?nn_nodes ~disable_cluster ~en_ls ~en_fs ~en_vs ~en_vs2
() =
let client_config =
if disable_cluster then
None
else
Some
(Plasma_client_config.get_config
?clustername
?nn_nodes ()
) in
let fs =
Mapred_fs.standard_fs_cc
?client_config
~configure_cluster:(fun cluster ->
Plasma_client.configure_buffer cluster 100;
Plasma_client.configure_auth_daemon cluster;
)
() in
printf "Creating files...\n%!";
(* Create files *)
List.iter
(fun fcase ->
if en_ls || en_fs then
create_fcase fs fcase `Ls;
if en_vs then
create_fcase fs fcase `Vs;
if en_vs2 then
create_fcase fs fcase `Vs2;
)
f_cases;
if en_ls then
run_ls fs;
if en_fs then
run_fs fs;
if en_vs then
run_vs fs;
if en_vs2 then
run_vs2 fs;
printf "End of test sequence.\n%!";
if !problems = 0 then
printf "No errors.\n%!"
else
printf "%d errors!\n%!" !problems
let main () =
let clustername = ref None in
let nn_nodes = ref [] in
let debug = ref false in
let en_ls = ref false in
let en_fs = ref false in
let en_vs = ref false in
let en_vs2 = ref false in
let disable_cluster = ref false in
Arg.parse
[ "-namenode", Arg.String (fun s -> nn_nodes := s :: !nn_nodes),
"<host>:<port> Access this namenode";
"-cluster", Arg.String (fun s -> clustername := Some s),
"<name> name of the cluster";
"-disable-cluster", Arg.Set disable_cluster,
" Disable PlasmaFS entirely";
"-testdir", Arg.Set_string testdir,
"<dir> in which directory to run the tests";
"-ls", Arg.Set en_ls,
" run the tests with line-structured files";
"-fs", Arg.Set en_fs,
" run the tests with fixed-size record";
"-vs", Arg.Set en_vs,
" run the tests with variable-size record";
"-vs2", Arg.Set en_vs2,
" run the tests with variable-size record (double chunk len)";
"-debug", Arg.Set debug,
" run with debugging enabled";
]
(fun s -> raise(Arg.Bad("Unexpected arg: " ^ s)))
"usage: mr_io <options>";
Netlog.current_logger :=
Netlog.channel_logger stderr (if !debug then `Debug else `Info);
Plasma_util.debug := !debug;
if not !en_ls && not !en_fs && not !en_vs && not !en_vs2 then (
en_ls := true;
en_fs := true;
en_vs := true;
en_vs2 := true;
);
Printexc.record_backtrace true;
Mapred_io.more_logging := true;
try
run
?clustername:!clustername
?nn_nodes:(if !nn_nodes = [] then None else Some !nn_nodes)
~disable_cluster:!disable_cluster
~en_ls:!en_ls
~en_fs:!en_fs
~en_vs:!en_vs
~en_vs2:!en_vs2
()
with
| error ->
let bt = Printexc.get_backtrace() in
eprintf "Exception %s\n%!" (Netexn.to_string error);
eprintf "Backtrace: %s\n%!" bt;
eprintf "Before, there were %d errors.\n%!" !problems
let () = main()