(* Invoke the sorter directly *) open Printf let sort infile infile_eof outfile conf me = let mjc = Mapred_job_config.mapred_job_config (Mapred_job_config.test_job_config()) in let rc = ( object method bigblock_size = mjc#bigblock_size method mr_buffer_size = conf # mr_buffer_size method mr_buffer_size_tight = conf # mr_buffer_size_tight end ) in let sorter = (* Mapred_sorters.null_sorter *) Mapred_sorters.generic_sorter ~hash:(Mapred_sorters.Hex_asc.hash) ~cmp:(Mapred_sorters.Hex_asc.cmp) me mjc 1.0 in sorter#set_key_extraction Mapred_split.tab_split_key_range; let fs = me#filesystem in let bsize = fs#blocksize infile in let bsizeL = Int64.of_int bsize in let shm_mng = new Plasma_shm.null_shm_manager() in let n_blocks = if infile_eof = 0L then 0L else Int64.succ (Int64.div (Int64.pred infile_eof) bsizeL) in let reader = (Mapred_io.line_structured_format()) # read_file [] fs shm_mng rc infile 0L n_blocks in printf "Reading...\n%!"; let q = Queue.create() in ( try while true do reader # input_records q; sorter # put_records q; done with | End_of_file -> () ); reader # close_in(); Mapred_io.create_file ~repl:1 fs outfile; let writer = (Mapred_io.line_structured_format()) # write_file fs shm_mng rc outfile in printf "Sorting+writing...\n%!"; sorter # sort writer; printf "Sort time: %.2f\n%!" sorter#sort_time; writer # close_out(); sorter # close() let () = let infile = ref "/sort_in" in let outfile = ref "/sort_out" in let clustername = ref None in let nn_nodes = ref [] in let disable_cluster = ref false in let debug = ref false in Arg.parse [ "-namenode", Arg.String (fun s -> nn_nodes := s :: !nn_nodes), "<host>:<port> Access this namenode"; "-cluster", Arg.String (fun s -> clustername := Some s), "<name> name of the cluster"; "-disable-cluster", Arg.Set disable_cluster, " Disable PlasmaFS entirely"; "-input", Arg.Set_string infile, "<file> PlasmaFS input file"; "-output", Arg.Set_string outfile, "<file> PlasmaFS output file"; "-debug", Arg.Set debug, " run with debugging enabled" ] (fun s -> raise(Arg.Bad("Unexpected arg: " ^ s))) "usage: mr_io <options>"; printf "Input: %s\n" !infile; printf "Output: %s\n%!" !outfile; Netlog.current_logger := Netlog.channel_logger stderr (if !debug then `Debug else `Info); Plasma_util.debug := !debug; let client_config = if !disable_cluster then None else Some (Plasma_client_config.get_config ?clustername:!clustername ~nn_nodes:!nn_nodes () ) in let fs = Mapred_fs.standard_fs_cc ?client_config () in let shm_mng = new Plasma_shm.null_shm_manager() in try let length = fs # size [] !infile in let sort_size = Int64.to_int length in let conf = Mapred_config.test_config ~clustername:"foo" ~nodes:[] ~sort_size () in let me = ( object method filesystem = fs method shm_manager = shm_mng method config = conf method config_file = assert false method auth_ticket = "" method command_name = "foo" method accumulate _ = () end ) in sort !infile length !outfile conf me with | error -> eprintf "Exception %s\n%!" (Netexn.to_string error)