(* Invoke the sorter directly *)
open Printf
let sort infile infile_eof outfile conf me =
let mjc =
Mapred_job_config.mapred_job_config
(Mapred_job_config.test_job_config()) in
let rc =
( object
method bigblock_size = mjc#bigblock_size
method mr_buffer_size = conf # mr_buffer_size
method mr_buffer_size_tight = conf # mr_buffer_size_tight
end
) in
let sorter =
(* Mapred_sorters.null_sorter *)
Mapred_sorters.generic_sorter
~hash:(Mapred_sorters.Hex_asc.hash)
~cmp:(Mapred_sorters.Hex_asc.cmp)
me mjc 1.0 in
sorter#set_key_extraction Mapred_split.tab_split_key_range;
let fs = me#filesystem in
let bsize = fs#blocksize infile in
let bsizeL = Int64.of_int bsize in
let shm_mng = new Plasma_shm.null_shm_manager() in
let n_blocks =
if infile_eof = 0L then
0L
else
Int64.succ (Int64.div (Int64.pred infile_eof) bsizeL) in
let reader =
(Mapred_io.line_structured_format())
# read_file [] fs shm_mng rc infile 0L n_blocks in
printf "Reading...\n%!";
let q = Queue.create() in
( try
while true do
reader # input_records q;
sorter # put_records q;
done
with
| End_of_file -> ()
);
reader # close_in();
Mapred_io.create_file ~repl:1 fs outfile;
let writer =
(Mapred_io.line_structured_format())
# write_file fs shm_mng rc outfile in
printf "Sorting+writing...\n%!";
sorter # sort writer;
printf "Sort time: %.2f\n%!" sorter#sort_time;
writer # close_out();
sorter # close()
let () =
let infile = ref "/sort_in" in
let outfile = ref "/sort_out" in
let clustername = ref None in
let nn_nodes = ref [] in
let disable_cluster = ref false in
let debug = ref false in
Arg.parse
[ "-namenode", Arg.String (fun s -> nn_nodes := s :: !nn_nodes),
"<host>:<port> Access this namenode";
"-cluster", Arg.String (fun s -> clustername := Some s),
"<name> name of the cluster";
"-disable-cluster", Arg.Set disable_cluster,
" Disable PlasmaFS entirely";
"-input", Arg.Set_string infile,
"<file> PlasmaFS input file";
"-output", Arg.Set_string outfile,
"<file> PlasmaFS output file";
"-debug", Arg.Set debug,
" run with debugging enabled"
]
(fun s -> raise(Arg.Bad("Unexpected arg: " ^ s)))
"usage: mr_io <options>";
printf "Input: %s\n" !infile;
printf "Output: %s\n%!" !outfile;
Netlog.current_logger :=
Netlog.channel_logger stderr (if !debug then `Debug else `Info);
Plasma_util.debug := !debug;
let client_config =
if !disable_cluster then
None
else
Some
(Plasma_client_config.get_config
?clustername:!clustername
~nn_nodes:!nn_nodes ()
) in
let fs =
Mapred_fs.standard_fs_cc ?client_config () in
let shm_mng = new Plasma_shm.null_shm_manager() in
try
let length = fs # size [] !infile in
let sort_size = Int64.to_int length in
let conf =
Mapred_config.test_config
~clustername:"foo" ~nodes:[] ~sort_size () in
let me =
( object
method filesystem = fs
method shm_manager = shm_mng
method config = conf
method config_file = assert false
method auth_ticket = ""
method command_name = "foo"
method accumulate _ = ()
end
) in
sort !infile length !outfile conf me
with
| error ->
eprintf "Exception %s\n%!" (Netexn.to_string error)