Plasma GitLab Archive
Projects Blog Knowledge

(*
  Copyright 2010 Gerd Stolpmann

  This file is part of Plasma, a distributed filesystem and a
  map/reduce computation framework. Unless you have a written license
  agreement with the copyright holder (Gerd Stolpmann), the following
  terms apply:

  Plasma is free software: you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation, either version 3 of the License, or
  (at your option) any later version.

  Plasma is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with Foobar.  If not, see <http://www.gnu.org/licenses/>.

*)
(* $Id: plasma.ml 468 2011-10-13 08:17:04Z gerd $ *)

(* TODO:
   - ls: display filenames should be converted to local encoding
   - argument encoding should be converted to the tree encoding
   - file tree: the separation between absolute and relative names is
     nonsense. Support a unified tree. (E.g. cp /abs rel)
   - allow to set more parameters for accessing trees:
     * encoding
     * ssh options
     * replication
   - nicer error messages
   - cp -r
 *)

open Printf
open Plasma_rpcapi_aux
open Plasma_util.Operators

let dlogf = Plasma_util.dlogf

module StrMap = Plasma_util.StrMap

exception Error of string
  (* Error message w/o usage string. In contrast, Failure will print
     usage info
   *)

let errwith s = raise(Error s)

type tagged_arg =
    [ `Glob of string
    | `Verb of string
    ]

type ext_arg =
    { tree_prefix : string option;
      tree_file : tagged_arg
    }

let display_of_arg arg =
  ( match arg.tree_prefix with
      | None -> ""
      | Some p -> p ^ ":"
  ) ^
    ( match arg.tree_file with
	| `Glob s -> s
	| `Verb s -> s
    )

let usage() =
  prerr_endline "usage: plasma <mode> [-help|options]";
  prerr_endline "";
  prerr_endline "  available modes:";
  prerr_endline "  - list | ls";
  prerr_endline "  - delete | rm";
  prerr_endline "  - rename | mv";
  prerr_endline "  - link | ln";
  prerr_endline "  - copy | cp";
  prerr_endline "  - mkdir";
  prerr_endline "  - create";
  prerr_endline "  - put";
  prerr_endline "  - get";
  prerr_endline "  - cat";
  prerr_endline "  - chmod";
  prerr_endline "  - chown";
  prerr_endline "  - fsstat";
  prerr_endline "  - params";
  prerr_endline "  - blocks";
  prerr_endline "  - auth_ticket";
  prerr_endline "  - admin_table"


let get_pw ht u =
  try
    Hashtbl.find ht u
  with Not_found ->
    try
      Sys.getenv ("PLASMA_PASSWD_" ^ u)
    with Not_found ->
      let pw = ref "" in
      Netsys_posix.with_tty
	(fun tty ->
	   pw := (Netsys_posix.tty_read_password ~tty
		    ("Plasma password of " ^ u ^ "? "))
	);
      Hashtbl.add ht u !pw;
      !pw


let sugg_mem = 64 * 1024 * 1024 (* 64M *)


let access_cluster esys =
  let cluster = ref None in
  let namenodes = ref [] in
  let rep = ref 0 in
  let debug = ref false in
  let open_cluster = ref None in
  let auth = ref `None in
  let user = ref "" in
  let group = ref "" in
  let pref = ref [] in
  let local_pref = ref false in
  let args =
    [ "-cluster", Arg.String (fun s -> cluster := Some s),
      "<name>    Set the cluster name";
      
      "-namenode", Arg.String (fun n -> namenodes := n :: !namenodes),
      "<host>:<port>   Also use this namenode (can be given several times)";

      "-rep", Arg.Set_int rep,
      "<n>   Set replication factor for new files";

      "-pref", Arg.String (fun s -> pref := s :: !pref),
      "<identity>    Consider the data node with this identity as preferred place";

      "-local-pref", Arg.Set local_pref,
      "    Consider the local node as preferred place";

      "-auth", Arg.String (fun s -> auth := `Direct s),
      "<name>   Set the RPC user to authenticate as at the namenode";

      "-user", Arg.String (fun s -> user := s),
      "<name>   Set the filesystem user for new files";

      "-group", Arg.String (fun s -> group := s),
      "<name>   Set the filesystem group for new files";

      "-debug", Arg.Set debug,
      "    Enable debug output";
    ] in
  ( object
      method args = args
      method cluster = !cluster
      method namenodes = !namenodes
      method rep = !rep
      method open_cluster() =
	match !open_cluster with
	  | None ->
	      let nn_nodes =
		if !namenodes = [] then None else Some !namenodes in
	      let cfg = Plasma_client_config.get_config
	        ?clustername:!cluster
                ?nn_nodes () in
	      let c = Plasma_client.open_cluster_cc cfg esys in
	      open_cluster := Some c;
	      ( match !auth with
		  | `None ->
		      (* Do something reasonable... *)
		      ( try
			  let ticket = Sys.getenv "PLASMAFS_AUTH_TICKET" in
			  Plasma_client.configure_auth_ticket c ticket
			with Not_found ->
			  Plasma_client.configure_auth_daemon c
			    (* fallback: no authentication if there is no
			       reachable daemon
			     *)
		      )

		  | `Direct u ->
		      let ht = Hashtbl.create 5 in
		      Plasma_client.configure_auth c u "pnobody" (get_pw ht);
	      );
	      Plasma_client.configure_default_user_group c !user !group;
	      let bsize = Plasma_client.blocksize c in
	      let bufs = max 1 (sugg_mem / bsize) in
	      Plasma_client.configure_buffer c bufs;
	      if !local_pref then (
		let local_ids = Plasma_client.local_identities c in
		pref := local_ids @ !pref
	      );
	      Plasma_client.configure_pref_nodes c !pref;
	      c
	  | Some c -> c

      method config() =
	Netlog.current_logger := 
	  Netlog.channel_logger stderr (if !debug then `Debug else `Info);
	Plasma_util.debug := !debug
    end
  )

let tree_prefix_re =
  Pcre.regexp "^([-a-zA-Z0-9_]+):(.*)$"

let extract_tree_prefix s =
  try
    match Pcre.extract ~rex:tree_prefix_re ~full_match:false s with
      | [| prefix; s' |] ->
	  Some prefix, s'
      | _ ->
	  raise Not_found
  with
    | Not_found -> None, s


let tree_config_re =
  Pcre.regexp "^([-a-zA-Z0-9_]+)=(.*)$"


let get_tree ssh_opts url =
  (* Quite limited so far. We recognize:
     - file://any/path
     - http://host/path
     - plasma://name@host:port   (host:port optional)
     - ssh://name@host:port      (name@, :port optional)
   *)
  let syn = Hashtbl.copy Neturl.common_url_syntax in
  Hashtbl.add syn "plasma"
    { Neturl.null_url_syntax with
	Neturl.url_enable_scheme = Neturl.Url_part_required;
	Neturl.url_enable_user = Neturl.Url_part_required;
	Neturl.url_enable_host = Neturl.Url_part_allowed;
	Neturl.url_enable_port = Neturl.Url_part_allowed;
    };
  Hashtbl.add syn "ssh"
    { Neturl.null_url_syntax with
	Neturl.url_enable_scheme = Neturl.Url_part_required;
	Neturl.url_enable_user = Neturl.Url_part_allowed;
	Neturl.url_enable_host = Neturl.Url_part_required;
	Neturl.url_enable_port = Neturl.Url_part_allowed;
    };
 
  try
    let u = Neturl.parse_url ~schemes:syn (Neturl.fixup_url_string url) in
    match Neturl.url_scheme u with
      | "file" ->
	  let p = try Neturl.url_path u with Not_found -> [] in
	  Netfs.local_fs
	    ?encoding:(Netconversion.user_encoding())
	    ~root:(if p = [] then "/" else Neturl.join_path p)
	    ()
      | "http" ->
	  (Http_fs.http_fs url :> Netfs.stream_fs)
      | "plasma" ->
	  let clustername = Neturl.url_user u in
	  let nn_nodes =
	    try
	      let h = Neturl.url_host u in
	      let p = Neturl.url_port u in
	      Some [ sprintf "%s:%d" h p ]
	    with Not_found -> None in
	  let cfg = Plasma_client_config.get_config ~clustername ?nn_nodes () in
	  let c =
	    Plasma_client.open_cluster_cc
	      cfg (Unixqueue.create_unix_event_system()) in
	  (* FIXME: allow to set replication factor *)
	  Plasma_netfs.netfs ~verbose:true c
      | "ssh" ->
	  let user =
	    try Some(Neturl.url_user u) with Not_found -> None in
	  let host =
	    Neturl.url_host u in
	  let port_opts =
	    try [ "-p"; string_of_int(Neturl.url_port u) ]
	    with Not_found -> [] in
	  let options =
	    ssh_opts @ port_opts in
	  let ci =
	    Shell_fs.ssh_interpreter ~options ?user ~host () in
	  let t =
	    (Shell_fs.shell_fs
	       ?encoding:(Netconversion.user_encoding())
	       ci
	     :> Netfs.stream_fs) in
	  t
      | sch ->
	  raise(Arg.Bad("This URL scheme is not supported: " ^ sch))
  with
    | Neturl.Malformed_URL ->
	raise(Arg.Bad("This URL cannot be parsed: " ^ url))



let file_tagger ac =
  let enable_glob = ref true in
  let file_args = ref [] in
  let trees = Hashtbl.create 5 in
  let ssh_opts = [] in  (* FIXME: allow to set this *)
  let parse_tree s =
    try
      match Pcre.extract ~rex:tree_config_re ~full_match:false s with
	| [| prefix; url |] ->
	    let tree = get_tree ssh_opts url in
	    Hashtbl.replace trees prefix tree
	| _ -> raise Not_found
    with
      | Not_found -> raise(Arg.Bad("Bad syntax for -tree arg: " ^ s)) in
  let args =
    [ "-glob", Arg.Set enable_glob,
      "    Enable wildcards for the following arguments";
      "-no-glob", Arg.Clear enable_glob,
      "    Disable wildcards for the following arguments";
      "-tree", Arg.String parse_tree,
      "<prefix>=<url>    Access the file tree at this URL for this prefix";
    ] in
  let plasma_tree = ref None in
  let abs_file_tree =
    Netfs.local_fs
      ?encoding:(Netconversion.user_encoding()) () in
  let rel_file_tree =
    Netfs.local_fs
      ?encoding:(Netconversion.user_encoding()) 
      ~root:"." () in
  ( object(self)
      method args = args

      method parse_anonarg s =
	let p_opt, s' = extract_tree_prefix s in
	{ tree_prefix = p_opt;
	  tree_file = if !enable_glob then `Glob s' else `Verb s'
	}

      method anonarg s =
	let file_arg = self # parse_anonarg s in
	file_args := file_arg :: !file_args

      method file_args = List.rev !file_args

      method trees = trees

      method plasma_tree =
	match !plasma_tree with
	  | None ->
	      let c = ac#open_cluster() in
	      let repl = ac#rep in
	      let t = Plasma_netfs.netfs ~repl ~verbose:true c in
	      plasma_tree := Some t;
	      t
	  | Some t ->
	      t

      method same_tree t1 t2 =
	if t1 = abs_file_tree || t1 = rel_file_tree then
	  (t2 = abs_file_tree || t2 = rel_file_tree)
	else
	  t1 = t2

      method files file_arg =
	(* Returns (t, files) where files is a list of (path, display) *)
	let glob t pat =
	  match pat with
	    | `Glob g ->
		let fsys = Netglob.of_stream_fs t in
		Netglob.glob
		  ?encoding:(Netconversion.user_encoding()) 
		  ~fsys
		  ~mode:`All_paths
		  (`String g)
	    | `Verb v ->
		[v] in

	let pp ?(f = fun file -> (file,file)) files =
	  List.map f files in

	match file_arg.tree_prefix with
	  | Some p ->
	      let pdisp file = (file, p ^ ":" ^ file) in
	      ( try
		  let t = Hashtbl.find trees p in
		  (t, pp ~f:pdisp (glob t file_arg.tree_file))
		with
		  | Not_found ->
		      if p = "file" then (
			let is_abs s = 
			  s <> "" && s.[0] = '/' in
			let get_netfs s =
			  if is_abs s then
			    (abs_file_tree, s)
			  else
			    (rel_file_tree, "/" ^ s) in
			let disp s =
			  if is_abs s then
			    (s, "file:" ^ String.sub s 1 (String.length s - 1))
			  else
			    (s, "file:" ^ s) in
			  
			match file_arg.tree_file with
			  | `Glob g ->
			      let (t, pat) = get_netfs g in
			      let f = if is_abs g then pdisp else disp in
			      (t, pp ~f (glob t (`Glob pat)))
			  | `Verb v ->
			      let (t, name) = get_netfs v in
			      (t, [name, "file:" ^ v])
		      )
		      else (
			(* interpret p as host name *)
			let ci =
			  Shell_fs.ssh_interpreter ~host:p () in
			let t =
			  (Shell_fs.shell_fs
			     ?encoding:(Netconversion.user_encoding())
			     ci
			   :> Netfs.stream_fs) in
			(t, pp ~f:pdisp (glob t file_arg.tree_file))
		      )
	      )
	  | None ->
	      (* No prefix: access PlasmaFS *)
	      let t = self # plasma_tree in
	      (t, pp (glob t file_arg.tree_file))
    end
  )


let mtime_compare t1 t2 =
  let k = Int64.compare t1.tsecs t2.tsecs in
  if k = 0 then
    compare t1.tnsecs t2.tnsecs
  else
    k

let list() =
  let esys = Unixqueue.create_unix_event_system() in
  let ac = access_cluster esys in
  let ft = file_tagger ac in
  let one = ref false in
  let all = ref false in
  let dirs = ref false in
  let inode_flag = ref false in
  let reverse = ref false in
  let sort_by_size = ref false in
  let sort_by_mtime = ref false in

  if not(Unix.isatty Unix.stdout) then
    one := true;

  Arg.parse
    ( ac#args @ ft#args @
	[ "-1", Arg.Set one,
	  "   output one filename per line (default if stdout is not a tty)";

	  "-l", Arg.Clear one,
	  "   output long listing (default if stdout is a tty)";

	  "-a", Arg.Set all,
	  "   also include entries starting with .";

	  "-d", Arg.Set dirs,
	  "   list directory entries instead of contents";

	  "-i", Arg.Set inode_flag,
	  "   output inode numbers";

	  "-r", Arg.Set reverse,
	  "   reverse sorting order";

	  "-S", Arg.Set sort_by_size,
	  "   sort by file size";

	  "-t", Arg.Set sort_by_mtime,
	  "   sort by modification time";
	] )
    ft#anonarg
    "usage: plasma list <file> ...";
  ac#config();

  let show_rows rows =
    let widths =
      List.fold_left
	(fun acc row ->
	   if acc = [] then
	     List.map
	       (fun (field,_) -> String.length field)
	       row
	   else
	     List.map2
	       (fun width (field, alignment) ->
		  max width (String.length field)
	       )
	       acc row
	)
	[]
	rows in
    List.iter
      (fun row ->
	 let first = ref true in
	 List.iter2
	   (fun width (field, alignment) ->
	      if not !first then printf " ";
	      first := false;
	      match alignment with
		| `L -> printf "%-*s" width field
		| `R -> printf "%*s" width field
	   )
	   widths row;
	 printf "\n";
      )
      rows
  in    
	
  let file_row name inode ii =
    if !one then
      [ name, `L ]
    else (
      let rwx ?(t=false) ?(su=false) ?(sg=false) n =
	( if n land 4 <> 0 then "r" else "-" ) ^ 
	  ( if n land 2 <> 0 then "w" else "-" ) ^ 
	  ( if n land 1 <> 0 then (
	      if t && (ii.mode land 0o1000) <> 0 then "t" else
		if su && (ii.mode land 0o4000) <> 0 then "s" else
		  if sg && (ii.mode land 0o2000) <> 0 then "s" else
		    "x" 
	    )
	    else  (
	      if su && (ii.mode land 0o4000) <> 0 then "S" else
		if sg && (ii.mode land 0o2000) <> 0 then "S" else
		  "-"
	    )
	  ) in
      let bits =
	( match ii.filetype with
	    | `ftype_regular -> "-"
	    | `ftype_directory -> "d"
	    | `ftype_symlink -> "s" 
	) ^
	  rwx ~su:true (ii.mode lsr 6) ^ 
	  rwx ~sg:true (ii.mode lsr 3) ^ 
	  rwx ~t:true ii.mode in
      let maybe_inode =
	if !inode_flag then [ (Int64.to_string inode), `R ] else [] in
      let mtime =
	(Netdate.format
	   ~fmt:"%Y-%m-%d %H:%M"
	   (Netdate.create 
	      ~zone:Netdate.localzone
	      (Int64.to_float ii.mtime.tsecs))) in
      let xname =
	if ii.filetype = `ftype_symlink then
	  name ^ " -> " ^ ii.field1
	else
	  name in
      let row =
	maybe_inode @
	  [ bits, `L;
	    ii.usergroup.user, `L;
	    ii.usergroup.group, `L;
	    Int64.to_string ii.eof, `R;
	    mtime, `R;
	    xname, `L
	  ] in
      row
    )
  in

  let need_lf = ref false in
  let first = ref true in
  let errors = ref false in

  let expanded =
    List.flatten
      (List.map
	 (fun farg ->
	    let (t, files) = ft#files farg in
	    List.map (fun (file,display) -> (t,farg,file,display)) files
	 )
	 ft#file_args;
      ) in

  let suppress_header =
    List.length expanded <= 1 in

  List.iter
    (fun (t, farg, file, display) ->
       if farg.tree_prefix = None then (
	 let c = ac#open_cluster() in
	 let t = Plasma_client.start c in
	 ( try
	     let inode = Plasma_client.lookup t file false in
	     let ii = Plasma_client.get_inodeinfo t inode in
	     match ii.filetype with
	       | `ftype_directory when not !dirs ->
		   if not suppress_header then (
		     if not !first then printf "\n";
		     printf "%s:\n%!" display;
		   );
		   let elements = 
		     List.flatten
		       (List.map
			  (fun (elem, elem_inode) ->
			     try
			       let elem_ii =
				 Plasma_client.get_inodeinfo t elem_inode in
			       [elem, elem_inode, elem_ii]
			     with
			       | Plasma_client.Plasma_error `enoent -> []
			  )
			  (Plasma_client.list_inode t inode)) in
		   let elements' =
		     List.filter
		       (fun (elem,_,_) ->
			  !all || (elem <> "" && elem.[0] <> '.')
		       ) elements in
		   let elements'' =
		     List.sort
		       (fun (elem1,_,elem_ii1) (elem2,_,elem_ii2) ->
			  let k =
			    if !sort_by_size then
			      Int64.compare elem_ii1.eof elem_ii2.eof
			    else
			      if !sort_by_mtime then
				mtime_compare elem_ii1.mtime elem_ii2.mtime
			      else
				String.compare elem1 elem2 in
			  if !reverse then -k else k
		       )
		       elements' in
		   let rows =
		     List.map
		       (fun (elem,elem_inode,elem_ii) ->
			  file_row elem elem_inode elem_ii
		       )
		       elements'' in
		   show_rows rows;
		   if elements'' = [] && not suppress_header then
		     printf "<empty>\n";
		   need_lf := true
	       | _ ->
		   if !need_lf then printf "\n";
		   need_lf := false;
		   show_rows [file_row display inode ii]
	   with
	     | error ->
		 if not !first then eprintf "\n";
		 eprintf "%s: %s\n%!" 
		   display
		   (Netexn.to_string error);
		 errors := true;
		 need_lf := true;
	 );
	 Plasma_client.commit t;
       )
       else (
	 try
	   if t # test [] file `D then (
	     if not suppress_header then (
	       if not !first then printf "\n";
	       printf "%s:\n%!" display;
	     );
	     let files = t # readdir [] file in
	     let files' =
	       List.filter
		 (fun name ->
		    !all || (name <> "" && name.[0] <> '.')
		 ) files in
	     let files'' =
	       List.sort
		 (fun name1 name2 ->
		    let k =
		      String.compare name1 name2 in
		    if !reverse then -k else k
		 )
		 files' in
	     show_rows (List.map (fun name -> [ name, `L ]) files'');
	     if files'' = [] && not suppress_header then
	       printf "<empty>\n";
	     need_lf := true;
	   )
	   else (
	     if t # test [] file `N then (
	       if !need_lf then printf "\n";
	       need_lf := false;
	       show_rows [ [ display, `L ] ]
	     )
	     else
	       raise(Unix.Unix_error(Unix.ENOENT, "", display))
	   )
	 with
	   | error ->
	       if not !first then eprintf "\n";
	       eprintf "%s: %s\n%!" 
		 display
		 (Netexn.to_string error);
	       errors := true;
	       need_lf := true;
       );
       first := false
    )
    expanded;

  if !errors then exit 1


let create() =
  let esys = Unixqueue.create_unix_event_system() in
  let ac = access_cluster esys in
  let ft = file_tagger ac in
  Arg.parse
    (ac#args @ ft#args)
    ft#anonarg
    "usage: plasma create <file> ...";
  ac#config();

  List.iter
    (fun farg ->
       let (t, files) = ft#files farg in
       List.iter
	 (fun (path, display) ->
	    let ch = t # write [`Create; `Exclusive] path in
	    ch # close_out()
	 )
	 files
    )
    ft#file_args


let mkdir() =
  let esys = Unixqueue.create_unix_event_system() in
  let ac = access_cluster esys in
  let ft = file_tagger ac in
  let p = ref false in
  Arg.parse
    (ac#args @ ft#args @
       [ "-p", Arg.Set p, "    Create parent directories as necessary" ]
    )
    ft#anonarg
    "usage: plasma mkdir <file> ...";
  ac#config();

  List.iter
    (fun farg ->
       let (t, files) = ft#files farg in
       List.iter
	 (fun (path, display) ->
	    t # mkdir (if !p then [`Path] else []) path
	 )
	 files
    )
    ft#file_args


let parse_mode s =
  try
    int_of_string("0o" ^ s)
  with
    | _ ->
	errwith ("Cannot parse mode as octal number: " ^ s)


let chmod() =
  let esys = Unixqueue.create_unix_event_system() in
  let ac = access_cluster esys in
  let ft = file_tagger ac in
  let mode = ref None in
  Arg.parse
    (ac#args @ ft#args)
    (fun s ->
       if !mode = None then
	 mode := Some(parse_mode s)
       else
	 ft#anonarg s
    )
    "usage: plasma chmod <octal_mode> <file> ...";
  let m =
    match !mode with
      | None ->
	  errwith "Missing arguments"
      | Some m -> m in
  ac#config();

  List.iter
    (fun farg ->
       let (t, files) = ft#files farg in
       if t <> ft#plasma_tree then
	 errwith "chmod is only supported for PlasmaFS";
       let c = ac#open_cluster() in
       Plasma_client.with_trans c
	 (fun trans ->
	    List.iter
	      (fun (path, display) ->
		 let inode = Plasma_client.lookup trans path true in
		 let ii = Plasma_client.get_inodeinfo trans inode in
		 let ii' = { ii with mode = m } in
		 Plasma_client.set_inodeinfo trans inode ii';
		 eprintf "Modified: %s\n%!" display
	      )
	      files
	 );
      eprintf "Committed.\n%!"
    )
    ft#file_args


let parse_ug s =
  try
    let k = String.index s ':'in
    let u = String.sub s 0 k in
    let g = String.sub s (k+1) (String.length s - k - 1) in
    (if u = "" then None else Some u), (Some g)
  with
    | Not_found ->
	(if s = "" then None else Some s), None


let chown() =
  let esys = Unixqueue.create_unix_event_system() in
  let ac = access_cluster esys in
  let ft = file_tagger ac in
  let user = ref None in
  let group = ref None in
  let ug_seen = ref false in
  Arg.parse
    (ac#args @ ft#args)
    (fun s ->
       if not !ug_seen then (
	 let (u_opt, g_opt) = parse_ug s in
	 user := u_opt;
	 group := g_opt;
	 ug_seen := true
       )
       else
	 ft#anonarg s
    )
    "usage: plasma chown <user>[:<group>] <file> ...";
  ac#config();

  List.iter
    (fun farg ->
       let (t, files) = ft#files farg in
       if t <> ft#plasma_tree then
	 errwith "chown is only supported for PlasmaFS";
       let c = ac#open_cluster() in
       Plasma_client.with_trans c
	 (fun trans ->
	    List.iter
	      (fun (path, display) ->
		 let inode = Plasma_client.lookup trans path true in
		 let ii = Plasma_client.get_inodeinfo trans inode in
		 let usergroup =
		   { user = ( match !user with
				 | None -> ii.usergroup.user
				 | Some u -> u
			     );
		     group = ( match !group with
				 | None -> ii.usergroup.group
				 | Some g -> g
			     )
		   } in
		 let ii' = { ii with usergroup = usergroup } in
		 Plasma_client.set_inodeinfo trans inode ii';
		 eprintf "Modified: %s\n%!" display
	      )
	      files
	 );
      eprintf "Committed.\n%!"
    )
    ft#file_args


let multi_cmd t_dir ac ft do_operation =
  (* ln, mv, cp *)
  let dir = ref None in
  let dir_expanded = ref None in
    let n = List.length ft#file_args in
  let targets = ref ft#file_args in

  let expand_uniq arg =
    let (t,files) = ft#files arg in
    let n = List.length files in
    if n = 0 then
      errwith("Nothing matches: " ^ display_of_arg arg)
    else
      if n > 1 then
	errwith("Multiple matches: " ^ display_of_arg arg);
    let file = List.hd files in
    (t, file) in

  if !dir = None && n = 1 then
    failwith "At least two arguments needed";
  
  if !dir = None && n > 1 then (
    let last = List.hd (List.rev !targets) in
    let (t, last_file) = expand_uniq last in
    let (last_path, _) = last_file in
    if t # test [] last_path `D then (
      dir := Some last;
      dir_expanded := Some(t, last_file);
      targets := List.rev (List.tl (List.rev !targets));
    )
  );

  if !dir <> None && !dir_expanded = None then (
    match !dir with
      | None -> assert false
      | Some d ->
	  let (t, d_file) = expand_uniq d in
	  let (d_path,d_disp) = d_file in
	  if not (t#test [] d_path `D) then
	    errwith("Not a directory: " ^ d_disp);
	  dir_expanded := Some(t, d_file)
  );

  ( match !dir_expanded with
      | None ->
	  if List.length !targets <> 2 then
	    failwith
	      "Exactly two arguments expected when no directory is specified";
	  ( match !targets with
	      | [ target_arg; link_arg ] ->
		  let (target_t, target_file) = expand_uniq target_arg in
		  let (link_t, link_file) = expand_uniq link_arg in
		  do_operation target_t target_file link_t link_file
	      | _ ->
		  assert false
	  )
	    
      | Some (t,dirfile) ->
	  let (dirpath, dirdisp) = dirfile in
	  List.iter
	    (fun arg ->
	       let (t1, files) = ft#files arg in
	       List.iter
		 (fun ((path, disp) as file) ->
		    let basename = Filename.basename path in
		    do_operation t1 file t 
		      (dirpath ^ "/" ^ basename, dirdisp ^ "/" ^ basename)
		 )
		 files
	    )
	    !targets
  )




let link() =
  let esys = Unixqueue.create_unix_event_system() in
  let symlink_flag = ref false in
  let dir = ref None in
  let ac = access_cluster esys in
  let ft = file_tagger ac in
  Arg.parse
    (ac#args @ ft#args @
       [ "-s", Arg.Set symlink_flag,
	 "    create symlink instead of hard link";

	 "-t", Arg.String (fun s -> dir := Some(ft#parse_anonarg s)),
	 "<dir>   create the links in this directory";
       ]
    )
    ft#anonarg
    "usage: plasma ln <opts> (<target> <link> | <target>... <dir>) ";
  ac#config();

  let do_link target_t target_file link_t link_file =
    if not(ft # same_tree target_t link_t) then
      errwith "Cross-filesystem links are not supported";
    let (target_path, target_disp) = target_file in
    let (link_path, link_disp) = link_file in
    if !symlink_flag then (
      link_t # symlink [] target_path link_path
    )
    else (
      if link_t <> ft#plasma_tree then
	errwith "Hard links are only supported for PlasmaFS";
      let c = ac#open_cluster() in
      Plasma_client.with_trans c
	(fun trans ->
	   let inode = Plasma_client.lookup trans target_path true in
	   Plasma_client.link trans link_path inode;
	   eprintf "Linked: %s -> %s\n%!" link_disp target_disp
	);
      eprintf "Committed.\n%!"
    )
  in

  multi_cmd !dir ac ft do_link


let rename() =
  (* rename: almost the same as link *)
  let esys = Unixqueue.create_unix_event_system() in
  let dir = ref None in
  let ac = access_cluster esys in
  let ft = file_tagger ac in
  Arg.parse
    (ac#args @ ft#args @
       [ "-t", Arg.String (fun s -> dir := Some(ft#parse_anonarg s)),
	 "<dir>   move files to this directory";
       ]
    )
    ft#anonarg
    "usage: plasma mv <opts> (<src> <dest> | <src>... <destdir>) ";
  ac#config();

  let do_rename src_t src_file dest_t dest_file =
    if not(ft # same_tree src_t dest_t) then
      errwith "Cross-filesystem renames are not supported";
    let (src_path, src_disp) = src_file in
    let (dest_path, dest_disp) = dest_file in
    dest_t # rename [] src_path dest_path
  in

  multi_cmd !dir ac ft do_rename


let copy() =
  (* copy: almost the same as link *)
  let esys = Unixqueue.create_unix_event_system() in
  let dir = ref None in
  let ac = access_cluster esys in
  let ft = file_tagger ac in
  Arg.parse
    (ac#args @ ft#args @
       [ "-t", Arg.String (fun s -> dir := Some(ft#parse_anonarg s)),
	 "<dir>   copy files to this directory";
       ]
    )
    ft#anonarg
    "usage: plasma cp <opts> (<src> <dest> | <src>... <destdir>) ";
  ac#config();

  let do_copy src_t src_file dest_t dest_file =
    let (src_path, src_disp) = src_file in
    let (dest_path, dest_disp) = dest_file in
    Netfs.copy ~streaming:true src_t src_path dest_t dest_path
  in

  multi_cmd !dir ac ft do_copy


let delete() =
  let esys = Unixqueue.create_unix_event_system() in
  let recurse = ref false in
  let ac = access_cluster esys in
  let ft = file_tagger ac in

  Arg.parse
    (ac#args @ ft#args @
       [ "-r", Arg.Set recurse,
	 "   recursively delete directories"
       ]
    )
    ft#anonarg
    "usage: plasma rm <file> ...";
  ac#config();

  List.iter
    (fun farg ->
       let (t, files) = ft#files farg in
       List.iter
	 (fun (path, display) ->
	    t # remove (if !recurse then [`Recursive] else []) path
	 )
	 files
    )
    ft#file_args


let put() =
  let esys = Unixqueue.create_unix_event_system() in
  let ac = access_cluster esys in

  let plasma_filename = ref "" in
  let plasma_set = ref false in
  let local_filename = ref "" in
  let local_set = ref false in
  let topo = ref `Star in
  let from_stdin = ref false in
  let force = ref false in
  let sync_mode = ref `Late_datasync in

  Arg.parse
    ( ac#args
      @ [ "-chain", Arg.Unit (fun () -> topo := `Chain),
	  "   Use chain topology instead of star topology";

	  "-stdin", Arg.Set from_stdin,
	  "   Copy data from stdin. No <local_filename> in this case.";

	  "-f", Arg.Set force,
	  "   Force: Replace existing files";

	  "-nosync", Arg.Unit (fun () -> sync_mode := `No_datasync),
	  "   Do not wait until the data gets synced";
	]
    )
    (fun s ->
       let (p_opt, _) = extract_tree_prefix s in
       if p_opt <> None then
	 failwith "the put operation does not support non-PlasmaFS trees";
       if not !local_set && not !from_stdin then (
	 local_filename := s;
	 local_set := true
       )
       else
	 if not !plasma_set then (
	   plasma_filename := s;
	   plasma_set := true
	 )
	 else
	   raise(Arg.Bad("Unexpected arg: " ^ s))
    )
    "usage: plasma put (<local_path> <plasma_path> | -stdin <plasma_path>)";
  ac#config();

  if not !plasma_set then
    failwith "No plasma filename given";   
  if not !local_set && not !from_stdin then
    failwith "No local filename given";
  if !local_set && !from_stdin then
    failwith "With -stdin, a localfile filename must not be given";

  if !from_stdin then
    local_filename := "<stdin>";

  let fd, len = 
    if !from_stdin then
      Unix.stdin, 0L
    else (
      let fd = Unix.openfile !local_filename [Unix.O_RDONLY] 0 in
      let len = Unix.LargeFile.lseek fd 0L Unix.SEEK_END in
      ignore(Unix.LargeFile.lseek fd 0L Unix.SEEK_SET);
      (fd, len)
    ) in
  
  let c = ac#open_cluster () in
  let ii0 = Plasma_client.regular_ii c 0o666 in
  let ii = { ii0 with replication = ac#rep } in
  let t = Plasma_client.start c in
  if !force then (
    try
      Plasma_client.unlink t !plasma_filename
    with Plasma_client.Plasma_error `enoent -> ()
  );
  let inode = Plasma_client.create_file t !plasma_filename ii in
  Plasma_client.commit t;
  ignore(Plasma_client.copy_in ~flags:[!sync_mode] c inode 0L fd len !topo);
  Unix.close fd;

  eprintf "Copied: %s -> %s\n%!" !local_filename !plasma_filename


let alt_copy_out esys c inode pos fd len par =
  (* par: number of parallel requests *)

  Plasma_client.configure_buffer c (2*par);

  let t = Plasma_client.start c in
  let blocksize = Plasma_client.blocksize c in
  let blocksizeL = Int64.of_int blocksize in

  let stream_e, signal =
    Plasma_util.signal_engine esys in

  let pool = Netsys_mem.create_pool blocksize in

  let n = ref 0 in
  let p = ref pos in
  let end_pos = Int64.add pos len in

  let rec read_block_e p0 buf r bsize =
    Plasma_client.read_e ~lazy_validation:true c inode p0 buf r (bsize-r)
    ++ (fun (l,eof,_) ->
	  if not eof && (r+l) < bsize then
	    read_block_e (Int64.add p0 (Int64.of_int l)) buf (r+l) bsize
	  else
	    eps_e (`Done(r+l,eof)) esys
       )
  in

  let rec fetch() =
    if !n < par then (
      if !p < end_pos then (
	let buf = Netsys_mem.pool_alloc_memory pool in
	let p0 = !p in
	let bsize = Int64.to_int (min blocksizeL (Int64.sub end_pos p0)) in
	let _e =
	  read_block_e p0 (`Memory buf) 0 bsize
	  ++ (fun (l,eof) ->
		ignore(Unix.LargeFile.lseek fd p0 Unix.SEEK_SET);
		ignore(Netsys_mem.mem_write fd buf 0 l);
		decr n;
		fetch();
		eps_e (`Done ()) esys
	     ) in
	incr n;
	p := Int64.add !p blocksizeL;
	fetch()
      )
    );
    if !n=0 then signal(`Done())
  in

  fetch();
  let _e =
    stream_e ++ (fun () -> Plasma_client.commit_e t) in
  Unixqueue.run esys


let get() =
  let esys = Unixqueue.create_unix_event_system() in
  let ac = access_cluster esys in

  let plasma_filename = ref "" in
  let plasma_set = ref false in
  let local_filename = ref "" in
  let local_set = ref false in
  let alt = ref None in

  Arg.parse
    ( ac#args @
	[ "-alt", Arg.Int (fun n -> alt := Some n),
	  "<n>   use the alternate algorithm with a parallelization factor of n"
	]
    )
    (fun s ->
       let (p_opt, _) = extract_tree_prefix s in
       if p_opt <> None then
	 failwith "the get operation does not support non-PlasmaFS trees";
       if not !plasma_set then (
	 plasma_filename := s;
	 plasma_set := true
       )
       else
	 if not !local_set then (
	   local_filename := s;
	   local_set := true
	 )
	 else
	   raise(Arg.Bad("Unexpected arg: " ^ s))
    )
    "usage: plasma get <plasma_filename> <local_filename>";
  ac#config();

  if not !plasma_set then
    failwith "No plasma filename given";   
  if not !local_set then
    failwith "No local filename given";

  let fd = 
    Unix.openfile 
      !local_filename [Unix.O_WRONLY; Unix.O_TRUNC; Unix.O_CREAT] 0o666 in
  
  let c = ac#open_cluster () in

  let t = Plasma_client.start c in
  let inode = Plasma_client.lookup t !plasma_filename false in
  let ii = Plasma_client.get_inodeinfo t inode in
  Plasma_client.commit t;
  ( match !alt with
      | None ->
	  ignore(Plasma_client.copy_out c inode 0L fd ii.eof)
      | Some par ->
	  alt_copy_out esys c inode 0L fd ii.eof par
  );
  Unix.close fd;

  eprintf "Copied: %s -> %s\n%!" !plasma_filename !local_filename


let cat() =
  let esys = Unixqueue.create_unix_event_system() in
  let ac = access_cluster esys in
  let ft = file_tagger ac in
  Arg.parse
    (ac#args @ ft#args)
    ft#anonarg
    "usage: plasma cat <file> ...";
  ac#config();

  List.iter
    (fun farg ->
       let (t, files) = ft#files farg in
       List.iter
	 (fun (path, display) ->
	    if t = ft#plasma_tree then (
	      let c = ac#open_cluster() in
	      let inode, ii =
		Plasma_client.with_trans c
		  (fun trans ->
		     let inode = Plasma_client.lookup trans path false in
		     let ii = Plasma_client.get_inodeinfo trans inode in
		     if ii.Plasma_rpcapi_aux.filetype <> `ftype_regular then
		       raise(Plasma_client.Plasma_error `eisdir);
		     (inode, ii)
		  ) in
	      ignore(Plasma_client.copy_out c inode 0L Unix.stdout ii.eof)
	    )
	    else (
	      Netchannels.with_out_obj_channel
		(new Netchannels.output_channel stdout)
		(fun out_ch ->
		   Netchannels.with_in_obj_channel
		     (t # read [`Streaming; `Binary] path)
		     (fun in_ch ->
			out_ch # output_channel in_ch
		     )
		)
	    )
	 )
	 files
    )
    ft#file_args


let fsstat() =
  let esys = Unixqueue.create_unix_event_system() in
  let ac = access_cluster esys in

  Arg.parse
    ac#args
    (fun s ->
       raise(Arg.Bad("Unexpected arg: " ^ s))
    )
    "usage: plasma fsstat";
  ac#config();

  let c = ac#open_cluster () in
  let st = Plasma_client.fsstat c in
  printf "Total:        %20Ld blocks\n" st.total_blocks;
  printf "Used:         %20Ld blocks\n" st.used_blocks;
  printf "Transitional: %20Ld blocks\n" st.trans_blocks;
  printf "Free:         %20Ld blocks\n"
    (Int64.sub (Int64.sub st.total_blocks st.used_blocks) st.trans_blocks);
  ()


let params() =
  let esys = Unixqueue.create_unix_event_system() in
  let ac = access_cluster esys in

  Arg.parse
    ac#args
    (fun s ->
       raise(Arg.Bad("Unexpected arg: " ^ s))
    )
    "usage: plasma params";
  ac#config();

  let c = ac#open_cluster () in
  let l = Plasma_client.params c in
  List.iter
    (fun (n,v) -> printf "%s = %s\n" n v)
    l;
  ()


let auth_ticket() =
  let esys = Unixqueue.create_unix_event_system() in
  let ac = access_cluster esys in

  Arg.parse
    ac#args
    (fun s ->
       raise(Arg.Bad("Unexpected arg: " ^ s))
    )
    "usage: plasma auth_ticket";
  ac#config();

  let c = ac#open_cluster () in
  let (u, _, _) = Plasma_client.current_user c in
  let t = Plasma_client.get_auth_ticket c u in
  print_endline t;
  ()


let admin_table() =
  let esys = Unixqueue.create_unix_event_system() in
  let ac = access_cluster esys in
  let mode = ref None in

  Arg.parse
    (ac#args @
       [ "-get", Arg.String (fun key -> mode := Some(`Get key)),
	 "<name>   Get the admin table <name> and print it to stdout";

	 "-put", Arg.String (fun key -> mode := Some(`Put key)),
	 "<name>   Set the admin table <name> from stdin";
       ]
    )
    (fun s ->
       raise(Arg.Bad("Unexpected arg: " ^ s))
    )
    "usage: plasma admin_table";
  ac#config();

  match !mode with
    | None ->
	failwith "Need -get or -put"
    | Some(`Get key) ->
	let c = ac#open_cluster () in
	print_string(Plasma_client.read_admin_table c key)
    | Some(`Put key) ->
	let c = ac#open_cluster () in
	let data =
	  Netchannels.string_of_in_obj_channel
	    (new Netchannels.input_channel stdin) in
	Plasma_client.write_admin_table c key data


let blocks() =
  let open Plasma_blocks in

  let esys = Unixqueue.create_unix_event_system() in
  let ac = access_cluster esys in
  let ft = file_tagger ac in

  Arg.parse
    ac#args
    (fun s ->
       let (p_opt, _) = extract_tree_prefix s in
       if p_opt <> None then
	 failwith "the blocks operation does not support non-PlasmaFS trees";
       ft#anonarg s
    )
    "usage: plasma blocks <plasma_file> ...";
  ac#config();

  let c = ac#open_cluster () in
  let t = Plasma_client.start c in

  let analyze file =
    let inode = Plasma_client.lookup t file false in
    let ii = Plasma_client.get_inodeinfo t inode in
    (* FIXME: very large blocklists *)
    let list = 
      Plasma_client.get_blocklist t inode 0L ii.blocklimit false in
    printf "%s:\n" file;
    
    let hostport = Hashtbl.create 5 in
    let alive = Hashtbl.create 5 in
    
    List.iter
      (fun b ->
	 Hashtbl.replace hostport b.identity b.node;
	 Hashtbl.replace alive b.identity b.node_alive;
      )
      list;
    
    let set = Plasma_blocks.bset_of_blocklist list in
    
    (* Get replication *)
    let replication = ref max_int in
    let blocks = ref 0L in
    Bset.iter
      (fun _ br ->
	 let all_datanodes = br.br_datanodes in
	 let live_datanodes =
	   StrMap.filter
	     (fun id _ -> try Hashtbl.find alive id with Not_found -> false)
	     all_datanodes in
	 let n = StrMap.cardinal live_datanodes in
	 replication := min !replication n;
	 let b = Int64.mul br.br_length (Int64.of_int n) in
	 blocks := Int64.add !blocks b
      )
      set;
    
    let print_bi bi =
      let idxmax = Int64.pred (Int64.add bi.br_index bi.br_length) in
      printf "%9Ld - %9Ld:" bi.br_index idxmax;
      StrMap.iter
	(fun id b_s ->
	   let hp =
	     try Hashtbl.find hostport id
	     with Not_found -> "n/a" in
	   let alive =
	     try Hashtbl.find alive id
	     with Not_found -> false in
	   let b_e =
		Int64.pred (Int64.add b_s bi.br_length) in
	   if alive then
	     printf " %s[%Ld-%Ld]" hp b_s b_e
	   else
	     printf " %s[DEAD]" hp
	)
	bi.br_datanodes;
      printf "\n";
    in
    
    (* Iterate over ranges and print *)
    Bset.iter
      (fun id bi ->
	 print_bi bi
      )
      set;
    
    let r_s =
      if list = [] then
	"n/a"
      else
	string_of_int !replication in
    
    printf "  blocks:                %Ld\n" !blocks;
    printf "  actual replication:    %s\n" r_s;
    printf "  requested replication: %d\n" ii.replication;
    printf "\n"
  in

  List.iter
    (fun farg ->
       let (_, files) = ft#files farg in
       List.iter
	 (fun (path, _) ->
	    analyze path
	 )
	 files
    )
    ft#file_args;

  Plasma_client.commit t



let help() =
  usage()


let dispatch() =
  if Array.length Sys.argv < 2 then
    failwith "Bad usage";

  let mode_str = Sys.argv.(1) in
  incr Arg.current;

  match mode_str with
    | "create" -> create()
    | "delete" | "rm" -> delete()
    | "mkdir" -> mkdir()
    | "put" -> put()
    | "get" -> get()
    | "cat" -> cat()
    | "list" | "ls" -> list()
    | "link" | "ln" -> link()
    | "copy" | "cp" -> copy()
    | "rename" | "mv" -> rename()
    | "chmod" -> chmod()
    | "chown" -> chown()
    | "fsstat" -> fsstat()
    | "params" -> params()
    | "blocks" -> blocks()
    | "auth_ticket" -> auth_ticket()
    | "admin_table" -> admin_table()
    | "help" | "-help" | "--help" -> help()
    | _ -> failwith ("Bad usage")

let main () =
  try
    dispatch()
  with
    | Plasma_client.Plasma_error code ->
        prerr_endline ("plasma: " ^ Plasma_util.string_of_errno code);
        exit 2

    | Unix.Unix_error(code,fn,file) ->
	prerr_endline ("plasma: " ^ 
			 (if file <> "" then file ^ ": " else "") ^
			 Unix.error_message code ^ 
			 (if fn <> "" then " (" ^ fn ^ ")" else "")
		      );
	exit 2

    | Failure m ->
        prerr_endline ("plasma: " ^ m);
        usage();
        exit 2

    | error ->
	prerr_endline ("plasma exception: " ^ Netexn.to_string error);
	exit 2
          
let () = 
  let ctrl = Gc.get() in
  let ctrl' = { ctrl with 
                  Gc.max_overhead = 1_000_000;  (* Turn compactor off *)
                  Gc.space_overhead = 300;      (* allow more wasted mem *)
                  Gc.minor_heap_size = 65536;   (* bigger minor heap *)
              } in
  Gc.set ctrl';
  main()

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml