(* $Id: ps_random.ml 471 2011-10-13 12:14:19Z gerd $ *)
(* Random reads from a file using multi_read_e *)
open Printf
open Plasma_rpcapi_aux
open Uq_engines.Operators
let run c filename n =
let esys = Plasma_client.event_system c in
let trans = Plasma_client.start c in
let inode = Plasma_client.lookup trans filename false in
let ii = Plasma_client.get_inodeinfo trans inode in
let blocksize = Plasma_client.blocksize c in
let blocks = Int64.to_int(Int64.div ii.eof (Int64.of_int blocksize)) in
let buf = String.create blocksize in
let get_task k =
if k < n then
let i = Random.int blocks in
let task = ( ( Int64.of_int (i*blocksize),
(`String buf),
0,
blocksize ),
(fun (n,_,_) ->
printf "Got %d bytes from block %d\n%!" n i
)
) in
Some (eps_e (`Done (Some task)) esys)
else
None in
Plasma_client.sync
(Plasma_client.multi_read_e c inode) (Stream.from get_task);
printf "Done\n%!"
let () =
let cluster = ref None in
let namenodes = ref [] in
let filename = ref None in
let n = ref 1024 in
let args =
[ "-cluster", Arg.String (fun s -> cluster := Some s),
"<name> Set the cluster name";
"-namenode", Arg.String (fun n -> namenodes := n :: !namenodes),
"<host>:<port> Also use this namenode (can be given several times)";
"-n", Arg.Set_int n,
"<n> Number of blocks to read randomly (default: 1024)"
] in
Arg.parse
args
(fun s -> filename := Some s)
"usage: ps_random filename";
let esys = Unixqueue.create_unix_event_system() in
let nn_nodes =
if !namenodes = [] then None else Some !namenodes in
let cfg = Plasma_client_config.get_config
?clustername:!cluster
?nn_nodes () in
let c = Plasma_client.open_cluster_cc cfg esys in
Plasma_client.configure_auth_daemon c;
(* Intentionally we do not configure buffers *)
match !filename with
| None -> failwith "Missing filename"
| Some fn ->
run c fn !n