Plasma GitLab Archive
Projects Blog Knowledge

(*
  Copyright 2010 Gerd Stolpmann

  This file is part of Plasma, a distributed filesystem and a
  map/reduce computation framework. Unless you have a written license
  agreement with the copyright holder (Gerd Stolpmann), the following
  terms apply:

  Plasma is free software: you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation, either version 3 of the License, or
  (at your option) any later version.

  Plasma is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with Foobar.  If not, see <http://www.gnu.org/licenses/>.

*)
(* $Id: nfs3d_manager.ml 488 2011-10-25 21:29:23Z gerd $ *)

(* mount -t nfs -o soft,proto=tcp,port=2801,mountport=2800,mountproto=tcp,nfsvers=3,nolock,noacl,nordirplus,sec=none 127.0.0.1:/test /mnt *)

module PA = Plasma_rpcapi_aux
module NA = Nfs3_aux
module StrSet = Plasma_util.StrSet

open Plasma_util.Operators
open Printf

type conf =
    { clustername : string;
      namenodes : (string * int) list;
      buffer_memory : int;
      cauth : Pfs_auth.client_auth;
    }

type dircache =
    (int64 * int64, (string * int64) array * float) Hashtbl.t
    (* maps [(inode, cookieverf)] to the array of [(name,inode)] members, plus
       time of last access
     *)


type rundata =
    { conf : conf;
      cluster : Plasma_client.plasma_cluster;
      clients : (Unix.sockaddr, string) Hashtbl.t;
      (** Maps socket address to a secret that prooves the mount *)
      rootfh : int64;
      blocksize : int;
      blockpool : Netsys_mem.memory_pool;
      dircache : (Unix.sockaddr, dircache) Hashtbl.t;
      writeverf : string;
      mutable ug_admin : Plasma_ug.ug_admin;
      mutable ug_admin_file_t : float;
      mutable ug_admin_check_t : float;
      esys : Unixqueue.event_system;
    }

type nfs_error =
    [ `nfs3err_perm 
    | `nfs3err_noent 
    | `nfs3err_io 
    | `nfs3err_nxio 
    | `nfs3err_acces 
    | `nfs3err_exist 
    | `nfs3err_xdev 
    | `nfs3err_nodev 
    | `nfs3err_notdir 
    | `nfs3err_isdir 
    | `nfs3err_inval 
    | `nfs3err_fbig 
    | `nfs3err_nospc 
    | `nfs3err_rofs 
    | `nfs3err_mlink 
    | `nfs3err_nametoolong 
    | `nfs3err_notempty 
    | `nfs3err_dquot 
    | `nfs3err_stale 
    | `nfs3err_remote 
    | `nfs3err_badhandle 
    | `nfs3err_not_sync 
    | `nfs3err_bad_cookie 
    | `nfs3err_notsupp 
    | `nfs3err_toosmall 
    | `nfs3err_serverfault 
    | `nfs3err_badtype 
    | `nfs3err_jukebox 
    ]


type 'a nfs_error_arg =
    [ `nfs3err_perm of 'a
    | `nfs3err_noent of 'a
    | `nfs3err_io of 'a
    | `nfs3err_nxio of 'a
    | `nfs3err_acces of 'a
    | `nfs3err_exist of 'a
    | `nfs3err_xdev of 'a
    | `nfs3err_nodev of 'a
    | `nfs3err_notdir of 'a
    | `nfs3err_isdir of 'a
    | `nfs3err_inval of 'a
    | `nfs3err_fbig of 'a
    | `nfs3err_nospc of 'a
    | `nfs3err_rofs of 'a
    | `nfs3err_mlink of 'a
    | `nfs3err_nametoolong of 'a 
    | `nfs3err_notempty of 'a
    | `nfs3err_dquot of 'a
    | `nfs3err_stale of 'a
    | `nfs3err_remote of 'a
    | `nfs3err_badhandle of 'a
    | `nfs3err_not_sync of 'a
    | `nfs3err_bad_cookie of 'a
    | `nfs3err_notsupp of 'a
    | `nfs3err_toosmall of 'a
    | `nfs3err_serverfault of 'a
    | `nfs3err_badtype of 'a
    | `nfs3err_jukebox of 'a
    ]

exception Nfs_error of nfs_error
      
exception Retry

let add_arg nfs_error arg =
  match nfs_error with
    | `nfs3err_perm -> `nfs3err_perm arg 
    | `nfs3err_noent -> `nfs3err_noent arg 
    | `nfs3err_io -> `nfs3err_io arg 
    | `nfs3err_nxio -> `nfs3err_nxio arg 
    | `nfs3err_acces -> `nfs3err_acces arg 
    | `nfs3err_exist -> `nfs3err_exist arg 
    | `nfs3err_xdev -> `nfs3err_xdev arg 
    | `nfs3err_nodev -> `nfs3err_nodev arg 
    | `nfs3err_notdir -> `nfs3err_notdir arg 
    | `nfs3err_isdir -> `nfs3err_isdir arg 
    | `nfs3err_inval -> `nfs3err_inval arg 
    | `nfs3err_fbig -> `nfs3err_fbig arg 
    | `nfs3err_nospc -> `nfs3err_nospc arg 
    | `nfs3err_rofs -> `nfs3err_rofs arg 
    | `nfs3err_mlink -> `nfs3err_mlink arg 
    | `nfs3err_nametoolong -> `nfs3err_nametoolong arg 
    | `nfs3err_notempty -> `nfs3err_notempty arg 
    | `nfs3err_dquot -> `nfs3err_dquot arg 
    | `nfs3err_stale -> `nfs3err_stale arg 
    | `nfs3err_remote -> `nfs3err_remote arg 
    | `nfs3err_badhandle -> `nfs3err_badhandle arg 
    | `nfs3err_not_sync -> `nfs3err_not_sync arg
    | `nfs3err_bad_cookie -> `nfs3err_bad_cookie arg
    | `nfs3err_notsupp -> `nfs3err_notsupp arg 
    | `nfs3err_toosmall -> `nfs3err_toosmall arg 
    | `nfs3err_serverfault -> `nfs3err_serverfault arg 
    | `nfs3err_badtype -> `nfs3err_badtype arg 
    | `nfs3err_jukebox -> `nfs3err_jukebox arg 

      
let read_multiple = 1
  (* Reading more than one block has a bad effect. The NFS client usually
     always requests several blocks by emitting several READs. When now
     each READ fetches several filesystem blocks, this results in a
     non-ascending order
   *)

let write_multiple = 16

let configure_auth cluster conf =
  Plasma_client.configure_auth cluster
    Pfs_auth.privileged_user
    Pfs_auth.bottom_user
    (fun u ->
       if u = Pfs_auth.privileged_user then
	 conf.cauth.Pfs_auth.proot_pw
       else
	 conf.cauth.Pfs_auth.pnobody_pw
    );
  Plasma_client.configure_default_user_group cluster "root" "root"


let with_sync_cluster conf f =
  let sesys = Unixqueue.create_unix_event_system() in
  let scluster = 
    Plasma_client.open_cluster conf.clustername conf.namenodes sesys in
  configure_auth scluster conf;
  try
    let r = f scluster in
    Plasma_client.close_cluster scluster;
    r
  with
    | error ->
	Plasma_client.close_cluster scluster;
	raise error


let mkdir_p cluster trans path =
  let rec loop prefix path =
    match path with
      | [] -> ()
      | name :: path' ->
	  let prefix' = prefix ^ "/" ^ name in
	  ( try
	      ignore(Plasma_client.mkdir trans prefix' 
		       (Plasma_client.dir_ii cluster 0o777))
	    with
	      | Plasma_client.Plasma_error `eexist -> ()
	  );
	  loop prefix' path' in
  loop "/" path


let dlogf = Plasma_util.dlogf

let mount_dir_path_l = [ ".plasma"; "var"; "lib"; "nfs3" ]

let mount_dir_path = "/" ^ String.concat "/" mount_dir_path_l 

let mount_path = mount_dir_path ^ "/mounts_" ^ Unix.gethostname()

let to_hex s =
  let buf = Buffer.create 50 in
  for k = 0 to String.length s - 1 do
    bprintf buf "%02x" (Char.code s.[k])
  done;
  Buffer.contents buf


let from_hex s = (* poor impl (slow) *)
  try
    if (String.length s) mod 2 <> 0 then raise Not_found;
    let buf = Buffer.create 50 in
    for k = 0 to (String.length s) / 2 - 1 do
      let j = 2*k in
      let u = String.sub s j 2 in
      let code = Scanf.sscanf u "%x%!" (fun code -> code) in
      Buffer.add_char buf (Char.chr code)
    done;
    Buffer.contents buf
  with
    | _ -> invalid_arg "Nfs3d_manager.from_hex"


let load_mount_table rd =
  let load scluster trans =
    try
      let inode = Plasma_client.lookup trans mount_path false in
      let ii = Plasma_client.get_inodeinfo trans inode in
      let s = String.create (Int64.to_int ii.PA.eof) in
      let (_,_,_) =
	Plasma_client.read scluster inode 0L (`String s) 0 (String.length s) in
      s
    with Plasma_client.Plasma_error `enoent -> ""
  in
  let s =
    with_sync_cluster rd.conf
      (fun scluster ->
	 Plasma_client.retry scluster "get_mount_table"
	   (fun () -> Plasma_client.with_trans scluster (load scluster)) ()
      ) in
  let ch =
    new Netchannels.input_string s in
  try
    while true do
      let line1 = ch # input_line() in
      let ip = Unix.inet_addr_of_string line1 in
      let line2 = ch # input_line() in
      let secret = from_hex line2 in
      let saddr = Unix.ADDR_INET(ip,0) in
      Hashtbl.replace rd.clients saddr secret
    done
  with End_of_file -> ()


let rec write cluster inode s pos =
  let sm = `String s in
  if pos < String.length s then (
    let n = 
      Plasma_client.write
	cluster inode (Int64.of_int pos) sm pos (String.length s - pos) in
    write cluster inode s (pos+n)
  )
  

let write_mount_table rd =
  let mkfile scluster trans =
    mkdir_p scluster trans mount_dir_path_l;
    let inode =
      try
	Plasma_client.create_file
	  trans mount_path (Plasma_client.regular_ii scluster 0o666)
      with Plasma_client.Plasma_error `eexist ->
	Plasma_client.lookup trans mount_path false in
    Plasma_client.truncate trans inode 0L;
    inode
  in
  let buf = Buffer.create 500 in
  Hashtbl.iter
    (fun saddr secret ->
       let ip = 
	 match saddr with
	   | Unix.ADDR_INET(ip,_) -> ip
	   | _ -> assert false   in 
       Buffer.add_string buf (Unix.string_of_inet_addr ip ^ "\n");
       Buffer.add_string buf (to_hex secret ^ "\n")
    )
    rd.clients;
  let s = Buffer.contents buf in
  with_sync_cluster rd.conf
    (fun scluster ->
       let inode =
	 Plasma_client.retry scluster "write_mount_table"
	   (fun () -> 
	      Plasma_client.with_trans scluster
		(fun trans -> mkfile scluster trans)
	   )
	   () in
       write scluster inode s 0;
       Plasma_client.flush scluster inode 0L 0L;
    )


let read_local_file name =
  let b = Buffer.create 512 in
  let b_ch = new Netchannels.output_buffer b in
  Netchannels.with_in_obj_channel
    (new Netchannels.input_channel (open_in_bin name))
    (fun ch -> b_ch # output_channel ch);
  Buffer.contents b


let get_ug_admin_file_t() =
  List.fold_left
    (fun acc file ->
       let m = (Unix.stat file).Unix.st_mtime in
       max acc m
    )
    0.0
    [ "/etc/passwd"; "/etc/group" ]


let get_ug_admin() =
  (* Just read /etc/passwd and /etc/group *)
  let data =
    [ "passwd", read_local_file "/etc/passwd";
      "group",  read_local_file "/etc/group"
    ] in
  Plasma_ug.parse_ug_admin data
  



let new_rundata conf =
  let rootfh, blocksize, ug_admin, ug_admin_file_t =
    with_sync_cluster conf
      (fun scluster ->
	 let st =
	   Plasma_client.start scluster in
	 let rootfh = 
	   Plasma_client.lookup st "/" false in
	 let blocksize =
	   Plasma_client.blocksize scluster in
	 let ug_admin_file_t = get_ug_admin_file_t() in
	 let ug_admin = get_ug_admin() in
	 Plasma_client.commit st;
	 (rootfh, blocksize, ug_admin, ug_admin_file_t)
    ) in

  let esys = (Netplex_cenv.self_cont()) # event_system in
  let cluster = 
    Plasma_client.open_cluster conf.clustername conf.namenodes esys in

  configure_auth cluster conf;
  Plasma_client.configure_buffer cluster 
    (max 1 (conf.buffer_memory / blocksize));

  (* writeverf is just the startup timestamp: *)
  let writeverfL = Int64.bits_of_float (Unix.gettimeofday()) in
  let writeverf = Rtypes.int8_as_string (Rtypes.int8_of_int64 writeverfL) in

  let rd =
    { conf = conf;
      cluster = cluster;
      clients = Hashtbl.create 7;
      rootfh = rootfh;
      blocksize = blocksize;
      blockpool = Netsys_mem.create_pool (read_multiple*blocksize);
      dircache = Hashtbl.create 7;
      writeverf = writeverf;
      ug_admin;
      ug_admin_file_t;
      ug_admin_check_t = Unix.gettimeofday();
      esys = esys
    } in
  load_mount_table rd;
  rd


let check_ug_admin rd =
  let now = Unix.gettimeofday() in
  if now -. rd.ug_admin_check_t >= 1.0 then (
    let file_t = get_ug_admin_file_t() in
    if file_t > rd.ug_admin_file_t then (
      rd.ug_admin <- get_ug_admin();
      rd.ug_admin_file_t <- file_t;
      rd.ug_admin_check_t <- now
    )
  )


let global_rundata = ref None

let get_rundata conf =
  match !global_rundata with
    | Some rd ->
	if rd.conf == conf then (
	  check_ug_admin rd;
	  rd
	)
	else
	  failwith "More than one configuration"
    | None ->
	let rd = new_rundata conf in
	global_rundata := Some rd;
	rd

let rec wait_for_rundata_failsafe conf =
  try
    ignore(get_rundata conf); true
  with
    | Plasma_client.Cluster_down msg as err ->
	Netlog.logf `Warning
	  "Cluster not yet up: %s"
	  (Netexn.to_string err);
	let _t = 
	  Netplex_cenv.create_timer
	    (fun t -> wait_for_rundata_failsafe conf)
	    1.0 in
	false


let peer sess =
  let saddr = Rpc_server.get_peer_name sess in
  match saddr with
    | Unix.ADDR_INET(ip,_) ->
	Unix.ADDR_INET(ip,0)
    | _ ->
	saddr


let wrap_fh secret inode =
  secret ^ Rtypes.int8_as_string (Rtypes.int8_of_int64 inode)

let unwrap_fh fh =
  if String.length fh <> 16 then
    failwith "unwrap_fh";
  let secret = String.sub fh 0 8 in
  (secret, Rtypes.int64_of_int8 (Rtypes.read_int8 fh 8))

let wrap_sess_fh sess rd inode =
  let saddr = peer sess in
  try
    let secret = Hashtbl.find rd.clients saddr in
    wrap_fh secret inode
  with
    | Not_found ->
	raise (Nfs_error `nfs3err_serverfault)
	
	  
let unwrap_sess_fh sess rd fh =
  let saddr = peer sess in
  if String.length fh <> 16 then
    raise (Nfs_error `nfs3err_badhandle);
  try
    let exp_secret = Hashtbl.find rd.clients saddr in
    let (secret, inode) = unwrap_fh fh in
    if secret <> exp_secret then
      raise (Nfs_error `nfs3err_badhandle);
    inode
  with
    | Not_found ->
	raise (Nfs_error `nfs3err_serverfault)

let to_uid rd user =
  try Int64.of_int ((rd.ug_admin # getpwnam user).Unix.pw_uid)
  with _ -> 65534L   (* "nobody" *)

let to_uname rd uid =
  try (rd.ug_admin # getpwuid (Int64.to_int uid)).Unix.pw_name
  with _ -> "nobody"

let to_gid rd group =
  try Int64.of_int((rd.ug_admin # getgrnam group).Unix.gr_gid)
  with _ -> 65534L   (* "nogroup" *)
  
let to_gname rd gid =
  try (rd.ug_admin # getgrgid (Int64.to_int gid)).Unix.gr_name
  with _ -> "nogroup"
  
let to_nfstime t =
  if t.PA.tsecs < 0L then
    failwith "to_nfstime: found negative value (=set to server time)";
  { NA.seconds = t.PA.tsecs;
      nseconds = Int64.of_int t.PA.tnsecs
  }


let of_nfstime t =
  { PA.tsecs = t.NA.seconds;
    PA.tnsecs = Int64.to_int t.NA.nseconds
  }


let to_fattr3 rd inode ii =
  (* Update eof and mtime if there are pending writes. The background is
     that Plasma_client.write does not immediately update these metadata
     field. This happens first at flush time. We can query the designated
     new values, though, and consider them before we return values back
     to the user.
   *)
  let eof =
    try Plasma_client.get_write_eof rd.cluster inode
    with Not_found -> ii.PA.eof in
  let mtime =
    try Plasma_client.get_write_mtime rd.cluster inode
    with Not_found -> ii.PA.mtime in
  dlogf "to_fattr3 eof=%Ld" eof;
  { NA.fa_type = (match ii.PA.filetype with
		    | `ftype_regular -> `nf3reg
		    | `ftype_directory -> `nf3dir
		    | `ftype_symlink -> `nf3lnk
		 );
    fa_mode = Int64.of_int (ii.PA.mode land 0o777);
    fa_nlink = 1L;    (* FIXME *)
    fa_uid = to_uid rd ii.PA.usergroup.PA.user;
    fa_gid = to_gid rd ii.PA.usergroup.PA.group;
    fa_size = eof;
    fa_used = 0L;    (* FIXME *)
    (* fa_used is the number of used bytes. For us, this would be the number
       of actually allocated blocks, times the blocksize

       TODO: add this to inode_info. How to deal with failed nodes?
     *)
    fa_rdev = { NA.specdata1 = 0L; specdata2 = 0L };
    fa_fsid = 0L;
    fa_fileid = inode;
    fa_atime = to_nfstime mtime;  (* no atime *)
    fa_mtime = to_nfstime mtime;
    fa_ctime = to_nfstime ii.PA.ctime;
  }

let of_fattr3 rd ftype attr =
  { PA.filetype = ftype;
    usergroup = { PA.user = to_uname rd attr.NA.fa_uid;
		  PA.group = to_gname rd attr.NA.fa_gid;
		};
    mode = (Int64.to_int attr.NA.fa_mode) land 0o777;
    eof = attr.NA.fa_size;
    mtime = of_nfstime attr.NA.fa_mtime;
    ctime = of_nfstime attr.NA.fa_ctime;
    replication = 0;   (* use server's default *)
    blocklimit = 0L;
    field1 = "";
    seqno = (-1L);
    committed = false;
    create_verifier = 0L;
    anonymous = false;
  }

let to_wcc_attr inode ii =
  dlogf "to_wcc_attr eof=%Ld" ii.PA.eof;
  { NA.wcc_size = ii.PA.eof;
    wcc_mtime = to_nfstime ii.PA.mtime;
    wcc_ctime = to_nfstime ii.PA.ctime;
  }

let to_nfserr 
      ?(retry_on_conflict = false)
      (pe : Plasma_client.errno) : nfs_error =
  match pe with
    | `enotrans -> `nfs3err_serverfault
    | `efailedcommit -> `nfs3err_serverfault
    | `elongtrans -> `nfs3err_serverfault
    | `efailed -> `nfs3err_serverfault
    | `eperm -> `nfs3err_perm
    | `enoent -> `nfs3err_noent
    | `eaccess -> `nfs3err_acces
    | `eexist -> `nfs3err_exist
    | `efhier -> `nfs3err_perm
    | `einval -> `nfs3err_inval
    | `efbig -> `nfs3err_fbig
    | `enospc -> `nfs3err_nospc
    | `erofs -> `nfs3err_rofs
    | `enametoolong -> `nfs3err_nametoolong
    | `econflict -> 
	if retry_on_conflict then raise Retry else `nfs3err_serverfault
    | `ecoord -> `nfs3err_serverfault
    | `enonode -> `nfs3err_serverfault
    | `etbusy -> `nfs3err_serverfault
    | `estale -> `nfs3err_stale
    | `eio -> `nfs3err_io
    | `eloop -> `nfs3err_serverfault  (* should never get this *)
    | `enotdir -> `nfs3err_notdir
    | `eisdir -> `nfs3err_isdir
    | `enotempty -> `nfs3err_notempty
    | `ebadpath -> `nfs3err_notdir


let to_nfserr_arg
      ?retry_on_conflict
      pe arg : _ nfs_error_arg =
  add_arg (to_nfserr ?retry_on_conflict pe) arg

let of_create_verf s =
  (* Convert 8 bytes to an int64 *)
  assert(String.length s = 8);
  Rtypes.int64_of_int8 (Rtypes.read_int8 s 0)

(* Helpers for authorization *)

(* FIXME: get_rpc_identity just trusts the IDs it gets from the
   local machine. Regarding the user ID there is nothing wrong with
   that. The groups, though, should be verified with the namenode
   (i.e. only accept a group if the user is also in this group
   following the definition in the namenode).
 *)

let get_rpc_identity rd sess : (string * string * StrSet.t) =
  (* may raise Nfs_error *)
  let am = Rpc_server.get_auth_method sess in
  if am#name <> "AUTH_SYS" then
    raise(Nfs_error `nfs3err_perm);
  let creds = Rpc_server.get_user sess in
  try
    let (uid, gid, supp_gids, _) = Rpc_auth_sys.parse_user_name creds in
    let uname = to_uname rd (Int64.of_int uid) in
    let gname = to_gname rd (Int64.of_int gid) in
    let supp_groups =
      Array.fold_left
	(fun acc g ->
	   try
	     let n = to_gname rd (Int64.of_int g) in
	     StrSet.add n acc
	   with
	     | _ -> acc
		 (* just drop unknown gid's *)
	)
	StrSet.empty
	supp_gids in
    dlogf "get_rpc_identity: user=%s group=%s" uname gname;
    (uname,gname,supp_groups)
  with
    | _ -> 
	raise(Nfs_error `nfs3err_perm)

let have_permission mask (user,group,supp_groups) ii =
  if user = ii.PA.usergroup.PA.user then (
    (ii.PA.mode land (mask lsl 6)) <> 0
  )
  else
    if StrSet.mem ii.PA.usergroup.PA.group supp_groups then (
      (ii.PA.mode land (mask lsl 3)) <> 0
    )
    else
      (ii.PA.mode land mask) <> 0

let have_read_permission = have_permission 4
let have_write_permission = have_permission 2
let have_exec_permission = have_permission 1

let have_owner_permission (user,group,supp_groups) ii =
  user = ii.PA.usergroup.PA.user

let check_setattr_permission rd creds sattr ii =
  let have_user_change =
    match sattr.NA.sa_uid with
      | None -> false
      | Some uid ->
	  let uname = to_uname rd uid in
	  ii.PA.usergroup.PA.user <> uname in
  let have_group_change =
    match sattr.NA.sa_gid with
      | None -> false
      | Some id -> ii.PA.usergroup.PA.group <> to_gname rd id in
  let have_mode_change =
    match sattr.NA.sa_mode with
      | None -> false
      | Some m -> Int64.of_int(ii.PA.mode) <> m in
  let have_current_mtime_change =
    match sattr.NA.sa_mtime with
      | `dont_change -> false
      | `set_to_server_time -> true
      | `set_to_client_time _ -> false in
  let have_arbitrary_mtime_change =
    match sattr.NA.sa_mtime with
      | `dont_change -> false
      | `set_to_server_time -> false
      | `set_to_client_time _ -> true in
  let have_size_change =
    sattr.NA.sa_size <> None in
  (* perms: *)
  let owner_perm_lz = lazy(have_owner_permission creds ii) in
  let wr_perm_lz = lazy(Lazy.force owner_perm_lz ||
			  have_write_permission creds ii) in
  (* changing uid: always disallowed *)
  if have_user_change then
    raise(Nfs_error `nfs3err_perm);
  (* Changing mode, gid: the owner can do this *)
  if (have_mode_change && not(Lazy.force owner_perm_lz))
     || (have_group_change && not(Lazy.force owner_perm_lz)) 
  then
    raise(Nfs_error `nfs3err_perm);
  (* mtime: the owner can do this, or anybody with write access, but
     only to the current time
   *)
  if have_current_mtime_change && not(Lazy.force wr_perm_lz) && not(Lazy.force owner_perm_lz) then
    raise(Nfs_error `nfs3err_acces);
  if have_arbitrary_mtime_change && not(Lazy.force owner_perm_lz) then
    raise(Nfs_error `nfs3err_perm);
  (* size: write access needed *)
  if have_size_change && not(Lazy.force wr_perm_lz) then
    raise(Nfs_error `nfs3err_acces);
  ()

(* NFS *)

let pause =
  [| 0.01; 0.02; 0.04; 0.08; 0.16; 0.32; 0.64; 1.0;
     2.0; 4.0; 8.0; 16.0; 32.0; 60.0
  |]

let transaction_e rd name f =
  (** Creates a transaction [t] and runs [f t esys]. *)
  (* TODO: avoid serialization *)
  let result = ref `Aborted in
  let rec next_attempt retry =
    dlogf "Starting transaction for %s" name;
    Plasma_client.start_e rd.cluster
    ++ (fun t ->
	  let e = f t rd.esys in
	  Plasma_util.failsafe_e e 
	  ++ (fun st ->
		let success, do_retry =
		  match st with
		    | `Done _ -> true, false
		    | `Error Retry -> false, true
		    | _ -> false, false in
		( if success then (
		    dlogf
		      "Committing transaction for %s" name;
		    Plasma_client.commit_e t
		  )
		  else (
		    dlogf "Aborting transaction for %s" name;
		    Plasma_client.abort_e t
		  )
		)
		++ (fun () ->
		      if do_retry then (
			let d =
			  if retry < Array.length pause then
			    pause.(retry)
			  else
			    pause.(Array.length pause - 1) in
			dlogf
			  "Retrying transaction for %s after %f seconds"
			  name d;
			Plasma_util.delay_engine d
			  (fun _ -> next_attempt (retry+1))
			  rd.esys
		      )
		      else (
			result := st;
			eps_e (`Done ()) rd.esys
		      )
		   )
	     )
       ) in
  next_attempt 0
    >> (function
	  | `Done () -> `Done !result
	  | `Error e -> `Error e
	  | `Aborted -> `Aborted
       )


let no_transaction_e rd name f =
  (* Same, but no transaction *)
  let result = ref `Aborted in
  let rec next_attempt retry =
    dlogf "Starting execution of %s" name;
    let e = f rd.esys in
    Plasma_util.failsafe_e e 
    ++ (fun st ->
	  let success, do_retry =
	    match st with
	      | `Done _ -> true, false
	      | `Error Retry -> false, true
	      | _ -> false, false in
	  ( if success then
	      dlogf
		"Successful execution of %s" name
	    else
	      dlogf "Non-successful execution of %s" name
	  );
	  if do_retry then (
	    let d =
	      if retry < Array.length pause then
		pause.(retry)
	      else
		pause.(Array.length pause - 1) in
	    dlogf
	      "Retrying execution of %s after %f seconds"
	      name d;
	    Plasma_util.delay_engine d
	      (fun _ -> next_attempt (retry+1))
	      rd.esys
	  )
	  else (
	    result := st;
	    eps_e (`Done ()) rd.esys
	  )
       ) in
  next_attempt 0
    >> (function
	  | `Done () -> `Done !result
	  | `Error e -> `Error e
	  | `Aborted -> `Aborted
       )

      

(* --- read calls  --- *)

let proc_lookup conf sess name_in_dir emit =
  let rd = get_rundata conf in
  try
    let _ = get_rpc_identity rd sess in
    (* Any identity can look up - no further checks *)
    let dir_inode = unwrap_sess_fh sess rd name_in_dir.NA.dirop_dir in
    Plasma_util.rpc_srv_reply "lookup" sess emit
      (transaction_e rd "lookup"
	 (fun t esys ->
	    (* We don't check the x bit because we don't do this in the
	       nameserver neither
	     *)
	    Plasma_client.dir_lookup_e
	      t
	      dir_inode
	      name_in_dir.NA.dirop_name
	      true
	    ++ (fun file_inode ->
		  Plasma_client.get_inodeinfo_e 
		    t
		    file_inode
		  ++ (fun file_ii ->
			eps_e (`Done(file_inode,file_ii)) esys
		     )
	       )
	    >> (function
		  | `Done (file_inode,file_ii) ->
		      let file_fh = wrap_sess_fh sess rd file_inode in
		      let file_attr = to_fattr3 rd file_inode file_ii in
		      let ok =
			{ NA.lookup_object = file_fh;
			  lookup_obj_attributes = Some file_attr;
			  lookup_dir_attributes = None
			} in
		      `Done(`nfs3_ok ok)
		  | `Error (Plasma_client.Plasma_error plasma_err) ->
		      `Done(to_nfserr_arg
			      ~retry_on_conflict:true
			      plasma_err
			      { NA.lookup_dir_attributes_fail = None }
			    :> NA.lookup3res)
		  | `Error err ->
		      `Error err
		  | `Aborted ->
		      `Aborted
	       )
	 )
      )
  with
    | Nfs_error e ->
	let r =
	  (add_arg e { NA.lookup_dir_attributes_fail = None } :> NA.lookup3res)
	in
	emit r


let with_ii ?(rd=false) ?(rd_or_owner=false) ?(wr=false)
            conf sess fh proc_name f_ok f_err f_nfserr emit =
  (* f_ok: is called as   
               f_ok trans inode ii   
           and must return an engine computing the OK value
     f_err: is called for Plasma errors as
               f_err plasmacode inode_ii_opt
           and must return the emitted value
     f_nfserr: is called for Nfs_error exceptions as
               f_nfserr nfscode inode_ii_opt
           and must return the emitted value. nfscode is w/o argument.
           inode_ii_opt is Some(inode,ii) if the inode lookup was ok.
   *)
  let rund = get_rundata conf in
  try
    let creds = get_rpc_identity rund sess in
    let inode = unwrap_sess_fh sess rund fh in
    Plasma_util.rpc_srv_reply proc_name sess emit
      (transaction_e rund proc_name
	 (fun t esys ->
	    let inode_ii_opt = ref None in
	    Plasma_client.get_inodeinfo_e
	      t
	      inode
	    ++ (fun ii ->
		  if rd && not (have_read_permission creds ii) then
		    raise(Nfs_error `nfs3err_acces);
		  if rd_or_owner &&
		     (not (have_read_permission creds ii)) &&
		     (not (have_owner_permission creds ii)) then
		       raise(Nfs_error `nfs3err_acces);
		  if wr && not (have_write_permission creds ii) then
		    raise(Nfs_error `nfs3err_acces);
		  inode_ii_opt := Some(inode,ii);
		  try
		    let e = f_ok t inode ii creds in
		    e ++ (fun ok -> eps_e (`Done ok) rund.esys)
		  with
		    | error -> 
			eps_e (`Error error) rund.esys
	       )
	    >> (function
		  | `Done r -> 
		      `Done r
		  | `Error (Plasma_client.Plasma_error plasma_err) ->
		      `Done(f_err plasma_err !inode_ii_opt)
		  | `Error (Nfs_error err) ->
		      `Done(f_nfserr err !inode_ii_opt)
		  | `Error err ->
		      `Error err
		  | `Aborted ->
		      `Aborted
	       )
	 )
      )
  with
    | Nfs_error e -> (* from get_rpc_identity or unwrap_sess_fh *)
	let r = f_nfserr e None in
	emit r


let get_ii ?rd ?rd_or_owner ?wr 
           conf sess fh proc_name f_ok f_err f_nfserr emit =
  (* Same but f_ok returns the result directly *)
  let rund = get_rundata conf in
  with_ii
    ?rd ?rd_or_owner ?wr conf sess fh proc_name 
    (fun t inode ii creds -> 
       let r = f_ok t inode ii creds in
       eps_e (`Done (`nfs3_ok r)) rund.esys
    )
    f_err f_nfserr emit



let proc_getattr conf sess fh emit =
  let rd = get_rundata conf in
  get_ii
    ~rd_or_owner:true conf sess fh "getattr"
    (fun t inode ii creds ->
       let attr = to_fattr3 rd inode ii in
       { NA.getattr_obj_attributes = attr }
    )
    (fun plasma_err _ ->
       (to_nfserr
	  ~retry_on_conflict:true
	  plasma_err
	:> NA.getattr3res)
    )
    (fun nfs_err _ -> (nfs_err :> NA.getattr3res))
    emit

let a_read = Rtypes.int_of_uint4 NA.access3_read
let a_lookup = Rtypes.int_of_uint4 NA.access3_lookup
let a_modify = Rtypes.int_of_uint4 NA.access3_modify
let a_extend = Rtypes.int_of_uint4 NA.access3_extend
let a_delete = Rtypes.int_of_uint4 NA.access3_delete
let a_execute = Rtypes.int_of_uint4 NA.access3_execute

let grant bits cond =
  if cond then bits else 0


let proc_access conf sess (fh, perms) emit = 
  (* fake for now - we always grant everything *)
  let rd = get_rundata conf in
  let pi = Int64.to_int (Int64.logand perms 0x3fL) in
  get_ii
    conf sess fh "access"
    (fun t inode ii creds ->
       let r1 =
	 grant a_read
	   ((pi land a_read) <> 0 && (have_read_permission creds ii)) in
       let r2 =
	 grant a_lookup
	   ((pi land a_lookup) <> 0) in  (* we ignore the x bit! *)
       let r3 =
	 grant a_modify
	   ((pi land a_modify) <> 0 && (have_write_permission creds ii)) in
       let r4 =
	 grant a_extend
	   ((pi land a_extend) <> 0 && (have_write_permission creds ii)) in
       let r5 =
	 grant a_delete false in  (* never granted, see RFC 1813 *)
       let r6 =
	 grant a_execute
	   ((pi land a_execute) <> 0 && (have_exec_permission creds ii)) in
       let pi' = r1 lor r2 lor r3 lor r4 lor r5 lor r6 in
       let attr = to_fattr3 rd inode ii in
       { NA.access_obj_attributes_ok = Some attr;
	 access_rights_ok = Int64.of_int pi'
       }
    )
    (fun plasma_err _ ->
       (to_nfserr_arg
	  ~retry_on_conflict:true
	  plasma_err
	  { NA.access_obj_attributes_fail = None }
	:> NA.access3res)
    )
    (fun nfs_err _ ->
       (add_arg nfs_err { NA.access_obj_attributes_fail = None }
	:> NA.access3res))
    emit


let proc_readlink conf sess fh emit =
  let rd = get_rundata conf in
  (* No access check here *)
  get_ii
    conf sess fh "readlink"
    (fun t inode ii creds ->
       let attr = to_fattr3 rd inode ii in
       match ii.PA.filetype with
	 | `ftype_symlink ->
	     { NA.readlink_symlink_attributes_ok = Some attr;
	       readlink_data = ii.PA.field1
	     }
	 | _ ->
	     raise(Plasma_client.Plasma_error `einval)
    )
    (fun plasma_err inode_ii_opt ->
       let attr_opt =
	 match inode_ii_opt with
	   | None -> None
	   | Some(inode,ii) -> Some(to_fattr3 rd inode ii) in
       (to_nfserr_arg
	  ~retry_on_conflict:true
	  plasma_err
	  { NA.readlink_symlink_attributes_fail = attr_opt }
	:> NA.readlink3res)
    )
    (fun nfs_err _ ->
       (add_arg nfs_err { NA.readlink_symlink_attributes_fail = None }
	:> NA.readlink3res))
    emit


let proc_fsstat conf sess fh emit =
  (* fake so far *)
  let rd = get_rundata conf in
  try
    let inode = unwrap_sess_fh sess rd fh in
    if inode <> rd.rootfh then
      raise(Nfs_error `nfs3err_stale);
    with_ii
      conf sess fh "fsstat"
      (fun t inode ii creds ->
	 let attr = to_fattr3 rd inode ii in
	 Plasma_client.fsstat_e rd.cluster 
	 ++ (fun st ->
	       let free_blocks =
		 Int64.sub
		   (Int64.sub st.PA.total_blocks st.PA.used_blocks)
		   st.PA.trans_blocks in
	       let bsizeL = Int64.of_int rd.blocksize in
	       let m = Int64.div Int64.max_int bsizeL in
	       let bytes n =
		 if n > m then Int64.max_int else Int64.mul n bsizeL in
	       let r =
		 { NA.fsstat_obj_attributes_ok = Some attr;
		   fsstat_tbytes = bytes st.PA.total_blocks;
		   fsstat_fbytes = bytes free_blocks;
		   fsstat_abytes = bytes free_blocks;
		   fsstat_tfiles = Int64.max_int;  (* best guess *)
		   fsstat_ffiles = Int64.max_int;  (* best guess *)
		   fsstat_afiles = Int64.max_int;  (* best guess *)
		   fsstat_invarsec = 0L;
		 } in
	       eps_e (`Done (`nfs3_ok r)) rd.esys
	    )
      )
      (fun plasma_err inode_ii_opt ->
	 let fail =
	   { NA.fsstat_obj_attributes_fail = None } in
	 (to_nfserr_arg
	    ~retry_on_conflict:true
	    plasma_err
	    fail
	  :> NA.fsstat3res)
      )
      (fun nfs_err inode_ii_opt ->
	 let fail =
	   { NA.fsstat_obj_attributes_fail = None } in
	 (add_arg nfs_err fail :> NA.fsstat3res))
      emit
  with
    | Nfs_error e ->
	emit
	  (add_arg e { NA.fsstat_obj_attributes_fail = None } :> NA.fsstat3res)


let proc_fsinfo conf sess fh emit = 
  (* fake so far *)
  let rd = get_rundata conf in
  try
    let _ = get_rpc_identity rd sess in
    let inode = unwrap_sess_fh sess rd fh in
    if inode <> rd.rootfh then
      raise(Nfs_error `nfs3err_stale);
    let r =
      { NA.fsinfo_obj_attributes_ok = None;
	fsinfo_rtmax = Int64.of_int (read_multiple * rd.blocksize);
	fsinfo_rtpref = Int64.of_int rd.blocksize;
	fsinfo_rtmult = Int64.of_int rd.blocksize;
	fsinfo_wtmax = Int64.of_int (write_multiple * rd.blocksize);
	fsinfo_wtpref = Int64.of_int rd.blocksize;
	fsinfo_wtmult = Int64.of_int rd.blocksize;
	fsinfo_dtpref = 32768L;
	fsinfo_maxfilesize = Int64.max_int;
	fsinfo_time_delta = { NA.seconds = 0L; nseconds = 1L };
	fsinfo_properties =
	  Int64.logor
	    (Int64.logor
	       (Int64.logor
		  (Rtypes.int64_of_uint4 NA.fsf3_link)
		  (Rtypes.int64_of_uint4 NA.fsf3_symlink))
	       (Rtypes.int64_of_uint4 NA.fsf3_homogeneous))
	    (Rtypes.int64_of_uint4 NA.fsf3_cansettime);
      } in
    emit (`nfs3_ok r)
  with
    | Nfs_error e ->
	emit
	  (add_arg e { NA.fsinfo_obj_attributes_fail = None } :> NA.fsinfo3res)

(* about wtmult:

   "OTOH wtmult has nothing to do with RPC, and has more to do with the
    disk organization on the server.
    As I understand it, in many cases the significance of this value lies
    in the fact that hardware operations to the disk have a lower limit on
    the number of bytes that can be read/written. IOW if your write is not
    aligned to a 'wtmult' boundary, then the server may be forced to read
    in the remaining data from the disk before it writes the entire block
    back." (Trond Myklebust, 
    http://search.luky.org/linux-kernel.2003/msg02857.html)
 *)


let proc_pathconf conf sess fh emit =
  let rd = get_rundata conf in
  get_ii
    conf sess fh "pathconf"
    (fun t inode ii creds ->
       let attr = to_fattr3 rd inode ii in
       { NA.pathconf_obj_attributes_ok = Some attr;
	 pathconf_linkmax = 32767L;   (* actually unlimited *)
	 pathconf_name_max = 32767L;  (* actually unlimited *)
	 pathconf_no_trunc = true;
	 pathconf_chown_restricted = false;
	 pathconf_case_insensitive = false;
	 pathconf_case_preserving = true;
       }
    )
    (fun plasma_err _ ->
       (to_nfserr_arg
	  ~retry_on_conflict:true
	  plasma_err
	  { NA.pathconf_obj_attributes_fail = None }
	:> NA.pathconf3res)
    )
    (fun nfs_err _ ->
       (add_arg nfs_err { NA.pathconf_obj_attributes_fail = None }
	:> NA.pathconf3res))
    emit

(*
let memory_string rd size =
  let mlen = Netsys_mem.init_string_bytelen size in
  let mem = 
    if mlen <= Netsys_mem.pool_block_size rd.blockpool then
      Netsys_mem.pool_alloc_memory rd.blockpool
    else
      Bigarray.Array1.create Bigarray.char Bigarray.c_layout mlen in
  let voffs,_ = Netsys_mem.init_string mem 0 size in
  let s = (Netsys_mem.as_value mem voffs : string) in
  let m = Bigarray.Array1.sub mem voffs size in
  (* mem: the underlying bigarray
     m:   the bigarray which is physically identical with s
     s:   the string inside mem
   *)
  (mem, m, s)
 *)


let prefix_mstring (m:Xdr_mstring.mstring) p =
  assert(p <= m#length);
  ( object
      method length = p
      method blit_to_string = m#blit_to_string
      method blit_to_memory = m#blit_to_memory
      method as_string = m#as_string
      method as_memory = m#as_memory
      method preferred = m#preferred
    end
  )


let proc_read conf sess (fh, offset, count) emit =
  let rd = get_rundata conf in
  try
    let creds = get_rpc_identity rd sess in
    let inode = unwrap_sess_fh sess rd fh in

    (* offset and count are specified as unsigned int64 in the NFS
       protocol. Here, they are passed as signed int64, i.e. negative
       values are large ones.

       If offset is negative, we immediately return 0 and set the
       EOF flag.

       If count is negative, we replace it with max_int.
     *)
    if offset < 0L then
      emit
	(`nfs3_ok
	   { NA.read_file_attributes_ok = None;
	     read_count_ok = 0L;
	     read_eof = true;
	     read_data = (Xdr_mstring.memory_based_mstrings #
			    create_from_string "" 0 0 false
			 )
	   })
    else (
      let max_count = read_multiple * rd.blocksize in
      let max_countL = Int64.of_int max_count in
      let count =
	if count < 0L then 
	  max_count
	else
	  Int64.to_int (min count max_countL) in

      let full_name =
	sprintf "read(%Ld,%Ld,%d)" inode offset count in

      let m = 
	if count <= Netsys_mem.pool_block_size rd.blockpool then
	  Netsys_mem.pool_alloc_memory rd.blockpool
	else
	  Bigarray.Array1.create Bigarray.char Bigarray.c_layout count in

      Plasma_util.rpc_srv_reply full_name sess emit
	(no_transaction_e rd full_name
	   (fun esys ->
	      ( Plasma_client.read_e
		  ~lazy_validation:true
		  rd.cluster
		  inode
		  offset
		  (`Memory m)
		  0
		  count
		++ (fun (n,eof,ii) ->
		      assert(n >= 0 && n <= count);
		      eps_e (`Done (n,eof,ii)) rd.esys
		   )
	      )
	      ++ (fun (n, eof, ii) ->
		    assert(n <= count);
		    (* We check here the permissions (AFTER reading the block,
		       but that's a minor issue). Access is granted when the
		       r or x bits allow it (see RFC 1813)
		     *)
		    if not (have_read_permission creds ii) && 
		       not (have_exec_permission creds ii)
		    then
		      raise (Nfs_error `nfs3err_acces);
		    let r =
		      { NA.read_file_attributes_ok = Some(to_fattr3 rd inode ii);
			read_count_ok = Int64.of_int n;
			read_eof = eof;
			read_data = (Xdr_mstring.memory_based_mstrings #
				       create_from_memory m 0 n false
				    )
		      } in
		    eps_e (`Done(`nfs3_ok r)) rd.esys
		 )
	      >> (function
		    | `Done r -> `Done r
		    | `Error (Plasma_client.Plasma_error plasma_err) ->
			`Done(to_nfserr_arg
				~retry_on_conflict:true
				plasma_err
				{ NA.read_file_attributes_fail = None }
			      :> NA.read3res)
		    | `Error err ->
			`Error err
		    | `Aborted ->
			`Aborted
		 )
	   )
	)
    )
  with
    | Nfs_error e ->
	emit
	  (add_arg e { NA.read_file_attributes_fail = None } :> NA.read3res)


let null_verf = 0L

let readdir_from_cache rd saddr inode verf =
  try
    if verf = null_verf then raise Not_found;
    let dircache = Hashtbl.find rd.dircache saddr in
    let (entries, _) = Hashtbl.find dircache (inode, verf) in
    let now = Unix.gettimeofday() in
    Hashtbl.replace dircache (inode, verf) (entries, now);
    Some entries
  with
    | Not_found ->
	None

let readdir_e rd t saddr inode =
  (** not cached, so we have to query the list *)
  Plasma_client.list_inode_e
    t
    inode
  ++ (fun entries ->
	let entries = Array.of_list entries in
	let dircache =
	  try Hashtbl.find rd.dircache saddr
	  with
	    | Not_found ->
		let dircache = Hashtbl.create 13 in
		Hashtbl.add rd.dircache saddr dircache;
		dircache in
	let new_verf = ref (Plasma_rng.random_int64()) in
	while !new_verf = null_verf || 
	    Hashtbl.mem dircache (inode,!new_verf) 
	do
	  new_verf := Int64.succ !new_verf
	done;
	let now = Unix.gettimeofday() in
	Hashtbl.add dircache (inode,!new_verf) (entries, now);
	eps_e (`Done (entries, !new_verf)) rd.esys
     )

let clean_dircache rd saddr =
  (* limit the number of entries stored there *)
  (* TODO: call this function periodically *)
  let n_keep = 4 in
  try
    let dircache = Hashtbl.find rd.dircache saddr in
    let keep = ref [] in
    let k = ref 0 in
    let del = ref [] in
    let now = Unix.gettimeofday() in
    Hashtbl.iter
      (fun (inode,verf) (_,t) ->
	 if now -. t < 60.0 then (
	   keep := (inode, verf, t) :: !keep;
	   incr k;
	   if !k > n_keep then (  
	     (** remove oldest entry from [keep] *)
	     let (o_inode, o_verf, o_t) =
	       List.fold_left
		 (fun (o_inode, o_verf, o_t) (inode, verf, t) ->
		    if t < o_t then
		      (inode, verf, t)
		    else
		      (o_inode, o_verf, o_t)
		 )
		 (List.hd !keep)
		 (List.tl !keep) in
	     keep := 
	       List.filter
		 (fun (inode, verf, _) -> inode <> o_inode || verf <> o_verf)
		 !keep;
	     del := (o_inode, o_verf) :: !del;
	     decr k
	   )
	 )
	 else
	   del := (inode, verf) :: !del
      )
      dircache;
    List.iter
      (fun (inode, verf) ->
	 Hashtbl.remove dircache (inode,verf)
      )
      !del
  with
    | Not_found -> ()
  

let readdir_response count entries k_start verf =
  let count =
    if count < 0L then
      max_int
    else
      Int64.to_int (min (Int64.of_int max_int) count) in

  let nfs_entry k (name,inode) =
    let ne =
      { NA.entry_fileid = inode;
	entry_name = name;
	entry_cookie = Int64.of_int (k+1);
	entry_nextentry = None
      } in
    let name_size =
      (String.length name - 1) / 4 + 5 in
    let ne_size =
      name_size + 20 (* 20: remaining stuff in entry3 *) in
    (ne, ne_size) in

  let rec nfs_entries k xdr_size =
    if k >= Array.length entries then
      (None, true)
    else (
      let (ne, ne_size) = nfs_entry k entries.(k) in
      if count - ne_size >= xdr_size then (
	let (l', eof) = nfs_entries (k+1) (xdr_size + ne_size) in
	(Some { ne with NA.entry_nextentry = l' }, eof)
      )
      else
	(None, false)
    ) in

  let dirlist3_size = 8 in
  let resok_size = 12 in

  let l, eof = nfs_entries k_start (dirlist3_size + resok_size) in
  
  { NA.readdir_dir_attributes_ok = None;
    readdir_cookieverf_ok = verf;
    readdir_reply = 
      { NA.dl_entries = l;
	dl_eof = eof
      }
  }

let proc_readdir_common conf sess (fh, cookie, verf_s) 
                        f_ok emit =
  let rd = get_rundata conf in
  let saddr = peer sess in
  let verf = Rtypes.int64_of_int8 (Rtypes.read_int8 verf_s 0) in
  let fail = { NA.readdir_dir_attributes_fail = None } in
  try
    with_ii
      ~rd:true conf sess fh "readdir"
      (fun t inode ii creds ->
	 (** First check whether we can respond from dircache: *)
	 match readdir_from_cache rd saddr inode verf with
	   | Some entries ->
	       (** Found in the cache. The [cookie] is simply interpreted as the
		   index in [names] where to continue
		*)
	       if cookie < 0L || cookie >= Int64.of_int max_int then
		 raise(Nfs_error `nfs3err_bad_cookie);
	       let cookie = Int64.to_int cookie in
	       if cookie > Array.length entries then
		 raise(Nfs_error `nfs3err_bad_cookie);
	       clean_dircache rd saddr;
	       eps_e (`Done(`nfs3_ok (f_ok entries cookie verf_s))) rd.esys

	   | None ->
	       if cookie <> 0L && verf <> 0L then
		 raise (Nfs_error `nfs3err_bad_cookie);
	       (* Linux does not interpret BAD_COOKIE nicely (i.e. does not try
		  again with cookie=0 & verf=0), so we have to work around this.
		  One important case seems to be cookie=0 & verf <> 0. We
		  interpret this as wish to restart the read from the beginning
		  of the directory.
		*)
	       readdir_e rd t saddr inode
	       ++ (fun (entries, new_verf) ->
		     let new_verf_s =
		       Rtypes.int8_as_string
			 (Rtypes.int8_of_int64 new_verf) in
		     let r =
		       f_ok entries 0 new_verf_s in
		     let rr = 
		       r.NA.readdir_reply in
		     if rr.NA.dl_entries = None && 
		       not rr.NA.dl_eof
		     then
		       eps_e (`Done(`nfs3err_toosmall fail)) rd.esys
		     else
		       eps_e (`Done(`nfs3_ok r)) rd.esys
		  )
      )
      (fun plasma_err ii_opt ->
	 (to_nfserr_arg
	    ~retry_on_conflict:true
	    plasma_err
	    fail
	  :> NA.readdir3res)
      )
      (fun nfs_err ii_opt ->
	 (add_arg nfs_err fail :> NA.readdir3res)
      )
      emit
  with
   | Nfs_error e ->
	emit
	  (add_arg e { NA.readdir_dir_attributes_fail = None } :>NA.readdir3res)


let proc_readdir conf sess (fh, cookie, verf_s, count) emit =
  let f = readdir_response count in
  proc_readdir_common conf sess (fh, cookie, verf_s) f emit


let proc_readdirplus conf sess (fh, cookie, verf, dircount, maxcount) emit = 
  (** In the RFC [dircount] is unspecified. Obviously, it refers to
      some buffer size in the Solaris implementation. We simply return
      NOTSUPP so clients fall back to readdir.
   *)
  let fail =
    { NA.readdirp_dir_attributes_fail = None } in
  emit (`nfs3err_notsupp fail)



(* --- write calls  --- *)

let failed_wcc_data rd inode_ii_opt =
  (* This assumes the operation did not perform any modification, so we
     can return ii as [after]
   *)
  { NA.before = 
      ( match inode_ii_opt with
	  | None -> None
	  | Some(inode,ii) -> Some (to_wcc_attr inode ii)
      );
    NA.after = 
      ( match inode_ii_opt with
	  | None -> None
	  | Some(inode,ii) -> Some (to_fattr3 rd inode ii)
      );
  }


let apply_sattr rd ii sattr =
  let new_ug =
    { PA.user = (match sattr.NA.sa_uid with
		   | None -> ii.PA.usergroup.PA.user
		   | Some id -> to_uname rd id
		);
      PA.group = (match sattr.NA.sa_gid with
		    | None -> ii.PA.usergroup.PA.group
		    | Some id -> to_gname rd id
		 );
    } in
  { ii with
      PA.mode = (match sattr.NA.sa_mode with
		   | None -> ii.PA.mode
		   | Some m -> (Int64.to_int m) land 0o777
		);
      PA.usergroup = new_ug;
      PA.eof = (match sattr.NA.sa_size with
		  | None -> ii.PA.eof
		  | Some s -> 
		      if s < 0L then raise (Nfs_error `nfs3err_inval);
		      s
	       );
      (* No way to change atime *)
      PA.mtime = (match sattr.NA.sa_mtime with
		    | `dont_change -> 
			ii.PA.mtime
		    | `set_to_server_time -> 
			{ PA.tsecs = (-1L); tnsecs = 0 }
		    | `set_to_client_time t -> 
			of_nfstime t
		 );
  }



(* setattr strangeness: if there is a pending write, the EOF value in
   ii is not yet updated. For this reason we fix all ii structs
   returned to the user by checking whether there is a pending write
   that might have changed EOF (in to_fattr3). If the user calls
   setattr, though, the ii struct is directly changed, but the pending
   write will still happen. This appears to the user as if the setattr
   had no effect (at least if the file is truncated).

   For this reason we flush here all pending writes before doing the
   setattr.
 *)

let proc_setattr conf sess (fh, sattr, guard) emit =
  let rd = get_rundata conf in
  with_ii
    conf sess fh "setattr"
    (fun t inode ii creds ->
       check_setattr_permission rd creds sattr ii;
       ( match guard with
	   | None -> ()
	   | Some t ->
	       let ii_ctime = to_nfstime ii.PA.ctime in
	       if t <> ii_ctime then
		 raise(Nfs_error `nfs3err_not_sync)
       );
       Plasma_client.flush_e rd.cluster inode 0L 0L (* see comment above *)
       ++ (fun () ->
	     let ii' = apply_sattr rd ii sattr in
	     Plasma_client.set_inodeinfo_e t inode ii'
	     ++ (fun () ->
		   ( if ii'.PA.eof < ii.PA.eof then
		       Plasma_client.truncate_e t inode ii'.PA.eof
		     else
		       eps_e (`Done ()) rd.esys
		   ) 
		   ++ (fun () -> Plasma_client.get_inodeinfo_e t inode)
		   ++ (fun real_ii ->
			 let r =
			   `nfs3_ok
			     { NA.setattr_obj_wcc_ok = 
				 { NA.before = Some (to_wcc_attr inode ii); 
				   after = Some (to_fattr3 rd inode real_ii)
				 } 
			     } in
			 eps_e (`Done r) rd.esys
		      )
		)
	  )
    )
    (fun plasma_err inode_ii_opt ->
       let fail =
	 { NA.setattr_obj_wcc_fail = failed_wcc_data rd inode_ii_opt } in
       (to_nfserr_arg
	  ~retry_on_conflict:true
	  plasma_err
	  fail
	:> NA.setattr3res)
    )
    (fun nfs_err inode_ii_opt ->
       let fail =
	 { NA.setattr_obj_wcc_fail = failed_wcc_data rd inode_ii_opt } in
       (add_arg nfs_err fail :> NA.setattr3res))
    emit


let asserted_lookup rd t dir_inode file_name =
  (* Looks up the inode and ii at this path. Any error is translated into
     [`Error Retry] (transaction will restart)
   *)
  Uq_engines.meta_engine
    (Plasma_client.dir_lookup_e t dir_inode file_name true
     ++ (fun file_inode ->
	   Plasma_client.get_inodeinfo_e 
	     t file_inode
	   ++ (fun file_ii ->
		 eps_e (`Done (file_ii,file_inode)) rd.esys
	      )
	)
    )
  >> (fun st ->
	match st with
	  | `Done r -> r
	  | `Error _ -> `Error Retry
	  | `Aborted -> `Aborted
     )


let proc_create conf sess (name_in_dir, how) emit =
  let rd = get_rundata conf in
  with_ii
    ~wr:true conf sess name_in_dir.NA.dirop_dir "create"
    (fun t dir_inode dir_ii creds ->
       let (user,group,_) = creds in
       let ug = { PA.user = user; group = group } in
       (* Check the validity of the new name *)
       let file_name = name_in_dir.NA.dirop_name in
       if (file_name = "" || String.contains file_name '/' || file_name = "."
	   || file_name = "..") then 
	 raise(Nfs_error `nfs3err_inval);
       let file_ii = 
	 match how with
	   | `unchecked sattr | `guarded sattr ->
	       let ii = Plasma_client.regular_ii rd.cluster 0 in
	       let ii' = { ii with PA.usergroup = ug } in
	       check_setattr_permission rd creds sattr ii';
	       apply_sattr rd ii' sattr
	   | `exclusive verf ->
	       let ii = Plasma_client.regular_ii rd.cluster 0 in
	       let ii' = { ii with PA.usergroup = ug } in
	       { ii' with PA.create_verifier = of_create_verf verf } in
       Plasma_client.create_inode_e t file_ii
       ++ (fun file_inode ->
	     Uq_engines.meta_engine
	     (Plasma_client.link_at_e t dir_inode file_name file_inode)
	     ++ (fun st ->
		   (* We check here for EEXIST errors. *)
		   match st with
		     | `Done () ->
			 (* Get the ii struct *)
			 Plasma_client.get_inodeinfo_e t file_inode
			 ++ (fun real_file_ii ->
			       eps_e (`Done (real_file_ii, file_inode)) rd.esys
			    )
		     | `Error (Plasma_client.Plasma_error `eexist) as exerr ->
			 (* If [how=unchecked] we have to hide this error.
			    For [how=guarded] we can pass this error back.
			    For [how=exclusive] we have to check the identity
			    of the file
			  *)
			 ( match how with
			     | `unchecked attr ->
				 (* We hide the error even if the file is
				    a directory or symlink
				  *)
				 asserted_lookup rd t dir_inode file_name
			     | `guarded attr ->
				 eps_e exerr rd.esys
			     | `exclusive verf ->
				 asserted_lookup rd t dir_inode file_name
				 ++ (fun (ii,inode) ->
				       let ok =
					 ii.PA.create_verifier =
					 of_create_verf verf in
				       if ok then
					 eps_e (`Done(ii,inode)) rd.esys
				       else
					 eps_e exerr rd.esys
				    )
			 )
		     | (`Error _ | `Aborted) as e ->
			 eps_e (e :> _ Uq_engines.engine_state) rd.esys
		)
	     ++ (fun (real_file_ii,file_inode) ->
		   let r =
		     `nfs3_ok {
		       NA.create_obj = Some(wrap_sess_fh sess rd file_inode);
		       NA.create_obj_attributes = 
			 Some (to_fattr3 rd file_inode real_file_ii);
		       NA.create_dir_wcc_ok =
			 { NA.before = Some (to_wcc_attr dir_inode dir_ii); 
			   after = None; (* don't have this at hand *)
			 } 
		     } in
		   eps_e (`Done r) rd.esys
		)
	  )
    )
    (fun plasma_err inode_ii_opt ->
       let fail =
	 { NA.create_dir_wcc_fail = failed_wcc_data rd inode_ii_opt } in
       (to_nfserr_arg
	  ~retry_on_conflict:true
	  plasma_err
	  fail
	:> NA.create3res)
    )
    (fun nfs_err inode_ii_opt ->
       let fail =
	 { NA.create_dir_wcc_fail = failed_wcc_data rd inode_ii_opt } in
       (add_arg nfs_err fail :> NA.create3res))
    emit

let proc_mkdir conf sess (name_in_dir, sattr) emit =
  let rd = get_rundata conf in
  with_ii
    ~wr:true
    conf sess name_in_dir.NA.dirop_dir "mkdir"
    (fun t dir_inode dir_ii creds ->
       let (user,group,_) = creds in
       let ug = { PA.user = user; group = group } in
       (* Check the validity of the new name *)
       let file_name = name_in_dir.NA.dirop_name in
       if (file_name = "" || String.contains file_name '/' || file_name = "."
	   || file_name = "..") then 
	 raise(Nfs_error `nfs3err_inval);
       let file_ii = 
	 Plasma_client.dir_ii rd.cluster 0o777 in
       let file_ii' = { file_ii with PA.usergroup = ug } in
       check_setattr_permission rd creds sattr file_ii';
       let file_ii'' = apply_sattr rd file_ii' sattr in
       Plasma_client.create_inode_e t file_ii''
       ++ (fun file_inode ->
	     Plasma_client.link_at_e t dir_inode file_name file_inode
	     ++ (fun () ->
		   (* Get the ii struct *)
		   Plasma_client.get_inodeinfo_e t file_inode
		   ++ (fun real_file_ii ->
			 eps_e (`Done (real_file_ii, file_inode)) rd.esys
		      )
		)
	     ++ (fun (real_file_ii,file_inode) ->
		   let r =
		     `nfs3_ok {
		       NA.mkdir_obj = Some(wrap_sess_fh sess rd file_inode);
		       NA.mkdir_obj_attributes = 
			 Some (to_fattr3 rd file_inode real_file_ii);
		       NA.mkdir_dir_wcc_ok =
			 { NA.before = Some (to_wcc_attr dir_inode dir_ii); 
			   after = None; (* don't have this at hand *)
			 } 
		     } in
		   eps_e (`Done r) rd.esys
		)
	  )
    )
    (fun plasma_err inode_ii_opt ->
       let fail =
	 { NA.mkdir_dir_wcc_fail = failed_wcc_data rd inode_ii_opt } in
       (to_nfserr_arg
	  ~retry_on_conflict:true
	  plasma_err
	  fail
	:> NA.mkdir3res)
    )
    (fun nfs_err inode_ii_opt ->
       let fail =
	 { NA.mkdir_dir_wcc_fail = failed_wcc_data rd inode_ii_opt } in
       (add_arg nfs_err fail :> NA.mkdir3res))
    emit

let proc_symlink conf sess (name_in_dir, data) emit =
  let rd = get_rundata conf in
  with_ii
    ~wr:true conf sess name_in_dir.NA.dirop_dir "symlink"
    (fun t dir_inode dir_ii creds ->
       let (user,group,_) = creds in
       let ug = { PA.user = user; group = group } in
       (* Check the validity of the new name *)
       let file_name = name_in_dir.NA.dirop_name in
       if (file_name = "" || String.contains file_name '/' || file_name = "."
	   || file_name = "..") then 
	 raise(Nfs_error `nfs3err_inval);
       let target = data.NA.symlink_path in
       let sattr = data.NA.symlink_attributes in
       let file_ii = 
	 Plasma_client.symlink_ii rd.cluster target in
       let file_ii' = { file_ii with PA.usergroup = ug } in
       check_setattr_permission rd creds sattr file_ii';
       let file_ii'' = apply_sattr rd file_ii' sattr in
       Plasma_client.create_inode_e t file_ii''
       ++ (fun file_inode ->
	     Plasma_client.link_at_e t dir_inode file_name file_inode
	     ++ (fun () ->
		   (* Get the ii struct *)
		   Plasma_client.get_inodeinfo_e t file_inode
		   ++ (fun real_file_ii ->
			 eps_e (`Done (real_file_ii, file_inode)) rd.esys
		      )
		)
	     ++ (fun (real_file_ii,file_inode) ->
		   let r =
		     `nfs3_ok {
		       NA.symlink_obj = Some(wrap_sess_fh sess rd file_inode);
		       NA.symlink_obj_attributes = 
			 Some (to_fattr3 rd file_inode real_file_ii);
		       NA.symlink_dir_wcc_ok =
			 { NA.before = Some (to_wcc_attr dir_inode dir_ii); 
			   after = None; (* don't have this at hand *)
			 } 
		     } in
		   eps_e (`Done r) rd.esys
		)
	  )
    )
    (fun plasma_err inode_ii_opt ->
       let fail =
	 { NA.symlink_dir_wcc_fail = failed_wcc_data rd inode_ii_opt } in
       (to_nfserr_arg
	  ~retry_on_conflict:true
	  plasma_err
	  fail
	:> NA.symlink3res)
    )
    (fun nfs_err inode_ii_opt ->
       let fail =
	 { NA.symlink_dir_wcc_fail = failed_wcc_data rd inode_ii_opt } in
       (add_arg nfs_err fail :> NA.symlink3res))
    emit


let real_ownership_check rd sess ii =
  let (u,_,_) = get_rpc_identity rd sess in
  if u <> ii.PA.usergroup.PA.user then
    raise(Nfs_error `nfs3err_perm)
    

let check_ownership_e ?(namelock=true) sess rd t dir_inode file_name do_it =
  (* This is used if the "t" bit is set in the file mode of the directory.
     Now, the file must be owner by the caller to continue.
   *)
  if do_it then (
    ( if namelock then
	Plasma_client.namelock_e t dir_inode file_name
      else
	eps_e (`Done ()) rd.esys
    )
    ++ (fun () ->
	  Plasma_client.dir_lookup_e t dir_inode file_name true
       )
    ++ (fun inode ->
	  Plasma_client.get_inodeinfo_e t inode
       )
    ++ (fun ii ->
	  real_ownership_check rd sess ii;
	  eps_e (`Done ()) rd.esys
       )
  )
  else
    eps_e (`Done ()) rd.esys


let proc_remove conf sess name_in_dir emit =
  (* We also allow the removal of directories (the NFS spec supports this) *)
  let rd = get_rundata conf in
  with_ii
    ~wr:true conf sess name_in_dir.NA.dirop_dir "remove"
    (fun t dir_inode dir_ii creds ->
       (* Check the validity of the name *)
       let file_name = name_in_dir.NA.dirop_name in
       if (file_name = "" || String.contains file_name '/' || file_name = ".")
         then raise(Nfs_error `nfs3err_inval);
       if (file_name = "..")
         then raise(Nfs_error `nfs3err_exist);  (* mentioned in the spec *)
       check_ownership_e sess rd t dir_inode file_name
	 ((dir_ii.PA.mode land 0o1000) <> 0)
       ++ (fun () ->
	     Plasma_client.unlink_at_e t dir_inode file_name)
       ++ (fun () ->
	     let r =
	       `nfs3_ok {
		 NA.remove_dir_wcc_ok =
		   { NA.before = Some (to_wcc_attr dir_inode dir_ii); 
		     after = None; (* don't have this at hand *)
		   } 
	       } in
	     eps_e (`Done r) rd.esys
	  )
    )
    (fun plasma_err inode_ii_opt ->
       let fail =
	 { NA.remove_dir_wcc_fail = failed_wcc_data rd inode_ii_opt } in
       (to_nfserr_arg
	  ~retry_on_conflict:true
	  plasma_err
	  fail
	:> NA.remove3res)
    )
    (fun nfs_err inode_ii_opt ->
       let fail =
	 { NA.remove_dir_wcc_fail = failed_wcc_data rd inode_ii_opt } in
       (add_arg nfs_err fail :> NA.remove3res))
    emit
 
let proc_rmdir conf sess name_in_dir emit =
  let rd = get_rundata conf in
  with_ii
    ~wr:true conf sess name_in_dir.NA.dirop_dir "rmdir"
    (fun t dir_inode dir_ii creds ->
       (* Check the validity of the name *)
       let file_name = name_in_dir.NA.dirop_name in
       if (file_name = "" || String.contains file_name '/' || file_name = ".")
         then raise(Nfs_error `nfs3err_inval);
       if (file_name = "..")
         then raise(Nfs_error `nfs3err_exist);  (* mentioned in the spec *)
       Plasma_client.namelock_e t dir_inode file_name
       ++ (fun () ->
	     Plasma_client.dir_lookup_e t dir_inode file_name true)
       ++ (fun file_inode ->
	     Plasma_client.get_inodeinfo_e t file_inode
	  )
       ++ (fun ii ->
	     if ii.PA.filetype = `ftype_directory then (
	       if (dir_ii.PA.mode land 0o1000) <> 0 then
		 real_ownership_check rd sess ii;
	       Plasma_client.unlink_at_e t dir_inode file_name
	       ++ (fun () ->
		     let r =
		       `nfs3_ok {
			 NA.rmdir_dir_wcc_ok =
			   { NA.before = 
			       Some (to_wcc_attr dir_inode dir_ii); 
			     after = None; (* don't have this at hand *)
			   } 
			     } in
		     eps_e (`Done r) rd.esys
		  )
	     )
	     else
	       eps_e (`Error (Nfs_error `nfs3err_notdir)) rd.esys
	  )
    )
    (fun plasma_err inode_ii_opt ->
       let fail =
	 { NA.rmdir_dir_wcc_fail = failed_wcc_data rd inode_ii_opt } in
       (to_nfserr_arg
	  ~retry_on_conflict:true
	  plasma_err
	  fail
	:> NA.rmdir3res)
    )
    (fun nfs_err inode_ii_opt ->
       let fail =
	 { NA.rmdir_dir_wcc_fail = failed_wcc_data rd inode_ii_opt } in
       (add_arg nfs_err fail :> NA.rmdir3res))
    emit


let rename_success() =
  `nfs3_ok {
    NA.rename_fromdir_wcc_ok = { NA.before = None; after = None };
    NA.rename_todir_wcc_ok = { NA.before = None; after = None };
  }


let do_unlink_then_rename_e rd t dir1_inode file_name1 dir2_inode file_name2 =
  Uq_engines.meta_engine
    (Plasma_client.unlink_at_e t dir2_inode file_name2)
  ++ (fun st ->
	match st with
	  | `Done () ->
	      Plasma_client.rename_at_e
		t dir1_inode file_name1 dir2_inode file_name2
	      ++ (fun () ->
		    eps_e (`Done (rename_success())) rd.esys
		 )

	  | `Error (Plasma_client.Plasma_error `enotempty) ->
	      eps_e (`Error (Nfs_error `nfs3err_exist)) rd.esys

	  | `Error e ->
	      eps_e (`Error e) rd.esys

	  | `Aborted ->
	      eps_e `Aborted rd.esys
     )	      



let do_rename_e rd t dir1_inode file_name1 dir2_inode file_name2 =
  (* Speculatively try the rename. This may run into EEXIST,
     though.
   *)
  Uq_engines.meta_engine
    (Plasma_client.rename_at_e
       t dir1_inode file_name1 dir2_inode file_name2)
  ++ (fun st ->
	(* It is now possible that we got EEXIST. NFS, however,
	   allows to rename into an existing file in some
	   circumstances.
	 *)
	match st with
	  | `Done () ->
	      (* Success *)
	      eps_e (`Done (rename_success())) rd.esys
	  | `Error (Plasma_client.Plasma_error `eexist) ->
	      (* the EEXIST case. This is very complicated. First get
		 a namelock on the destination which is supposed to exist
		 now.
	       *)
	      Uq_engines.meta_engine
		(Plasma_client.namelock_e t dir2_inode file_name2)
	      ++ (fun st ->
		    match st with
		      | `Done () ->
			  (* Now check whether it would be allowed to
			     do unlink the destination and then try the
			     rename again
			   *)
			  Plasma_client.dir_lookup_e 
			    t dir1_inode file_name1 true
			  ++ (fun inode1 ->
				Plasma_client.get_inodeinfo_e t inode1)
			  ++ (fun ii1 ->
				Plasma_client.dir_lookup_e 
				  t dir2_inode file_name2 true
				++ (fun inode2 ->
				      Plasma_client.get_inodeinfo_e t inode2)
				++ (fun ii2 ->
				      let both_non_dir =
					ii1.PA.filetype <> `ftype_directory &&
					  ii2.PA.filetype <> `ftype_directory in
				      let both_dir =
					ii1.PA.filetype = `ftype_directory &&
					  ii2.PA.filetype = `ftype_directory in
				      if both_non_dir || both_dir then
					do_unlink_then_rename_e 
					  rd t dir1_inode file_name1
					  dir2_inode file_name2 
				      else
					eps_e (`Error (Nfs_error `nfs3err_exist))
					  rd.esys
				   )
			     )

		      | `Error (Plasma_client.Plasma_error `enoent) ->
			  (* The only explanation is that another transaction
			     just finished an unlink. We report this as
			     conflict.
			   *)
			  eps_e (`Error Retry) rd.esys
		      | `Error e -> eps_e (`Error e) rd.esys
		      | `Aborted -> eps_e `Aborted rd.esys
		 )
	      
	  | `Error e -> eps_e (`Error e) rd.esys
	  | `Aborted -> eps_e `Aborted rd.esys
     )


let proc_rename conf sess (name1_in_dir,name2_in_dir) emit =
  let rd = get_rundata conf in
  with_ii
    ~wr:true conf sess name1_in_dir.NA.dirop_dir "rename"
    (fun t dir1_inode dir1_ii creds ->
       (* Check the validity of the first name *)
       let file_name1 = name1_in_dir.NA.dirop_name in
       if (file_name1 = "" || String.contains file_name1 '/' || file_name1 = ".")
         then raise(Nfs_error `nfs3err_inval);
       (* Check the validity of the second name *)
       let file_name2 = name2_in_dir.NA.dirop_name in
       if (file_name2 = "" || String.contains file_name2 '/') then
	 raise(Nfs_error `nfs3err_inval);
       let dir2_inode = unwrap_sess_fh sess rd name2_in_dir.NA.dirop_dir in
       Plasma_client.get_inodeinfo_e t dir2_inode
       ++ (fun dir2_ii ->
	     if not(have_write_permission creds dir2_ii) then
	       raise(Nfs_error `nfs3err_acces);
	     (* Get a lock for the source name. This is required because the
		following is non-atomic and we want to prevent that a competing
		writer replaces the source in the meantime
	      *)
	     Plasma_client.namelock_e t dir1_inode file_name1
	     ++ (fun () ->
		   check_ownership_e ~namelock:false
		     sess rd t dir1_inode file_name1
		     ((dir1_ii.PA.mode land 0o1000) <> 0))
	     ++ (fun () ->
		   do_rename_e rd t dir1_inode file_name1 dir2_inode file_name2
		)
	  )
    )
    (fun plasma_err inode_ii_opt ->
       let fail =
	 { NA.rename_fromdir_wcc_fail = failed_wcc_data rd inode_ii_opt;
	   NA.rename_todir_wcc_fail = { NA.before = None; after = None };
	 } in
       (to_nfserr_arg
	  ~retry_on_conflict:true
	  plasma_err
	  fail
	:> NA.rename3res)
    )
    (fun nfs_err inode_ii_opt ->
       let fail =
	 { NA.rename_fromdir_wcc_fail = failed_wcc_data rd inode_ii_opt;
	   NA.rename_todir_wcc_fail = { NA.before = None; after = None };
	 } in
       (add_arg nfs_err fail :> NA.rename3res))
    emit


let proc_link conf sess (fh, name_in_dir) emit =
  let rd = get_rundata conf in
  with_ii
    ~wr:true conf sess name_in_dir.NA.dirop_dir "link"
    (fun t dir_inode dir_ii creds ->
       (* Check the validity of the name *)
       let file_name = name_in_dir.NA.dirop_name in
       if (file_name = "" || String.contains file_name '/') then
	 raise(Nfs_error `nfs3err_inval);
       if (file_name = "." || file_name = "..") then
         raise(Nfs_error `nfs3err_exist);
       let file_inode = unwrap_sess_fh sess rd fh in
       Plasma_client.link_at_e t dir_inode file_name file_inode
       ++ (fun () ->
	     let r =
	       `nfs3_ok
		 { NA.link_file_attributes_ok = None;
		   linkdir_wcc_ok = {
		     NA.before = Some (to_wcc_attr dir_inode dir_ii); 
		     after = None; (* don't have this at hand *)
		   } 
		 } in
	     eps_e (`Done r) rd.esys
	  )
    )
    (fun plasma_err inode_ii_opt ->
       let fail =
	 { NA.link_file_attributes_fail = None;
	   NA.linkdir_wcc_fail = failed_wcc_data rd inode_ii_opt
	 } in
       (to_nfserr_arg
	  ~retry_on_conflict:true
	  plasma_err
	  fail
	:> NA.link3res)
    )
    (fun nfs_err inode_ii_opt ->
       let fail =
	 { NA.link_file_attributes_fail = None;
	   NA.linkdir_wcc_fail = failed_wcc_data rd inode_ii_opt
	 } in
       (add_arg nfs_err fail :> NA.link3res))
    emit


let proc_write conf sess (fh, offset, count, stable, data) emit = 
  let rd = get_rundata conf in
  try
    let creds = get_rpc_identity rd sess in
    let inode = unwrap_sess_fh sess rd fh in

    (* offset and count are specified as unsigned int64 in the NFS
       protocol. Here, they are passed as signed int64, i.e. negative
       values are large ones.

       If offset is negative, we return EFBIG.
     *)
    if offset < 0L then
      raise (Nfs_error `nfs3err_fbig);
    if count < 0L || count > Int64.of_int data#length then
      raise (Nfs_error `nfs3err_inval);

    let count = Int64.to_int count in

    let full_name =
      sprintf "write(%Ld,%Ld,%d,%s%s)" inode offset count 
	(match stable with
	   | `unstable -> "unstable"
	   | `data_sync -> "data_sync"
	   | `file_sync -> "file_sync"
	)
	(if Int64.rem offset (Int64.of_int rd.blocksize) <> 0L then
	   ",misaligned"
	 else
	   ""
	) in

    (* Plasma_client.write does not set ii.eof and ii.mtime to the new
       values. We cope with that by querying for the designated new values
       and fixing the attributes before they are returned to the user
       (in to_fattr3).
     *)

    let return_e n committed =
      (* Get the cached ii value for the "after" wcc struct *)
      Plasma_client.get_cached_inodeinfo_e rd.cluster inode false 
      ++ (fun ii ->
	    dlogf "%s: after_get_ii" full_name;
	    let r =
	      `nfs3_ok 
		{ NA.write_file_wcc_ok = {
		    NA.before = None;
		    after = Some (to_fattr3 rd inode ii); 
		  };
		  write_count_ok = Int64.of_int n;
		  write_committed = committed;
		  write_verf = rd.writeverf;
		} in
	    eps_e (`Done r) rd.esys
	 )
    in

    (* We need not to care about ECONFLICT - none of these Plasma_client
       functions will return it
     *)

    Plasma_util.rpc_srv_reply full_name sess emit
      (no_transaction_e rd full_name
	 (fun esys ->
	    dlogf "%s: before_write" full_name;
	    let s, s_offs = data#as_string in
	    let rec wr_loop_e k =
	      if k < count then
		let kL = Int64.of_int k in
		Plasma_client.write_e 
		  rd.cluster inode (Int64.add offset kL) 
		  (`String s) (s_offs+k) (count-k)
		++ (fun n ->
		      wr_loop_e (k+n)
		   )
	      else (
		dlogf "%s: after_write" full_name;
		match stable with
		  | `unstable ->
		      return_e count `unstable
		  | `data_sync | `file_sync ->
		      Plasma_client.flush_e 
			rd.cluster inode offset (Int64.of_int count)
		      ++ (fun () -> 
			    dlogf "%s: after_flush" full_name;
			    return_e count `file_sync)
	      ) in

	    Plasma_client.get_cached_inodeinfo_e rd.cluster inode false
	    ++ (fun ii ->
		  if not(have_write_permission creds ii) then
		    raise(Nfs_error `nfs3err_acces);
		  (* Note that ii may be slightly out of date. We do not consider
		     this as a problem, because updates become immediately visible
		     that are done via NFS. Updates from other clients can be
		     invisible for a short period of time, though.
		   *)
		  wr_loop_e 0 
	       )
	 )
      )
  with
    | Nfs_error e ->
	emit
	  (add_arg e { NA.write_file_wcc_fail = failed_wcc_data rd None }
	   :> NA.write3res)

  
let proc_commit conf sess (fh, offset, count) emit =
  let rd = get_rundata conf in
  try
    let inode = unwrap_sess_fh sess rd fh in
    let _creds = get_rpc_identity rd sess in

    (* For simplicity we ignore offset and count (this is not yet implemented
       in Plasma_client anyway)

       Also, ignore file permissions.
     *)

    let return_e () =
      (* Get the cached ii value for the "after" wcc struct *)
      Plasma_client.get_cached_inodeinfo_e rd.cluster inode false 
      ++ (fun ii ->
	    let r =
	      `nfs3_ok 
		{ NA.commit_file_wcc_ok = {
		    NA.before = None;
		    after = Some (to_fattr3 rd inode ii); 
		  };
		  commit_verf = rd.writeverf;
		} in
	    eps_e (`Done r) rd.esys
	 )
    in

    (* We need not to care about ECONFLICT - none of these Plasma_client
       functions will return it
     *)

    Plasma_util.rpc_srv_reply "commit" sess emit
      (no_transaction_e rd "commit"
	 (fun esys ->
	    Plasma_client.flush_e 
	      rd.cluster inode 0L 0L
	    ++ (fun () ->
		  return_e ()
	       )
	 )
      )
  with
    | Nfs_error e ->
	emit
	  (add_arg e { NA.commit_file_wcc_fail = failed_wcc_data rd None }
	   :> NA.commit3res)


let proc_mknod conf sess (name_in_dir, data) emit = 
  let fail =
    { NA.mknod_dir_wcc_fail = 
	{ NA.before = None; after = None } } in
  emit (`nfs3err_notsupp fail)

let setup_nfs srv conf =
  Nfs3_srv.NFS.V3.bind_async
    ~proc_null:(fun sess () emit -> emit())
    ~proc_lookup:(proc_lookup conf)
    ~proc_getattr:(proc_getattr conf)
    ~proc_access:(proc_access conf)
    ~proc_readlink:(proc_readlink conf)
    ~proc_read:(proc_read conf)
    ~proc_readdir:(proc_readdir conf)
    ~proc_readdirplus:(proc_readdirplus conf)
    ~proc_fsstat:(proc_fsstat conf)
    ~proc_fsinfo:(proc_fsinfo conf)
    ~proc_pathconf:(proc_pathconf conf)
    ~proc_setattr:(proc_setattr conf)
    ~proc_write:(proc_write conf)
    ~proc_create:(proc_create conf)
    ~proc_mkdir:(proc_mkdir conf)
    ~proc_symlink:(proc_symlink conf)
    ~proc_remove:(proc_remove conf)
    ~proc_rmdir:(proc_rmdir conf)
    ~proc_rename:(proc_rename conf)
    ~proc_link:(proc_link conf)
    ~proc_commit:(proc_commit conf)
    ~proc_mknod:(proc_mknod conf)
    srv;
  let m_none =
    Rpc_server.auth_none in
  let m_sys = 
    Rpc_auth_sys.server_auth_method 
      ~lookup_hostname:false
      ~require_privileged_port:true
      () in
  Rpc_server.set_auth_methods srv [m_none;m_sys]


(* MOUNT *)

let proc_mnt conf sess path emit =
  let rd = get_rundata conf in
  let saddr = peer sess in
  let r =
    if path <> "/" ^ conf.clustername then
      `mnt3err_noent
    else (
      let secret =
	try
	  Hashtbl.find rd.clients saddr
	with
	  | Not_found ->
	      let secret = Plasma_rng.random_bytes 8 in
	      Hashtbl.add rd.clients saddr secret;
	      write_mount_table rd;
	      secret in
      let fh = wrap_fh secret rd.rootfh in
      `mnt3_ok
	{ NA.fhandle = fh;
	  auth_flavors = [| 1;  (* AUTH_SYS *)
			 |]
	}
    ) in
  emit r
      

let name_of_saddr saddr =
  match saddr with
    | Unix.ADDR_INET(ip,_) ->
	Unix.string_of_inet_addr ip
    | Unix.ADDR_UNIX _ ->
	"localhost"

let proc_dump conf sess  () emit =
  let rd = get_rundata conf in
  let l =
    Hashtbl.fold
      (fun saddr _ acc ->
	 Some
	   { NA.ml_hostname = name_of_saddr saddr;
	     ml_directory = "/" ^ conf.clustername;
	     ml_next = acc
	   }
      )
      rd.clients
      None in
  emit l


let proc_umnt conf sess path emit =
  let rd = get_rundata conf in
  if path = "/" ^ conf.clustername then (
    let saddr = peer sess in
    Hashtbl.remove rd.clients saddr;
    Hashtbl.remove rd.dircache saddr;
    write_mount_table rd;
  );
  emit ()


let proc_umntall conf sess () emit =
  let rd = get_rundata conf in
  let saddr = peer sess in
  Hashtbl.remove rd.clients saddr;
  Hashtbl.remove rd.dircache saddr;
  write_mount_table rd;
  emit ()


let proc_export conf sess () emit =
  emit
    (Some
       { NA.ex_dir = "/" ^ conf.clustername;
	 ex_groups = None;
	 ex_next = None
       })


let setup_mount srv conf =
  Nfs3_srv.MOUNT.V3.bind_async
    ~proc_null:(fun _ _ emit -> emit())
    ~proc_mnt:(proc_mnt conf)
    ~proc_dump:(proc_dump conf)
    ~proc_umnt:(proc_umnt conf)
    ~proc_umntall:(proc_umntall conf)
    ~proc_export:(proc_export conf)
    srv;
  let m_none =
    Rpc_server.auth_none in
  let m_sys = 
    Rpc_auth_sys.server_auth_method 
      ~lookup_hostname:false
      ~require_privileged_port:true
      () in
  Rpc_server.set_auth_methods srv [m_none;m_sys]


let parse_global_conf cf =
  let namenodes =
    cf#resolve_section cf#root_addr "namenodes" in
  match namenodes with
    | [] ->
        failwith "missing 'namenodes' config"
    | _ :: _ :: _ ->
        failwith "more than one 'namenodes' config"
    | [nn] ->
	let addrs = Plasma_util.node_list cf nn in
        if addrs = [] then
          failwith "nfs config: no namenodes";
	let nnlist =
          List.map
            (fun addr ->
               try Plasma_util.parse_host_port addr
               with _ ->
		 failwith ("bad 'addr' parameter in 'namenodes.node': " ^ addr)
            )
            addrs in
	let clustername =
            try
              let p = cf#resolve_parameter nn "clustername" in
              cf#string_param p
            with
              | Not_found ->
                  failwith "missing 'namenodes.clustername' parameter" in
	let buffer_memory =
            try
              let p = cf#resolve_parameter nn "buffer_memory" in
              cf#int_param p
            with
              | Not_found ->
		  128 * 1024 * 1024 (* 128 M *) in
	let cauth = Pfs_auth.extract_client_config cf cf#root_addr in
	{ clustername = clustername;
	  namenodes = nnlist;
	  buffer_memory = buffer_memory;
	  cauth;
	}


let global_conf = ref None

let get_global_conf cf =
  match !global_conf with
    | None ->
	let c = parse_global_conf cf in
	global_conf := Some c;
	c
    | Some c ->
	c


let nfs3_factory() =
  Rpc_netplex.rpc_factory
    ~configure:(fun cf cfaddr ->
		  get_global_conf cf
	       )
    ~hooks:(fun conf ->
	      ( object
		  inherit Netplex_kit.empty_processor_hooks()

		  method post_start_hook _ =
		    ignore(wait_for_rundata_failsafe conf)
		end
	      )
	   )
    ~setup:(fun srv conf ->
	      setup_nfs srv conf
	   )
    ~name:"nfs3"
    ()

let mount3_factory() =
  Rpc_netplex.rpc_factory
    ~configure:(fun cf cfaddr ->
		  get_global_conf cf
	       )
    ~hooks:(fun conf ->
	      ( object
		  inherit Netplex_kit.empty_processor_hooks()
		end
	      )
	   )
    ~setup:(fun srv conf ->
	      setup_mount srv conf
	   )
    ~name:"mount3"
    ()

let factory () =
  new Netplex_kit.protocol_switch_factory 
    "nfs"
    [ "nfs3", nfs3_factory();
      "mount3", mount3_factory()
    ]

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml