(* Copyright 2010 Gerd Stolpmann This file is part of Plasma, a distributed filesystem and a map/reduce computation framework. Unless you have a written license agreement with the copyright holder (Gerd Stolpmann), the following terms apply: Plasma is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. Plasma is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with Foobar. If not, see <http://www.gnu.org/licenses/>. *) (* $Id: nfs3d_manager.ml 488 2011-10-25 21:29:23Z gerd $ *) (* mount -t nfs -o soft,proto=tcp,port=2801,mountport=2800,mountproto=tcp,nfsvers=3,nolock,noacl,nordirplus,sec=none 127.0.0.1:/test /mnt *) module PA = Plasma_rpcapi_aux module NA = Nfs3_aux module StrSet = Plasma_util.StrSet open Plasma_util.Operators open Printf type conf = { clustername : string; namenodes : (string * int) list; buffer_memory : int; cauth : Pfs_auth.client_auth; } type dircache = (int64 * int64, (string * int64) array * float) Hashtbl.t (* maps [(inode, cookieverf)] to the array of [(name,inode)] members, plus time of last access *) type rundata = { conf : conf; cluster : Plasma_client.plasma_cluster; clients : (Unix.sockaddr, string) Hashtbl.t; (** Maps socket address to a secret that prooves the mount *) rootfh : int64; blocksize : int; blockpool : Netsys_mem.memory_pool; dircache : (Unix.sockaddr, dircache) Hashtbl.t; writeverf : string; mutable ug_admin : Plasma_ug.ug_admin; mutable ug_admin_file_t : float; mutable ug_admin_check_t : float; esys : Unixqueue.event_system; } type nfs_error = [ `nfs3err_perm | `nfs3err_noent | `nfs3err_io | `nfs3err_nxio | `nfs3err_acces | `nfs3err_exist | `nfs3err_xdev | `nfs3err_nodev | `nfs3err_notdir | `nfs3err_isdir | `nfs3err_inval | `nfs3err_fbig | `nfs3err_nospc | `nfs3err_rofs | `nfs3err_mlink | `nfs3err_nametoolong | `nfs3err_notempty | `nfs3err_dquot | `nfs3err_stale | `nfs3err_remote | `nfs3err_badhandle | `nfs3err_not_sync | `nfs3err_bad_cookie | `nfs3err_notsupp | `nfs3err_toosmall | `nfs3err_serverfault | `nfs3err_badtype | `nfs3err_jukebox ] type 'a nfs_error_arg = [ `nfs3err_perm of 'a | `nfs3err_noent of 'a | `nfs3err_io of 'a | `nfs3err_nxio of 'a | `nfs3err_acces of 'a | `nfs3err_exist of 'a | `nfs3err_xdev of 'a | `nfs3err_nodev of 'a | `nfs3err_notdir of 'a | `nfs3err_isdir of 'a | `nfs3err_inval of 'a | `nfs3err_fbig of 'a | `nfs3err_nospc of 'a | `nfs3err_rofs of 'a | `nfs3err_mlink of 'a | `nfs3err_nametoolong of 'a | `nfs3err_notempty of 'a | `nfs3err_dquot of 'a | `nfs3err_stale of 'a | `nfs3err_remote of 'a | `nfs3err_badhandle of 'a | `nfs3err_not_sync of 'a | `nfs3err_bad_cookie of 'a | `nfs3err_notsupp of 'a | `nfs3err_toosmall of 'a | `nfs3err_serverfault of 'a | `nfs3err_badtype of 'a | `nfs3err_jukebox of 'a ] exception Nfs_error of nfs_error exception Retry let add_arg nfs_error arg = match nfs_error with | `nfs3err_perm -> `nfs3err_perm arg | `nfs3err_noent -> `nfs3err_noent arg | `nfs3err_io -> `nfs3err_io arg | `nfs3err_nxio -> `nfs3err_nxio arg | `nfs3err_acces -> `nfs3err_acces arg | `nfs3err_exist -> `nfs3err_exist arg | `nfs3err_xdev -> `nfs3err_xdev arg | `nfs3err_nodev -> `nfs3err_nodev arg | `nfs3err_notdir -> `nfs3err_notdir arg | `nfs3err_isdir -> `nfs3err_isdir arg | `nfs3err_inval -> `nfs3err_inval arg | `nfs3err_fbig -> `nfs3err_fbig arg | `nfs3err_nospc -> `nfs3err_nospc arg | `nfs3err_rofs -> `nfs3err_rofs arg | `nfs3err_mlink -> `nfs3err_mlink arg | `nfs3err_nametoolong -> `nfs3err_nametoolong arg | `nfs3err_notempty -> `nfs3err_notempty arg | `nfs3err_dquot -> `nfs3err_dquot arg | `nfs3err_stale -> `nfs3err_stale arg | `nfs3err_remote -> `nfs3err_remote arg | `nfs3err_badhandle -> `nfs3err_badhandle arg | `nfs3err_not_sync -> `nfs3err_not_sync arg | `nfs3err_bad_cookie -> `nfs3err_bad_cookie arg | `nfs3err_notsupp -> `nfs3err_notsupp arg | `nfs3err_toosmall -> `nfs3err_toosmall arg | `nfs3err_serverfault -> `nfs3err_serverfault arg | `nfs3err_badtype -> `nfs3err_badtype arg | `nfs3err_jukebox -> `nfs3err_jukebox arg let read_multiple = 1 (* Reading more than one block has a bad effect. The NFS client usually always requests several blocks by emitting several READs. When now each READ fetches several filesystem blocks, this results in a non-ascending order *) let write_multiple = 16 let configure_auth cluster conf = Plasma_client.configure_auth cluster Pfs_auth.privileged_user Pfs_auth.bottom_user (fun u -> if u = Pfs_auth.privileged_user then conf.cauth.Pfs_auth.proot_pw else conf.cauth.Pfs_auth.pnobody_pw ); Plasma_client.configure_default_user_group cluster "root" "root" let with_sync_cluster conf f = let sesys = Unixqueue.create_unix_event_system() in let scluster = Plasma_client.open_cluster conf.clustername conf.namenodes sesys in configure_auth scluster conf; try let r = f scluster in Plasma_client.close_cluster scluster; r with | error -> Plasma_client.close_cluster scluster; raise error let mkdir_p cluster trans path = let rec loop prefix path = match path with | [] -> () | name :: path' -> let prefix' = prefix ^ "/" ^ name in ( try ignore(Plasma_client.mkdir trans prefix' (Plasma_client.dir_ii cluster 0o777)) with | Plasma_client.Plasma_error `eexist -> () ); loop prefix' path' in loop "/" path let dlogf = Plasma_util.dlogf let mount_dir_path_l = [ ".plasma"; "var"; "lib"; "nfs3" ] let mount_dir_path = "/" ^ String.concat "/" mount_dir_path_l let mount_path = mount_dir_path ^ "/mounts_" ^ Unix.gethostname() let to_hex s = let buf = Buffer.create 50 in for k = 0 to String.length s - 1 do bprintf buf "%02x" (Char.code s.[k]) done; Buffer.contents buf let from_hex s = (* poor impl (slow) *) try if (String.length s) mod 2 <> 0 then raise Not_found; let buf = Buffer.create 50 in for k = 0 to (String.length s) / 2 - 1 do let j = 2*k in let u = String.sub s j 2 in let code = Scanf.sscanf u "%x%!" (fun code -> code) in Buffer.add_char buf (Char.chr code) done; Buffer.contents buf with | _ -> invalid_arg "Nfs3d_manager.from_hex" let load_mount_table rd = let load scluster trans = try let inode = Plasma_client.lookup trans mount_path false in let ii = Plasma_client.get_inodeinfo trans inode in let s = String.create (Int64.to_int ii.PA.eof) in let (_,_,_) = Plasma_client.read scluster inode 0L (`String s) 0 (String.length s) in s with Plasma_client.Plasma_error `enoent -> "" in let s = with_sync_cluster rd.conf (fun scluster -> Plasma_client.retry scluster "get_mount_table" (fun () -> Plasma_client.with_trans scluster (load scluster)) () ) in let ch = new Netchannels.input_string s in try while true do let line1 = ch # input_line() in let ip = Unix.inet_addr_of_string line1 in let line2 = ch # input_line() in let secret = from_hex line2 in let saddr = Unix.ADDR_INET(ip,0) in Hashtbl.replace rd.clients saddr secret done with End_of_file -> () let rec write cluster inode s pos = let sm = `String s in if pos < String.length s then ( let n = Plasma_client.write cluster inode (Int64.of_int pos) sm pos (String.length s - pos) in write cluster inode s (pos+n) ) let write_mount_table rd = let mkfile scluster trans = mkdir_p scluster trans mount_dir_path_l; let inode = try Plasma_client.create_file trans mount_path (Plasma_client.regular_ii scluster 0o666) with Plasma_client.Plasma_error `eexist -> Plasma_client.lookup trans mount_path false in Plasma_client.truncate trans inode 0L; inode in let buf = Buffer.create 500 in Hashtbl.iter (fun saddr secret -> let ip = match saddr with | Unix.ADDR_INET(ip,_) -> ip | _ -> assert false in Buffer.add_string buf (Unix.string_of_inet_addr ip ^ "\n"); Buffer.add_string buf (to_hex secret ^ "\n") ) rd.clients; let s = Buffer.contents buf in with_sync_cluster rd.conf (fun scluster -> let inode = Plasma_client.retry scluster "write_mount_table" (fun () -> Plasma_client.with_trans scluster (fun trans -> mkfile scluster trans) ) () in write scluster inode s 0; Plasma_client.flush scluster inode 0L 0L; ) let read_local_file name = let b = Buffer.create 512 in let b_ch = new Netchannels.output_buffer b in Netchannels.with_in_obj_channel (new Netchannels.input_channel (open_in_bin name)) (fun ch -> b_ch # output_channel ch); Buffer.contents b let get_ug_admin_file_t() = List.fold_left (fun acc file -> let m = (Unix.stat file).Unix.st_mtime in max acc m ) 0.0 [ "/etc/passwd"; "/etc/group" ] let get_ug_admin() = (* Just read /etc/passwd and /etc/group *) let data = [ "passwd", read_local_file "/etc/passwd"; "group", read_local_file "/etc/group" ] in Plasma_ug.parse_ug_admin data let new_rundata conf = let rootfh, blocksize, ug_admin, ug_admin_file_t = with_sync_cluster conf (fun scluster -> let st = Plasma_client.start scluster in let rootfh = Plasma_client.lookup st "/" false in let blocksize = Plasma_client.blocksize scluster in let ug_admin_file_t = get_ug_admin_file_t() in let ug_admin = get_ug_admin() in Plasma_client.commit st; (rootfh, blocksize, ug_admin, ug_admin_file_t) ) in let esys = (Netplex_cenv.self_cont()) # event_system in let cluster = Plasma_client.open_cluster conf.clustername conf.namenodes esys in configure_auth cluster conf; Plasma_client.configure_buffer cluster (max 1 (conf.buffer_memory / blocksize)); (* writeverf is just the startup timestamp: *) let writeverfL = Int64.bits_of_float (Unix.gettimeofday()) in let writeverf = Rtypes.int8_as_string (Rtypes.int8_of_int64 writeverfL) in let rd = { conf = conf; cluster = cluster; clients = Hashtbl.create 7; rootfh = rootfh; blocksize = blocksize; blockpool = Netsys_mem.create_pool (read_multiple*blocksize); dircache = Hashtbl.create 7; writeverf = writeverf; ug_admin; ug_admin_file_t; ug_admin_check_t = Unix.gettimeofday(); esys = esys } in load_mount_table rd; rd let check_ug_admin rd = let now = Unix.gettimeofday() in if now -. rd.ug_admin_check_t >= 1.0 then ( let file_t = get_ug_admin_file_t() in if file_t > rd.ug_admin_file_t then ( rd.ug_admin <- get_ug_admin(); rd.ug_admin_file_t <- file_t; rd.ug_admin_check_t <- now ) ) let global_rundata = ref None let get_rundata conf = match !global_rundata with | Some rd -> if rd.conf == conf then ( check_ug_admin rd; rd ) else failwith "More than one configuration" | None -> let rd = new_rundata conf in global_rundata := Some rd; rd let rec wait_for_rundata_failsafe conf = try ignore(get_rundata conf); true with | Plasma_client.Cluster_down msg as err -> Netlog.logf `Warning "Cluster not yet up: %s" (Netexn.to_string err); let _t = Netplex_cenv.create_timer (fun t -> wait_for_rundata_failsafe conf) 1.0 in false let peer sess = let saddr = Rpc_server.get_peer_name sess in match saddr with | Unix.ADDR_INET(ip,_) -> Unix.ADDR_INET(ip,0) | _ -> saddr let wrap_fh secret inode = secret ^ Rtypes.int8_as_string (Rtypes.int8_of_int64 inode) let unwrap_fh fh = if String.length fh <> 16 then failwith "unwrap_fh"; let secret = String.sub fh 0 8 in (secret, Rtypes.int64_of_int8 (Rtypes.read_int8 fh 8)) let wrap_sess_fh sess rd inode = let saddr = peer sess in try let secret = Hashtbl.find rd.clients saddr in wrap_fh secret inode with | Not_found -> raise (Nfs_error `nfs3err_serverfault) let unwrap_sess_fh sess rd fh = let saddr = peer sess in if String.length fh <> 16 then raise (Nfs_error `nfs3err_badhandle); try let exp_secret = Hashtbl.find rd.clients saddr in let (secret, inode) = unwrap_fh fh in if secret <> exp_secret then raise (Nfs_error `nfs3err_badhandle); inode with | Not_found -> raise (Nfs_error `nfs3err_serverfault) let to_uid rd user = try Int64.of_int ((rd.ug_admin # getpwnam user).Unix.pw_uid) with _ -> 65534L (* "nobody" *) let to_uname rd uid = try (rd.ug_admin # getpwuid (Int64.to_int uid)).Unix.pw_name with _ -> "nobody" let to_gid rd group = try Int64.of_int((rd.ug_admin # getgrnam group).Unix.gr_gid) with _ -> 65534L (* "nogroup" *) let to_gname rd gid = try (rd.ug_admin # getgrgid (Int64.to_int gid)).Unix.gr_name with _ -> "nogroup" let to_nfstime t = if t.PA.tsecs < 0L then failwith "to_nfstime: found negative value (=set to server time)"; { NA.seconds = t.PA.tsecs; nseconds = Int64.of_int t.PA.tnsecs } let of_nfstime t = { PA.tsecs = t.NA.seconds; PA.tnsecs = Int64.to_int t.NA.nseconds } let to_fattr3 rd inode ii = (* Update eof and mtime if there are pending writes. The background is that Plasma_client.write does not immediately update these metadata field. This happens first at flush time. We can query the designated new values, though, and consider them before we return values back to the user. *) let eof = try Plasma_client.get_write_eof rd.cluster inode with Not_found -> ii.PA.eof in let mtime = try Plasma_client.get_write_mtime rd.cluster inode with Not_found -> ii.PA.mtime in dlogf "to_fattr3 eof=%Ld" eof; { NA.fa_type = (match ii.PA.filetype with | `ftype_regular -> `nf3reg | `ftype_directory -> `nf3dir | `ftype_symlink -> `nf3lnk ); fa_mode = Int64.of_int (ii.PA.mode land 0o777); fa_nlink = 1L; (* FIXME *) fa_uid = to_uid rd ii.PA.usergroup.PA.user; fa_gid = to_gid rd ii.PA.usergroup.PA.group; fa_size = eof; fa_used = 0L; (* FIXME *) (* fa_used is the number of used bytes. For us, this would be the number of actually allocated blocks, times the blocksize TODO: add this to inode_info. How to deal with failed nodes? *) fa_rdev = { NA.specdata1 = 0L; specdata2 = 0L }; fa_fsid = 0L; fa_fileid = inode; fa_atime = to_nfstime mtime; (* no atime *) fa_mtime = to_nfstime mtime; fa_ctime = to_nfstime ii.PA.ctime; } let of_fattr3 rd ftype attr = { PA.filetype = ftype; usergroup = { PA.user = to_uname rd attr.NA.fa_uid; PA.group = to_gname rd attr.NA.fa_gid; }; mode = (Int64.to_int attr.NA.fa_mode) land 0o777; eof = attr.NA.fa_size; mtime = of_nfstime attr.NA.fa_mtime; ctime = of_nfstime attr.NA.fa_ctime; replication = 0; (* use server's default *) blocklimit = 0L; field1 = ""; seqno = (-1L); committed = false; create_verifier = 0L; anonymous = false; } let to_wcc_attr inode ii = dlogf "to_wcc_attr eof=%Ld" ii.PA.eof; { NA.wcc_size = ii.PA.eof; wcc_mtime = to_nfstime ii.PA.mtime; wcc_ctime = to_nfstime ii.PA.ctime; } let to_nfserr ?(retry_on_conflict = false) (pe : Plasma_client.errno) : nfs_error = match pe with | `enotrans -> `nfs3err_serverfault | `efailedcommit -> `nfs3err_serverfault | `elongtrans -> `nfs3err_serverfault | `efailed -> `nfs3err_serverfault | `eperm -> `nfs3err_perm | `enoent -> `nfs3err_noent | `eaccess -> `nfs3err_acces | `eexist -> `nfs3err_exist | `efhier -> `nfs3err_perm | `einval -> `nfs3err_inval | `efbig -> `nfs3err_fbig | `enospc -> `nfs3err_nospc | `erofs -> `nfs3err_rofs | `enametoolong -> `nfs3err_nametoolong | `econflict -> if retry_on_conflict then raise Retry else `nfs3err_serverfault | `ecoord -> `nfs3err_serverfault | `enonode -> `nfs3err_serverfault | `etbusy -> `nfs3err_serverfault | `estale -> `nfs3err_stale | `eio -> `nfs3err_io | `eloop -> `nfs3err_serverfault (* should never get this *) | `enotdir -> `nfs3err_notdir | `eisdir -> `nfs3err_isdir | `enotempty -> `nfs3err_notempty | `ebadpath -> `nfs3err_notdir let to_nfserr_arg ?retry_on_conflict pe arg : _ nfs_error_arg = add_arg (to_nfserr ?retry_on_conflict pe) arg let of_create_verf s = (* Convert 8 bytes to an int64 *) assert(String.length s = 8); Rtypes.int64_of_int8 (Rtypes.read_int8 s 0) (* Helpers for authorization *) (* FIXME: get_rpc_identity just trusts the IDs it gets from the local machine. Regarding the user ID there is nothing wrong with that. The groups, though, should be verified with the namenode (i.e. only accept a group if the user is also in this group following the definition in the namenode). *) let get_rpc_identity rd sess : (string * string * StrSet.t) = (* may raise Nfs_error *) let am = Rpc_server.get_auth_method sess in if am#name <> "AUTH_SYS" then raise(Nfs_error `nfs3err_perm); let creds = Rpc_server.get_user sess in try let (uid, gid, supp_gids, _) = Rpc_auth_sys.parse_user_name creds in let uname = to_uname rd (Int64.of_int uid) in let gname = to_gname rd (Int64.of_int gid) in let supp_groups = Array.fold_left (fun acc g -> try let n = to_gname rd (Int64.of_int g) in StrSet.add n acc with | _ -> acc (* just drop unknown gid's *) ) StrSet.empty supp_gids in dlogf "get_rpc_identity: user=%s group=%s" uname gname; (uname,gname,supp_groups) with | _ -> raise(Nfs_error `nfs3err_perm) let have_permission mask (user,group,supp_groups) ii = if user = ii.PA.usergroup.PA.user then ( (ii.PA.mode land (mask lsl 6)) <> 0 ) else if StrSet.mem ii.PA.usergroup.PA.group supp_groups then ( (ii.PA.mode land (mask lsl 3)) <> 0 ) else (ii.PA.mode land mask) <> 0 let have_read_permission = have_permission 4 let have_write_permission = have_permission 2 let have_exec_permission = have_permission 1 let have_owner_permission (user,group,supp_groups) ii = user = ii.PA.usergroup.PA.user let check_setattr_permission rd creds sattr ii = let have_user_change = match sattr.NA.sa_uid with | None -> false | Some uid -> let uname = to_uname rd uid in ii.PA.usergroup.PA.user <> uname in let have_group_change = match sattr.NA.sa_gid with | None -> false | Some id -> ii.PA.usergroup.PA.group <> to_gname rd id in let have_mode_change = match sattr.NA.sa_mode with | None -> false | Some m -> Int64.of_int(ii.PA.mode) <> m in let have_current_mtime_change = match sattr.NA.sa_mtime with | `dont_change -> false | `set_to_server_time -> true | `set_to_client_time _ -> false in let have_arbitrary_mtime_change = match sattr.NA.sa_mtime with | `dont_change -> false | `set_to_server_time -> false | `set_to_client_time _ -> true in let have_size_change = sattr.NA.sa_size <> None in (* perms: *) let owner_perm_lz = lazy(have_owner_permission creds ii) in let wr_perm_lz = lazy(Lazy.force owner_perm_lz || have_write_permission creds ii) in (* changing uid: always disallowed *) if have_user_change then raise(Nfs_error `nfs3err_perm); (* Changing mode, gid: the owner can do this *) if (have_mode_change && not(Lazy.force owner_perm_lz)) || (have_group_change && not(Lazy.force owner_perm_lz)) then raise(Nfs_error `nfs3err_perm); (* mtime: the owner can do this, or anybody with write access, but only to the current time *) if have_current_mtime_change && not(Lazy.force wr_perm_lz) && not(Lazy.force owner_perm_lz) then raise(Nfs_error `nfs3err_acces); if have_arbitrary_mtime_change && not(Lazy.force owner_perm_lz) then raise(Nfs_error `nfs3err_perm); (* size: write access needed *) if have_size_change && not(Lazy.force wr_perm_lz) then raise(Nfs_error `nfs3err_acces); () (* NFS *) let pause = [| 0.01; 0.02; 0.04; 0.08; 0.16; 0.32; 0.64; 1.0; 2.0; 4.0; 8.0; 16.0; 32.0; 60.0 |] let transaction_e rd name f = (** Creates a transaction [t] and runs [f t esys]. *) (* TODO: avoid serialization *) let result = ref `Aborted in let rec next_attempt retry = dlogf "Starting transaction for %s" name; Plasma_client.start_e rd.cluster ++ (fun t -> let e = f t rd.esys in Plasma_util.failsafe_e e ++ (fun st -> let success, do_retry = match st with | `Done _ -> true, false | `Error Retry -> false, true | _ -> false, false in ( if success then ( dlogf "Committing transaction for %s" name; Plasma_client.commit_e t ) else ( dlogf "Aborting transaction for %s" name; Plasma_client.abort_e t ) ) ++ (fun () -> if do_retry then ( let d = if retry < Array.length pause then pause.(retry) else pause.(Array.length pause - 1) in dlogf "Retrying transaction for %s after %f seconds" name d; Plasma_util.delay_engine d (fun _ -> next_attempt (retry+1)) rd.esys ) else ( result := st; eps_e (`Done ()) rd.esys ) ) ) ) in next_attempt 0 >> (function | `Done () -> `Done !result | `Error e -> `Error e | `Aborted -> `Aborted ) let no_transaction_e rd name f = (* Same, but no transaction *) let result = ref `Aborted in let rec next_attempt retry = dlogf "Starting execution of %s" name; let e = f rd.esys in Plasma_util.failsafe_e e ++ (fun st -> let success, do_retry = match st with | `Done _ -> true, false | `Error Retry -> false, true | _ -> false, false in ( if success then dlogf "Successful execution of %s" name else dlogf "Non-successful execution of %s" name ); if do_retry then ( let d = if retry < Array.length pause then pause.(retry) else pause.(Array.length pause - 1) in dlogf "Retrying execution of %s after %f seconds" name d; Plasma_util.delay_engine d (fun _ -> next_attempt (retry+1)) rd.esys ) else ( result := st; eps_e (`Done ()) rd.esys ) ) in next_attempt 0 >> (function | `Done () -> `Done !result | `Error e -> `Error e | `Aborted -> `Aborted ) (* --- read calls --- *) let proc_lookup conf sess name_in_dir emit = let rd = get_rundata conf in try let _ = get_rpc_identity rd sess in (* Any identity can look up - no further checks *) let dir_inode = unwrap_sess_fh sess rd name_in_dir.NA.dirop_dir in Plasma_util.rpc_srv_reply "lookup" sess emit (transaction_e rd "lookup" (fun t esys -> (* We don't check the x bit because we don't do this in the nameserver neither *) Plasma_client.dir_lookup_e t dir_inode name_in_dir.NA.dirop_name true ++ (fun file_inode -> Plasma_client.get_inodeinfo_e t file_inode ++ (fun file_ii -> eps_e (`Done(file_inode,file_ii)) esys ) ) >> (function | `Done (file_inode,file_ii) -> let file_fh = wrap_sess_fh sess rd file_inode in let file_attr = to_fattr3 rd file_inode file_ii in let ok = { NA.lookup_object = file_fh; lookup_obj_attributes = Some file_attr; lookup_dir_attributes = None } in `Done(`nfs3_ok ok) | `Error (Plasma_client.Plasma_error plasma_err) -> `Done(to_nfserr_arg ~retry_on_conflict:true plasma_err { NA.lookup_dir_attributes_fail = None } :> NA.lookup3res) | `Error err -> `Error err | `Aborted -> `Aborted ) ) ) with | Nfs_error e -> let r = (add_arg e { NA.lookup_dir_attributes_fail = None } :> NA.lookup3res) in emit r let with_ii ?(rd=false) ?(rd_or_owner=false) ?(wr=false) conf sess fh proc_name f_ok f_err f_nfserr emit = (* f_ok: is called as f_ok trans inode ii and must return an engine computing the OK value f_err: is called for Plasma errors as f_err plasmacode inode_ii_opt and must return the emitted value f_nfserr: is called for Nfs_error exceptions as f_nfserr nfscode inode_ii_opt and must return the emitted value. nfscode is w/o argument. inode_ii_opt is Some(inode,ii) if the inode lookup was ok. *) let rund = get_rundata conf in try let creds = get_rpc_identity rund sess in let inode = unwrap_sess_fh sess rund fh in Plasma_util.rpc_srv_reply proc_name sess emit (transaction_e rund proc_name (fun t esys -> let inode_ii_opt = ref None in Plasma_client.get_inodeinfo_e t inode ++ (fun ii -> if rd && not (have_read_permission creds ii) then raise(Nfs_error `nfs3err_acces); if rd_or_owner && (not (have_read_permission creds ii)) && (not (have_owner_permission creds ii)) then raise(Nfs_error `nfs3err_acces); if wr && not (have_write_permission creds ii) then raise(Nfs_error `nfs3err_acces); inode_ii_opt := Some(inode,ii); try let e = f_ok t inode ii creds in e ++ (fun ok -> eps_e (`Done ok) rund.esys) with | error -> eps_e (`Error error) rund.esys ) >> (function | `Done r -> `Done r | `Error (Plasma_client.Plasma_error plasma_err) -> `Done(f_err plasma_err !inode_ii_opt) | `Error (Nfs_error err) -> `Done(f_nfserr err !inode_ii_opt) | `Error err -> `Error err | `Aborted -> `Aborted ) ) ) with | Nfs_error e -> (* from get_rpc_identity or unwrap_sess_fh *) let r = f_nfserr e None in emit r let get_ii ?rd ?rd_or_owner ?wr conf sess fh proc_name f_ok f_err f_nfserr emit = (* Same but f_ok returns the result directly *) let rund = get_rundata conf in with_ii ?rd ?rd_or_owner ?wr conf sess fh proc_name (fun t inode ii creds -> let r = f_ok t inode ii creds in eps_e (`Done (`nfs3_ok r)) rund.esys ) f_err f_nfserr emit let proc_getattr conf sess fh emit = let rd = get_rundata conf in get_ii ~rd_or_owner:true conf sess fh "getattr" (fun t inode ii creds -> let attr = to_fattr3 rd inode ii in { NA.getattr_obj_attributes = attr } ) (fun plasma_err _ -> (to_nfserr ~retry_on_conflict:true plasma_err :> NA.getattr3res) ) (fun nfs_err _ -> (nfs_err :> NA.getattr3res)) emit let a_read = Rtypes.int_of_uint4 NA.access3_read let a_lookup = Rtypes.int_of_uint4 NA.access3_lookup let a_modify = Rtypes.int_of_uint4 NA.access3_modify let a_extend = Rtypes.int_of_uint4 NA.access3_extend let a_delete = Rtypes.int_of_uint4 NA.access3_delete let a_execute = Rtypes.int_of_uint4 NA.access3_execute let grant bits cond = if cond then bits else 0 let proc_access conf sess (fh, perms) emit = (* fake for now - we always grant everything *) let rd = get_rundata conf in let pi = Int64.to_int (Int64.logand perms 0x3fL) in get_ii conf sess fh "access" (fun t inode ii creds -> let r1 = grant a_read ((pi land a_read) <> 0 && (have_read_permission creds ii)) in let r2 = grant a_lookup ((pi land a_lookup) <> 0) in (* we ignore the x bit! *) let r3 = grant a_modify ((pi land a_modify) <> 0 && (have_write_permission creds ii)) in let r4 = grant a_extend ((pi land a_extend) <> 0 && (have_write_permission creds ii)) in let r5 = grant a_delete false in (* never granted, see RFC 1813 *) let r6 = grant a_execute ((pi land a_execute) <> 0 && (have_exec_permission creds ii)) in let pi' = r1 lor r2 lor r3 lor r4 lor r5 lor r6 in let attr = to_fattr3 rd inode ii in { NA.access_obj_attributes_ok = Some attr; access_rights_ok = Int64.of_int pi' } ) (fun plasma_err _ -> (to_nfserr_arg ~retry_on_conflict:true plasma_err { NA.access_obj_attributes_fail = None } :> NA.access3res) ) (fun nfs_err _ -> (add_arg nfs_err { NA.access_obj_attributes_fail = None } :> NA.access3res)) emit let proc_readlink conf sess fh emit = let rd = get_rundata conf in (* No access check here *) get_ii conf sess fh "readlink" (fun t inode ii creds -> let attr = to_fattr3 rd inode ii in match ii.PA.filetype with | `ftype_symlink -> { NA.readlink_symlink_attributes_ok = Some attr; readlink_data = ii.PA.field1 } | _ -> raise(Plasma_client.Plasma_error `einval) ) (fun plasma_err inode_ii_opt -> let attr_opt = match inode_ii_opt with | None -> None | Some(inode,ii) -> Some(to_fattr3 rd inode ii) in (to_nfserr_arg ~retry_on_conflict:true plasma_err { NA.readlink_symlink_attributes_fail = attr_opt } :> NA.readlink3res) ) (fun nfs_err _ -> (add_arg nfs_err { NA.readlink_symlink_attributes_fail = None } :> NA.readlink3res)) emit let proc_fsstat conf sess fh emit = (* fake so far *) let rd = get_rundata conf in try let inode = unwrap_sess_fh sess rd fh in if inode <> rd.rootfh then raise(Nfs_error `nfs3err_stale); with_ii conf sess fh "fsstat" (fun t inode ii creds -> let attr = to_fattr3 rd inode ii in Plasma_client.fsstat_e rd.cluster ++ (fun st -> let free_blocks = Int64.sub (Int64.sub st.PA.total_blocks st.PA.used_blocks) st.PA.trans_blocks in let bsizeL = Int64.of_int rd.blocksize in let m = Int64.div Int64.max_int bsizeL in let bytes n = if n > m then Int64.max_int else Int64.mul n bsizeL in let r = { NA.fsstat_obj_attributes_ok = Some attr; fsstat_tbytes = bytes st.PA.total_blocks; fsstat_fbytes = bytes free_blocks; fsstat_abytes = bytes free_blocks; fsstat_tfiles = Int64.max_int; (* best guess *) fsstat_ffiles = Int64.max_int; (* best guess *) fsstat_afiles = Int64.max_int; (* best guess *) fsstat_invarsec = 0L; } in eps_e (`Done (`nfs3_ok r)) rd.esys ) ) (fun plasma_err inode_ii_opt -> let fail = { NA.fsstat_obj_attributes_fail = None } in (to_nfserr_arg ~retry_on_conflict:true plasma_err fail :> NA.fsstat3res) ) (fun nfs_err inode_ii_opt -> let fail = { NA.fsstat_obj_attributes_fail = None } in (add_arg nfs_err fail :> NA.fsstat3res)) emit with | Nfs_error e -> emit (add_arg e { NA.fsstat_obj_attributes_fail = None } :> NA.fsstat3res) let proc_fsinfo conf sess fh emit = (* fake so far *) let rd = get_rundata conf in try let _ = get_rpc_identity rd sess in let inode = unwrap_sess_fh sess rd fh in if inode <> rd.rootfh then raise(Nfs_error `nfs3err_stale); let r = { NA.fsinfo_obj_attributes_ok = None; fsinfo_rtmax = Int64.of_int (read_multiple * rd.blocksize); fsinfo_rtpref = Int64.of_int rd.blocksize; fsinfo_rtmult = Int64.of_int rd.blocksize; fsinfo_wtmax = Int64.of_int (write_multiple * rd.blocksize); fsinfo_wtpref = Int64.of_int rd.blocksize; fsinfo_wtmult = Int64.of_int rd.blocksize; fsinfo_dtpref = 32768L; fsinfo_maxfilesize = Int64.max_int; fsinfo_time_delta = { NA.seconds = 0L; nseconds = 1L }; fsinfo_properties = Int64.logor (Int64.logor (Int64.logor (Rtypes.int64_of_uint4 NA.fsf3_link) (Rtypes.int64_of_uint4 NA.fsf3_symlink)) (Rtypes.int64_of_uint4 NA.fsf3_homogeneous)) (Rtypes.int64_of_uint4 NA.fsf3_cansettime); } in emit (`nfs3_ok r) with | Nfs_error e -> emit (add_arg e { NA.fsinfo_obj_attributes_fail = None } :> NA.fsinfo3res) (* about wtmult: "OTOH wtmult has nothing to do with RPC, and has more to do with the disk organization on the server. As I understand it, in many cases the significance of this value lies in the fact that hardware operations to the disk have a lower limit on the number of bytes that can be read/written. IOW if your write is not aligned to a 'wtmult' boundary, then the server may be forced to read in the remaining data from the disk before it writes the entire block back." (Trond Myklebust, http://search.luky.org/linux-kernel.2003/msg02857.html) *) let proc_pathconf conf sess fh emit = let rd = get_rundata conf in get_ii conf sess fh "pathconf" (fun t inode ii creds -> let attr = to_fattr3 rd inode ii in { NA.pathconf_obj_attributes_ok = Some attr; pathconf_linkmax = 32767L; (* actually unlimited *) pathconf_name_max = 32767L; (* actually unlimited *) pathconf_no_trunc = true; pathconf_chown_restricted = false; pathconf_case_insensitive = false; pathconf_case_preserving = true; } ) (fun plasma_err _ -> (to_nfserr_arg ~retry_on_conflict:true plasma_err { NA.pathconf_obj_attributes_fail = None } :> NA.pathconf3res) ) (fun nfs_err _ -> (add_arg nfs_err { NA.pathconf_obj_attributes_fail = None } :> NA.pathconf3res)) emit (* let memory_string rd size = let mlen = Netsys_mem.init_string_bytelen size in let mem = if mlen <= Netsys_mem.pool_block_size rd.blockpool then Netsys_mem.pool_alloc_memory rd.blockpool else Bigarray.Array1.create Bigarray.char Bigarray.c_layout mlen in let voffs,_ = Netsys_mem.init_string mem 0 size in let s = (Netsys_mem.as_value mem voffs : string) in let m = Bigarray.Array1.sub mem voffs size in (* mem: the underlying bigarray m: the bigarray which is physically identical with s s: the string inside mem *) (mem, m, s) *) let prefix_mstring (m:Xdr_mstring.mstring) p = assert(p <= m#length); ( object method length = p method blit_to_string = m#blit_to_string method blit_to_memory = m#blit_to_memory method as_string = m#as_string method as_memory = m#as_memory method preferred = m#preferred end ) let proc_read conf sess (fh, offset, count) emit = let rd = get_rundata conf in try let creds = get_rpc_identity rd sess in let inode = unwrap_sess_fh sess rd fh in (* offset and count are specified as unsigned int64 in the NFS protocol. Here, they are passed as signed int64, i.e. negative values are large ones. If offset is negative, we immediately return 0 and set the EOF flag. If count is negative, we replace it with max_int. *) if offset < 0L then emit (`nfs3_ok { NA.read_file_attributes_ok = None; read_count_ok = 0L; read_eof = true; read_data = (Xdr_mstring.memory_based_mstrings # create_from_string "" 0 0 false ) }) else ( let max_count = read_multiple * rd.blocksize in let max_countL = Int64.of_int max_count in let count = if count < 0L then max_count else Int64.to_int (min count max_countL) in let full_name = sprintf "read(%Ld,%Ld,%d)" inode offset count in let m = if count <= Netsys_mem.pool_block_size rd.blockpool then Netsys_mem.pool_alloc_memory rd.blockpool else Bigarray.Array1.create Bigarray.char Bigarray.c_layout count in Plasma_util.rpc_srv_reply full_name sess emit (no_transaction_e rd full_name (fun esys -> ( Plasma_client.read_e ~lazy_validation:true rd.cluster inode offset (`Memory m) 0 count ++ (fun (n,eof,ii) -> assert(n >= 0 && n <= count); eps_e (`Done (n,eof,ii)) rd.esys ) ) ++ (fun (n, eof, ii) -> assert(n <= count); (* We check here the permissions (AFTER reading the block, but that's a minor issue). Access is granted when the r or x bits allow it (see RFC 1813) *) if not (have_read_permission creds ii) && not (have_exec_permission creds ii) then raise (Nfs_error `nfs3err_acces); let r = { NA.read_file_attributes_ok = Some(to_fattr3 rd inode ii); read_count_ok = Int64.of_int n; read_eof = eof; read_data = (Xdr_mstring.memory_based_mstrings # create_from_memory m 0 n false ) } in eps_e (`Done(`nfs3_ok r)) rd.esys ) >> (function | `Done r -> `Done r | `Error (Plasma_client.Plasma_error plasma_err) -> `Done(to_nfserr_arg ~retry_on_conflict:true plasma_err { NA.read_file_attributes_fail = None } :> NA.read3res) | `Error err -> `Error err | `Aborted -> `Aborted ) ) ) ) with | Nfs_error e -> emit (add_arg e { NA.read_file_attributes_fail = None } :> NA.read3res) let null_verf = 0L let readdir_from_cache rd saddr inode verf = try if verf = null_verf then raise Not_found; let dircache = Hashtbl.find rd.dircache saddr in let (entries, _) = Hashtbl.find dircache (inode, verf) in let now = Unix.gettimeofday() in Hashtbl.replace dircache (inode, verf) (entries, now); Some entries with | Not_found -> None let readdir_e rd t saddr inode = (** not cached, so we have to query the list *) Plasma_client.list_inode_e t inode ++ (fun entries -> let entries = Array.of_list entries in let dircache = try Hashtbl.find rd.dircache saddr with | Not_found -> let dircache = Hashtbl.create 13 in Hashtbl.add rd.dircache saddr dircache; dircache in let new_verf = ref (Plasma_rng.random_int64()) in while !new_verf = null_verf || Hashtbl.mem dircache (inode,!new_verf) do new_verf := Int64.succ !new_verf done; let now = Unix.gettimeofday() in Hashtbl.add dircache (inode,!new_verf) (entries, now); eps_e (`Done (entries, !new_verf)) rd.esys ) let clean_dircache rd saddr = (* limit the number of entries stored there *) (* TODO: call this function periodically *) let n_keep = 4 in try let dircache = Hashtbl.find rd.dircache saddr in let keep = ref [] in let k = ref 0 in let del = ref [] in let now = Unix.gettimeofday() in Hashtbl.iter (fun (inode,verf) (_,t) -> if now -. t < 60.0 then ( keep := (inode, verf, t) :: !keep; incr k; if !k > n_keep then ( (** remove oldest entry from [keep] *) let (o_inode, o_verf, o_t) = List.fold_left (fun (o_inode, o_verf, o_t) (inode, verf, t) -> if t < o_t then (inode, verf, t) else (o_inode, o_verf, o_t) ) (List.hd !keep) (List.tl !keep) in keep := List.filter (fun (inode, verf, _) -> inode <> o_inode || verf <> o_verf) !keep; del := (o_inode, o_verf) :: !del; decr k ) ) else del := (inode, verf) :: !del ) dircache; List.iter (fun (inode, verf) -> Hashtbl.remove dircache (inode,verf) ) !del with | Not_found -> () let readdir_response count entries k_start verf = let count = if count < 0L then max_int else Int64.to_int (min (Int64.of_int max_int) count) in let nfs_entry k (name,inode) = let ne = { NA.entry_fileid = inode; entry_name = name; entry_cookie = Int64.of_int (k+1); entry_nextentry = None } in let name_size = (String.length name - 1) / 4 + 5 in let ne_size = name_size + 20 (* 20: remaining stuff in entry3 *) in (ne, ne_size) in let rec nfs_entries k xdr_size = if k >= Array.length entries then (None, true) else ( let (ne, ne_size) = nfs_entry k entries.(k) in if count - ne_size >= xdr_size then ( let (l', eof) = nfs_entries (k+1) (xdr_size + ne_size) in (Some { ne with NA.entry_nextentry = l' }, eof) ) else (None, false) ) in let dirlist3_size = 8 in let resok_size = 12 in let l, eof = nfs_entries k_start (dirlist3_size + resok_size) in { NA.readdir_dir_attributes_ok = None; readdir_cookieverf_ok = verf; readdir_reply = { NA.dl_entries = l; dl_eof = eof } } let proc_readdir_common conf sess (fh, cookie, verf_s) f_ok emit = let rd = get_rundata conf in let saddr = peer sess in let verf = Rtypes.int64_of_int8 (Rtypes.read_int8 verf_s 0) in let fail = { NA.readdir_dir_attributes_fail = None } in try with_ii ~rd:true conf sess fh "readdir" (fun t inode ii creds -> (** First check whether we can respond from dircache: *) match readdir_from_cache rd saddr inode verf with | Some entries -> (** Found in the cache. The [cookie] is simply interpreted as the index in [names] where to continue *) if cookie < 0L || cookie >= Int64.of_int max_int then raise(Nfs_error `nfs3err_bad_cookie); let cookie = Int64.to_int cookie in if cookie > Array.length entries then raise(Nfs_error `nfs3err_bad_cookie); clean_dircache rd saddr; eps_e (`Done(`nfs3_ok (f_ok entries cookie verf_s))) rd.esys | None -> if cookie <> 0L && verf <> 0L then raise (Nfs_error `nfs3err_bad_cookie); (* Linux does not interpret BAD_COOKIE nicely (i.e. does not try again with cookie=0 & verf=0), so we have to work around this. One important case seems to be cookie=0 & verf <> 0. We interpret this as wish to restart the read from the beginning of the directory. *) readdir_e rd t saddr inode ++ (fun (entries, new_verf) -> let new_verf_s = Rtypes.int8_as_string (Rtypes.int8_of_int64 new_verf) in let r = f_ok entries 0 new_verf_s in let rr = r.NA.readdir_reply in if rr.NA.dl_entries = None && not rr.NA.dl_eof then eps_e (`Done(`nfs3err_toosmall fail)) rd.esys else eps_e (`Done(`nfs3_ok r)) rd.esys ) ) (fun plasma_err ii_opt -> (to_nfserr_arg ~retry_on_conflict:true plasma_err fail :> NA.readdir3res) ) (fun nfs_err ii_opt -> (add_arg nfs_err fail :> NA.readdir3res) ) emit with | Nfs_error e -> emit (add_arg e { NA.readdir_dir_attributes_fail = None } :>NA.readdir3res) let proc_readdir conf sess (fh, cookie, verf_s, count) emit = let f = readdir_response count in proc_readdir_common conf sess (fh, cookie, verf_s) f emit let proc_readdirplus conf sess (fh, cookie, verf, dircount, maxcount) emit = (** In the RFC [dircount] is unspecified. Obviously, it refers to some buffer size in the Solaris implementation. We simply return NOTSUPP so clients fall back to readdir. *) let fail = { NA.readdirp_dir_attributes_fail = None } in emit (`nfs3err_notsupp fail) (* --- write calls --- *) let failed_wcc_data rd inode_ii_opt = (* This assumes the operation did not perform any modification, so we can return ii as [after] *) { NA.before = ( match inode_ii_opt with | None -> None | Some(inode,ii) -> Some (to_wcc_attr inode ii) ); NA.after = ( match inode_ii_opt with | None -> None | Some(inode,ii) -> Some (to_fattr3 rd inode ii) ); } let apply_sattr rd ii sattr = let new_ug = { PA.user = (match sattr.NA.sa_uid with | None -> ii.PA.usergroup.PA.user | Some id -> to_uname rd id ); PA.group = (match sattr.NA.sa_gid with | None -> ii.PA.usergroup.PA.group | Some id -> to_gname rd id ); } in { ii with PA.mode = (match sattr.NA.sa_mode with | None -> ii.PA.mode | Some m -> (Int64.to_int m) land 0o777 ); PA.usergroup = new_ug; PA.eof = (match sattr.NA.sa_size with | None -> ii.PA.eof | Some s -> if s < 0L then raise (Nfs_error `nfs3err_inval); s ); (* No way to change atime *) PA.mtime = (match sattr.NA.sa_mtime with | `dont_change -> ii.PA.mtime | `set_to_server_time -> { PA.tsecs = (-1L); tnsecs = 0 } | `set_to_client_time t -> of_nfstime t ); } (* setattr strangeness: if there is a pending write, the EOF value in ii is not yet updated. For this reason we fix all ii structs returned to the user by checking whether there is a pending write that might have changed EOF (in to_fattr3). If the user calls setattr, though, the ii struct is directly changed, but the pending write will still happen. This appears to the user as if the setattr had no effect (at least if the file is truncated). For this reason we flush here all pending writes before doing the setattr. *) let proc_setattr conf sess (fh, sattr, guard) emit = let rd = get_rundata conf in with_ii conf sess fh "setattr" (fun t inode ii creds -> check_setattr_permission rd creds sattr ii; ( match guard with | None -> () | Some t -> let ii_ctime = to_nfstime ii.PA.ctime in if t <> ii_ctime then raise(Nfs_error `nfs3err_not_sync) ); Plasma_client.flush_e rd.cluster inode 0L 0L (* see comment above *) ++ (fun () -> let ii' = apply_sattr rd ii sattr in Plasma_client.set_inodeinfo_e t inode ii' ++ (fun () -> ( if ii'.PA.eof < ii.PA.eof then Plasma_client.truncate_e t inode ii'.PA.eof else eps_e (`Done ()) rd.esys ) ++ (fun () -> Plasma_client.get_inodeinfo_e t inode) ++ (fun real_ii -> let r = `nfs3_ok { NA.setattr_obj_wcc_ok = { NA.before = Some (to_wcc_attr inode ii); after = Some (to_fattr3 rd inode real_ii) } } in eps_e (`Done r) rd.esys ) ) ) ) (fun plasma_err inode_ii_opt -> let fail = { NA.setattr_obj_wcc_fail = failed_wcc_data rd inode_ii_opt } in (to_nfserr_arg ~retry_on_conflict:true plasma_err fail :> NA.setattr3res) ) (fun nfs_err inode_ii_opt -> let fail = { NA.setattr_obj_wcc_fail = failed_wcc_data rd inode_ii_opt } in (add_arg nfs_err fail :> NA.setattr3res)) emit let asserted_lookup rd t dir_inode file_name = (* Looks up the inode and ii at this path. Any error is translated into [`Error Retry] (transaction will restart) *) Uq_engines.meta_engine (Plasma_client.dir_lookup_e t dir_inode file_name true ++ (fun file_inode -> Plasma_client.get_inodeinfo_e t file_inode ++ (fun file_ii -> eps_e (`Done (file_ii,file_inode)) rd.esys ) ) ) >> (fun st -> match st with | `Done r -> r | `Error _ -> `Error Retry | `Aborted -> `Aborted ) let proc_create conf sess (name_in_dir, how) emit = let rd = get_rundata conf in with_ii ~wr:true conf sess name_in_dir.NA.dirop_dir "create" (fun t dir_inode dir_ii creds -> let (user,group,_) = creds in let ug = { PA.user = user; group = group } in (* Check the validity of the new name *) let file_name = name_in_dir.NA.dirop_name in if (file_name = "" || String.contains file_name '/' || file_name = "." || file_name = "..") then raise(Nfs_error `nfs3err_inval); let file_ii = match how with | `unchecked sattr | `guarded sattr -> let ii = Plasma_client.regular_ii rd.cluster 0 in let ii' = { ii with PA.usergroup = ug } in check_setattr_permission rd creds sattr ii'; apply_sattr rd ii' sattr | `exclusive verf -> let ii = Plasma_client.regular_ii rd.cluster 0 in let ii' = { ii with PA.usergroup = ug } in { ii' with PA.create_verifier = of_create_verf verf } in Plasma_client.create_inode_e t file_ii ++ (fun file_inode -> Uq_engines.meta_engine (Plasma_client.link_at_e t dir_inode file_name file_inode) ++ (fun st -> (* We check here for EEXIST errors. *) match st with | `Done () -> (* Get the ii struct *) Plasma_client.get_inodeinfo_e t file_inode ++ (fun real_file_ii -> eps_e (`Done (real_file_ii, file_inode)) rd.esys ) | `Error (Plasma_client.Plasma_error `eexist) as exerr -> (* If [how=unchecked] we have to hide this error. For [how=guarded] we can pass this error back. For [how=exclusive] we have to check the identity of the file *) ( match how with | `unchecked attr -> (* We hide the error even if the file is a directory or symlink *) asserted_lookup rd t dir_inode file_name | `guarded attr -> eps_e exerr rd.esys | `exclusive verf -> asserted_lookup rd t dir_inode file_name ++ (fun (ii,inode) -> let ok = ii.PA.create_verifier = of_create_verf verf in if ok then eps_e (`Done(ii,inode)) rd.esys else eps_e exerr rd.esys ) ) | (`Error _ | `Aborted) as e -> eps_e (e :> _ Uq_engines.engine_state) rd.esys ) ++ (fun (real_file_ii,file_inode) -> let r = `nfs3_ok { NA.create_obj = Some(wrap_sess_fh sess rd file_inode); NA.create_obj_attributes = Some (to_fattr3 rd file_inode real_file_ii); NA.create_dir_wcc_ok = { NA.before = Some (to_wcc_attr dir_inode dir_ii); after = None; (* don't have this at hand *) } } in eps_e (`Done r) rd.esys ) ) ) (fun plasma_err inode_ii_opt -> let fail = { NA.create_dir_wcc_fail = failed_wcc_data rd inode_ii_opt } in (to_nfserr_arg ~retry_on_conflict:true plasma_err fail :> NA.create3res) ) (fun nfs_err inode_ii_opt -> let fail = { NA.create_dir_wcc_fail = failed_wcc_data rd inode_ii_opt } in (add_arg nfs_err fail :> NA.create3res)) emit let proc_mkdir conf sess (name_in_dir, sattr) emit = let rd = get_rundata conf in with_ii ~wr:true conf sess name_in_dir.NA.dirop_dir "mkdir" (fun t dir_inode dir_ii creds -> let (user,group,_) = creds in let ug = { PA.user = user; group = group } in (* Check the validity of the new name *) let file_name = name_in_dir.NA.dirop_name in if (file_name = "" || String.contains file_name '/' || file_name = "." || file_name = "..") then raise(Nfs_error `nfs3err_inval); let file_ii = Plasma_client.dir_ii rd.cluster 0o777 in let file_ii' = { file_ii with PA.usergroup = ug } in check_setattr_permission rd creds sattr file_ii'; let file_ii'' = apply_sattr rd file_ii' sattr in Plasma_client.create_inode_e t file_ii'' ++ (fun file_inode -> Plasma_client.link_at_e t dir_inode file_name file_inode ++ (fun () -> (* Get the ii struct *) Plasma_client.get_inodeinfo_e t file_inode ++ (fun real_file_ii -> eps_e (`Done (real_file_ii, file_inode)) rd.esys ) ) ++ (fun (real_file_ii,file_inode) -> let r = `nfs3_ok { NA.mkdir_obj = Some(wrap_sess_fh sess rd file_inode); NA.mkdir_obj_attributes = Some (to_fattr3 rd file_inode real_file_ii); NA.mkdir_dir_wcc_ok = { NA.before = Some (to_wcc_attr dir_inode dir_ii); after = None; (* don't have this at hand *) } } in eps_e (`Done r) rd.esys ) ) ) (fun plasma_err inode_ii_opt -> let fail = { NA.mkdir_dir_wcc_fail = failed_wcc_data rd inode_ii_opt } in (to_nfserr_arg ~retry_on_conflict:true plasma_err fail :> NA.mkdir3res) ) (fun nfs_err inode_ii_opt -> let fail = { NA.mkdir_dir_wcc_fail = failed_wcc_data rd inode_ii_opt } in (add_arg nfs_err fail :> NA.mkdir3res)) emit let proc_symlink conf sess (name_in_dir, data) emit = let rd = get_rundata conf in with_ii ~wr:true conf sess name_in_dir.NA.dirop_dir "symlink" (fun t dir_inode dir_ii creds -> let (user,group,_) = creds in let ug = { PA.user = user; group = group } in (* Check the validity of the new name *) let file_name = name_in_dir.NA.dirop_name in if (file_name = "" || String.contains file_name '/' || file_name = "." || file_name = "..") then raise(Nfs_error `nfs3err_inval); let target = data.NA.symlink_path in let sattr = data.NA.symlink_attributes in let file_ii = Plasma_client.symlink_ii rd.cluster target in let file_ii' = { file_ii with PA.usergroup = ug } in check_setattr_permission rd creds sattr file_ii'; let file_ii'' = apply_sattr rd file_ii' sattr in Plasma_client.create_inode_e t file_ii'' ++ (fun file_inode -> Plasma_client.link_at_e t dir_inode file_name file_inode ++ (fun () -> (* Get the ii struct *) Plasma_client.get_inodeinfo_e t file_inode ++ (fun real_file_ii -> eps_e (`Done (real_file_ii, file_inode)) rd.esys ) ) ++ (fun (real_file_ii,file_inode) -> let r = `nfs3_ok { NA.symlink_obj = Some(wrap_sess_fh sess rd file_inode); NA.symlink_obj_attributes = Some (to_fattr3 rd file_inode real_file_ii); NA.symlink_dir_wcc_ok = { NA.before = Some (to_wcc_attr dir_inode dir_ii); after = None; (* don't have this at hand *) } } in eps_e (`Done r) rd.esys ) ) ) (fun plasma_err inode_ii_opt -> let fail = { NA.symlink_dir_wcc_fail = failed_wcc_data rd inode_ii_opt } in (to_nfserr_arg ~retry_on_conflict:true plasma_err fail :> NA.symlink3res) ) (fun nfs_err inode_ii_opt -> let fail = { NA.symlink_dir_wcc_fail = failed_wcc_data rd inode_ii_opt } in (add_arg nfs_err fail :> NA.symlink3res)) emit let real_ownership_check rd sess ii = let (u,_,_) = get_rpc_identity rd sess in if u <> ii.PA.usergroup.PA.user then raise(Nfs_error `nfs3err_perm) let check_ownership_e ?(namelock=true) sess rd t dir_inode file_name do_it = (* This is used if the "t" bit is set in the file mode of the directory. Now, the file must be owner by the caller to continue. *) if do_it then ( ( if namelock then Plasma_client.namelock_e t dir_inode file_name else eps_e (`Done ()) rd.esys ) ++ (fun () -> Plasma_client.dir_lookup_e t dir_inode file_name true ) ++ (fun inode -> Plasma_client.get_inodeinfo_e t inode ) ++ (fun ii -> real_ownership_check rd sess ii; eps_e (`Done ()) rd.esys ) ) else eps_e (`Done ()) rd.esys let proc_remove conf sess name_in_dir emit = (* We also allow the removal of directories (the NFS spec supports this) *) let rd = get_rundata conf in with_ii ~wr:true conf sess name_in_dir.NA.dirop_dir "remove" (fun t dir_inode dir_ii creds -> (* Check the validity of the name *) let file_name = name_in_dir.NA.dirop_name in if (file_name = "" || String.contains file_name '/' || file_name = ".") then raise(Nfs_error `nfs3err_inval); if (file_name = "..") then raise(Nfs_error `nfs3err_exist); (* mentioned in the spec *) check_ownership_e sess rd t dir_inode file_name ((dir_ii.PA.mode land 0o1000) <> 0) ++ (fun () -> Plasma_client.unlink_at_e t dir_inode file_name) ++ (fun () -> let r = `nfs3_ok { NA.remove_dir_wcc_ok = { NA.before = Some (to_wcc_attr dir_inode dir_ii); after = None; (* don't have this at hand *) } } in eps_e (`Done r) rd.esys ) ) (fun plasma_err inode_ii_opt -> let fail = { NA.remove_dir_wcc_fail = failed_wcc_data rd inode_ii_opt } in (to_nfserr_arg ~retry_on_conflict:true plasma_err fail :> NA.remove3res) ) (fun nfs_err inode_ii_opt -> let fail = { NA.remove_dir_wcc_fail = failed_wcc_data rd inode_ii_opt } in (add_arg nfs_err fail :> NA.remove3res)) emit let proc_rmdir conf sess name_in_dir emit = let rd = get_rundata conf in with_ii ~wr:true conf sess name_in_dir.NA.dirop_dir "rmdir" (fun t dir_inode dir_ii creds -> (* Check the validity of the name *) let file_name = name_in_dir.NA.dirop_name in if (file_name = "" || String.contains file_name '/' || file_name = ".") then raise(Nfs_error `nfs3err_inval); if (file_name = "..") then raise(Nfs_error `nfs3err_exist); (* mentioned in the spec *) Plasma_client.namelock_e t dir_inode file_name ++ (fun () -> Plasma_client.dir_lookup_e t dir_inode file_name true) ++ (fun file_inode -> Plasma_client.get_inodeinfo_e t file_inode ) ++ (fun ii -> if ii.PA.filetype = `ftype_directory then ( if (dir_ii.PA.mode land 0o1000) <> 0 then real_ownership_check rd sess ii; Plasma_client.unlink_at_e t dir_inode file_name ++ (fun () -> let r = `nfs3_ok { NA.rmdir_dir_wcc_ok = { NA.before = Some (to_wcc_attr dir_inode dir_ii); after = None; (* don't have this at hand *) } } in eps_e (`Done r) rd.esys ) ) else eps_e (`Error (Nfs_error `nfs3err_notdir)) rd.esys ) ) (fun plasma_err inode_ii_opt -> let fail = { NA.rmdir_dir_wcc_fail = failed_wcc_data rd inode_ii_opt } in (to_nfserr_arg ~retry_on_conflict:true plasma_err fail :> NA.rmdir3res) ) (fun nfs_err inode_ii_opt -> let fail = { NA.rmdir_dir_wcc_fail = failed_wcc_data rd inode_ii_opt } in (add_arg nfs_err fail :> NA.rmdir3res)) emit let rename_success() = `nfs3_ok { NA.rename_fromdir_wcc_ok = { NA.before = None; after = None }; NA.rename_todir_wcc_ok = { NA.before = None; after = None }; } let do_unlink_then_rename_e rd t dir1_inode file_name1 dir2_inode file_name2 = Uq_engines.meta_engine (Plasma_client.unlink_at_e t dir2_inode file_name2) ++ (fun st -> match st with | `Done () -> Plasma_client.rename_at_e t dir1_inode file_name1 dir2_inode file_name2 ++ (fun () -> eps_e (`Done (rename_success())) rd.esys ) | `Error (Plasma_client.Plasma_error `enotempty) -> eps_e (`Error (Nfs_error `nfs3err_exist)) rd.esys | `Error e -> eps_e (`Error e) rd.esys | `Aborted -> eps_e `Aborted rd.esys ) let do_rename_e rd t dir1_inode file_name1 dir2_inode file_name2 = (* Speculatively try the rename. This may run into EEXIST, though. *) Uq_engines.meta_engine (Plasma_client.rename_at_e t dir1_inode file_name1 dir2_inode file_name2) ++ (fun st -> (* It is now possible that we got EEXIST. NFS, however, allows to rename into an existing file in some circumstances. *) match st with | `Done () -> (* Success *) eps_e (`Done (rename_success())) rd.esys | `Error (Plasma_client.Plasma_error `eexist) -> (* the EEXIST case. This is very complicated. First get a namelock on the destination which is supposed to exist now. *) Uq_engines.meta_engine (Plasma_client.namelock_e t dir2_inode file_name2) ++ (fun st -> match st with | `Done () -> (* Now check whether it would be allowed to do unlink the destination and then try the rename again *) Plasma_client.dir_lookup_e t dir1_inode file_name1 true ++ (fun inode1 -> Plasma_client.get_inodeinfo_e t inode1) ++ (fun ii1 -> Plasma_client.dir_lookup_e t dir2_inode file_name2 true ++ (fun inode2 -> Plasma_client.get_inodeinfo_e t inode2) ++ (fun ii2 -> let both_non_dir = ii1.PA.filetype <> `ftype_directory && ii2.PA.filetype <> `ftype_directory in let both_dir = ii1.PA.filetype = `ftype_directory && ii2.PA.filetype = `ftype_directory in if both_non_dir || both_dir then do_unlink_then_rename_e rd t dir1_inode file_name1 dir2_inode file_name2 else eps_e (`Error (Nfs_error `nfs3err_exist)) rd.esys ) ) | `Error (Plasma_client.Plasma_error `enoent) -> (* The only explanation is that another transaction just finished an unlink. We report this as conflict. *) eps_e (`Error Retry) rd.esys | `Error e -> eps_e (`Error e) rd.esys | `Aborted -> eps_e `Aborted rd.esys ) | `Error e -> eps_e (`Error e) rd.esys | `Aborted -> eps_e `Aborted rd.esys ) let proc_rename conf sess (name1_in_dir,name2_in_dir) emit = let rd = get_rundata conf in with_ii ~wr:true conf sess name1_in_dir.NA.dirop_dir "rename" (fun t dir1_inode dir1_ii creds -> (* Check the validity of the first name *) let file_name1 = name1_in_dir.NA.dirop_name in if (file_name1 = "" || String.contains file_name1 '/' || file_name1 = ".") then raise(Nfs_error `nfs3err_inval); (* Check the validity of the second name *) let file_name2 = name2_in_dir.NA.dirop_name in if (file_name2 = "" || String.contains file_name2 '/') then raise(Nfs_error `nfs3err_inval); let dir2_inode = unwrap_sess_fh sess rd name2_in_dir.NA.dirop_dir in Plasma_client.get_inodeinfo_e t dir2_inode ++ (fun dir2_ii -> if not(have_write_permission creds dir2_ii) then raise(Nfs_error `nfs3err_acces); (* Get a lock for the source name. This is required because the following is non-atomic and we want to prevent that a competing writer replaces the source in the meantime *) Plasma_client.namelock_e t dir1_inode file_name1 ++ (fun () -> check_ownership_e ~namelock:false sess rd t dir1_inode file_name1 ((dir1_ii.PA.mode land 0o1000) <> 0)) ++ (fun () -> do_rename_e rd t dir1_inode file_name1 dir2_inode file_name2 ) ) ) (fun plasma_err inode_ii_opt -> let fail = { NA.rename_fromdir_wcc_fail = failed_wcc_data rd inode_ii_opt; NA.rename_todir_wcc_fail = { NA.before = None; after = None }; } in (to_nfserr_arg ~retry_on_conflict:true plasma_err fail :> NA.rename3res) ) (fun nfs_err inode_ii_opt -> let fail = { NA.rename_fromdir_wcc_fail = failed_wcc_data rd inode_ii_opt; NA.rename_todir_wcc_fail = { NA.before = None; after = None }; } in (add_arg nfs_err fail :> NA.rename3res)) emit let proc_link conf sess (fh, name_in_dir) emit = let rd = get_rundata conf in with_ii ~wr:true conf sess name_in_dir.NA.dirop_dir "link" (fun t dir_inode dir_ii creds -> (* Check the validity of the name *) let file_name = name_in_dir.NA.dirop_name in if (file_name = "" || String.contains file_name '/') then raise(Nfs_error `nfs3err_inval); if (file_name = "." || file_name = "..") then raise(Nfs_error `nfs3err_exist); let file_inode = unwrap_sess_fh sess rd fh in Plasma_client.link_at_e t dir_inode file_name file_inode ++ (fun () -> let r = `nfs3_ok { NA.link_file_attributes_ok = None; linkdir_wcc_ok = { NA.before = Some (to_wcc_attr dir_inode dir_ii); after = None; (* don't have this at hand *) } } in eps_e (`Done r) rd.esys ) ) (fun plasma_err inode_ii_opt -> let fail = { NA.link_file_attributes_fail = None; NA.linkdir_wcc_fail = failed_wcc_data rd inode_ii_opt } in (to_nfserr_arg ~retry_on_conflict:true plasma_err fail :> NA.link3res) ) (fun nfs_err inode_ii_opt -> let fail = { NA.link_file_attributes_fail = None; NA.linkdir_wcc_fail = failed_wcc_data rd inode_ii_opt } in (add_arg nfs_err fail :> NA.link3res)) emit let proc_write conf sess (fh, offset, count, stable, data) emit = let rd = get_rundata conf in try let creds = get_rpc_identity rd sess in let inode = unwrap_sess_fh sess rd fh in (* offset and count are specified as unsigned int64 in the NFS protocol. Here, they are passed as signed int64, i.e. negative values are large ones. If offset is negative, we return EFBIG. *) if offset < 0L then raise (Nfs_error `nfs3err_fbig); if count < 0L || count > Int64.of_int data#length then raise (Nfs_error `nfs3err_inval); let count = Int64.to_int count in let full_name = sprintf "write(%Ld,%Ld,%d,%s%s)" inode offset count (match stable with | `unstable -> "unstable" | `data_sync -> "data_sync" | `file_sync -> "file_sync" ) (if Int64.rem offset (Int64.of_int rd.blocksize) <> 0L then ",misaligned" else "" ) in (* Plasma_client.write does not set ii.eof and ii.mtime to the new values. We cope with that by querying for the designated new values and fixing the attributes before they are returned to the user (in to_fattr3). *) let return_e n committed = (* Get the cached ii value for the "after" wcc struct *) Plasma_client.get_cached_inodeinfo_e rd.cluster inode false ++ (fun ii -> dlogf "%s: after_get_ii" full_name; let r = `nfs3_ok { NA.write_file_wcc_ok = { NA.before = None; after = Some (to_fattr3 rd inode ii); }; write_count_ok = Int64.of_int n; write_committed = committed; write_verf = rd.writeverf; } in eps_e (`Done r) rd.esys ) in (* We need not to care about ECONFLICT - none of these Plasma_client functions will return it *) Plasma_util.rpc_srv_reply full_name sess emit (no_transaction_e rd full_name (fun esys -> dlogf "%s: before_write" full_name; let s, s_offs = data#as_string in let rec wr_loop_e k = if k < count then let kL = Int64.of_int k in Plasma_client.write_e rd.cluster inode (Int64.add offset kL) (`String s) (s_offs+k) (count-k) ++ (fun n -> wr_loop_e (k+n) ) else ( dlogf "%s: after_write" full_name; match stable with | `unstable -> return_e count `unstable | `data_sync | `file_sync -> Plasma_client.flush_e rd.cluster inode offset (Int64.of_int count) ++ (fun () -> dlogf "%s: after_flush" full_name; return_e count `file_sync) ) in Plasma_client.get_cached_inodeinfo_e rd.cluster inode false ++ (fun ii -> if not(have_write_permission creds ii) then raise(Nfs_error `nfs3err_acces); (* Note that ii may be slightly out of date. We do not consider this as a problem, because updates become immediately visible that are done via NFS. Updates from other clients can be invisible for a short period of time, though. *) wr_loop_e 0 ) ) ) with | Nfs_error e -> emit (add_arg e { NA.write_file_wcc_fail = failed_wcc_data rd None } :> NA.write3res) let proc_commit conf sess (fh, offset, count) emit = let rd = get_rundata conf in try let inode = unwrap_sess_fh sess rd fh in let _creds = get_rpc_identity rd sess in (* For simplicity we ignore offset and count (this is not yet implemented in Plasma_client anyway) Also, ignore file permissions. *) let return_e () = (* Get the cached ii value for the "after" wcc struct *) Plasma_client.get_cached_inodeinfo_e rd.cluster inode false ++ (fun ii -> let r = `nfs3_ok { NA.commit_file_wcc_ok = { NA.before = None; after = Some (to_fattr3 rd inode ii); }; commit_verf = rd.writeverf; } in eps_e (`Done r) rd.esys ) in (* We need not to care about ECONFLICT - none of these Plasma_client functions will return it *) Plasma_util.rpc_srv_reply "commit" sess emit (no_transaction_e rd "commit" (fun esys -> Plasma_client.flush_e rd.cluster inode 0L 0L ++ (fun () -> return_e () ) ) ) with | Nfs_error e -> emit (add_arg e { NA.commit_file_wcc_fail = failed_wcc_data rd None } :> NA.commit3res) let proc_mknod conf sess (name_in_dir, data) emit = let fail = { NA.mknod_dir_wcc_fail = { NA.before = None; after = None } } in emit (`nfs3err_notsupp fail) let setup_nfs srv conf = Nfs3_srv.NFS.V3.bind_async ~proc_null:(fun sess () emit -> emit()) ~proc_lookup:(proc_lookup conf) ~proc_getattr:(proc_getattr conf) ~proc_access:(proc_access conf) ~proc_readlink:(proc_readlink conf) ~proc_read:(proc_read conf) ~proc_readdir:(proc_readdir conf) ~proc_readdirplus:(proc_readdirplus conf) ~proc_fsstat:(proc_fsstat conf) ~proc_fsinfo:(proc_fsinfo conf) ~proc_pathconf:(proc_pathconf conf) ~proc_setattr:(proc_setattr conf) ~proc_write:(proc_write conf) ~proc_create:(proc_create conf) ~proc_mkdir:(proc_mkdir conf) ~proc_symlink:(proc_symlink conf) ~proc_remove:(proc_remove conf) ~proc_rmdir:(proc_rmdir conf) ~proc_rename:(proc_rename conf) ~proc_link:(proc_link conf) ~proc_commit:(proc_commit conf) ~proc_mknod:(proc_mknod conf) srv; let m_none = Rpc_server.auth_none in let m_sys = Rpc_auth_sys.server_auth_method ~lookup_hostname:false ~require_privileged_port:true () in Rpc_server.set_auth_methods srv [m_none;m_sys] (* MOUNT *) let proc_mnt conf sess path emit = let rd = get_rundata conf in let saddr = peer sess in let r = if path <> "/" ^ conf.clustername then `mnt3err_noent else ( let secret = try Hashtbl.find rd.clients saddr with | Not_found -> let secret = Plasma_rng.random_bytes 8 in Hashtbl.add rd.clients saddr secret; write_mount_table rd; secret in let fh = wrap_fh secret rd.rootfh in `mnt3_ok { NA.fhandle = fh; auth_flavors = [| 1; (* AUTH_SYS *) |] } ) in emit r let name_of_saddr saddr = match saddr with | Unix.ADDR_INET(ip,_) -> Unix.string_of_inet_addr ip | Unix.ADDR_UNIX _ -> "localhost" let proc_dump conf sess () emit = let rd = get_rundata conf in let l = Hashtbl.fold (fun saddr _ acc -> Some { NA.ml_hostname = name_of_saddr saddr; ml_directory = "/" ^ conf.clustername; ml_next = acc } ) rd.clients None in emit l let proc_umnt conf sess path emit = let rd = get_rundata conf in if path = "/" ^ conf.clustername then ( let saddr = peer sess in Hashtbl.remove rd.clients saddr; Hashtbl.remove rd.dircache saddr; write_mount_table rd; ); emit () let proc_umntall conf sess () emit = let rd = get_rundata conf in let saddr = peer sess in Hashtbl.remove rd.clients saddr; Hashtbl.remove rd.dircache saddr; write_mount_table rd; emit () let proc_export conf sess () emit = emit (Some { NA.ex_dir = "/" ^ conf.clustername; ex_groups = None; ex_next = None }) let setup_mount srv conf = Nfs3_srv.MOUNT.V3.bind_async ~proc_null:(fun _ _ emit -> emit()) ~proc_mnt:(proc_mnt conf) ~proc_dump:(proc_dump conf) ~proc_umnt:(proc_umnt conf) ~proc_umntall:(proc_umntall conf) ~proc_export:(proc_export conf) srv; let m_none = Rpc_server.auth_none in let m_sys = Rpc_auth_sys.server_auth_method ~lookup_hostname:false ~require_privileged_port:true () in Rpc_server.set_auth_methods srv [m_none;m_sys] let parse_global_conf cf = let namenodes = cf#resolve_section cf#root_addr "namenodes" in match namenodes with | [] -> failwith "missing 'namenodes' config" | _ :: _ :: _ -> failwith "more than one 'namenodes' config" | [nn] -> let addrs = Plasma_util.node_list cf nn in if addrs = [] then failwith "nfs config: no namenodes"; let nnlist = List.map (fun addr -> try Plasma_util.parse_host_port addr with _ -> failwith ("bad 'addr' parameter in 'namenodes.node': " ^ addr) ) addrs in let clustername = try let p = cf#resolve_parameter nn "clustername" in cf#string_param p with | Not_found -> failwith "missing 'namenodes.clustername' parameter" in let buffer_memory = try let p = cf#resolve_parameter nn "buffer_memory" in cf#int_param p with | Not_found -> 128 * 1024 * 1024 (* 128 M *) in let cauth = Pfs_auth.extract_client_config cf cf#root_addr in { clustername = clustername; namenodes = nnlist; buffer_memory = buffer_memory; cauth; } let global_conf = ref None let get_global_conf cf = match !global_conf with | None -> let c = parse_global_conf cf in global_conf := Some c; c | Some c -> c let nfs3_factory() = Rpc_netplex.rpc_factory ~configure:(fun cf cfaddr -> get_global_conf cf ) ~hooks:(fun conf -> ( object inherit Netplex_kit.empty_processor_hooks() method post_start_hook _ = ignore(wait_for_rundata_failsafe conf) end ) ) ~setup:(fun srv conf -> setup_nfs srv conf ) ~name:"nfs3" () let mount3_factory() = Rpc_netplex.rpc_factory ~configure:(fun cf cfaddr -> get_global_conf cf ) ~hooks:(fun conf -> ( object inherit Netplex_kit.empty_processor_hooks() end ) ) ~setup:(fun srv conf -> setup_mount srv conf ) ~name:"mount3" () let factory () = new Netplex_kit.protocol_switch_factory "nfs" [ "nfs3", nfs3_factory(); "mount3", mount3_factory() ]