(*
Copyright 2010 Gerd Stolpmann
This file is part of Plasma, a distributed filesystem and a
map/reduce computation framework. Unless you have a written license
agreement with the copyright holder (Gerd Stolpmann), the following
terms apply:
Plasma is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Plasma is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Foobar. If not, see <http://www.gnu.org/licenses/>.
*)
(* $Id: plasma.ml 468 2011-10-13 08:17:04Z gerd $ *)
(* TODO:
- ls: display filenames should be converted to local encoding
- argument encoding should be converted to the tree encoding
- file tree: the separation between absolute and relative names is
nonsense. Support a unified tree. (E.g. cp /abs rel)
- allow to set more parameters for accessing trees:
* encoding
* ssh options
* replication
- nicer error messages
- cp -r
*)
open Printf
open Plasma_rpcapi_aux
open Plasma_util.Operators
let dlogf = Plasma_util.dlogf
module StrMap = Plasma_util.StrMap
exception Error of string
(* Error message w/o usage string. In contrast, Failure will print
usage info
*)
let errwith s = raise(Error s)
type tagged_arg =
[ `Glob of string
| `Verb of string
]
type ext_arg =
{ tree_prefix : string option;
tree_file : tagged_arg
}
let display_of_arg arg =
( match arg.tree_prefix with
| None -> ""
| Some p -> p ^ ":"
) ^
( match arg.tree_file with
| `Glob s -> s
| `Verb s -> s
)
let usage() =
prerr_endline "usage: plasma <mode> [-help|options]";
prerr_endline "";
prerr_endline " available modes:";
prerr_endline " - list | ls";
prerr_endline " - delete | rm";
prerr_endline " - rename | mv";
prerr_endline " - link | ln";
prerr_endline " - copy | cp";
prerr_endline " - mkdir";
prerr_endline " - create";
prerr_endline " - put";
prerr_endline " - get";
prerr_endline " - cat";
prerr_endline " - chmod";
prerr_endline " - chown";
prerr_endline " - fsstat";
prerr_endline " - params";
prerr_endline " - blocks";
prerr_endline " - auth_ticket";
prerr_endline " - admin_table"
let get_pw ht u =
try
Hashtbl.find ht u
with Not_found ->
try
Sys.getenv ("PLASMA_PASSWD_" ^ u)
with Not_found ->
let pw = ref "" in
Netsys_posix.with_tty
(fun tty ->
pw := (Netsys_posix.tty_read_password ~tty
("Plasma password of " ^ u ^ "? "))
);
Hashtbl.add ht u !pw;
!pw
let sugg_mem = 64 * 1024 * 1024 (* 64M *)
let access_cluster esys =
let cluster = ref None in
let namenodes = ref [] in
let rep = ref 0 in
let debug = ref false in
let open_cluster = ref None in
let auth = ref `None in
let user = ref "" in
let group = ref "" in
let pref = ref [] in
let local_pref = ref false in
let args =
[ "-cluster", Arg.String (fun s -> cluster := Some s),
"<name> Set the cluster name";
"-namenode", Arg.String (fun n -> namenodes := n :: !namenodes),
"<host>:<port> Also use this namenode (can be given several times)";
"-rep", Arg.Set_int rep,
"<n> Set replication factor for new files";
"-pref", Arg.String (fun s -> pref := s :: !pref),
"<identity> Consider the data node with this identity as preferred place";
"-local-pref", Arg.Set local_pref,
" Consider the local node as preferred place";
"-auth", Arg.String (fun s -> auth := `Direct s),
"<name> Set the RPC user to authenticate as at the namenode";
"-user", Arg.String (fun s -> user := s),
"<name> Set the filesystem user for new files";
"-group", Arg.String (fun s -> group := s),
"<name> Set the filesystem group for new files";
"-debug", Arg.Set debug,
" Enable debug output";
] in
( object
method args = args
method cluster = !cluster
method namenodes = !namenodes
method rep = !rep
method open_cluster() =
match !open_cluster with
| None ->
let nn_nodes =
if !namenodes = [] then None else Some !namenodes in
let cfg = Plasma_client_config.get_config
?clustername:!cluster
?nn_nodes () in
let c = Plasma_client.open_cluster_cc cfg esys in
open_cluster := Some c;
( match !auth with
| `None ->
(* Do something reasonable... *)
( try
let ticket = Sys.getenv "PLASMAFS_AUTH_TICKET" in
Plasma_client.configure_auth_ticket c ticket
with Not_found ->
Plasma_client.configure_auth_daemon c
(* fallback: no authentication if there is no
reachable daemon
*)
)
| `Direct u ->
let ht = Hashtbl.create 5 in
Plasma_client.configure_auth c u "pnobody" (get_pw ht);
);
Plasma_client.configure_default_user_group c !user !group;
let bsize = Plasma_client.blocksize c in
let bufs = max 1 (sugg_mem / bsize) in
Plasma_client.configure_buffer c bufs;
if !local_pref then (
let local_ids = Plasma_client.local_identities c in
pref := local_ids @ !pref
);
Plasma_client.configure_pref_nodes c !pref;
c
| Some c -> c
method config() =
Netlog.current_logger :=
Netlog.channel_logger stderr (if !debug then `Debug else `Info);
Plasma_util.debug := !debug
end
)
let tree_prefix_re =
Pcre.regexp "^([-a-zA-Z0-9_]+):(.*)$"
let extract_tree_prefix s =
try
match Pcre.extract ~rex:tree_prefix_re ~full_match:false s with
| [| prefix; s' |] ->
Some prefix, s'
| _ ->
raise Not_found
with
| Not_found -> None, s
let tree_config_re =
Pcre.regexp "^([-a-zA-Z0-9_]+)=(.*)$"
let get_tree ssh_opts url =
(* Quite limited so far. We recognize:
- file://any/path
- http://host/path
- plasma://name@host:port (host:port optional)
- ssh://name@host:port (name@, :port optional)
*)
let syn = Hashtbl.copy Neturl.common_url_syntax in
Hashtbl.add syn "plasma"
{ Neturl.null_url_syntax with
Neturl.url_enable_scheme = Neturl.Url_part_required;
Neturl.url_enable_user = Neturl.Url_part_required;
Neturl.url_enable_host = Neturl.Url_part_allowed;
Neturl.url_enable_port = Neturl.Url_part_allowed;
};
Hashtbl.add syn "ssh"
{ Neturl.null_url_syntax with
Neturl.url_enable_scheme = Neturl.Url_part_required;
Neturl.url_enable_user = Neturl.Url_part_allowed;
Neturl.url_enable_host = Neturl.Url_part_required;
Neturl.url_enable_port = Neturl.Url_part_allowed;
};
try
let u = Neturl.parse_url ~schemes:syn (Neturl.fixup_url_string url) in
match Neturl.url_scheme u with
| "file" ->
let p = try Neturl.url_path u with Not_found -> [] in
Netfs.local_fs
?encoding:(Netconversion.user_encoding())
~root:(if p = [] then "/" else Neturl.join_path p)
()
| "http" ->
(Http_fs.http_fs url :> Netfs.stream_fs)
| "plasma" ->
let clustername = Neturl.url_user u in
let nn_nodes =
try
let h = Neturl.url_host u in
let p = Neturl.url_port u in
Some [ sprintf "%s:%d" h p ]
with Not_found -> None in
let cfg = Plasma_client_config.get_config ~clustername ?nn_nodes () in
let c =
Plasma_client.open_cluster_cc
cfg (Unixqueue.create_unix_event_system()) in
(* FIXME: allow to set replication factor *)
Plasma_netfs.netfs ~verbose:true c
| "ssh" ->
let user =
try Some(Neturl.url_user u) with Not_found -> None in
let host =
Neturl.url_host u in
let port_opts =
try [ "-p"; string_of_int(Neturl.url_port u) ]
with Not_found -> [] in
let options =
ssh_opts @ port_opts in
let ci =
Shell_fs.ssh_interpreter ~options ?user ~host () in
let t =
(Shell_fs.shell_fs
?encoding:(Netconversion.user_encoding())
ci
:> Netfs.stream_fs) in
t
| sch ->
raise(Arg.Bad("This URL scheme is not supported: " ^ sch))
with
| Neturl.Malformed_URL ->
raise(Arg.Bad("This URL cannot be parsed: " ^ url))
let file_tagger ac =
let enable_glob = ref true in
let file_args = ref [] in
let trees = Hashtbl.create 5 in
let ssh_opts = [] in (* FIXME: allow to set this *)
let parse_tree s =
try
match Pcre.extract ~rex:tree_config_re ~full_match:false s with
| [| prefix; url |] ->
let tree = get_tree ssh_opts url in
Hashtbl.replace trees prefix tree
| _ -> raise Not_found
with
| Not_found -> raise(Arg.Bad("Bad syntax for -tree arg: " ^ s)) in
let args =
[ "-glob", Arg.Set enable_glob,
" Enable wildcards for the following arguments";
"-no-glob", Arg.Clear enable_glob,
" Disable wildcards for the following arguments";
"-tree", Arg.String parse_tree,
"<prefix>=<url> Access the file tree at this URL for this prefix";
] in
let plasma_tree = ref None in
let abs_file_tree =
Netfs.local_fs
?encoding:(Netconversion.user_encoding()) () in
let rel_file_tree =
Netfs.local_fs
?encoding:(Netconversion.user_encoding())
~root:"." () in
( object(self)
method args = args
method parse_anonarg s =
let p_opt, s' = extract_tree_prefix s in
{ tree_prefix = p_opt;
tree_file = if !enable_glob then `Glob s' else `Verb s'
}
method anonarg s =
let file_arg = self # parse_anonarg s in
file_args := file_arg :: !file_args
method file_args = List.rev !file_args
method trees = trees
method plasma_tree =
match !plasma_tree with
| None ->
let c = ac#open_cluster() in
let repl = ac#rep in
let t = Plasma_netfs.netfs ~repl ~verbose:true c in
plasma_tree := Some t;
t
| Some t ->
t
method same_tree t1 t2 =
if t1 = abs_file_tree || t1 = rel_file_tree then
(t2 = abs_file_tree || t2 = rel_file_tree)
else
t1 = t2
method files file_arg =
(* Returns (t, files) where files is a list of (path, display) *)
let glob t pat =
match pat with
| `Glob g ->
let fsys = Netglob.of_stream_fs t in
Netglob.glob
?encoding:(Netconversion.user_encoding())
~fsys
~mode:`All_paths
(`String g)
| `Verb v ->
[v] in
let pp ?(f = fun file -> (file,file)) files =
List.map f files in
match file_arg.tree_prefix with
| Some p ->
let pdisp file = (file, p ^ ":" ^ file) in
( try
let t = Hashtbl.find trees p in
(t, pp ~f:pdisp (glob t file_arg.tree_file))
with
| Not_found ->
if p = "file" then (
let is_abs s =
s <> "" && s.[0] = '/' in
let get_netfs s =
if is_abs s then
(abs_file_tree, s)
else
(rel_file_tree, "/" ^ s) in
let disp s =
if is_abs s then
(s, "file:" ^ String.sub s 1 (String.length s - 1))
else
(s, "file:" ^ s) in
match file_arg.tree_file with
| `Glob g ->
let (t, pat) = get_netfs g in
let f = if is_abs g then pdisp else disp in
(t, pp ~f (glob t (`Glob pat)))
| `Verb v ->
let (t, name) = get_netfs v in
(t, [name, "file:" ^ v])
)
else (
(* interpret p as host name *)
let ci =
Shell_fs.ssh_interpreter ~host:p () in
let t =
(Shell_fs.shell_fs
?encoding:(Netconversion.user_encoding())
ci
:> Netfs.stream_fs) in
(t, pp ~f:pdisp (glob t file_arg.tree_file))
)
)
| None ->
(* No prefix: access PlasmaFS *)
let t = self # plasma_tree in
(t, pp (glob t file_arg.tree_file))
end
)
let mtime_compare t1 t2 =
let k = Int64.compare t1.tsecs t2.tsecs in
if k = 0 then
compare t1.tnsecs t2.tnsecs
else
k
let list() =
let esys = Unixqueue.create_unix_event_system() in
let ac = access_cluster esys in
let ft = file_tagger ac in
let one = ref false in
let all = ref false in
let dirs = ref false in
let inode_flag = ref false in
let reverse = ref false in
let sort_by_size = ref false in
let sort_by_mtime = ref false in
if not(Unix.isatty Unix.stdout) then
one := true;
Arg.parse
( ac#args @ ft#args @
[ "-1", Arg.Set one,
" output one filename per line (default if stdout is not a tty)";
"-l", Arg.Clear one,
" output long listing (default if stdout is a tty)";
"-a", Arg.Set all,
" also include entries starting with .";
"-d", Arg.Set dirs,
" list directory entries instead of contents";
"-i", Arg.Set inode_flag,
" output inode numbers";
"-r", Arg.Set reverse,
" reverse sorting order";
"-S", Arg.Set sort_by_size,
" sort by file size";
"-t", Arg.Set sort_by_mtime,
" sort by modification time";
] )
ft#anonarg
"usage: plasma list <file> ...";
ac#config();
let show_rows rows =
let widths =
List.fold_left
(fun acc row ->
if acc = [] then
List.map
(fun (field,_) -> String.length field)
row
else
List.map2
(fun width (field, alignment) ->
max width (String.length field)
)
acc row
)
[]
rows in
List.iter
(fun row ->
let first = ref true in
List.iter2
(fun width (field, alignment) ->
if not !first then printf " ";
first := false;
match alignment with
| `L -> printf "%-*s" width field
| `R -> printf "%*s" width field
)
widths row;
printf "\n";
)
rows
in
let file_row name inode ii =
if !one then
[ name, `L ]
else (
let rwx ?(t=false) ?(su=false) ?(sg=false) n =
( if n land 4 <> 0 then "r" else "-" ) ^
( if n land 2 <> 0 then "w" else "-" ) ^
( if n land 1 <> 0 then (
if t && (ii.mode land 0o1000) <> 0 then "t" else
if su && (ii.mode land 0o4000) <> 0 then "s" else
if sg && (ii.mode land 0o2000) <> 0 then "s" else
"x"
)
else (
if su && (ii.mode land 0o4000) <> 0 then "S" else
if sg && (ii.mode land 0o2000) <> 0 then "S" else
"-"
)
) in
let bits =
( match ii.filetype with
| `ftype_regular -> "-"
| `ftype_directory -> "d"
| `ftype_symlink -> "s"
) ^
rwx ~su:true (ii.mode lsr 6) ^
rwx ~sg:true (ii.mode lsr 3) ^
rwx ~t:true ii.mode in
let maybe_inode =
if !inode_flag then [ (Int64.to_string inode), `R ] else [] in
let mtime =
(Netdate.format
~fmt:"%Y-%m-%d %H:%M"
(Netdate.create
~zone:Netdate.localzone
(Int64.to_float ii.mtime.tsecs))) in
let xname =
if ii.filetype = `ftype_symlink then
name ^ " -> " ^ ii.field1
else
name in
let row =
maybe_inode @
[ bits, `L;
ii.usergroup.user, `L;
ii.usergroup.group, `L;
Int64.to_string ii.eof, `R;
mtime, `R;
xname, `L
] in
row
)
in
let need_lf = ref false in
let first = ref true in
let errors = ref false in
let expanded =
List.flatten
(List.map
(fun farg ->
let (t, files) = ft#files farg in
List.map (fun (file,display) -> (t,farg,file,display)) files
)
ft#file_args;
) in
let suppress_header =
List.length expanded <= 1 in
List.iter
(fun (t, farg, file, display) ->
if farg.tree_prefix = None then (
let c = ac#open_cluster() in
let t = Plasma_client.start c in
( try
let inode = Plasma_client.lookup t file false in
let ii = Plasma_client.get_inodeinfo t inode in
match ii.filetype with
| `ftype_directory when not !dirs ->
if not suppress_header then (
if not !first then printf "\n";
printf "%s:\n%!" display;
);
let elements =
List.flatten
(List.map
(fun (elem, elem_inode) ->
try
let elem_ii =
Plasma_client.get_inodeinfo t elem_inode in
[elem, elem_inode, elem_ii]
with
| Plasma_client.Plasma_error `enoent -> []
)
(Plasma_client.list_inode t inode)) in
let elements' =
List.filter
(fun (elem,_,_) ->
!all || (elem <> "" && elem.[0] <> '.')
) elements in
let elements'' =
List.sort
(fun (elem1,_,elem_ii1) (elem2,_,elem_ii2) ->
let k =
if !sort_by_size then
Int64.compare elem_ii1.eof elem_ii2.eof
else
if !sort_by_mtime then
mtime_compare elem_ii1.mtime elem_ii2.mtime
else
String.compare elem1 elem2 in
if !reverse then -k else k
)
elements' in
let rows =
List.map
(fun (elem,elem_inode,elem_ii) ->
file_row elem elem_inode elem_ii
)
elements'' in
show_rows rows;
if elements'' = [] && not suppress_header then
printf "<empty>\n";
need_lf := true
| _ ->
if !need_lf then printf "\n";
need_lf := false;
show_rows [file_row display inode ii]
with
| error ->
if not !first then eprintf "\n";
eprintf "%s: %s\n%!"
display
(Netexn.to_string error);
errors := true;
need_lf := true;
);
Plasma_client.commit t;
)
else (
try
if t # test [] file `D then (
if not suppress_header then (
if not !first then printf "\n";
printf "%s:\n%!" display;
);
let files = t # readdir [] file in
let files' =
List.filter
(fun name ->
!all || (name <> "" && name.[0] <> '.')
) files in
let files'' =
List.sort
(fun name1 name2 ->
let k =
String.compare name1 name2 in
if !reverse then -k else k
)
files' in
show_rows (List.map (fun name -> [ name, `L ]) files'');
if files'' = [] && not suppress_header then
printf "<empty>\n";
need_lf := true;
)
else (
if t # test [] file `N then (
if !need_lf then printf "\n";
need_lf := false;
show_rows [ [ display, `L ] ]
)
else
raise(Unix.Unix_error(Unix.ENOENT, "", display))
)
with
| error ->
if not !first then eprintf "\n";
eprintf "%s: %s\n%!"
display
(Netexn.to_string error);
errors := true;
need_lf := true;
);
first := false
)
expanded;
if !errors then exit 1
let create() =
let esys = Unixqueue.create_unix_event_system() in
let ac = access_cluster esys in
let ft = file_tagger ac in
Arg.parse
(ac#args @ ft#args)
ft#anonarg
"usage: plasma create <file> ...";
ac#config();
List.iter
(fun farg ->
let (t, files) = ft#files farg in
List.iter
(fun (path, display) ->
let ch = t # write [`Create; `Exclusive] path in
ch # close_out()
)
files
)
ft#file_args
let mkdir() =
let esys = Unixqueue.create_unix_event_system() in
let ac = access_cluster esys in
let ft = file_tagger ac in
let p = ref false in
Arg.parse
(ac#args @ ft#args @
[ "-p", Arg.Set p, " Create parent directories as necessary" ]
)
ft#anonarg
"usage: plasma mkdir <file> ...";
ac#config();
List.iter
(fun farg ->
let (t, files) = ft#files farg in
List.iter
(fun (path, display) ->
t # mkdir (if !p then [`Path] else []) path
)
files
)
ft#file_args
let parse_mode s =
try
int_of_string("0o" ^ s)
with
| _ ->
errwith ("Cannot parse mode as octal number: " ^ s)
let chmod() =
let esys = Unixqueue.create_unix_event_system() in
let ac = access_cluster esys in
let ft = file_tagger ac in
let mode = ref None in
Arg.parse
(ac#args @ ft#args)
(fun s ->
if !mode = None then
mode := Some(parse_mode s)
else
ft#anonarg s
)
"usage: plasma chmod <octal_mode> <file> ...";
let m =
match !mode with
| None ->
errwith "Missing arguments"
| Some m -> m in
ac#config();
List.iter
(fun farg ->
let (t, files) = ft#files farg in
if t <> ft#plasma_tree then
errwith "chmod is only supported for PlasmaFS";
let c = ac#open_cluster() in
Plasma_client.with_trans c
(fun trans ->
List.iter
(fun (path, display) ->
let inode = Plasma_client.lookup trans path true in
let ii = Plasma_client.get_inodeinfo trans inode in
let ii' = { ii with mode = m } in
Plasma_client.set_inodeinfo trans inode ii';
eprintf "Modified: %s\n%!" display
)
files
);
eprintf "Committed.\n%!"
)
ft#file_args
let parse_ug s =
try
let k = String.index s ':'in
let u = String.sub s 0 k in
let g = String.sub s (k+1) (String.length s - k - 1) in
(if u = "" then None else Some u), (Some g)
with
| Not_found ->
(if s = "" then None else Some s), None
let chown() =
let esys = Unixqueue.create_unix_event_system() in
let ac = access_cluster esys in
let ft = file_tagger ac in
let user = ref None in
let group = ref None in
let ug_seen = ref false in
Arg.parse
(ac#args @ ft#args)
(fun s ->
if not !ug_seen then (
let (u_opt, g_opt) = parse_ug s in
user := u_opt;
group := g_opt;
ug_seen := true
)
else
ft#anonarg s
)
"usage: plasma chown <user>[:<group>] <file> ...";
ac#config();
List.iter
(fun farg ->
let (t, files) = ft#files farg in
if t <> ft#plasma_tree then
errwith "chown is only supported for PlasmaFS";
let c = ac#open_cluster() in
Plasma_client.with_trans c
(fun trans ->
List.iter
(fun (path, display) ->
let inode = Plasma_client.lookup trans path true in
let ii = Plasma_client.get_inodeinfo trans inode in
let usergroup =
{ user = ( match !user with
| None -> ii.usergroup.user
| Some u -> u
);
group = ( match !group with
| None -> ii.usergroup.group
| Some g -> g
)
} in
let ii' = { ii with usergroup = usergroup } in
Plasma_client.set_inodeinfo trans inode ii';
eprintf "Modified: %s\n%!" display
)
files
);
eprintf "Committed.\n%!"
)
ft#file_args
let multi_cmd t_dir ac ft do_operation =
(* ln, mv, cp *)
let dir = ref None in
let dir_expanded = ref None in
let n = List.length ft#file_args in
let targets = ref ft#file_args in
let expand_uniq arg =
let (t,files) = ft#files arg in
let n = List.length files in
if n = 0 then
errwith("Nothing matches: " ^ display_of_arg arg)
else
if n > 1 then
errwith("Multiple matches: " ^ display_of_arg arg);
let file = List.hd files in
(t, file) in
if !dir = None && n = 1 then
failwith "At least two arguments needed";
if !dir = None && n > 1 then (
let last = List.hd (List.rev !targets) in
let (t, last_file) = expand_uniq last in
let (last_path, _) = last_file in
if t # test [] last_path `D then (
dir := Some last;
dir_expanded := Some(t, last_file);
targets := List.rev (List.tl (List.rev !targets));
)
);
if !dir <> None && !dir_expanded = None then (
match !dir with
| None -> assert false
| Some d ->
let (t, d_file) = expand_uniq d in
let (d_path,d_disp) = d_file in
if not (t#test [] d_path `D) then
errwith("Not a directory: " ^ d_disp);
dir_expanded := Some(t, d_file)
);
( match !dir_expanded with
| None ->
if List.length !targets <> 2 then
failwith
"Exactly two arguments expected when no directory is specified";
( match !targets with
| [ target_arg; link_arg ] ->
let (target_t, target_file) = expand_uniq target_arg in
let (link_t, link_file) = expand_uniq link_arg in
do_operation target_t target_file link_t link_file
| _ ->
assert false
)
| Some (t,dirfile) ->
let (dirpath, dirdisp) = dirfile in
List.iter
(fun arg ->
let (t1, files) = ft#files arg in
List.iter
(fun ((path, disp) as file) ->
let basename = Filename.basename path in
do_operation t1 file t
(dirpath ^ "/" ^ basename, dirdisp ^ "/" ^ basename)
)
files
)
!targets
)
let link() =
let esys = Unixqueue.create_unix_event_system() in
let symlink_flag = ref false in
let dir = ref None in
let ac = access_cluster esys in
let ft = file_tagger ac in
Arg.parse
(ac#args @ ft#args @
[ "-s", Arg.Set symlink_flag,
" create symlink instead of hard link";
"-t", Arg.String (fun s -> dir := Some(ft#parse_anonarg s)),
"<dir> create the links in this directory";
]
)
ft#anonarg
"usage: plasma ln <opts> (<target> <link> | <target>... <dir>) ";
ac#config();
let do_link target_t target_file link_t link_file =
if not(ft # same_tree target_t link_t) then
errwith "Cross-filesystem links are not supported";
let (target_path, target_disp) = target_file in
let (link_path, link_disp) = link_file in
if !symlink_flag then (
link_t # symlink [] target_path link_path
)
else (
if link_t <> ft#plasma_tree then
errwith "Hard links are only supported for PlasmaFS";
let c = ac#open_cluster() in
Plasma_client.with_trans c
(fun trans ->
let inode = Plasma_client.lookup trans target_path true in
Plasma_client.link trans link_path inode;
eprintf "Linked: %s -> %s\n%!" link_disp target_disp
);
eprintf "Committed.\n%!"
)
in
multi_cmd !dir ac ft do_link
let rename() =
(* rename: almost the same as link *)
let esys = Unixqueue.create_unix_event_system() in
let dir = ref None in
let ac = access_cluster esys in
let ft = file_tagger ac in
Arg.parse
(ac#args @ ft#args @
[ "-t", Arg.String (fun s -> dir := Some(ft#parse_anonarg s)),
"<dir> move files to this directory";
]
)
ft#anonarg
"usage: plasma mv <opts> (<src> <dest> | <src>... <destdir>) ";
ac#config();
let do_rename src_t src_file dest_t dest_file =
if not(ft # same_tree src_t dest_t) then
errwith "Cross-filesystem renames are not supported";
let (src_path, src_disp) = src_file in
let (dest_path, dest_disp) = dest_file in
dest_t # rename [] src_path dest_path
in
multi_cmd !dir ac ft do_rename
let copy() =
(* copy: almost the same as link *)
let esys = Unixqueue.create_unix_event_system() in
let dir = ref None in
let ac = access_cluster esys in
let ft = file_tagger ac in
Arg.parse
(ac#args @ ft#args @
[ "-t", Arg.String (fun s -> dir := Some(ft#parse_anonarg s)),
"<dir> copy files to this directory";
]
)
ft#anonarg
"usage: plasma cp <opts> (<src> <dest> | <src>... <destdir>) ";
ac#config();
let do_copy src_t src_file dest_t dest_file =
let (src_path, src_disp) = src_file in
let (dest_path, dest_disp) = dest_file in
Netfs.copy ~streaming:true src_t src_path dest_t dest_path
in
multi_cmd !dir ac ft do_copy
let delete() =
let esys = Unixqueue.create_unix_event_system() in
let recurse = ref false in
let ac = access_cluster esys in
let ft = file_tagger ac in
Arg.parse
(ac#args @ ft#args @
[ "-r", Arg.Set recurse,
" recursively delete directories"
]
)
ft#anonarg
"usage: plasma rm <file> ...";
ac#config();
List.iter
(fun farg ->
let (t, files) = ft#files farg in
List.iter
(fun (path, display) ->
t # remove (if !recurse then [`Recursive] else []) path
)
files
)
ft#file_args
let put() =
let esys = Unixqueue.create_unix_event_system() in
let ac = access_cluster esys in
let plasma_filename = ref "" in
let plasma_set = ref false in
let local_filename = ref "" in
let local_set = ref false in
let topo = ref `Star in
let from_stdin = ref false in
let force = ref false in
let sync_mode = ref `Late_datasync in
Arg.parse
( ac#args
@ [ "-chain", Arg.Unit (fun () -> topo := `Chain),
" Use chain topology instead of star topology";
"-stdin", Arg.Set from_stdin,
" Copy data from stdin. No <local_filename> in this case.";
"-f", Arg.Set force,
" Force: Replace existing files";
"-nosync", Arg.Unit (fun () -> sync_mode := `No_datasync),
" Do not wait until the data gets synced";
]
)
(fun s ->
let (p_opt, _) = extract_tree_prefix s in
if p_opt <> None then
failwith "the put operation does not support non-PlasmaFS trees";
if not !local_set && not !from_stdin then (
local_filename := s;
local_set := true
)
else
if not !plasma_set then (
plasma_filename := s;
plasma_set := true
)
else
raise(Arg.Bad("Unexpected arg: " ^ s))
)
"usage: plasma put (<local_path> <plasma_path> | -stdin <plasma_path>)";
ac#config();
if not !plasma_set then
failwith "No plasma filename given";
if not !local_set && not !from_stdin then
failwith "No local filename given";
if !local_set && !from_stdin then
failwith "With -stdin, a localfile filename must not be given";
if !from_stdin then
local_filename := "<stdin>";
let fd, len =
if !from_stdin then
Unix.stdin, 0L
else (
let fd = Unix.openfile !local_filename [Unix.O_RDONLY] 0 in
let len = Unix.LargeFile.lseek fd 0L Unix.SEEK_END in
ignore(Unix.LargeFile.lseek fd 0L Unix.SEEK_SET);
(fd, len)
) in
let c = ac#open_cluster () in
let ii0 = Plasma_client.regular_ii c 0o666 in
let ii = { ii0 with replication = ac#rep } in
let t = Plasma_client.start c in
if !force then (
try
Plasma_client.unlink t !plasma_filename
with Plasma_client.Plasma_error `enoent -> ()
);
let inode = Plasma_client.create_file t !plasma_filename ii in
Plasma_client.commit t;
ignore(Plasma_client.copy_in ~flags:[!sync_mode] c inode 0L fd len !topo);
Unix.close fd;
eprintf "Copied: %s -> %s\n%!" !local_filename !plasma_filename
let alt_copy_out esys c inode pos fd len par =
(* par: number of parallel requests *)
Plasma_client.configure_buffer c (2*par);
let t = Plasma_client.start c in
let blocksize = Plasma_client.blocksize c in
let blocksizeL = Int64.of_int blocksize in
let stream_e, signal =
Plasma_util.signal_engine esys in
let pool = Netsys_mem.create_pool blocksize in
let n = ref 0 in
let p = ref pos in
let end_pos = Int64.add pos len in
let rec read_block_e p0 buf r bsize =
Plasma_client.read_e ~lazy_validation:true c inode p0 buf r (bsize-r)
++ (fun (l,eof,_) ->
if not eof && (r+l) < bsize then
read_block_e (Int64.add p0 (Int64.of_int l)) buf (r+l) bsize
else
eps_e (`Done(r+l,eof)) esys
)
in
let rec fetch() =
if !n < par then (
if !p < end_pos then (
let buf = Netsys_mem.pool_alloc_memory pool in
let p0 = !p in
let bsize = Int64.to_int (min blocksizeL (Int64.sub end_pos p0)) in
let _e =
read_block_e p0 (`Memory buf) 0 bsize
++ (fun (l,eof) ->
ignore(Unix.LargeFile.lseek fd p0 Unix.SEEK_SET);
ignore(Netsys_mem.mem_write fd buf 0 l);
decr n;
fetch();
eps_e (`Done ()) esys
) in
incr n;
p := Int64.add !p blocksizeL;
fetch()
)
);
if !n=0 then signal(`Done())
in
fetch();
let _e =
stream_e ++ (fun () -> Plasma_client.commit_e t) in
Unixqueue.run esys
let get() =
let esys = Unixqueue.create_unix_event_system() in
let ac = access_cluster esys in
let plasma_filename = ref "" in
let plasma_set = ref false in
let local_filename = ref "" in
let local_set = ref false in
let alt = ref None in
Arg.parse
( ac#args @
[ "-alt", Arg.Int (fun n -> alt := Some n),
"<n> use the alternate algorithm with a parallelization factor of n"
]
)
(fun s ->
let (p_opt, _) = extract_tree_prefix s in
if p_opt <> None then
failwith "the get operation does not support non-PlasmaFS trees";
if not !plasma_set then (
plasma_filename := s;
plasma_set := true
)
else
if not !local_set then (
local_filename := s;
local_set := true
)
else
raise(Arg.Bad("Unexpected arg: " ^ s))
)
"usage: plasma get <plasma_filename> <local_filename>";
ac#config();
if not !plasma_set then
failwith "No plasma filename given";
if not !local_set then
failwith "No local filename given";
let fd =
Unix.openfile
!local_filename [Unix.O_WRONLY; Unix.O_TRUNC; Unix.O_CREAT] 0o666 in
let c = ac#open_cluster () in
let t = Plasma_client.start c in
let inode = Plasma_client.lookup t !plasma_filename false in
let ii = Plasma_client.get_inodeinfo t inode in
Plasma_client.commit t;
( match !alt with
| None ->
ignore(Plasma_client.copy_out c inode 0L fd ii.eof)
| Some par ->
alt_copy_out esys c inode 0L fd ii.eof par
);
Unix.close fd;
eprintf "Copied: %s -> %s\n%!" !plasma_filename !local_filename
let cat() =
let esys = Unixqueue.create_unix_event_system() in
let ac = access_cluster esys in
let ft = file_tagger ac in
Arg.parse
(ac#args @ ft#args)
ft#anonarg
"usage: plasma cat <file> ...";
ac#config();
List.iter
(fun farg ->
let (t, files) = ft#files farg in
List.iter
(fun (path, display) ->
if t = ft#plasma_tree then (
let c = ac#open_cluster() in
let inode, ii =
Plasma_client.with_trans c
(fun trans ->
let inode = Plasma_client.lookup trans path false in
let ii = Plasma_client.get_inodeinfo trans inode in
if ii.Plasma_rpcapi_aux.filetype <> `ftype_regular then
raise(Plasma_client.Plasma_error `eisdir);
(inode, ii)
) in
ignore(Plasma_client.copy_out c inode 0L Unix.stdout ii.eof)
)
else (
Netchannels.with_out_obj_channel
(new Netchannels.output_channel stdout)
(fun out_ch ->
Netchannels.with_in_obj_channel
(t # read [`Streaming; `Binary] path)
(fun in_ch ->
out_ch # output_channel in_ch
)
)
)
)
files
)
ft#file_args
let fsstat() =
let esys = Unixqueue.create_unix_event_system() in
let ac = access_cluster esys in
Arg.parse
ac#args
(fun s ->
raise(Arg.Bad("Unexpected arg: " ^ s))
)
"usage: plasma fsstat";
ac#config();
let c = ac#open_cluster () in
let st = Plasma_client.fsstat c in
printf "Total: %20Ld blocks\n" st.total_blocks;
printf "Used: %20Ld blocks\n" st.used_blocks;
printf "Transitional: %20Ld blocks\n" st.trans_blocks;
printf "Free: %20Ld blocks\n"
(Int64.sub (Int64.sub st.total_blocks st.used_blocks) st.trans_blocks);
()
let params() =
let esys = Unixqueue.create_unix_event_system() in
let ac = access_cluster esys in
Arg.parse
ac#args
(fun s ->
raise(Arg.Bad("Unexpected arg: " ^ s))
)
"usage: plasma params";
ac#config();
let c = ac#open_cluster () in
let l = Plasma_client.params c in
List.iter
(fun (n,v) -> printf "%s = %s\n" n v)
l;
()
let auth_ticket() =
let esys = Unixqueue.create_unix_event_system() in
let ac = access_cluster esys in
Arg.parse
ac#args
(fun s ->
raise(Arg.Bad("Unexpected arg: " ^ s))
)
"usage: plasma auth_ticket";
ac#config();
let c = ac#open_cluster () in
let (u, _, _) = Plasma_client.current_user c in
let t = Plasma_client.get_auth_ticket c u in
print_endline t;
()
let admin_table() =
let esys = Unixqueue.create_unix_event_system() in
let ac = access_cluster esys in
let mode = ref None in
Arg.parse
(ac#args @
[ "-get", Arg.String (fun key -> mode := Some(`Get key)),
"<name> Get the admin table <name> and print it to stdout";
"-put", Arg.String (fun key -> mode := Some(`Put key)),
"<name> Set the admin table <name> from stdin";
]
)
(fun s ->
raise(Arg.Bad("Unexpected arg: " ^ s))
)
"usage: plasma admin_table";
ac#config();
match !mode with
| None ->
failwith "Need -get or -put"
| Some(`Get key) ->
let c = ac#open_cluster () in
print_string(Plasma_client.read_admin_table c key)
| Some(`Put key) ->
let c = ac#open_cluster () in
let data =
Netchannels.string_of_in_obj_channel
(new Netchannels.input_channel stdin) in
Plasma_client.write_admin_table c key data
let blocks() =
let open Plasma_blocks in
let esys = Unixqueue.create_unix_event_system() in
let ac = access_cluster esys in
let ft = file_tagger ac in
Arg.parse
ac#args
(fun s ->
let (p_opt, _) = extract_tree_prefix s in
if p_opt <> None then
failwith "the blocks operation does not support non-PlasmaFS trees";
ft#anonarg s
)
"usage: plasma blocks <plasma_file> ...";
ac#config();
let c = ac#open_cluster () in
let t = Plasma_client.start c in
let analyze file =
let inode = Plasma_client.lookup t file false in
let ii = Plasma_client.get_inodeinfo t inode in
(* FIXME: very large blocklists *)
let list =
Plasma_client.get_blocklist t inode 0L ii.blocklimit false in
printf "%s:\n" file;
let hostport = Hashtbl.create 5 in
let alive = Hashtbl.create 5 in
List.iter
(fun b ->
Hashtbl.replace hostport b.identity b.node;
Hashtbl.replace alive b.identity b.node_alive;
)
list;
let set = Plasma_blocks.bset_of_blocklist list in
(* Get replication *)
let replication = ref max_int in
let blocks = ref 0L in
Bset.iter
(fun _ br ->
let all_datanodes = br.br_datanodes in
let live_datanodes =
StrMap.filter
(fun id _ -> try Hashtbl.find alive id with Not_found -> false)
all_datanodes in
let n = StrMap.cardinal live_datanodes in
replication := min !replication n;
let b = Int64.mul br.br_length (Int64.of_int n) in
blocks := Int64.add !blocks b
)
set;
let print_bi bi =
let idxmax = Int64.pred (Int64.add bi.br_index bi.br_length) in
printf "%9Ld - %9Ld:" bi.br_index idxmax;
StrMap.iter
(fun id b_s ->
let hp =
try Hashtbl.find hostport id
with Not_found -> "n/a" in
let alive =
try Hashtbl.find alive id
with Not_found -> false in
let b_e =
Int64.pred (Int64.add b_s bi.br_length) in
if alive then
printf " %s[%Ld-%Ld]" hp b_s b_e
else
printf " %s[DEAD]" hp
)
bi.br_datanodes;
printf "\n";
in
(* Iterate over ranges and print *)
Bset.iter
(fun id bi ->
print_bi bi
)
set;
let r_s =
if list = [] then
"n/a"
else
string_of_int !replication in
printf " blocks: %Ld\n" !blocks;
printf " actual replication: %s\n" r_s;
printf " requested replication: %d\n" ii.replication;
printf "\n"
in
List.iter
(fun farg ->
let (_, files) = ft#files farg in
List.iter
(fun (path, _) ->
analyze path
)
files
)
ft#file_args;
Plasma_client.commit t
let help() =
usage()
let dispatch() =
if Array.length Sys.argv < 2 then
failwith "Bad usage";
let mode_str = Sys.argv.(1) in
incr Arg.current;
match mode_str with
| "create" -> create()
| "delete" | "rm" -> delete()
| "mkdir" -> mkdir()
| "put" -> put()
| "get" -> get()
| "cat" -> cat()
| "list" | "ls" -> list()
| "link" | "ln" -> link()
| "copy" | "cp" -> copy()
| "rename" | "mv" -> rename()
| "chmod" -> chmod()
| "chown" -> chown()
| "fsstat" -> fsstat()
| "params" -> params()
| "blocks" -> blocks()
| "auth_ticket" -> auth_ticket()
| "admin_table" -> admin_table()
| "help" | "-help" | "--help" -> help()
| _ -> failwith ("Bad usage")
let main () =
try
dispatch()
with
| Plasma_client.Plasma_error code ->
prerr_endline ("plasma: " ^ Plasma_util.string_of_errno code);
exit 2
| Unix.Unix_error(code,fn,file) ->
prerr_endline ("plasma: " ^
(if file <> "" then file ^ ": " else "") ^
Unix.error_message code ^
(if fn <> "" then " (" ^ fn ^ ")" else "")
);
exit 2
| Failure m ->
prerr_endline ("plasma: " ^ m);
usage();
exit 2
| error ->
prerr_endline ("plasma exception: " ^ Netexn.to_string error);
exit 2
let () =
let ctrl = Gc.get() in
let ctrl' = { ctrl with
Gc.max_overhead = 1_000_000; (* Turn compactor off *)
Gc.space_overhead = 300; (* allow more wasted mem *)
Gc.minor_heap_size = 65536; (* bigger minor heap *)
} in
Gc.set ctrl';
main()