(* Copyright 2010 Gerd Stolpmann This file is part of Plasma, a distributed filesystem and a map/reduce computation framework. Unless you have a written license agreement with the copyright holder (Gerd Stolpmann), the following terms apply: Plasma is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. Plasma is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with Foobar. If not, see <http://www.gnu.org/licenses/>. *) (* $Id: dn_manager.ml 456 2011-10-11 13:26:23Z gerd $ *) (** The manager accepts user connections, and performs user requests. There is only one process running the manager. It is fully programmed in an asynchronous style, so it is always responsive. All real I/O is done by helper processes that implement the [Datanode_io] program. Communication with this program is sped up by using an shm object (instead of including the data to read/write into the RPC calls). There are usually a number of these helper processes, and a TCP connection to each of these. This TCP connection is used for RPC calls that trigger I/O operations. As mentioned, the payload data is not sent as part of the normal RPC serialization of request or response data, but instead it is put into the shm object, and the RPC calls only point to the shm object. *) open Printf let dlogf = Plasma_util.dlogf let dlogr = Plasma_util.dlogr module Rpcapi_clnt = Pfs_rpcapi_clnt module Rpcapi_srv = Pfs_rpcapi_srv module Rpcapi_aux = Pfs_rpcapi_aux module A = Pfs_rpcapi_aux module Dio_proxy = Rpcapi_clnt.Make'Datanode_io(Rpc_proxy.ManagedClient) type waiting_request = [ `R of int -> unit | `W of int -> unit ] (** The function is called when the event happened: - [`R f]: A slot is free for a read request. [f] is called with the slow index as arg. - [`W f]: A slot is free for a write request. [f] is called with the slow index as arg. *) (* Statistics: *) type stats = { mutable cycle : int; (* statistics cycle number *) mutable r_count : int; (* number of finished read requests *) mutable r_lat : float; (* cumulated latencies of read requests *) mutable r_peak: float; (* peak latency of read requests *) mutable w_count : int; (* number of finished write requests *) mutable w_lat : float; (* cumulated latencies of write requests *) mutable w_peak: float; (* peak latency of write requests *) mutable c_count : int; (* number of finished copy requests *) mutable c_lat : float; (* cumulated latencies of copy requests *) mutable c_peak: float; (* peak latency of copy requests *) mutable t_t0 : float; (* time when throttling started *) mutable t_t0_r : float; (* time when throttling started (w/ reset) *) mutable t_count : int; (* number of throttle events *) mutable t_lat : float; (* cumulated latency of throttling *) mutable t_peak : float; (* peak latency of throttling *) mutable q_peak : int; (* peak length of the wrequests queue *) mutable req_count : (Rpc_server.connection_id, int) Hashtbl.t; (* number of pending requests per connection *) mutable req_t0 : float; (* time when req_count changed *) mutable bconn_acc : float; (* cumulated n * t where n is the number of busy connections (with pending requests) and t is the period of time where n has been valid *) } type manager = { shm : Dn_shm.dn_shm; (** The shm object *) shm_name : string; (** The name of the shm object *) slot_used : bool array; (** for each shm slot this array says whether it is used (true) or free (false) *) wrequests : waiting_request Queue.t; (** The queue of waiting requests *) drequests : (unit -> unit) Queue.t; (** Delayed requests because of slow flushing *) io_mset : Rpc_proxy.ManagedSet.mset; (** Clients to the I/O processes *) safetrans : (int64, int64*int64) Hashtbl.t; (** maps sf_id to (st_tmo,st_secret) *) mutable last_sync : int64; (** A number that is increased by 1 whenever a sync is completed *) mutable syncing : bool; (** Whether a sync is in progress *) mutable slow_syncing : bool; (** Whether sync cannot be done in the configured period, and write requests are throttled *) wsync : (bool->unit) Queue.t; (** Functions waiting on the compeletion of a sync. The bool argument says whether the sync was successful. *) user_shm : (Rpc_server.connection_id, string list) Hashtbl.t; (** Manages user-requested shm objects. The table maps TCP connection IDs to the path names of the shm objects. *) mutable udsocket : string option; (** name of a Unix Domain socket *) store : Dn_store.dn_store; (** The datastore (file with blocks). We keep this is here only for calling {!Dn_store.will_read_block} *) mutable discover_server : Rpc_server.t option; stats : stats; (** statistics *) } module Manager_var = Netplex_cenv.Make_var_type(struct type t = manager end) module Int64_var = Netplex_cenv.Make_var_type(struct type t = int64 end) module String_var = Netplex_cenv.Make_var_type(struct type t = string end) let current_manager() = Manager_var.get "current_manager" let verify m block rd_perm wr_perm ticket = (** Verify that [ticket] contains the right verifier for accesses to [block]. Returns [true] if positive, [false] if negative. *) let open Pfs_rpcapi_aux in try dlogr (fun () -> sprintf "ticket id=%Ld tmo=%Ld vfy=%Ld rd=%B wr=%B range=%Ld+%Ld" ticket.safetrans_id ticket.safetrans_tmo ticket.safetrans_vfy ticket.read_perm ticket.write_perm ticket.range_start ticket.range_length); let (st_tmo, st_secret) = Hashtbl.find m.safetrans ticket.safetrans_id in let t = Unix.gettimeofday() in Int64.to_float st_tmo >= t && ( block >= ticket.range_start && Int64.sub block ticket.range_start < ticket.range_length && (not rd_perm || ticket.read_perm) && (not wr_perm || ticket.write_perm) && let v = Plasma_util.compute_verifier ticket.safetrans_id st_secret ticket.range_start ticket.range_length ticket.read_perm ticket.write_perm in ( dlogr (fun () -> sprintf "compute_verifier id=%Ld sec=%Ld range=%Ld+%Ld rd=%B wr=%B v=%Ld" ticket.safetrans_id st_secret ticket.range_start ticket.range_length ticket.read_perm ticket.write_perm v); v = ticket.safetrans_vfy ) ) with | Not_found -> false let check_length m = let n = Queue.length m.wrequests in m.stats.q_peak <- max m.stats.q_peak n let find_slots_and_process m = (** Run over the [wrequests] queue, and find slots for the requests. If a slot [s] is found for request function [f], the function is called as [f s] (minimally delayed via event system). Processing stops when [wrequests] becomes empty, or when there are no more free slots. *) let cont = Netplex_cenv.self_cont() in let esys = cont#event_system in let delayed = Queue.create() in let is_write = function | `R _ -> false | `W _ -> true in let rec find() = let req = Queue.take m.wrequests in if is_write req && m.slow_syncing then ( Queue.add req delayed; find() ) else match req with | `R on_slot -> on_slot | `W on_slot -> on_slot in ( try (* Flow control: if we are syncing slowly do not accept further write requests. This would make everything worse. We have to ensure to call this function again when the sync is done. *) for k = 0 to Array.length m.slot_used-1 do if not m.slot_used.(k) then ( let on_slot = find() in (* or Queue.Empty *) m.slot_used.(k) <- true; (** Run [onslot k] by pushing this onto the event queue. We do not run this immediately because the event queue has some reasonable exception handling, so we do not have to do this here. *) let g = Unixqueue.new_group esys in Unixqueue.once esys g 0.0 (fun () -> on_slot k) ) done with | Queue.Empty -> () ); (* Also run all drequests if possible *) if not m.slow_syncing then ( Queue.iter (fun f -> let g = Unixqueue.new_group esys in Unixqueue.once esys g 0.0 f ) m.drequests; Queue.clear m.drequests ); Queue.transfer delayed m.wrequests let release_slot m slot = (** Release slot [slot], and run again over the [wrequests] queue to check whether there is an element in the queue that can be processed because of the free slot. *) m.slot_used.(slot) <- false; find_slots_and_process m let sync_notify m ok = (** Run the registered functions [wsync] that wait on a sync. The functions are removed from the [wsync] queue. [ok] is [true] when the sync was successful. *) let cont = Netplex_cenv.self_cont() in let esys = cont#event_system in let q = Queue.create() in Queue.transfer m.wsync q; Queue.iter (fun f -> (** Run [f ok] by pushing this onto the event queue. We do not run this immediately because the event queue has some reasonable exception handling, so we do not have to do this here. *) let g = Unixqueue.new_group esys in Unixqueue.once esys g 0.0 (fun () -> f ok) ) q let sync_cycle m = (** This function is called every n seconds to start a sync. It performs the sync, increases [last_sync], and notifies the function waiting on sync completion. *) if m.syncing then ( if not m.slow_syncing then ( let t0 = Unix.gettimeofday() in m.stats.t_t0 <- t0; m.stats.t_t0_r <- t0; m.slow_syncing <- true ) ) else if true (* not (Queue.is_empty m.wsync) *) then ( try let (mc,_) = Rpc_proxy.ManagedSet.mset_pick m.io_mset in Dio_proxy.V1.sync'async mc () (fun get_result -> if m.slow_syncing then ( m.stats.t_count <- m.stats.t_count + 1; let t1 = Unix.gettimeofday() in let lat1 = t1 -. m.stats.t_t0_r in m.stats.t_lat <- m.stats.t_lat +. lat1; let lat2 = t1 -. m.stats.t_t0 in m.stats.t_peak <- max m.stats.t_peak lat2 ); m.syncing <- false; m.slow_syncing <- false; let ok = try get_result(); true with error -> Netlog.logf `Err "proc_sync exception (2): %s" (Netexn.to_string error); false in if ok then m.last_sync <- Int64.succ m.last_sync; find_slots_and_process m; sync_notify m ok ); m.syncing <- true with error -> Netlog.logf `Err "proc_sync exception (1): %s" (Netexn.to_string error); m.syncing <- false ) let ignore_lost_connection emit x = (** Call the [emit] function and ignore [Connection_lost] exceptions. This is required when [emit] is invoked after some delay, and not immediately after receiving the request *) try emit x with Rpc_server.Connection_lost -> () let count_request m sess delta = let l_old = Hashtbl.length m.stats.req_count in let conn_id = Rpc_server.get_connection_id sess in let n_old = try Hashtbl.find m.stats.req_count conn_id with Not_found -> 0 in let n_new = n_old + delta in if n_new = 0 then Hashtbl.remove m.stats.req_count conn_id else Hashtbl.replace m.stats.req_count conn_id n_new; let l_new = Hashtbl.length m.stats.req_count in if l_old <> l_new then ( let t0 = Unix.gettimeofday() in m.stats.bconn_acc <- m.stats.bconn_acc +. (float l_old) *. (t0 -. m.stats.req_t0); m.stats.req_t0 <- t0 ) let proc_read conf sess (req, block, pos, len, ticket) emit = (** Implementation of the [read] RPC. When we are done, we have to call [emit]. *) let m = current_manager() in (** First check whether [ticket] permits us to do the read: *) if not (verify m block true false ticket) then ( Netlog.logf `Err "Got unauthorized read request for block %Ld (ticket %Ld)" block ticket.Rpcapi_aux.safetrans_id; ignore_lost_connection (Rpc_server.reply_error sess) Rpc.System_err ) else ( (** Define a callback function that will be invoked when a slot in the shm object is free: *) match req with | `dnch_rpc -> let on_slot slot = let (mem,mpos) = Dn_shm.slot m.shm slot in try let t0 = Unix.gettimeofday() in (** Fadvise that we will need the block. *) Dn_store.will_read_block m.store block; (** Pick now an RPC client and call the [Datanode_io] program to start the read. *) let (mc,_) = Rpc_proxy.ManagedSet.mset_pick m.io_mset in Dio_proxy.V1.read'async mc (slot,block,pos,len) (fun get_result -> (** This function is called back when the read is done. The data is now in the shm object and can be forwarded from there *) let t1 = Unix.gettimeofday() in count_request m sess (-1); let ok = try get_result(); true with error -> Netlog.logf `Err "proc_read exception (2): %s" (Netexn.to_string error); false in if ok then ( (** Extract the data from the shm object and return to caller *) let data = String.create len in Netsys_mem.blit_memory_to_string mem mpos data 0 len; ignore_lost_connection emit (`dnch_rpc data) ) else ignore_lost_connection (Rpc_server.reply_error sess) Rpc.System_err; release_slot m slot; m.stats.r_count <- m.stats.r_count + 1; let lat = t1 -. t0 in m.stats.r_lat <- m.stats.r_lat +. lat; m.stats.r_peak <- max m.stats.r_peak lat; ); count_request m sess 1; with error -> Netlog.logf `Err "proc_read exception (1): %s" (Netexn.to_string error); ignore_lost_connection (Rpc_server.reply_error sess) Rpc.System_err; release_slot m slot; in (** Push [on_slot] onto the queue of waiting requests, and check immediately whether there is a free slot. *) Queue.push (`R on_slot) m.wrequests; check_length m; find_slots_and_process m | `dnch_shm shm_obj -> let t0 = Unix.gettimeofday() in let (mc,_) = Rpc_proxy.ManagedSet.mset_pick m.io_mset in Dio_proxy.V1.read_shm'async mc (shm_obj,block,pos,len) (fun get_result -> let t1 = Unix.gettimeofday() in count_request m sess (-1); let ok = try get_result(); true with error -> Netlog.logf `Err "proc_read exception (3): %s" (Netexn.to_string error); false in if ok then ( ignore_lost_connection emit `dnch_shm ) else ignore_lost_connection (Rpc_server.reply_error sess) Rpc.System_err; m.stats.r_count <- m.stats.r_count + 1; let lat = t1 -. t0 in m.stats.r_lat <- m.stats.r_lat +. lat; m.stats.r_peak <- max m.stats.r_peak lat; ); count_request m sess 1 ) let dump_load m prefix = let la = Rpc_proxy.ManagedSet.mset_load m.io_mset in dlogf "%s: mset_load=%s" prefix (String.concat "," (List.map string_of_int (Array.to_list la))) let call_write rpc_name conf sess (block,ticket) init_shm emit = (** Implementation of the [write] and [zero] RPCs. When we are done, we have to call [emit]. *) let m = current_manager() in (** First check whether [st_vfy] permits us to do the write: *) if not (verify m block false true ticket) then ( Netlog.logf `Err "Got unauthorized write request for block %Ld (ticket %Ld)" block ticket.Rpcapi_aux.safetrans_id; ignore_lost_connection (Rpc_server.reply_error sess) Rpc.System_err ) else ( (** Define a callback function that will be invoked when a slot in the shm object is free: *) let on_slot slot = (** Fill the slot with data, and the call [write] of the [Datanode_io] program to start writing *) let t0 = Unix.gettimeofday() in let (mem,mpos) = Dn_shm.slot m.shm slot in try init_shm mem mpos; (* dump_load m "before write"; *) let (mc,_) = Rpc_proxy.ManagedSet.mset_pick m.io_mset in Dio_proxy.V1.write'async mc (slot,block) (fun get_result -> (** This function is called back when the write is done. *) let t1 = Unix.gettimeofday() in count_request m sess (-1); let ok = try get_result(); true with error -> Netlog.logf `Err "%s exception (2): %s" rpc_name (Netexn.to_string error); false in if ok then ignore_lost_connection emit() else ignore_lost_connection (Rpc_server.reply_error sess) Rpc.System_err; release_slot m slot; m.stats.w_count <- m.stats.w_count + 1; let lat = t1 -. t0 in m.stats.w_lat <- m.stats.w_lat +. lat; m.stats.w_peak <- max m.stats.w_peak lat; ); count_request m sess 1 (* dump_load m "after write"; *) with error -> Netlog.logf `Err "%s exception (1): %s" rpc_name (Netexn.to_string error); ignore_lost_connection (Rpc_server.reply_error sess) Rpc.System_err; release_slot m slot in (** Push [on_slot] onto the queue of waiting requests, and check immediately whether there is a free slot. *) Queue.push (`W on_slot) m.wrequests; check_length m; find_slots_and_process m ) let proc_write conf sess (block,wr_req,ticket) emit = match wr_req with | `dnch_rpc data -> let init_shm mem mpos = data # blit_to_memory 0 mem mpos conf#dn_blocksize in call_write "proc_write" conf sess (block,ticket) init_shm emit | `dnch_shm shm_obj -> let t0 = Unix.gettimeofday() in let m = current_manager() in let do_write() = let (mc,_) = Rpc_proxy.ManagedSet.mset_pick m.io_mset in Dio_proxy.V1.write_shm'async mc (shm_obj,block) (fun get_result -> let t1 = Unix.gettimeofday() in count_request m sess (-1); let ok = try get_result(); true with error -> Netlog.logf `Err "%s exception (3): %s" "proc_write" (Netexn.to_string error); false in if ok then ignore_lost_connection emit() else ignore_lost_connection (Rpc_server.reply_error sess) Rpc.System_err; m.stats.w_count <- m.stats.w_count + 1; let lat = t1 -. t0 in m.stats.w_lat <- m.stats.w_lat +. lat; m.stats.w_peak <- max m.stats.w_peak lat; ); count_request m sess 1 in if m.slow_syncing then (* Do the write later *) Queue.add do_write m.drequests else do_write() let proc_zero conf sess (block,ticket) emit = let init_shm mem mpos = Bigarray.Array1.fill (Bigarray.Array1.sub mem mpos conf#dn_blocksize) '\000' in call_write "proc_zero" conf sess (block,ticket) init_shm emit let proc_copy conf sess (block,dest_node,dest_ident,dest_block, ticket,dest_ticket) emit = (** Implementation of the [copy] RPC. This is more or less directly forwarded to [Dn_io]. The implicit [read] and [write] operations must be done by [Dn_io]. There is, however, another implementation option for the network write in the remote copy case: The [read] could also put the data block into shm, and the network write is done here. That would allow us to better control the number of network connections to remote systems. (Of course, we could also simply _count_ the number of remote copy invocations here, and limit only that.) *) (* FIXME: local copies are not throttled! *) (* TODO: we ignore right now ticket. Do something. *) let m = current_manager() in let on_slot slot = (** Call [copy] of the [Datanode_io] program to start copying. The slot is used as buffer. *) try let (mc,_) = Rpc_proxy.ManagedSet.mset_pick m.io_mset in let t0 = Unix.gettimeofday() in Dio_proxy.V1.copy'async mc (block,dest_node,dest_ident,dest_block, dest_ticket,slot) (fun get_result -> let t1 = Unix.gettimeofday() in count_request m sess (-1); let ok = try get_result(); true with error -> Netlog.logf `Err "proc_copy exception (2): %s" (Netexn.to_string error); false in if ok then ignore_lost_connection emit() else ignore_lost_connection (Rpc_server.reply_error sess) Rpc.System_err; release_slot m slot; m.stats.c_count <- m.stats.c_count + 1; let lat = t1 -. t0 in m.stats.c_lat <- m.stats.c_lat +. lat; m.stats.c_peak <- max m.stats.c_peak lat; ); count_request m sess 1 with error -> Netlog.logf `Err "proc_copy exception (1): %s" (Netexn.to_string error); ignore_lost_connection (Rpc_server.reply_error sess) Rpc.System_err in (** First check whether [ticket] permits us to do the read: *) if not (verify m block true false ticket) then ( Netlog.logf `Err "Got unauthorized read request for block %Ld (ticket %Ld)" block ticket.Rpcapi_aux.safetrans_id; ignore_lost_connection (Rpc_server.reply_error sess) Rpc.System_err ) else ( (** Check [dest_ticket] immediately in case of a local copy *) let identity = Dn_store.get_identity m.store in if identity = dest_ident && not (verify m dest_block false true dest_ticket) then ( Netlog.logf `Err "Got unauthorized copy request for block %Ld to block %Ld (ticket %Ld)" block dest_block dest_ticket.Rpcapi_aux.safetrans_id; ignore_lost_connection (Rpc_server.reply_error sess) Rpc.System_err ) else ( (** Push [on_slot] onto the queue of waiting requests, and check immediately whether there is a free slot. *) Queue.push (`R on_slot) m.wrequests; check_length m; find_slots_and_process m ) ) let proc_sync conf sess () emit = (** Implementation of the [sync] RPC. When we are done, we have to call [emit]. *) let m = current_manager() in (** Determine the current sync cycle number [cur_sync], and the cycle number that must have completed so we can return success. If a sync is currently being done, we have to await this cycle plus the following, otherwise just the following. This ensures that all data at the time of calling this RPC is synced to disk. *) let cur_sync = m.last_sync in let wait_sync = if m.syncing then Int64.add cur_sync 2L else Int64.add cur_sync 1L in (** This is the callback function that is invoked when a sync cycle is done *) let rec f is_ok = if is_ok then ( if m.last_sync >= wait_sync then ignore_lost_connection emit() (** Return success *) else Queue.push f m.wsync (** Another round of waiting *) ) else (** Any error interrupts waiting, and we return the error: *) ignore_lost_connection (Rpc_server.reply_error sess) Rpc.System_err in Queue.push f m.wsync module Cached(V:Netplex_cenv.VAR_TYPE) = struct let cached f name emit emit_syserr = (** Get a value from [Datanode_io] by calling [f]. The value is cached into [name] *) try emit(V.get name) with | Netplex_cenv.Container_variable_not_found _ -> (** first time *) let m = current_manager() in let (mc,_) = Rpc_proxy.ManagedSet.mset_pick m.io_mset in f mc () (fun get_result -> (** This function is called back when the RPC is done. *) try let size = get_result() in V.set name size; emit size with | error -> Netlog.logf `Err "proc_get_%s exception (1): %s" name (Netexn.to_string error); emit_syserr() ) end let proc_identity conf sess exp_clustername emit = (** We forward this call to [Datanode_io] the first time the identity is requested. The identity does not change, so we can cache this value. *) if conf#dn_clustername <> exp_clustername then ( Netlog.logf `Err "proc_identity: bad clustername"; ignore_lost_connection (Rpc_server.reply_error sess) Rpc.System_err; ); let module C = Cached(String_var) in C.cached Dio_proxy.V1.identity'async "identity" (fun x -> ignore_lost_connection emit x) (fun () -> ignore_lost_connection (Rpc_server.reply_error sess) Rpc.System_err) let proc_identity_discover conf sess exp_clustername emit = (** Similar, but we are quiet on errors *) if conf#dn_clustername = exp_clustername then ( let module C = Cached(String_var) in C.cached Dio_proxy.V1.identity'async "identity" (fun x -> ignore_lost_connection emit x) (fun () -> ()) ) let proc_size conf sess () emit = (** We forward this call to [Datanode_io] the first time the size is requested. The size does not change, so we can cache this value. *) let module C = Cached(Int64_var) in C.cached Dio_proxy.V1.size'async "size" (fun x -> ignore_lost_connection emit x) (fun () -> ignore_lost_connection (Rpc_server.reply_error sess) Rpc.System_err) let proc_blocksize conf sess () emit = emit conf#dn_blocksize let proc_clustername conf sess () emit = emit conf#dn_clustername let remember_shm dir shm_names = (** Remember this shm_name in the filesystem *) let f = open_out (dir ^ "/shm_name.new") in try List.iter (fun name -> output_string f (name ^ "\n"); ) shm_names; close_out f; Unix.rename (dir ^ "/shm_name.new") (dir ^ "/shm_name") with | error -> close_out_noerr f; raise error let remember_all_shm() = let cont = Netplex_cenv.self_cont() in let dir = cont # socket_service # socket_service_config # controller_config # socket_directory in let m = current_manager() in let names = m.shm_name :: (Hashtbl.fold (fun _ n acc -> n @ acc) m.user_shm []) in remember_shm dir names let drop_old_shm dir = (** Look for a remembered old shm object and drop it *) if Sys.file_exists (dir ^ "/shm_name") then ( let f = open_in (dir ^ "/shm_name") in try while true do let n = input_line f in try Dn_shm.unlink n with | error -> Netlog.logf `Warning "Cannot unlink old shm %s: %s" n (Netexn.to_string error) done with | End_of_file -> close_in f ) let sess_is_local sess = try match Rpc_server.get_socket_name sess with | Unix.ADDR_INET(ip1,_) -> ( match Rpc_server.get_peer_name sess with | Unix.ADDR_INET(ip2,_) -> ip1=ip2 | _ -> assert false ) | Unix.ADDR_UNIX _ -> true with | _ -> false let proc_alloc_shm_if_local sess () emit = let m = current_manager() in let is_local = sess_is_local sess in if is_local then ( try let (fd, name) = Plasma_util.get_shm_fd "plasma" 0 in Unix.close fd; let conn = Rpc_server.get_connection_id sess in let l = try Hashtbl.find m.user_shm conn with Not_found -> [] in Hashtbl.replace m.user_shm conn (name :: l); remember_all_shm(); ignore_lost_connection emit (Some name) with | error -> Netlog.logf `Warning "Cannot allocate shm: %s" (Netexn.to_string error); ignore_lost_connection emit None ) else ignore_lost_connection emit None let proc_udsocket_if_local sess () emit = let is_local = sess_is_local sess in if is_local then ( let m = current_manager() in ignore_lost_connection emit m.udsocket ) else ignore_lost_connection emit None let on_tcp_close conn_id = let m = current_manager() in let l = try Hashtbl.find m.user_shm conn_id with Not_found -> [] in List.iter (fun n -> try Netsys_posix.shm_unlink n with | error -> Netlog.logf `Warning "Cannot unlink shm %s: %s" n (Netexn.to_string error) ) l; Hashtbl.remove m.user_shm conn_id; remember_all_shm() let create_io_mset conf = (** We use here the [Rpc_proxy] abstraction for RPC clients with automatically managed connections. As the [Datanode_io] processes are running on the same system, connections are reliable, and we need not to configure this part of [Rpc_proxy]. We simply configure to open several connections up to a certain number. The RPC calls will use any of these connections. *) let esys = (** use the container event system for these calls *) (Netplex_cenv.self_cont()) # event_system in let connector = match Netplex_cenv.lookup "Dn_io" "RPC" with | None -> failwith("Socket not found for service Dn_io, protocol RPC") | Some path -> Rpc_client.Unix path in let n = conf#dn_io_processes in let mclient_config = Rpc_proxy.ManagedClient.create_mclient_config ~programs:[ Rpcapi_clnt.Datanode_io.V1._program ] () in let mset_config = Rpc_proxy.ManagedSet.create_mset_config ~mclient_config ~policy:`Balance_load () in let mset = Rpc_proxy.ManagedSet.create_mset mset_config [| connector, n |] esys in mset let proc_reset_all_safetrans conf sess () emit = let m = current_manager() in Hashtbl.clear m.safetrans; emit() let proc_cancel_safetrans conf sess st_id_a emit = let m = current_manager() in Array.iter (fun st_id -> Hashtbl.remove m.safetrans st_id; ) st_id_a; emit() let proc_safetrans conf sess ticket_a emit = let m = current_manager() in Array.iter (fun t -> Hashtbl.replace m.safetrans t.A.st_id (t.A.st_tmo,t.A.st_secret) ) ticket_a; emit () let maybe_timeout_safetrans m = (** Cancel safetrans that have timed out *) let t = Unix.gettimeofday() in let l = Hashtbl.fold (fun st_id (st_tmo,_) acc -> if t > Int64.to_float st_tmo then st_id :: acc else acc ) m.safetrans [] in List.iter (fun st_id -> Hashtbl.remove m.safetrans st_id ) l let mfac = Hashtbl.create 5 let () = Hashtbl.add mfac "writedata" Xdr_mstring.string_based_mstrings let setup srv (cf,cfaddr,conf,shm_name,access) = Rpcapi_srv.Datanode.V1.bind_async ~proc_null:(fun _ _ emit -> emit ()) ~proc_identity:(proc_identity conf) ~proc_size:(proc_size conf) ~proc_blocksize:(proc_blocksize conf) ~proc_clustername:(proc_clustername conf) ~proc_read:(proc_read conf) ~proc_write:(proc_write conf) ~proc_copy:(proc_copy conf) ~proc_zero:(proc_zero conf) ~proc_sync:(proc_sync conf) ~proc_alloc_shm_if_local ~proc_udsocket_if_local srv; Rpcapi_srv.Datanode_ctrl.V1.bind_async ~proc_null:(fun _ _ emit -> emit ()) ~proc_reset_all_safetrans:(proc_reset_all_safetrans conf) ~proc_cancel_safetrans:(proc_cancel_safetrans conf) ~proc_safetrans:(proc_safetrans conf) srv; Rpc_server.set_mstring_factories srv mfac; Rpc_server.set_onclose_action srv on_tcp_close; Pfs_auth.configure_rpc_server srv access let new_stats() = { cycle = 0; r_count = 0; r_lat = 0.0; r_peak = 0.0; w_count = 0; w_lat = 0.0; w_peak = 0.0; c_count = 0; c_lat = 0.0; c_peak = 0.0; t_t0 = 0.0; t_t0_r = 0.0; t_count = 0; t_lat = 0.0; t_peak = 0.0; q_peak = 0; req_count = Hashtbl.create 7; req_t0 = Unix.gettimeofday(); bconn_acc = 0.0; } let next_stats m = let s = m.stats in let t0 = Unix.gettimeofday() in s.cycle <- s.cycle + 1; s.r_count <- 0; s.r_lat <- 0.0; s.r_peak <- 0.0; s.w_count <- 0; s.w_lat <- 0.0; s.w_peak <- 0.0; s.c_count <- 0; s.c_lat <- 0.0; s.c_peak <- 0.0; (* s.t_t0 <- 0.0; *) s.t_t0_r <- t0; s.t_count <- 0; s.t_lat <- 0.0; s.t_peak <- 0.0; s.q_peak <- Queue.length m.wrequests; (* s.req_count <- Hashtbl.create 7; *) s.req_t0 <- t0; s.bconn_acc <- 0.0 let stats_period = 60.0 let print_stats m = let s = m.stats in let t0 = Unix.gettimeofday() in let l = Hashtbl.length s.req_count in s.bconn_acc <- s.bconn_acc +. (float l) *. (t0 -. s.req_t0); let msg = sprintf "cycle=%d \ [reads: n=%d lat=%.1f avg=%.1f peak=%.1f] \ [writes: n=%d lat=%.1f avg=%.1f peak=%.1f] \ [copies: n=%d lat=%.1f avg=%.1f peak=%.1f] \ [throttling: n=%d lat=%.1f avg=%.1f peak=%.1f] \ [queue: peak_length=%d] \ [parallel requests: avg=%.1f]" s.cycle s.r_count s.r_lat (s.r_lat /. float s.r_count) s.r_peak s.w_count s.w_lat (s.w_lat /. float s.w_count) s.w_peak s.c_count s.c_lat (s.c_lat /. float s.c_count) s.c_peak s.t_count s.t_lat (s.t_lat /. float s.t_count) s.t_peak s.q_peak (s.bconn_acc /. stats_period) in let cont = Netplex_cenv.self_cont() in cont # log_subch "statistics" `Info msg; next_stats m let create_dn_discover_server conf container = (* This is a UDP server, and hence special *) let udp_port_opt = ref None in List.iter (fun p -> Array.iter (fun a -> match a with | `Socket(Unix.ADDR_INET(h,p)) -> udp_port_opt := Some (h,p) | _ -> () ) p # addresses ) container # socket_service # socket_service_config # protocols; match !udp_port_opt with | Some(h,p) -> let esys = container # event_system in let socket_config = ( object(self) inherit Rpc_server.default_socket_config method listen_options = { Uq_engines.default_listen_options with Uq_engines.lstn_reuseaddr = true } end ) in let rpc = Rpc_server.create2 (`Socket(Rpc.Udp, Rpc_server.Internet(h,p), socket_config)) esys in Rpcapi_srv.Datanode_discover.V1.bind_async ~proc_null:(fun _ _ emit -> emit ()) ~proc_identity:(proc_identity_discover conf) rpc; Some rpc | None -> None let factory() = Rpc_netplex.rpc_factory ~configure:(fun cf cfaddr -> let conf = Dn_config.extract_dn_config cf in (** Create the shm object - optionally, drop the old object. Shm objects have kernel persistence, so they are not automatically dropped when the process terminates. We drop explicitly at [pre_finish_hook] time, but for handling abnormally terminated processes we also remember the name of the shm object and drop it at next startup time. This is done at [pre_start_hook] time below. *) let shm_name = Dn_shm.create (conf :> Dn_shm.dn_shm_config) in let access = Pfs_auth.extract_access_config cf cf#root_addr in (cf, cfaddr, conf, shm_name, access) ) ~hooks:(fun (cf, cfaddr, conf, shm_name, access) -> ( object inherit Netplex_kit.empty_processor_hooks() method post_add_hook _ ctrl = ctrl # add_plugin Netplex_sharedvar.plugin; (** Also enable the [Dn_io] service: *) Dn_io.add_service conf cf cfaddr ctrl method pre_start_hook _ ctrl _ = drop_old_shm ctrl#controller_config#socket_directory; remember_shm ctrl#controller_config#socket_directory [shm_name] method post_start_hook cont = let store = Dn_store.open_store (conf :> Dn_store.dn_store_config) in let shm = Dn_shm.openshm (conf :> Dn_shm.dn_shm_config) shm_name in let udsocket = Netplex_cenv.lookup "Dn_manager" "RPC" in (* Start the special server for [Dn_discover]: *) let dsrv = create_dn_discover_server conf cont in let m = { shm = shm; shm_name = shm_name; slot_used = Array.make conf#dn_shm_queue_length false; wrequests = Queue.create(); drequests = Queue.create(); io_mset = create_io_mset conf; safetrans = Hashtbl.create 17; last_sync = 0L; syncing = false; slow_syncing = false; wsync = Queue.create(); user_shm = Hashtbl.create 5; udsocket = udsocket; store = store; discover_server = dsrv; stats = new_stats() } in Manager_var.set "current_manager" m; (** Publish the name of the shm object: *) ignore(Netplex_sharedvar.create_var "plasma.datanode.shm.name"); ignore(Netplex_sharedvar.set_value "plasma.datanode.shm.name" shm_name); (** Establish the timer calling [sync] periodically: *) let _tm = Netplex_cenv.create_timer (fun tm -> sync_cycle m; true) conf#dn_sync_period in (** Establish a timer for [maybe_timeout_safetrans] *) let _tm = Netplex_cenv.create_timer (fun tm -> maybe_timeout_safetrans m; true) 60.0 in (** Establish a timer for printing statistics *) let _tm = Netplex_cenv.create_timer (fun tm -> print_stats m; true) stats_period in (); method system_shutdown() = let m = current_manager() in ( match m.discover_server with | None -> () | Some rpc -> Netlog.logf `Info "Stopping discovery server"; Rpc_server.stop_server rpc ) method pre_finish_hook cont = ( try Dn_shm.unlink shm_name with | error -> Netlog.logf `Err "Cannot unlink shm object: %s" (Netexn.to_string error) ); let m = current_manager() in Dn_store.close_store m.store; end ) ) ~name:"dn_manager" ~setup ()