(* Copyright 2010 Gerd Stolpmann This file is part of Plasma, a distributed filesystem and a map/reduce computation framework. Unless you have a written license agreement with the copyright holder (Gerd Stolpmann), the following terms apply: Plasma is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. Plasma is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with Foobar. If not, see <http://www.gnu.org/licenses/>. *) (* $Id: plasma.ml 468 2011-10-13 08:17:04Z gerd $ *) (* TODO: - ls: display filenames should be converted to local encoding - argument encoding should be converted to the tree encoding - file tree: the separation between absolute and relative names is nonsense. Support a unified tree. (E.g. cp /abs rel) - allow to set more parameters for accessing trees: * encoding * ssh options * replication - nicer error messages - cp -r *) open Printf open Plasma_rpcapi_aux open Plasma_util.Operators let dlogf = Plasma_util.dlogf module StrMap = Plasma_util.StrMap exception Error of string (* Error message w/o usage string. In contrast, Failure will print usage info *) let errwith s = raise(Error s) type tagged_arg = [ `Glob of string | `Verb of string ] type ext_arg = { tree_prefix : string option; tree_file : tagged_arg } let display_of_arg arg = ( match arg.tree_prefix with | None -> "" | Some p -> p ^ ":" ) ^ ( match arg.tree_file with | `Glob s -> s | `Verb s -> s ) let usage() = prerr_endline "usage: plasma <mode> [-help|options]"; prerr_endline ""; prerr_endline " available modes:"; prerr_endline " - list | ls"; prerr_endline " - delete | rm"; prerr_endline " - rename | mv"; prerr_endline " - link | ln"; prerr_endline " - copy | cp"; prerr_endline " - mkdir"; prerr_endline " - create"; prerr_endline " - put"; prerr_endline " - get"; prerr_endline " - cat"; prerr_endline " - chmod"; prerr_endline " - chown"; prerr_endline " - fsstat"; prerr_endline " - params"; prerr_endline " - blocks"; prerr_endline " - auth_ticket"; prerr_endline " - admin_table" let get_pw ht u = try Hashtbl.find ht u with Not_found -> try Sys.getenv ("PLASMA_PASSWD_" ^ u) with Not_found -> let pw = ref "" in Netsys_posix.with_tty (fun tty -> pw := (Netsys_posix.tty_read_password ~tty ("Plasma password of " ^ u ^ "? ")) ); Hashtbl.add ht u !pw; !pw let sugg_mem = 64 * 1024 * 1024 (* 64M *) let access_cluster esys = let cluster = ref None in let namenodes = ref [] in let rep = ref 0 in let debug = ref false in let open_cluster = ref None in let auth = ref `None in let user = ref "" in let group = ref "" in let pref = ref [] in let local_pref = ref false in let args = [ "-cluster", Arg.String (fun s -> cluster := Some s), "<name> Set the cluster name"; "-namenode", Arg.String (fun n -> namenodes := n :: !namenodes), "<host>:<port> Also use this namenode (can be given several times)"; "-rep", Arg.Set_int rep, "<n> Set replication factor for new files"; "-pref", Arg.String (fun s -> pref := s :: !pref), "<identity> Consider the data node with this identity as preferred place"; "-local-pref", Arg.Set local_pref, " Consider the local node as preferred place"; "-auth", Arg.String (fun s -> auth := `Direct s), "<name> Set the RPC user to authenticate as at the namenode"; "-user", Arg.String (fun s -> user := s), "<name> Set the filesystem user for new files"; "-group", Arg.String (fun s -> group := s), "<name> Set the filesystem group for new files"; "-debug", Arg.Set debug, " Enable debug output"; ] in ( object method args = args method cluster = !cluster method namenodes = !namenodes method rep = !rep method open_cluster() = match !open_cluster with | None -> let nn_nodes = if !namenodes = [] then None else Some !namenodes in let cfg = Plasma_client_config.get_config ?clustername:!cluster ?nn_nodes () in let c = Plasma_client.open_cluster_cc cfg esys in open_cluster := Some c; ( match !auth with | `None -> (* Do something reasonable... *) ( try let ticket = Sys.getenv "PLASMAFS_AUTH_TICKET" in Plasma_client.configure_auth_ticket c ticket with Not_found -> Plasma_client.configure_auth_daemon c (* fallback: no authentication if there is no reachable daemon *) ) | `Direct u -> let ht = Hashtbl.create 5 in Plasma_client.configure_auth c u "pnobody" (get_pw ht); ); Plasma_client.configure_default_user_group c !user !group; let bsize = Plasma_client.blocksize c in let bufs = max 1 (sugg_mem / bsize) in Plasma_client.configure_buffer c bufs; if !local_pref then ( let local_ids = Plasma_client.local_identities c in pref := local_ids @ !pref ); Plasma_client.configure_pref_nodes c !pref; c | Some c -> c method config() = Netlog.current_logger := Netlog.channel_logger stderr (if !debug then `Debug else `Info); Plasma_util.debug := !debug end ) let tree_prefix_re = Pcre.regexp "^([-a-zA-Z0-9_]+):(.*)$" let extract_tree_prefix s = try match Pcre.extract ~rex:tree_prefix_re ~full_match:false s with | [| prefix; s' |] -> Some prefix, s' | _ -> raise Not_found with | Not_found -> None, s let tree_config_re = Pcre.regexp "^([-a-zA-Z0-9_]+)=(.*)$" let get_tree ssh_opts url = (* Quite limited so far. We recognize: - file://any/path - http://host/path - plasma://name@host:port (host:port optional) - ssh://name@host:port (name@, :port optional) *) let syn = Hashtbl.copy Neturl.common_url_syntax in Hashtbl.add syn "plasma" { Neturl.null_url_syntax with Neturl.url_enable_scheme = Neturl.Url_part_required; Neturl.url_enable_user = Neturl.Url_part_required; Neturl.url_enable_host = Neturl.Url_part_allowed; Neturl.url_enable_port = Neturl.Url_part_allowed; }; Hashtbl.add syn "ssh" { Neturl.null_url_syntax with Neturl.url_enable_scheme = Neturl.Url_part_required; Neturl.url_enable_user = Neturl.Url_part_allowed; Neturl.url_enable_host = Neturl.Url_part_required; Neturl.url_enable_port = Neturl.Url_part_allowed; }; try let u = Neturl.parse_url ~schemes:syn (Neturl.fixup_url_string url) in match Neturl.url_scheme u with | "file" -> let p = try Neturl.url_path u with Not_found -> [] in Netfs.local_fs ?encoding:(Netconversion.user_encoding()) ~root:(if p = [] then "/" else Neturl.join_path p) () | "http" -> (Http_fs.http_fs url :> Netfs.stream_fs) | "plasma" -> let clustername = Neturl.url_user u in let nn_nodes = try let h = Neturl.url_host u in let p = Neturl.url_port u in Some [ sprintf "%s:%d" h p ] with Not_found -> None in let cfg = Plasma_client_config.get_config ~clustername ?nn_nodes () in let c = Plasma_client.open_cluster_cc cfg (Unixqueue.create_unix_event_system()) in (* FIXME: allow to set replication factor *) Plasma_netfs.netfs ~verbose:true c | "ssh" -> let user = try Some(Neturl.url_user u) with Not_found -> None in let host = Neturl.url_host u in let port_opts = try [ "-p"; string_of_int(Neturl.url_port u) ] with Not_found -> [] in let options = ssh_opts @ port_opts in let ci = Shell_fs.ssh_interpreter ~options ?user ~host () in let t = (Shell_fs.shell_fs ?encoding:(Netconversion.user_encoding()) ci :> Netfs.stream_fs) in t | sch -> raise(Arg.Bad("This URL scheme is not supported: " ^ sch)) with | Neturl.Malformed_URL -> raise(Arg.Bad("This URL cannot be parsed: " ^ url)) let file_tagger ac = let enable_glob = ref true in let file_args = ref [] in let trees = Hashtbl.create 5 in let ssh_opts = [] in (* FIXME: allow to set this *) let parse_tree s = try match Pcre.extract ~rex:tree_config_re ~full_match:false s with | [| prefix; url |] -> let tree = get_tree ssh_opts url in Hashtbl.replace trees prefix tree | _ -> raise Not_found with | Not_found -> raise(Arg.Bad("Bad syntax for -tree arg: " ^ s)) in let args = [ "-glob", Arg.Set enable_glob, " Enable wildcards for the following arguments"; "-no-glob", Arg.Clear enable_glob, " Disable wildcards for the following arguments"; "-tree", Arg.String parse_tree, "<prefix>=<url> Access the file tree at this URL for this prefix"; ] in let plasma_tree = ref None in let abs_file_tree = Netfs.local_fs ?encoding:(Netconversion.user_encoding()) () in let rel_file_tree = Netfs.local_fs ?encoding:(Netconversion.user_encoding()) ~root:"." () in ( object(self) method args = args method parse_anonarg s = let p_opt, s' = extract_tree_prefix s in { tree_prefix = p_opt; tree_file = if !enable_glob then `Glob s' else `Verb s' } method anonarg s = let file_arg = self # parse_anonarg s in file_args := file_arg :: !file_args method file_args = List.rev !file_args method trees = trees method plasma_tree = match !plasma_tree with | None -> let c = ac#open_cluster() in let repl = ac#rep in let t = Plasma_netfs.netfs ~repl ~verbose:true c in plasma_tree := Some t; t | Some t -> t method same_tree t1 t2 = if t1 = abs_file_tree || t1 = rel_file_tree then (t2 = abs_file_tree || t2 = rel_file_tree) else t1 = t2 method files file_arg = (* Returns (t, files) where files is a list of (path, display) *) let glob t pat = match pat with | `Glob g -> let fsys = Netglob.of_stream_fs t in Netglob.glob ?encoding:(Netconversion.user_encoding()) ~fsys ~mode:`All_paths (`String g) | `Verb v -> [v] in let pp ?(f = fun file -> (file,file)) files = List.map f files in match file_arg.tree_prefix with | Some p -> let pdisp file = (file, p ^ ":" ^ file) in ( try let t = Hashtbl.find trees p in (t, pp ~f:pdisp (glob t file_arg.tree_file)) with | Not_found -> if p = "file" then ( let is_abs s = s <> "" && s.[0] = '/' in let get_netfs s = if is_abs s then (abs_file_tree, s) else (rel_file_tree, "/" ^ s) in let disp s = if is_abs s then (s, "file:" ^ String.sub s 1 (String.length s - 1)) else (s, "file:" ^ s) in match file_arg.tree_file with | `Glob g -> let (t, pat) = get_netfs g in let f = if is_abs g then pdisp else disp in (t, pp ~f (glob t (`Glob pat))) | `Verb v -> let (t, name) = get_netfs v in (t, [name, "file:" ^ v]) ) else ( (* interpret p as host name *) let ci = Shell_fs.ssh_interpreter ~host:p () in let t = (Shell_fs.shell_fs ?encoding:(Netconversion.user_encoding()) ci :> Netfs.stream_fs) in (t, pp ~f:pdisp (glob t file_arg.tree_file)) ) ) | None -> (* No prefix: access PlasmaFS *) let t = self # plasma_tree in (t, pp (glob t file_arg.tree_file)) end ) let mtime_compare t1 t2 = let k = Int64.compare t1.tsecs t2.tsecs in if k = 0 then compare t1.tnsecs t2.tnsecs else k let list() = let esys = Unixqueue.create_unix_event_system() in let ac = access_cluster esys in let ft = file_tagger ac in let one = ref false in let all = ref false in let dirs = ref false in let inode_flag = ref false in let reverse = ref false in let sort_by_size = ref false in let sort_by_mtime = ref false in if not(Unix.isatty Unix.stdout) then one := true; Arg.parse ( ac#args @ ft#args @ [ "-1", Arg.Set one, " output one filename per line (default if stdout is not a tty)"; "-l", Arg.Clear one, " output long listing (default if stdout is a tty)"; "-a", Arg.Set all, " also include entries starting with ."; "-d", Arg.Set dirs, " list directory entries instead of contents"; "-i", Arg.Set inode_flag, " output inode numbers"; "-r", Arg.Set reverse, " reverse sorting order"; "-S", Arg.Set sort_by_size, " sort by file size"; "-t", Arg.Set sort_by_mtime, " sort by modification time"; ] ) ft#anonarg "usage: plasma list <file> ..."; ac#config(); let show_rows rows = let widths = List.fold_left (fun acc row -> if acc = [] then List.map (fun (field,_) -> String.length field) row else List.map2 (fun width (field, alignment) -> max width (String.length field) ) acc row ) [] rows in List.iter (fun row -> let first = ref true in List.iter2 (fun width (field, alignment) -> if not !first then printf " "; first := false; match alignment with | `L -> printf "%-*s" width field | `R -> printf "%*s" width field ) widths row; printf "\n"; ) rows in let file_row name inode ii = if !one then [ name, `L ] else ( let rwx ?(t=false) ?(su=false) ?(sg=false) n = ( if n land 4 <> 0 then "r" else "-" ) ^ ( if n land 2 <> 0 then "w" else "-" ) ^ ( if n land 1 <> 0 then ( if t && (ii.mode land 0o1000) <> 0 then "t" else if su && (ii.mode land 0o4000) <> 0 then "s" else if sg && (ii.mode land 0o2000) <> 0 then "s" else "x" ) else ( if su && (ii.mode land 0o4000) <> 0 then "S" else if sg && (ii.mode land 0o2000) <> 0 then "S" else "-" ) ) in let bits = ( match ii.filetype with | `ftype_regular -> "-" | `ftype_directory -> "d" | `ftype_symlink -> "s" ) ^ rwx ~su:true (ii.mode lsr 6) ^ rwx ~sg:true (ii.mode lsr 3) ^ rwx ~t:true ii.mode in let maybe_inode = if !inode_flag then [ (Int64.to_string inode), `R ] else [] in let mtime = (Netdate.format ~fmt:"%Y-%m-%d %H:%M" (Netdate.create ~zone:Netdate.localzone (Int64.to_float ii.mtime.tsecs))) in let xname = if ii.filetype = `ftype_symlink then name ^ " -> " ^ ii.field1 else name in let row = maybe_inode @ [ bits, `L; ii.usergroup.user, `L; ii.usergroup.group, `L; Int64.to_string ii.eof, `R; mtime, `R; xname, `L ] in row ) in let need_lf = ref false in let first = ref true in let errors = ref false in let expanded = List.flatten (List.map (fun farg -> let (t, files) = ft#files farg in List.map (fun (file,display) -> (t,farg,file,display)) files ) ft#file_args; ) in let suppress_header = List.length expanded <= 1 in List.iter (fun (t, farg, file, display) -> if farg.tree_prefix = None then ( let c = ac#open_cluster() in let t = Plasma_client.start c in ( try let inode = Plasma_client.lookup t file false in let ii = Plasma_client.get_inodeinfo t inode in match ii.filetype with | `ftype_directory when not !dirs -> if not suppress_header then ( if not !first then printf "\n"; printf "%s:\n%!" display; ); let elements = List.flatten (List.map (fun (elem, elem_inode) -> try let elem_ii = Plasma_client.get_inodeinfo t elem_inode in [elem, elem_inode, elem_ii] with | Plasma_client.Plasma_error `enoent -> [] ) (Plasma_client.list_inode t inode)) in let elements' = List.filter (fun (elem,_,_) -> !all || (elem <> "" && elem.[0] <> '.') ) elements in let elements'' = List.sort (fun (elem1,_,elem_ii1) (elem2,_,elem_ii2) -> let k = if !sort_by_size then Int64.compare elem_ii1.eof elem_ii2.eof else if !sort_by_mtime then mtime_compare elem_ii1.mtime elem_ii2.mtime else String.compare elem1 elem2 in if !reverse then -k else k ) elements' in let rows = List.map (fun (elem,elem_inode,elem_ii) -> file_row elem elem_inode elem_ii ) elements'' in show_rows rows; if elements'' = [] && not suppress_header then printf "<empty>\n"; need_lf := true | _ -> if !need_lf then printf "\n"; need_lf := false; show_rows [file_row display inode ii] with | error -> if not !first then eprintf "\n"; eprintf "%s: %s\n%!" display (Netexn.to_string error); errors := true; need_lf := true; ); Plasma_client.commit t; ) else ( try if t # test [] file `D then ( if not suppress_header then ( if not !first then printf "\n"; printf "%s:\n%!" display; ); let files = t # readdir [] file in let files' = List.filter (fun name -> !all || (name <> "" && name.[0] <> '.') ) files in let files'' = List.sort (fun name1 name2 -> let k = String.compare name1 name2 in if !reverse then -k else k ) files' in show_rows (List.map (fun name -> [ name, `L ]) files''); if files'' = [] && not suppress_header then printf "<empty>\n"; need_lf := true; ) else ( if t # test [] file `N then ( if !need_lf then printf "\n"; need_lf := false; show_rows [ [ display, `L ] ] ) else raise(Unix.Unix_error(Unix.ENOENT, "", display)) ) with | error -> if not !first then eprintf "\n"; eprintf "%s: %s\n%!" display (Netexn.to_string error); errors := true; need_lf := true; ); first := false ) expanded; if !errors then exit 1 let create() = let esys = Unixqueue.create_unix_event_system() in let ac = access_cluster esys in let ft = file_tagger ac in Arg.parse (ac#args @ ft#args) ft#anonarg "usage: plasma create <file> ..."; ac#config(); List.iter (fun farg -> let (t, files) = ft#files farg in List.iter (fun (path, display) -> let ch = t # write [`Create; `Exclusive] path in ch # close_out() ) files ) ft#file_args let mkdir() = let esys = Unixqueue.create_unix_event_system() in let ac = access_cluster esys in let ft = file_tagger ac in let p = ref false in Arg.parse (ac#args @ ft#args @ [ "-p", Arg.Set p, " Create parent directories as necessary" ] ) ft#anonarg "usage: plasma mkdir <file> ..."; ac#config(); List.iter (fun farg -> let (t, files) = ft#files farg in List.iter (fun (path, display) -> t # mkdir (if !p then [`Path] else []) path ) files ) ft#file_args let parse_mode s = try int_of_string("0o" ^ s) with | _ -> errwith ("Cannot parse mode as octal number: " ^ s) let chmod() = let esys = Unixqueue.create_unix_event_system() in let ac = access_cluster esys in let ft = file_tagger ac in let mode = ref None in Arg.parse (ac#args @ ft#args) (fun s -> if !mode = None then mode := Some(parse_mode s) else ft#anonarg s ) "usage: plasma chmod <octal_mode> <file> ..."; let m = match !mode with | None -> errwith "Missing arguments" | Some m -> m in ac#config(); List.iter (fun farg -> let (t, files) = ft#files farg in if t <> ft#plasma_tree then errwith "chmod is only supported for PlasmaFS"; let c = ac#open_cluster() in Plasma_client.with_trans c (fun trans -> List.iter (fun (path, display) -> let inode = Plasma_client.lookup trans path true in let ii = Plasma_client.get_inodeinfo trans inode in let ii' = { ii with mode = m } in Plasma_client.set_inodeinfo trans inode ii'; eprintf "Modified: %s\n%!" display ) files ); eprintf "Committed.\n%!" ) ft#file_args let parse_ug s = try let k = String.index s ':'in let u = String.sub s 0 k in let g = String.sub s (k+1) (String.length s - k - 1) in (if u = "" then None else Some u), (Some g) with | Not_found -> (if s = "" then None else Some s), None let chown() = let esys = Unixqueue.create_unix_event_system() in let ac = access_cluster esys in let ft = file_tagger ac in let user = ref None in let group = ref None in let ug_seen = ref false in Arg.parse (ac#args @ ft#args) (fun s -> if not !ug_seen then ( let (u_opt, g_opt) = parse_ug s in user := u_opt; group := g_opt; ug_seen := true ) else ft#anonarg s ) "usage: plasma chown <user>[:<group>] <file> ..."; ac#config(); List.iter (fun farg -> let (t, files) = ft#files farg in if t <> ft#plasma_tree then errwith "chown is only supported for PlasmaFS"; let c = ac#open_cluster() in Plasma_client.with_trans c (fun trans -> List.iter (fun (path, display) -> let inode = Plasma_client.lookup trans path true in let ii = Plasma_client.get_inodeinfo trans inode in let usergroup = { user = ( match !user with | None -> ii.usergroup.user | Some u -> u ); group = ( match !group with | None -> ii.usergroup.group | Some g -> g ) } in let ii' = { ii with usergroup = usergroup } in Plasma_client.set_inodeinfo trans inode ii'; eprintf "Modified: %s\n%!" display ) files ); eprintf "Committed.\n%!" ) ft#file_args let multi_cmd t_dir ac ft do_operation = (* ln, mv, cp *) let dir = ref None in let dir_expanded = ref None in let n = List.length ft#file_args in let targets = ref ft#file_args in let expand_uniq arg = let (t,files) = ft#files arg in let n = List.length files in if n = 0 then errwith("Nothing matches: " ^ display_of_arg arg) else if n > 1 then errwith("Multiple matches: " ^ display_of_arg arg); let file = List.hd files in (t, file) in if !dir = None && n = 1 then failwith "At least two arguments needed"; if !dir = None && n > 1 then ( let last = List.hd (List.rev !targets) in let (t, last_file) = expand_uniq last in let (last_path, _) = last_file in if t # test [] last_path `D then ( dir := Some last; dir_expanded := Some(t, last_file); targets := List.rev (List.tl (List.rev !targets)); ) ); if !dir <> None && !dir_expanded = None then ( match !dir with | None -> assert false | Some d -> let (t, d_file) = expand_uniq d in let (d_path,d_disp) = d_file in if not (t#test [] d_path `D) then errwith("Not a directory: " ^ d_disp); dir_expanded := Some(t, d_file) ); ( match !dir_expanded with | None -> if List.length !targets <> 2 then failwith "Exactly two arguments expected when no directory is specified"; ( match !targets with | [ target_arg; link_arg ] -> let (target_t, target_file) = expand_uniq target_arg in let (link_t, link_file) = expand_uniq link_arg in do_operation target_t target_file link_t link_file | _ -> assert false ) | Some (t,dirfile) -> let (dirpath, dirdisp) = dirfile in List.iter (fun arg -> let (t1, files) = ft#files arg in List.iter (fun ((path, disp) as file) -> let basename = Filename.basename path in do_operation t1 file t (dirpath ^ "/" ^ basename, dirdisp ^ "/" ^ basename) ) files ) !targets ) let link() = let esys = Unixqueue.create_unix_event_system() in let symlink_flag = ref false in let dir = ref None in let ac = access_cluster esys in let ft = file_tagger ac in Arg.parse (ac#args @ ft#args @ [ "-s", Arg.Set symlink_flag, " create symlink instead of hard link"; "-t", Arg.String (fun s -> dir := Some(ft#parse_anonarg s)), "<dir> create the links in this directory"; ] ) ft#anonarg "usage: plasma ln <opts> (<target> <link> | <target>... <dir>) "; ac#config(); let do_link target_t target_file link_t link_file = if not(ft # same_tree target_t link_t) then errwith "Cross-filesystem links are not supported"; let (target_path, target_disp) = target_file in let (link_path, link_disp) = link_file in if !symlink_flag then ( link_t # symlink [] target_path link_path ) else ( if link_t <> ft#plasma_tree then errwith "Hard links are only supported for PlasmaFS"; let c = ac#open_cluster() in Plasma_client.with_trans c (fun trans -> let inode = Plasma_client.lookup trans target_path true in Plasma_client.link trans link_path inode; eprintf "Linked: %s -> %s\n%!" link_disp target_disp ); eprintf "Committed.\n%!" ) in multi_cmd !dir ac ft do_link let rename() = (* rename: almost the same as link *) let esys = Unixqueue.create_unix_event_system() in let dir = ref None in let ac = access_cluster esys in let ft = file_tagger ac in Arg.parse (ac#args @ ft#args @ [ "-t", Arg.String (fun s -> dir := Some(ft#parse_anonarg s)), "<dir> move files to this directory"; ] ) ft#anonarg "usage: plasma mv <opts> (<src> <dest> | <src>... <destdir>) "; ac#config(); let do_rename src_t src_file dest_t dest_file = if not(ft # same_tree src_t dest_t) then errwith "Cross-filesystem renames are not supported"; let (src_path, src_disp) = src_file in let (dest_path, dest_disp) = dest_file in dest_t # rename [] src_path dest_path in multi_cmd !dir ac ft do_rename let copy() = (* copy: almost the same as link *) let esys = Unixqueue.create_unix_event_system() in let dir = ref None in let ac = access_cluster esys in let ft = file_tagger ac in Arg.parse (ac#args @ ft#args @ [ "-t", Arg.String (fun s -> dir := Some(ft#parse_anonarg s)), "<dir> copy files to this directory"; ] ) ft#anonarg "usage: plasma cp <opts> (<src> <dest> | <src>... <destdir>) "; ac#config(); let do_copy src_t src_file dest_t dest_file = let (src_path, src_disp) = src_file in let (dest_path, dest_disp) = dest_file in Netfs.copy ~streaming:true src_t src_path dest_t dest_path in multi_cmd !dir ac ft do_copy let delete() = let esys = Unixqueue.create_unix_event_system() in let recurse = ref false in let ac = access_cluster esys in let ft = file_tagger ac in Arg.parse (ac#args @ ft#args @ [ "-r", Arg.Set recurse, " recursively delete directories" ] ) ft#anonarg "usage: plasma rm <file> ..."; ac#config(); List.iter (fun farg -> let (t, files) = ft#files farg in List.iter (fun (path, display) -> t # remove (if !recurse then [`Recursive] else []) path ) files ) ft#file_args let put() = let esys = Unixqueue.create_unix_event_system() in let ac = access_cluster esys in let plasma_filename = ref "" in let plasma_set = ref false in let local_filename = ref "" in let local_set = ref false in let topo = ref `Star in let from_stdin = ref false in let force = ref false in let sync_mode = ref `Late_datasync in Arg.parse ( ac#args @ [ "-chain", Arg.Unit (fun () -> topo := `Chain), " Use chain topology instead of star topology"; "-stdin", Arg.Set from_stdin, " Copy data from stdin. No <local_filename> in this case."; "-f", Arg.Set force, " Force: Replace existing files"; "-nosync", Arg.Unit (fun () -> sync_mode := `No_datasync), " Do not wait until the data gets synced"; ] ) (fun s -> let (p_opt, _) = extract_tree_prefix s in if p_opt <> None then failwith "the put operation does not support non-PlasmaFS trees"; if not !local_set && not !from_stdin then ( local_filename := s; local_set := true ) else if not !plasma_set then ( plasma_filename := s; plasma_set := true ) else raise(Arg.Bad("Unexpected arg: " ^ s)) ) "usage: plasma put (<local_path> <plasma_path> | -stdin <plasma_path>)"; ac#config(); if not !plasma_set then failwith "No plasma filename given"; if not !local_set && not !from_stdin then failwith "No local filename given"; if !local_set && !from_stdin then failwith "With -stdin, a localfile filename must not be given"; if !from_stdin then local_filename := "<stdin>"; let fd, len = if !from_stdin then Unix.stdin, 0L else ( let fd = Unix.openfile !local_filename [Unix.O_RDONLY] 0 in let len = Unix.LargeFile.lseek fd 0L Unix.SEEK_END in ignore(Unix.LargeFile.lseek fd 0L Unix.SEEK_SET); (fd, len) ) in let c = ac#open_cluster () in let ii0 = Plasma_client.regular_ii c 0o666 in let ii = { ii0 with replication = ac#rep } in let t = Plasma_client.start c in if !force then ( try Plasma_client.unlink t !plasma_filename with Plasma_client.Plasma_error `enoent -> () ); let inode = Plasma_client.create_file t !plasma_filename ii in Plasma_client.commit t; ignore(Plasma_client.copy_in ~flags:[!sync_mode] c inode 0L fd len !topo); Unix.close fd; eprintf "Copied: %s -> %s\n%!" !local_filename !plasma_filename let alt_copy_out esys c inode pos fd len par = (* par: number of parallel requests *) Plasma_client.configure_buffer c (2*par); let t = Plasma_client.start c in let blocksize = Plasma_client.blocksize c in let blocksizeL = Int64.of_int blocksize in let stream_e, signal = Plasma_util.signal_engine esys in let pool = Netsys_mem.create_pool blocksize in let n = ref 0 in let p = ref pos in let end_pos = Int64.add pos len in let rec read_block_e p0 buf r bsize = Plasma_client.read_e ~lazy_validation:true c inode p0 buf r (bsize-r) ++ (fun (l,eof,_) -> if not eof && (r+l) < bsize then read_block_e (Int64.add p0 (Int64.of_int l)) buf (r+l) bsize else eps_e (`Done(r+l,eof)) esys ) in let rec fetch() = if !n < par then ( if !p < end_pos then ( let buf = Netsys_mem.pool_alloc_memory pool in let p0 = !p in let bsize = Int64.to_int (min blocksizeL (Int64.sub end_pos p0)) in let _e = read_block_e p0 (`Memory buf) 0 bsize ++ (fun (l,eof) -> ignore(Unix.LargeFile.lseek fd p0 Unix.SEEK_SET); ignore(Netsys_mem.mem_write fd buf 0 l); decr n; fetch(); eps_e (`Done ()) esys ) in incr n; p := Int64.add !p blocksizeL; fetch() ) ); if !n=0 then signal(`Done()) in fetch(); let _e = stream_e ++ (fun () -> Plasma_client.commit_e t) in Unixqueue.run esys let get() = let esys = Unixqueue.create_unix_event_system() in let ac = access_cluster esys in let plasma_filename = ref "" in let plasma_set = ref false in let local_filename = ref "" in let local_set = ref false in let alt = ref None in Arg.parse ( ac#args @ [ "-alt", Arg.Int (fun n -> alt := Some n), "<n> use the alternate algorithm with a parallelization factor of n" ] ) (fun s -> let (p_opt, _) = extract_tree_prefix s in if p_opt <> None then failwith "the get operation does not support non-PlasmaFS trees"; if not !plasma_set then ( plasma_filename := s; plasma_set := true ) else if not !local_set then ( local_filename := s; local_set := true ) else raise(Arg.Bad("Unexpected arg: " ^ s)) ) "usage: plasma get <plasma_filename> <local_filename>"; ac#config(); if not !plasma_set then failwith "No plasma filename given"; if not !local_set then failwith "No local filename given"; let fd = Unix.openfile !local_filename [Unix.O_WRONLY; Unix.O_TRUNC; Unix.O_CREAT] 0o666 in let c = ac#open_cluster () in let t = Plasma_client.start c in let inode = Plasma_client.lookup t !plasma_filename false in let ii = Plasma_client.get_inodeinfo t inode in Plasma_client.commit t; ( match !alt with | None -> ignore(Plasma_client.copy_out c inode 0L fd ii.eof) | Some par -> alt_copy_out esys c inode 0L fd ii.eof par ); Unix.close fd; eprintf "Copied: %s -> %s\n%!" !plasma_filename !local_filename let cat() = let esys = Unixqueue.create_unix_event_system() in let ac = access_cluster esys in let ft = file_tagger ac in Arg.parse (ac#args @ ft#args) ft#anonarg "usage: plasma cat <file> ..."; ac#config(); List.iter (fun farg -> let (t, files) = ft#files farg in List.iter (fun (path, display) -> if t = ft#plasma_tree then ( let c = ac#open_cluster() in let inode, ii = Plasma_client.with_trans c (fun trans -> let inode = Plasma_client.lookup trans path false in let ii = Plasma_client.get_inodeinfo trans inode in if ii.Plasma_rpcapi_aux.filetype <> `ftype_regular then raise(Plasma_client.Plasma_error `eisdir); (inode, ii) ) in ignore(Plasma_client.copy_out c inode 0L Unix.stdout ii.eof) ) else ( Netchannels.with_out_obj_channel (new Netchannels.output_channel stdout) (fun out_ch -> Netchannels.with_in_obj_channel (t # read [`Streaming; `Binary] path) (fun in_ch -> out_ch # output_channel in_ch ) ) ) ) files ) ft#file_args let fsstat() = let esys = Unixqueue.create_unix_event_system() in let ac = access_cluster esys in Arg.parse ac#args (fun s -> raise(Arg.Bad("Unexpected arg: " ^ s)) ) "usage: plasma fsstat"; ac#config(); let c = ac#open_cluster () in let st = Plasma_client.fsstat c in printf "Total: %20Ld blocks\n" st.total_blocks; printf "Used: %20Ld blocks\n" st.used_blocks; printf "Transitional: %20Ld blocks\n" st.trans_blocks; printf "Free: %20Ld blocks\n" (Int64.sub (Int64.sub st.total_blocks st.used_blocks) st.trans_blocks); () let params() = let esys = Unixqueue.create_unix_event_system() in let ac = access_cluster esys in Arg.parse ac#args (fun s -> raise(Arg.Bad("Unexpected arg: " ^ s)) ) "usage: plasma params"; ac#config(); let c = ac#open_cluster () in let l = Plasma_client.params c in List.iter (fun (n,v) -> printf "%s = %s\n" n v) l; () let auth_ticket() = let esys = Unixqueue.create_unix_event_system() in let ac = access_cluster esys in Arg.parse ac#args (fun s -> raise(Arg.Bad("Unexpected arg: " ^ s)) ) "usage: plasma auth_ticket"; ac#config(); let c = ac#open_cluster () in let (u, _, _) = Plasma_client.current_user c in let t = Plasma_client.get_auth_ticket c u in print_endline t; () let admin_table() = let esys = Unixqueue.create_unix_event_system() in let ac = access_cluster esys in let mode = ref None in Arg.parse (ac#args @ [ "-get", Arg.String (fun key -> mode := Some(`Get key)), "<name> Get the admin table <name> and print it to stdout"; "-put", Arg.String (fun key -> mode := Some(`Put key)), "<name> Set the admin table <name> from stdin"; ] ) (fun s -> raise(Arg.Bad("Unexpected arg: " ^ s)) ) "usage: plasma admin_table"; ac#config(); match !mode with | None -> failwith "Need -get or -put" | Some(`Get key) -> let c = ac#open_cluster () in print_string(Plasma_client.read_admin_table c key) | Some(`Put key) -> let c = ac#open_cluster () in let data = Netchannels.string_of_in_obj_channel (new Netchannels.input_channel stdin) in Plasma_client.write_admin_table c key data let blocks() = let open Plasma_blocks in let esys = Unixqueue.create_unix_event_system() in let ac = access_cluster esys in let ft = file_tagger ac in Arg.parse ac#args (fun s -> let (p_opt, _) = extract_tree_prefix s in if p_opt <> None then failwith "the blocks operation does not support non-PlasmaFS trees"; ft#anonarg s ) "usage: plasma blocks <plasma_file> ..."; ac#config(); let c = ac#open_cluster () in let t = Plasma_client.start c in let analyze file = let inode = Plasma_client.lookup t file false in let ii = Plasma_client.get_inodeinfo t inode in (* FIXME: very large blocklists *) let list = Plasma_client.get_blocklist t inode 0L ii.blocklimit false in printf "%s:\n" file; let hostport = Hashtbl.create 5 in let alive = Hashtbl.create 5 in List.iter (fun b -> Hashtbl.replace hostport b.identity b.node; Hashtbl.replace alive b.identity b.node_alive; ) list; let set = Plasma_blocks.bset_of_blocklist list in (* Get replication *) let replication = ref max_int in let blocks = ref 0L in Bset.iter (fun _ br -> let all_datanodes = br.br_datanodes in let live_datanodes = StrMap.filter (fun id _ -> try Hashtbl.find alive id with Not_found -> false) all_datanodes in let n = StrMap.cardinal live_datanodes in replication := min !replication n; let b = Int64.mul br.br_length (Int64.of_int n) in blocks := Int64.add !blocks b ) set; let print_bi bi = let idxmax = Int64.pred (Int64.add bi.br_index bi.br_length) in printf "%9Ld - %9Ld:" bi.br_index idxmax; StrMap.iter (fun id b_s -> let hp = try Hashtbl.find hostport id with Not_found -> "n/a" in let alive = try Hashtbl.find alive id with Not_found -> false in let b_e = Int64.pred (Int64.add b_s bi.br_length) in if alive then printf " %s[%Ld-%Ld]" hp b_s b_e else printf " %s[DEAD]" hp ) bi.br_datanodes; printf "\n"; in (* Iterate over ranges and print *) Bset.iter (fun id bi -> print_bi bi ) set; let r_s = if list = [] then "n/a" else string_of_int !replication in printf " blocks: %Ld\n" !blocks; printf " actual replication: %s\n" r_s; printf " requested replication: %d\n" ii.replication; printf "\n" in List.iter (fun farg -> let (_, files) = ft#files farg in List.iter (fun (path, _) -> analyze path ) files ) ft#file_args; Plasma_client.commit t let help() = usage() let dispatch() = if Array.length Sys.argv < 2 then failwith "Bad usage"; let mode_str = Sys.argv.(1) in incr Arg.current; match mode_str with | "create" -> create() | "delete" | "rm" -> delete() | "mkdir" -> mkdir() | "put" -> put() | "get" -> get() | "cat" -> cat() | "list" | "ls" -> list() | "link" | "ln" -> link() | "copy" | "cp" -> copy() | "rename" | "mv" -> rename() | "chmod" -> chmod() | "chown" -> chown() | "fsstat" -> fsstat() | "params" -> params() | "blocks" -> blocks() | "auth_ticket" -> auth_ticket() | "admin_table" -> admin_table() | "help" | "-help" | "--help" -> help() | _ -> failwith ("Bad usage") let main () = try dispatch() with | Plasma_client.Plasma_error code -> prerr_endline ("plasma: " ^ Plasma_util.string_of_errno code); exit 2 | Unix.Unix_error(code,fn,file) -> prerr_endline ("plasma: " ^ (if file <> "" then file ^ ": " else "") ^ Unix.error_message code ^ (if fn <> "" then " (" ^ fn ^ ")" else "") ); exit 2 | Failure m -> prerr_endline ("plasma: " ^ m); usage(); exit 2 | error -> prerr_endline ("plasma exception: " ^ Netexn.to_string error); exit 2 let () = let ctrl = Gc.get() in let ctrl' = { ctrl with Gc.max_overhead = 1_000_000; (* Turn compactor off *) Gc.space_overhead = 300; (* allow more wasted mem *) Gc.minor_heap_size = 65536; (* bigger minor heap *) } in Gc.set ctrl'; main()