(* $Id: qclient.ml 286 2006-04-29 16:21:42Z gerd $ * ---------------------------------------------------------------------- * *) open Rtypes open Rpc open Rpc_client open Printf module A = Queues_aux ;; module C1 = Queues_clnt.QUEUESPROG.QUEUESVERS1 ;; type error = [ `not_found | `picked | `exists | `not_picked | `chunks_too_large | `timeout | `empty | `queue_deleted | `full | `bad_name | `bad_value | `inactive | `permission_denied | `sys_error ] exception Error of error let getenv ?(default="") n = try Sys.getenv n with Not_found -> default ;; let default_host = getenv ~default:"localhost" "QCLIENT_HOST" ;; let pdate seconds = let fsecs = Int64.to_float seconds in let t = Unix.localtime fsecs in sprintf "%4d-%02d-%02d %02d:%02d" (t.Unix.tm_year + 1900) (t.Unix.tm_mon + 1) (t.Unix.tm_mday) (t.Unix.tm_hour) (t.Unix.tm_min) ;; let getuser() = try let pw = Unix.getpwuid (Unix.getuid()) in pw.Unix.pw_name with Not_found -> "UNKNOWN" ;; let pluggable_auth_module = ref ( "<None>", (fun _ -> ( failwith "No auth module linked." : Rpc_client.t ) ) ) ;; let create_client host = let (name, create_client) = !pluggable_auth_module in create_client host ;; let parse_name_is_value s = try let l = String.length s in let k = String.index s '=' in (String.sub s 0 k), (String.sub s (k+1) (l-k-1)) with Not_found -> failwith ("Cannot parse: " ^ s) ;; exception Parse_error of string ;; let parse_shell_file filename = (* Parses a file with settings * NAME=VALUE * with shell syntax. The parser understands: * - comments * - backslashes * - single quotes * - double quotes * The parser does not understand: * - Any kind of expansion * The functions returns only settings for variables that begin with * "PROP_", but without this prefix. For example, * PROP_A=1 * B=2 * PROP_C=2 * is parsed as [ "A", "1"; "C"; "2" ]. *) let rec skip_space line k = if k >= String.length line then k else match line.[k] with ' ' | '\t' | '\r' -> skip_space line (k+1) | '#' -> String.length line | _ -> k in let rec scan_name b line k = if k >= String.length line then failwith "Syntax error" else let c = line.[k] in match c with 'A'..'Z'|'a'..'z'|'_' -> Buffer.add_char b c; scan_name b line (k+1) | '0'..'9' -> if Buffer.length b = 0 then failwith "Syntax error"; Buffer.add_char b c; scan_name b line (k+1) | '=' -> if Buffer.length b = 0 then failwith "Syntax error"; k+1 | _ -> failwith "Syntax error" in let rec scan_squote_value b line k = if k >= String.length line then failwith "Syntax error" else match line.[k] with '\'' -> scan_value b line (k+1) | c -> Buffer.add_char b c; scan_squote_value b line (k+1) and scan_dquote_value b line k = if k >= String.length line then failwith "Syntax error" else match line.[k] with '"' -> scan_value b line (k+1) | '\\' -> if k+1 >= String.length line then failwith "Syntax error"; Buffer.add_char b line.[k+1]; scan_dquote_value b line (k+2) | '$' -> failwith "$ expansion not supported"; | '`' -> failwith "backtick expansion not supported" | c -> Buffer.add_char b c; scan_dquote_value b line (k+1) and scan_value b line k = if k >= String.length line then k else match line.[k] with '\'' -> scan_squote_value b line (k+1) | '"' -> scan_dquote_value b line (k+1) | ('|' | '&' | ';' | '(' | ')' | '<' | '>' as c) -> failwith ("special character not supported: " ^ String.make 1 c) | ' ' | '\t' | '\r' -> k | c -> Buffer.add_char b c; scan_value b line (k+1) in let rec next_settings f linenumber = try let line = input_line f in let j = skip_space line 0 in if j = String.length line then next_settings f (linenumber+1) else begin (* Scan now: VARNAME= *) let varname = Buffer.create 20 in let k = scan_name varname line j in (* Scan now: VALUE *) let varval = Buffer.create 50 in let k' = scan_value varval line k in (* The rest is white space: *) let k'' = skip_space line k' in if k'' <> String.length line then failwith "Syntax error"; let vname = Buffer.contents varname in if String.length vname >= 6 && String.sub vname 0 5 = "PROP_" then (String.sub vname 5 (String.length vname - 5), Buffer.contents varval) :: next_settings f (linenumber+1) else next_settings f (linenumber+1) end with End_of_file -> [] | Failure s -> raise (Parse_error ("In file " ^ filename ^ ", line " ^ string_of_int linenumber ^ ": " ^ s)) in let f = open_in filename in try let l = next_settings f 1 in close_in f; l with err -> close_in f; raise err ;; let get_result r = match r with `successful x -> x | #error as e -> raise (Error e) ;; let check_result r = match r with `successful -> () | #error as e -> raise (Error e) ;; let cmd_queues() = let host = ref default_host in Arg.parse [ "-host", Arg.String (fun s -> host := s), "<name> Contact the queue server on this host"; ] (fun s -> raise (Arg.Bad "Unexpected argument")) "qclient queues [ options ]: list the installed queues. Options:"; let client = create_client !host in let l = get_result (C1.list_queues client ()) in printf "%-16s %2s %6s %6s %6s %-16s %-16s\n" "QUEUE" "ST" "LENGTH" "PICKED" "ADDING" "CREATED" "MODIFIED"; Array.iter (fun q -> let status = if q.A.qparams.A.qactive then begin (if q.A.qparams.A.qaccepting then "+" else " ") ^ (if q.A.qparams.A.qdelivering then "-" else " ") end else "**" in printf "%-16s %2s %6s %6s %6s %-16s %-16s\n" (q.A.qname) status (Int32.to_string q.A.qlength) (Int32.to_string q.A.qpicked) (Int32.to_string q.A.quploads) (pdate q.A.qcreation) (pdate q.A.qmodified); ) l; flush stdout; Rpc_client.shut_down client ;; let cmd_create() = let host = ref default_host in let qnames = ref [] in let activate = ref false in Arg.parse [ "-host", Arg.String (fun s -> host := s), "<name> Contact the queue server on this host"; "-activate", Arg.Set activate, " Activate the queues now"; ] (fun s -> qnames := !qnames @ [s]) "qclient create [ options ] qname ...: create these queues. Options:"; let client = create_client !host in List.iter (fun qname -> check_result (C1.create_queue client qname); printf "%s created.\n" qname; flush stdout; if !activate then begin let q = get_result(C1.get_queue client qname) in q.A.qparams.A.qactive <- true; check_result (C1.set_queue client (qname,q.A.qparams)); printf "%s activated.\n" qname; flush stdout; end; ) !qnames; Rpc_client.shut_down client ;; let cmd_destroy() = let host = ref default_host in let qnames = ref [] in Arg.parse [ "-host", Arg.String (fun s -> host := s), "<name> Contact the queue server on this host"; ] (fun s -> qnames := !qnames @ [s]) "qclient destroy [ options ] qname ...: delete these queues. Options:"; let client = create_client !host in List.iter (fun qname -> let q = get_result (C1.get_queue client qname) in check_result (C1.delete_queue client qname); ) !qnames; Rpc_client.shut_down client ;; let cmd_list() = let host = ref default_host in let list_properties = ref false in let qnames = ref [] in Arg.parse [ "-host", Arg.String (fun s -> host := s), "<name> Contact the queue server on this host"; "-properties", Arg.Set list_properties, " Include the property list of each entry"; ] (fun s -> qnames := !qnames @ [s]) "qclient list [ options ] qname ...: list these queues. Options:"; let client = create_client !host in let first_queue = ref true in List.iter (fun qname -> let q = get_result (C1.get_queue client qname) in let l = get_result (C1.list_queue_entries client q.A.qid) in if not !first_queue then print_newline(); first_queue := false; if l = [| |] then printf "The queue '%s' is empty.\n" qname else begin printf "%4s %-20s %9s %-16s\n" "RANK" "MEMBER" "SIZE" "ADDED"; Array.iteri (fun n e -> printf "%4d %-20s %9s %16s\n" n (qname ^ "-" ^ e.A.eid) (Int64.to_string e.A.esize) (pdate e.A.ecreation); if !list_properties then begin Array.iter (fun prop -> printf "%6s %s=%s\n" "" prop.A.pname prop.A.pvalue) e.A.eprops; end; ) l; end; flush stdout; ) !qnames; Rpc_client.shut_down client ;; let cmd_status() = let host = ref default_host in let inc_name = ref false in let inc_date_created = ref false in let inc_date_modified = ref false in let inc_length = ref false in let inc_picked = ref false in let inc_adding = ref false in let inc_maxlen = ref false in let inc_active = ref false in let inc_owner = ref false in let only_values = ref false in let all = ref false in let qnames = ref [] in Arg.parse [ "-host", Arg.String (fun s -> host := s), "<name> Contact the queue server on this host"; "-name", Arg.Set inc_name, " Output the name of the queue"; "-date-created", Arg.Set inc_date_created, " Output creation date"; "-date-modified", Arg.Set inc_date_modified, " Output date of last modification"; "-owner", Arg.Set inc_owner, " Output the owner of the queue (netname)"; "-length", Arg.Set inc_length, " Output queue length"; "-picked", Arg.Set inc_picked, " Output number of picked entries"; "-adding", Arg.Set inc_adding, " Output number of entries currently being added"; "-maxlen", Arg.Set inc_maxlen, " Output the maximum length of the queue"; "-active", Arg.Set inc_active, " Output whether the queue is active"; "-only-values", Arg.Set only_values, " Output only values (no labels)"; "-all", Arg.Set all, " Output all status variables"; ] (fun s -> qnames := !qnames @ [s]) "qclient status [ options ] qname ...: output queue status. Options:"; let client = create_client !host in List.iter (fun qname -> let q = get_result (C1.get_queue client qname) in let status = [ !inc_name, "Queue", q.A.qname; !inc_date_created, "Date created", (pdate q.A.qcreation); !inc_date_modified, "Date modified", (pdate q.A.qmodified); !inc_owner, "Owner", q.A.qowner; !inc_length, "Length", (Int32.to_string q.A.qlength); !inc_picked, "Picked", (Int32.to_string q.A.qpicked); !inc_adding, "Adding", (Int32.to_string q.A.quploads); !inc_maxlen, "Maximum length", Int32.to_string q.A.qparams.A.qmaxlen; !inc_active, "Active", (if q.A.qparams.A.qactive then begin "yes (" ^ (if q.A.qparams.A.qaccepting then "accepting" else "stopped") ^ "," ^ (if q.A.qparams.A.qdelivering then "delivering" else "stopped") ^ ")" end else "no"); ] in List.iter (fun (p,label,value) -> if !all || p then begin if not !only_values then printf "%s: " label; print_endline value; flush stdout end ) status ) !qnames; Rpc_client.shut_down client ;; let cmd_set() = let host = ref default_host in let qnames = ref [] in let active = ref None in let accepting = ref None in let delivering = ref None in let maxlen = ref None in let bool f = Arg.String (fun s -> match s with "true"|"yes"|"y"|"1" -> f true | "false"|"no"|"n"|"0" -> f false | _ -> raise(Arg.Bad "Option must be true or false") ) in Arg.parse [ "-host", Arg.String (fun s -> host := s), "<name> Contact the queue server on this host"; "-active", bool (fun b -> active := Some b), "(true|false) Make the queue active/inactive"; "-accepting", bool (fun b -> accepting := Some b), "(true|false) Whether the queue accepts new entries"; "-delivering", bool (fun b -> delivering := Some b), "(true|false) Whether the queue delivers entries"; "-maxlen", Arg.Int (fun n -> maxlen := Some n), "<n> Set the maximum length (-1 = infinite)"; ] (fun s -> qnames := !qnames @ [s]) "qclient set [ options ] queue ...: Set the params of the queues. Options:"; let client = create_client !host in List.iter (fun qname -> let q = get_result (C1.get_queue client qname) in let p = q.A.qparams in (match !active with None -> () | Some b -> p.A.qactive <- b ); (match !accepting with None -> () | Some b -> p.A.qaccepting <- b ); (match !delivering with None -> () | Some b -> p.A.qdelivering <- b ); (match !maxlen with None -> () | Some n -> p.A.qmaxlen <- Int32.of_int n ); check_result(C1.set_queue client (qname,p)); printf "Parameters of queue `%s' set.\n" qname; flush stdout ) !qnames; Rpc_client.shut_down client ;; let cmd_add() = let host = ref default_host in let queue = ref "" in let props = ref [] in let no_std_properties = ref false in let files = ref [] in let wait = ref (-1) in let add_property s = let n,v = parse_name_is_value s in props := List.remove_assoc n !props @ [ n,v ] in let load_properties file = let nv_list = parse_shell_file file in let names = List.map fst nv_list in let props' = List.filter (fun (n,v) -> not(List.mem n names)) !props in props := props' @ nv_list in let add_std_properties props file = let cprops = ref props in let set_prop n v = cprops := List.remove_assoc n !cprops @ [ n,v ] in let add_prop n v = if not (List.mem_assoc n !cprops) then cprops := !cprops @ [ n,v ] in set_prop "SP_FILENAME" (Filename.basename file); set_prop "SP_SYS_USER" (getuser()); set_prop "SP_SYS_HOST" (Unix.gethostname()); let hops = try int_of_string(List.assoc "SP_HOPS" !cprops) with _ -> 0 in set_prop "SP_HOPS" (string_of_int (hops+1)); add_prop "SP_FIRST_SYS_USER" (getuser()); add_prop "SP_FIRST_SYS_HOST" (Unix.gethostname()); add_prop "SP_FIRST_DATE" (string_of_float (Unix.time())); !cprops in Arg.parse [ "-host", Arg.String (fun s -> host := s), "<name> Contact the queue server on this host"; "-queue", Arg.String (fun s -> queue := s), "<qname> Add the entry to this queue"; "-property", Arg.String add_property, "<p>=<v> Set the property <p> to <v>"; "-property-file", Arg.String load_properties, "<file> Load the properties from this file"; "-no-std-properties", Arg.Set no_std_properties, " Do not set/update the standard properties"; "-wait", Arg.Int(fun n -> wait := n), "<seconds> Wait this number of seconds for the entry (-1 = endless)"; ] (fun s -> files := !files @ [s]) "qclient add [ options ] file ...: add these files to a queue. Options:"; if !queue = "" then failwith "The option -queue is mandatory!"; let client = create_client !host in List.iter (fun file -> let f = open_in file in let q = get_result (C1.get_queue client !queue) in let qid = q.A.qid in let plist = if !no_std_properties then !props else add_std_properties !props file in let parray = Array.of_list (List.map (fun (n,v) -> { A.pname = n; A.pvalue = v }) plist) in let w = Int32.of_int !wait in let handle = get_result (C1.upload_entry client (qid,parray,w)) in let len = 8192 in let buf = String.create len in let n = ref 1 in let s = ref Int64.zero in while !n > 0 do n := input f buf 0 len; let d = if !n = len then buf else String.sub buf 0 !n in let chunk = { A.serial = !s; A.last = !n=0; A.data = d } in check_result (C1.upload_chunk client (handle, chunk)); s := Int64.succ !s done; printf "File '%s' added.\n" file; flush stdout; close_in f; ) !files; Rpc_client.shut_down client ;; (* Standard properties: * SP_FILENAME * SP_SYS_USER * SP_SYS_HOST * SP_HOPS * SP_FIRST_SYS_USER * SP_FIRST_SYS_HOST * SP_FIRST_DATE *) let cmd_pop() = let host = ref default_host in let pop_file = ref "" in let pop_properties = ref "" in let wait = ref (-1) in let peek = ref false in let qname = ref "" in Arg.parse [ "-host", Arg.String (fun s -> host := s), "<name> Contact the queue server on this host"; "-get-file", Arg.String (fun s -> pop_file := s), "<name> Download the entry and store it into this file"; "-get-properties", Arg.String (fun s -> pop_properties := s), "<name> Download the properties and put them into this file"; "-wait", Arg.Int (fun k -> wait := k), "<seconds> Wait this number of seconds for the entry (-1 = endless)"; "-peek", Arg.Set peek, " Do not remove the entry from the queue"; ] (fun s -> if !qname <> "" then raise(Arg.Bad "Two many arguments"); qname := s ) "qclient pop [ options ] queue: Pop the next entry from the queue. Options:"; if !qname = "" then failwith "The queue argument is mandatory!"; let client = create_client !host in let q = get_result (C1.get_queue client !qname) in let qid = q.A.qid in let e = get_result (C1.pick_queue_entry client (qid, Int32.of_int !wait)) in if !pop_properties <> "" then begin let f = open_out !pop_properties in Array.iter (fun p -> let name = p.A.pname in let value = p.A.pvalue in output_string f ("PROP_" ^ name); output_string f "='"; for i = 0 to String.length value - 1 do match value.[i] with '\'' -> output_string f "'\\''"; | c -> output_char f c done; output_string f "'\n"; ) e.A.eprops; close_out f end; if !pop_file <> "" then begin let chunksize = Int32.of_string "65536" in let handle = get_result (C1.download_entry client (qid,e.A.eid,chunksize)) in let f = open_out !pop_file in let serial = ref Int64.zero in let last = ref false in while not !last do let chunk = get_result (C1.download_chunk client handle) in if chunk.A.serial <> !serial then failwith "Download error (bad serial number)"; output_string f chunk.A.data; serial := Int64.succ !serial; last := chunk.A.last done; close_out f end; if not !peek then begin check_result (C1.remove_picked_queue_entry client (qid,e.A.eid)); printf "%s-%s popped.\n" !qname e.A.eid; flush stdout; end else begin check_result (C1.return_picked_queue_entry client (qid,e.A.eid)); printf "%s-%s peeked.\n" !qname e.A.eid; flush stdout; end; Rpc_client.shut_down client ;; let cmd_cancel() = let host = ref default_host in let queue = ref "" in let entries = ref [] in let all = ref false in Arg.parse [ "-host", Arg.String (fun s -> host := s), "<name> Contact the queue server on this host"; "-queue", Arg.String (fun s -> queue := s), "<qname> Remove the entries from this queue"; "-all", Arg.Set all, " Remove all entries (empty the queue)"; ] (fun s -> entries := !entries @ [s]) "qclient cancel [ options ] entry ...: remove these entries. Options:"; if !queue = "" then failwith "The option -queue is mandatory!"; let client = create_client !host in let q = get_result (C1.get_queue client !queue) in let qid = q.A.qid in if !all then begin let l = get_result (C1.list_queue_entries client qid) in entries := List.map (fun e -> e.A.eid) (Array.to_list l); end; List.iter (fun entry -> check_result (C1.remove_queue_entry client (qid, entry)); printf "%s canceled.\n" entry; flush stdout ) !entries; Rpc_client.shut_down client ;; let start() = let (auth_name,_) = !pluggable_auth_module in let usage() = prerr_endline "qclient - access remote queues. Usage:"; prerr_endline "qclient queues [ -help | options ]"; prerr_endline "qclient create [ -help | options ]"; prerr_endline "qclient destroy [ -help | options ]"; prerr_endline "qclient status [ -help | options ]"; prerr_endline "qclient set [ -help | options ]"; prerr_endline "qclient list [ -help | options ]"; prerr_endline "qclient add [ -help | options ]"; prerr_endline "qclient pop [ -help | options ]"; prerr_endline "qclient cancel [ -help | options ]"; prerr_endline ""; prerr_endline ("This qclient is linked with the auth module: " ^ auth_name); exit 2; in incr Arg.current; if Array.length Sys.argv <= !Arg.current then usage(); let cmd = Sys.argv.(!Arg.current) in match cmd with "queues" -> cmd_queues() | "create" -> cmd_create() | "destroy" -> cmd_destroy() | "list" -> cmd_list() | "status" -> cmd_status() | "set" -> cmd_set() | "add" -> cmd_add() | "pop" -> cmd_pop() | "cancel" -> cmd_cancel() | _ -> usage() ;; let main() = try start(); exit 0 with Error e -> let code, msg = match e with `not_found -> 10, "not found" | `picked -> 11, "picked" | `exists -> 12, "exists" | `not_picked -> 13, "not picked" | `chunks_too_large -> 14, "chunks too large" | `timeout -> 15, "timeout" | `empty -> 16, "empty" | `queue_deleted -> 17, "queue deleted" | `full -> 18, "full" | `bad_name -> 19, "bad name" | `bad_value -> 20, "bad value" | `inactive -> 21, "inactive" | `permission_denied -> 22, "permission denied" | `sys_error -> 23, "system error" in prerr_endline ("Server response: " ^ msg); exit code | Rpc.Rpc_server x -> prerr_endline ("RPC error: " ^ match x with Unavailable_program -> "Unavailable program" | Unavailable_version(_,_) -> "Unavailable version" | Unavailable_procedure -> "Unavailable procedure" | Garbage -> "Garbage (unknown message format)" | System_err -> "System error" | Rpc_mismatch(_,_) -> "Unsupported RPC type" | Auth_bad_cred -> "Bad credentials" | Auth_rejected_cred -> "Rejected credentials" | Auth_bad_verf -> "Bad verifier" | Auth_rejected_verf -> "Rejected verifier" | Auth_too_weak -> "Authentication too weak" | Auth_invalid_resp -> "Invalid response" | Auth_failed -> "Authentication failed" ); exit 1 | Failure s -> prerr_endline ("Error: " ^ s); exit 1 ;;