Plasma GitLab Archive
Projects Blog Knowledge

(*
  Copyright 2010 Gerd Stolpmann

  This file is part of Plasma, a distributed filesystem and a
  map/reduce computation framework. Unless you have a written license
  agreement with the copyright holder (Gerd Stolpmann), the following
  terms apply:

  Plasma is free software: you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation, either version 3 of the License, or
  (at your option) any later version.

  Plasma is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with Foobar.  If not, see <http://www.gnu.org/licenses/>.

*)
(* $Id: dn_manager.ml 456 2011-10-11 13:26:23Z gerd $ *)

(** The manager accepts user connections, and performs user requests.
    There is only one process running the manager.
    It is fully programmed in an asynchronous style, so it is always
    responsive. All real I/O is done by helper processes that implement
    the [Datanode_io] program. Communication with this program is
    sped up by using an shm object (instead of including the data to
    read/write into the RPC calls). There are usually a number of these
    helper processes, and a TCP connection to each of these. This TCP
    connection is used for RPC calls that trigger I/O operations.
    As mentioned, the payload data is not sent as part of the normal
    RPC serialization of request or response data, but instead it is
    put into the shm object, and the RPC calls only point to the
    shm object.
 *)

open Printf

let dlogf = Plasma_util.dlogf
let dlogr = Plasma_util.dlogr

module Rpcapi_clnt = Pfs_rpcapi_clnt
module Rpcapi_srv = Pfs_rpcapi_srv
module Rpcapi_aux = Pfs_rpcapi_aux
module A = Pfs_rpcapi_aux

module Dio_proxy = Rpcapi_clnt.Make'Datanode_io(Rpc_proxy.ManagedClient)

type waiting_request =
    [ `R of int -> unit
    | `W of int -> unit
    ]
  (** The function is called when the event happened:
      - [`R f]: A slot is free for a read request. [f] is called with the
        slow index as arg.
      - [`W f]: A slot is free for a write request. [f] is called with the
        slow index as arg.
   *)

(* Statistics: *)
type stats =
    { mutable cycle : int;    (* statistics cycle number *)
      mutable r_count : int;  (* number of finished read requests *)
      mutable r_lat : float;  (* cumulated latencies of read requests *)
      mutable r_peak: float;  (* peak latency of read requests *)
      mutable w_count : int;  (* number of finished write requests *)
      mutable w_lat : float;  (* cumulated latencies of write requests *)
      mutable w_peak: float;  (* peak latency of write requests *)
      mutable c_count : int;  (* number of finished copy requests *)
      mutable c_lat : float;  (* cumulated latencies of copy requests *)
      mutable c_peak: float;  (* peak latency of copy requests *)
      mutable t_t0 : float;   (* time when throttling started *)
      mutable t_t0_r : float; (* time when throttling started (w/ reset) *)
      mutable t_count : int;  (* number of throttle events *)
      mutable t_lat : float;  (* cumulated latency of throttling *)
      mutable t_peak : float; (* peak latency of throttling *)
      mutable q_peak : int;   (* peak length of the wrequests queue *)
      mutable req_count : (Rpc_server.connection_id, int) Hashtbl.t;
                              (* number of pending requests per connection *)
      mutable req_t0 : float; (* time when req_count changed *)
      mutable bconn_acc : float;
        (* cumulated n * t where n is the number of busy connections (with
	   pending requests) and t is the period of time where n has been
	   valid
	 *)
    }

type manager =
    { shm : Dn_shm.dn_shm;
      (** The shm object *)

      shm_name : string;
      (** The name of the shm object *)

      slot_used : bool array;
      (** for each shm slot this array says whether it is used (true) or
          free (false)
       *)

      wrequests : waiting_request Queue.t;
      (** The queue of waiting requests *)

      drequests : (unit -> unit) Queue.t;
      (** Delayed requests because of slow flushing *)
      
      io_mset : Rpc_proxy.ManagedSet.mset;
      (** Clients to the I/O processes *)

      safetrans : (int64, int64*int64) Hashtbl.t;
      (** maps sf_id to (st_tmo,st_secret) *)

      mutable last_sync : int64;
      (** A number that is increased by 1 whenever a sync is completed *)

      mutable syncing : bool;
      (** Whether a sync is in progress *)

      mutable slow_syncing : bool;
      (** Whether sync cannot be done in the configured period, and write
	  requests are throttled
       *)

      wsync : (bool->unit) Queue.t;
      (** Functions waiting on the compeletion of a sync. The bool argument
          says whether the sync was successful.
       *)

      user_shm : (Rpc_server.connection_id, string list) Hashtbl.t;
      (** Manages user-requested shm objects. The table maps TCP connection
	  IDs to the path names of the shm objects.
       *)

      mutable udsocket : string option;
      (** name of a Unix Domain socket *)

      store : Dn_store.dn_store;
      (** The datastore (file with blocks). We keep this is here only for
	  calling {!Dn_store.will_read_block}
       *)

      mutable discover_server : Rpc_server.t option;

      stats : stats; (** statistics *)
    }

module Manager_var =
  Netplex_cenv.Make_var_type(struct type t = manager end)

module Int64_var =
  Netplex_cenv.Make_var_type(struct type t = int64 end)

module String_var =
  Netplex_cenv.Make_var_type(struct type t = string end)

let current_manager() =
  Manager_var.get "current_manager"

let verify m block rd_perm wr_perm ticket =
  (** Verify that [ticket] contains the right verifier for accesses to [block].
      Returns [true] if positive, [false] if negative.
   *)
  let open Pfs_rpcapi_aux in
  try
    dlogr
      (fun () ->
	 sprintf "ticket id=%Ld tmo=%Ld vfy=%Ld rd=%B wr=%B range=%Ld+%Ld"
	   ticket.safetrans_id ticket.safetrans_tmo ticket.safetrans_vfy
	   ticket.read_perm ticket.write_perm ticket.range_start
	   ticket.range_length);
    let (st_tmo, st_secret) = Hashtbl.find m.safetrans ticket.safetrans_id in
    let t = Unix.gettimeofday() in
    Int64.to_float st_tmo >= t && (
      block >= ticket.range_start && 
      Int64.sub block ticket.range_start < ticket.range_length &&
      (not rd_perm || ticket.read_perm) &&
      (not wr_perm || ticket.write_perm) &&
      let v = 
	Plasma_util.compute_verifier
	  ticket.safetrans_id st_secret ticket.range_start ticket.range_length
	  ticket.read_perm ticket.write_perm in
      ( dlogr
	  (fun () ->
	     sprintf
	       "compute_verifier id=%Ld sec=%Ld range=%Ld+%Ld rd=%B wr=%B v=%Ld"
	       ticket.safetrans_id st_secret ticket.range_start ticket.range_length
	       ticket.read_perm ticket.write_perm v);
	v = ticket.safetrans_vfy
      )
    )
  with
    | Not_found ->
	false

let check_length m =
  let n = Queue.length m.wrequests in
  m.stats.q_peak <- max m.stats.q_peak n

let find_slots_and_process m =
  (** Run over the [wrequests] queue, and find slots for the requests.
      If a slot [s] is found for request function [f], the function is
      called as [f s] (minimally delayed via event system). Processing
      stops when [wrequests] becomes empty, or when there are no more
      free slots.
   *)
  let cont = Netplex_cenv.self_cont() in
  let esys = cont#event_system in
  let delayed = Queue.create() in

  let is_write =
    function
      | `R _ -> false
      | `W _ -> true in

  let rec find() =
    let req = Queue.take m.wrequests in
    if is_write req && m.slow_syncing then (
      Queue.add req delayed;
      find()
    )
    else 
      match req with
	| `R on_slot -> on_slot
	| `W on_slot -> on_slot in

  ( try
      (* Flow control: if we are syncing slowly do not accept further 
	 write requests.
	 This would make everything worse. We have to ensure to call
	 this function again when the sync is done.
       *)
      for k = 0 to Array.length m.slot_used-1 do
	if not m.slot_used.(k) then (
	  let on_slot = find() in  (* or Queue.Empty *)
	  m.slot_used.(k) <- true;
	  (** Run [onslot k] by pushing this onto the event queue. We do not
	      run this immediately because the event queue has some reasonable
	      exception handling, so we do not have to do this here.
	   *)
	  let g = Unixqueue.new_group esys in
	  Unixqueue.once esys g 0.0 (fun () -> on_slot k)
	)
      done
    with
      | Queue.Empty -> ()
  );
  (* Also run all drequests if possible *)
  if not m.slow_syncing then (
    Queue.iter
      (fun f ->
	 let g = Unixqueue.new_group esys in
	 Unixqueue.once esys g 0.0 f
      )
      m.drequests;
    Queue.clear m.drequests
  );
  Queue.transfer delayed m.wrequests


let release_slot m slot =
  (** Release slot [slot], and run again over the [wrequests] queue to
      check whether there is an element in the queue that can be processed
      because of the free slot.
   *)
  m.slot_used.(slot) <- false;
  find_slots_and_process m


let sync_notify m ok =
  (** Run the registered functions [wsync] that wait on a sync. The functions
      are removed from the [wsync] queue. [ok] is [true] when the sync
      was successful.
   *)
  let cont = Netplex_cenv.self_cont() in
  let esys = cont#event_system in
  let q = Queue.create() in
  Queue.transfer m.wsync q;
  Queue.iter
    (fun f ->
       (** Run [f ok] by pushing this onto the event queue. We do not
           run this immediately because the event queue has some reasonable
           exception handling, so we do not have to do this here.
	*)
       let g = Unixqueue.new_group esys in
       Unixqueue.once esys g 0.0 (fun () -> f ok)
    )
    q


let sync_cycle m =
  (** This function is called every n seconds to start a sync. It performs
      the sync, increases [last_sync], and notifies the function waiting
      on sync completion.
   *) 
  if m.syncing then (
    if not m.slow_syncing then (
      let t0 = Unix.gettimeofday() in
      m.stats.t_t0 <- t0;
      m.stats.t_t0_r <- t0;
      m.slow_syncing <- true
    )
  )
  else
    if true (* not (Queue.is_empty m.wsync) *) then (
      try
	let (mc,_) = Rpc_proxy.ManagedSet.mset_pick m.io_mset in
	Dio_proxy.V1.sync'async
	  mc ()
	  (fun get_result ->
	     if m.slow_syncing then (
	       m.stats.t_count <- m.stats.t_count + 1;
	       let t1 = Unix.gettimeofday() in
	       let lat1 = t1 -. m.stats.t_t0_r in
	       m.stats.t_lat <- m.stats.t_lat +. lat1;
	       let lat2 = t1 -. m.stats.t_t0 in
	       m.stats.t_peak <- max m.stats.t_peak lat2
	     );
	     m.syncing <- false;
	     m.slow_syncing <- false;
	     let ok =
	       try get_result(); true
	       with error ->
		 Netlog.logf `Err "proc_sync exception (2): %s" 
		   (Netexn.to_string error);
		 false in
	     if ok then
	       m.last_sync <- Int64.succ m.last_sync;
	     find_slots_and_process m;
	     sync_notify m ok
	  );
	m.syncing <- true
      with error ->
	Netlog.logf `Err "proc_sync exception (1): %s" (Netexn.to_string error);
	m.syncing <- false
    )

let ignore_lost_connection emit x =
  (** Call the [emit] function and ignore [Connection_lost] exceptions.
      This is required when [emit] is invoked after some delay, and 
      not immediately after receiving the request
   *)
  try emit x 
  with Rpc_server.Connection_lost -> ()


let count_request m sess delta =
  let l_old = Hashtbl.length m.stats.req_count in
  let conn_id = Rpc_server.get_connection_id sess in
  let n_old =
    try Hashtbl.find m.stats.req_count conn_id
    with Not_found -> 0 in
  let n_new =
    n_old + delta in
  if n_new = 0 then
    Hashtbl.remove m.stats.req_count conn_id
  else
    Hashtbl.replace m.stats.req_count conn_id n_new;
  let l_new = Hashtbl.length m.stats.req_count in
  if l_old <> l_new then (
    let t0 = Unix.gettimeofday() in
    m.stats.bconn_acc <- 
      m.stats.bconn_acc +. (float l_old) *. (t0 -. m.stats.req_t0);
    m.stats.req_t0 <- t0
  )

  

let proc_read conf sess (req, block, pos, len, ticket) emit =
  (** Implementation of the [read] RPC. When we are done, we have to 
      call [emit].
   *)
  let m = current_manager() in
  (** First check whether [ticket] permits us to do the read: *)
  if not (verify m block true false ticket) then (
    Netlog.logf `Err
      "Got unauthorized read request for block %Ld (ticket %Ld)" 
      block ticket.Rpcapi_aux.safetrans_id;
    ignore_lost_connection
      (Rpc_server.reply_error sess) Rpc.System_err
  )
  else (
    (** Define a callback function that will be invoked when a slot in
	the shm object is free:
     *)
    match req with
      | `dnch_rpc ->
	  let on_slot slot =
	    let (mem,mpos) = Dn_shm.slot m.shm slot in
	    try
	      let t0 = Unix.gettimeofday() in
	      (** Fadvise that we will need the block. *)
	      Dn_store.will_read_block m.store block;
	      (** Pick now an RPC client and call the [Datanode_io] program to
		  start the read.
	       *)
	      let (mc,_) = Rpc_proxy.ManagedSet.mset_pick m.io_mset in
	      Dio_proxy.V1.read'async
		mc
		(slot,block,pos,len)
		(fun get_result ->
		   (** This function is called back when the read is done. The
		       data is now in the shm object and can be forwarded from 
		       there
		    *)
		   let t1 = Unix.gettimeofday() in
		   count_request m sess (-1);
		   let ok =
		     try get_result(); true
		     with error ->
		       Netlog.logf `Err "proc_read exception (2): %s" 
			 (Netexn.to_string error);
		       false in
		   if ok then (
		     (** Extract the data from the shm object and return to 
			 caller
		      *)
		     let data = String.create len in
		     Netsys_mem.blit_memory_to_string mem mpos data 0 len;
		     ignore_lost_connection
		       emit (`dnch_rpc data)
		   )
		   else
		     ignore_lost_connection
		       (Rpc_server.reply_error sess) Rpc.System_err;
		   release_slot m slot;
		   m.stats.r_count <- m.stats.r_count + 1;
		   let lat = t1 -. t0 in
		   m.stats.r_lat <- m.stats.r_lat +. lat;
		   m.stats.r_peak <- max m.stats.r_peak lat;
		);
	      count_request m sess 1;
	    with error ->
	      Netlog.logf `Err "proc_read exception (1): %s"
		(Netexn.to_string error);
	      ignore_lost_connection
		(Rpc_server.reply_error sess) Rpc.System_err;
	      release_slot m slot;
	  in
	  (** Push [on_slot] onto the queue of waiting requests, and check
	      immediately whether there is a free slot.
	   *)
	  Queue.push (`R on_slot) m.wrequests;
	  check_length m;
	  find_slots_and_process m
	    
      | `dnch_shm shm_obj ->
	  let t0 = Unix.gettimeofday() in
	  let (mc,_) = Rpc_proxy.ManagedSet.mset_pick m.io_mset in
	  Dio_proxy.V1.read_shm'async
	    mc
	    (shm_obj,block,pos,len)
	    (fun get_result ->
	       let t1 = Unix.gettimeofday() in
	       count_request m sess (-1);
	       let ok =
		 try get_result(); true
		 with error ->
		   Netlog.logf `Err "proc_read exception (3): %s" 
		     (Netexn.to_string error);
		   false in
	       if ok then (
		 ignore_lost_connection
		   emit `dnch_shm
	       )
	       else
		 ignore_lost_connection
		   (Rpc_server.reply_error sess) Rpc.System_err;
	       m.stats.r_count <- m.stats.r_count + 1;
	       let lat = t1 -. t0 in
	       m.stats.r_lat <- m.stats.r_lat +. lat;
	       m.stats.r_peak <- max m.stats.r_peak lat;
	    );
	  count_request m sess 1
  )



let dump_load m prefix =
  let la = Rpc_proxy.ManagedSet.mset_load m.io_mset in
  dlogf
    "%s: mset_load=%s"
    prefix
    (String.concat "," (List.map string_of_int (Array.to_list la)))

let call_write rpc_name conf sess (block,ticket) init_shm emit =
  (** Implementation of the [write] and [zero] RPCs. When we are done, we 
      have to call [emit].
   *)
  let m = current_manager() in
  (** First check whether [st_vfy] permits us to do the write: *)
  if not (verify m block false true ticket) then (
    Netlog.logf `Err
      "Got unauthorized write request for block %Ld (ticket %Ld)" 
      block ticket.Rpcapi_aux.safetrans_id;
    ignore_lost_connection
      (Rpc_server.reply_error sess) Rpc.System_err
  )
  else (
    (** Define a callback function that will be invoked when a slot in
        the shm object is free:
     *)
    let on_slot slot =
      (** Fill the slot with data, and the call [write] of the [Datanode_io]
          program to start writing
       *)
      let t0 = Unix.gettimeofday() in
      let (mem,mpos) = Dn_shm.slot m.shm slot in
      try
	init_shm mem mpos;
	(* dump_load m "before write"; *)
	let (mc,_) = Rpc_proxy.ManagedSet.mset_pick m.io_mset in
	Dio_proxy.V1.write'async
	  mc
	  (slot,block)
	  (fun get_result ->
	     (** This function is called back when the write is done. *)
	     let t1 = Unix.gettimeofday() in
	     count_request m sess (-1);
	     let ok =
	       try get_result(); true
	       with error ->
		 Netlog.logf `Err "%s exception (2): %s" 
		   rpc_name (Netexn.to_string error);
		 false in
	     if ok then
	       ignore_lost_connection
		 emit()
	     else
	       ignore_lost_connection
		 (Rpc_server.reply_error sess) Rpc.System_err;
	     release_slot m slot;
	     m.stats.w_count <- m.stats.w_count + 1;
	     let lat = t1 -. t0 in
	     m.stats.w_lat <- m.stats.w_lat +. lat;
	     m.stats.w_peak <- max m.stats.w_peak lat;
	  );
	count_request m sess 1
	(* dump_load m "after write"; *)
      with error ->
	Netlog.logf `Err
	  "%s exception (1): %s" rpc_name (Netexn.to_string error);
	ignore_lost_connection
	  (Rpc_server.reply_error sess) Rpc.System_err;
	release_slot m slot
    in
    (** Push [on_slot] onto the queue of waiting requests, and check immediately
        whether there is a free slot.
     *)
    Queue.push (`W on_slot) m.wrequests;
    check_length m;
    find_slots_and_process m
  )


let proc_write conf sess (block,wr_req,ticket) emit =
  match wr_req with
    | `dnch_rpc data ->
	let init_shm mem mpos =
	  data # blit_to_memory 0 mem mpos conf#dn_blocksize in
	call_write "proc_write" conf sess (block,ticket) init_shm emit 
    | `dnch_shm shm_obj ->
	let t0 = Unix.gettimeofday() in
	let m = current_manager() in
	let do_write() =
	  let (mc,_) = Rpc_proxy.ManagedSet.mset_pick m.io_mset in
	  Dio_proxy.V1.write_shm'async
	    mc
	    (shm_obj,block)
	    (fun get_result ->
	       let t1 = Unix.gettimeofday() in
	       count_request m sess (-1);
	       let ok =
		 try get_result(); true
		 with error ->
		   Netlog.logf `Err "%s exception (3): %s" 
		     "proc_write" (Netexn.to_string error);
		   false in
	       if ok then
		 ignore_lost_connection
		   emit()
	       else
		 ignore_lost_connection
		   (Rpc_server.reply_error sess) Rpc.System_err;
	       m.stats.w_count <- m.stats.w_count + 1;
	       let lat = t1 -. t0 in
	       m.stats.w_lat <- m.stats.w_lat +. lat;
	       m.stats.w_peak <- max m.stats.w_peak lat;
	    );
	  count_request m sess 1
	in
	if m.slow_syncing then
	  (* Do the write later *)
	  Queue.add do_write m.drequests
	else
	  do_write()


let proc_zero conf sess (block,ticket) emit =
  let init_shm mem mpos =
    Bigarray.Array1.fill 
      (Bigarray.Array1.sub mem mpos conf#dn_blocksize)
      '\000' in
  call_write "proc_zero" conf sess (block,ticket) init_shm emit 


let proc_copy conf sess (block,dest_node,dest_ident,dest_block,
			 ticket,dest_ticket)
              emit =
  (** Implementation of the [copy] RPC. This is more or less directly 
      forwarded to [Dn_io].

      The implicit [read] and [write] operations must be done by [Dn_io].
      There is, however, another implementation option for the network
      write in the remote copy case: The [read] could also put the data
      block into shm, and the network write is done here. That would
      allow us to better control the number of network connections to
      remote systems. (Of course, we could also simply _count_ the number
      of remote copy invocations here, and limit only that.)
   *)
  (* FIXME: local copies are not throttled! *)
  (* TODO: we ignore right now ticket. Do something. *)
  let m = current_manager() in
  let on_slot slot =
    (** Call [copy] of the [Datanode_io] program to start copying. The
	slot is used as buffer.
     *)
    try
      let (mc,_) = Rpc_proxy.ManagedSet.mset_pick m.io_mset in
      let t0 = Unix.gettimeofday() in
      Dio_proxy.V1.copy'async
	mc
	(block,dest_node,dest_ident,dest_block,
	 dest_ticket,slot)
	(fun get_result ->
	   let t1 = Unix.gettimeofday() in
	   count_request m sess (-1);
	   let ok =
	     try get_result(); true
	     with error ->
	       Netlog.logf `Err "proc_copy exception (2): %s" 
		 (Netexn.to_string error);
	       false in
	   if ok then
	     ignore_lost_connection
	       emit()
	   else
	     ignore_lost_connection
	       (Rpc_server.reply_error sess) Rpc.System_err;
	   release_slot m slot;
	   m.stats.c_count <- m.stats.c_count + 1;
	   let lat = t1 -. t0 in
	   m.stats.c_lat <- m.stats.c_lat +. lat;
	   m.stats.c_peak <- max m.stats.c_peak lat;
	);
      count_request m sess 1
    with error ->
      Netlog.logf `Err
	"proc_copy exception (1): %s" (Netexn.to_string error);
      ignore_lost_connection
	(Rpc_server.reply_error sess) Rpc.System_err
  in

  (** First check whether [ticket] permits us to do the read: *)
  if not (verify m block true false ticket) then (
    Netlog.logf `Err
      "Got unauthorized read request for block %Ld (ticket %Ld)" 
      block ticket.Rpcapi_aux.safetrans_id;
    ignore_lost_connection
      (Rpc_server.reply_error sess) Rpc.System_err
  )
  else (
    (** Check [dest_ticket] immediately in case of a local copy *)
    let identity = Dn_store.get_identity m.store in
    if identity = dest_ident && 
      not (verify m dest_block false true dest_ticket) 
    then (
      Netlog.logf `Err
	"Got unauthorized copy request for block %Ld to block %Ld (ticket %Ld)" 
	block dest_block dest_ticket.Rpcapi_aux.safetrans_id;
      ignore_lost_connection
	(Rpc_server.reply_error sess) Rpc.System_err
    )
    else (
      (** Push [on_slot] onto the queue of waiting requests, and check immediately
	  whether there is a free slot.
       *)
      Queue.push (`R on_slot) m.wrequests;
      check_length m;
      find_slots_and_process m
    )
  )

let proc_sync conf sess () emit =
  (** Implementation of the [sync] RPC. When we are done, we have to 
      call [emit].
   *)
  let m = current_manager() in
  (** Determine the current sync cycle number [cur_sync], and the cycle
      number that must have completed so we can return success. If a
      sync is currently being done, we have to await this cycle plus the
      following, otherwise just the following. This ensures that all data
      at the time of calling this RPC is synced to disk.
   *)
  let cur_sync = m.last_sync in
  let wait_sync = 
    if m.syncing then Int64.add cur_sync 2L else Int64.add cur_sync 1L in
  (** This is the callback function that is invoked when a sync cycle is 
      done
   *)
  let rec f is_ok =
    if is_ok then (
      if m.last_sync >= wait_sync then
	ignore_lost_connection
	  emit()    (** Return success *)
      else
	Queue.push f m.wsync    (** Another round of waiting *)
    )
    else
      (** Any error interrupts waiting, and we return the error: *)
      ignore_lost_connection
	(Rpc_server.reply_error sess) Rpc.System_err
  in
  Queue.push f m.wsync


module Cached(V:Netplex_cenv.VAR_TYPE) = struct
  let cached f name emit emit_syserr =
    (** Get a value from [Datanode_io] by calling [f]. The value is cached
        into [name]
     *)
    try
      emit(V.get name)
    with
      | Netplex_cenv.Container_variable_not_found _ ->
	  (** first time *)
	  let m = current_manager() in
	  let (mc,_) = Rpc_proxy.ManagedSet.mset_pick m.io_mset in
	  f
	    mc
	    ()
	    (fun get_result ->
	       (** This function is called back when the RPC is done. *)
	       try
		 let size = get_result() in
		 V.set name size;
		 emit size
	       with
		 | error ->
		     Netlog.logf `Err "proc_get_%s exception (1): %s" 
		       name (Netexn.to_string error);
		     emit_syserr()
	    )
end


let proc_identity conf sess exp_clustername emit =
  (** We forward this call to [Datanode_io] the first time the identity is
      requested. The identity does not change, so we can cache this value.
   *)
  if conf#dn_clustername <> exp_clustername then (
    Netlog.logf `Err "proc_identity: bad clustername";
    ignore_lost_connection (Rpc_server.reply_error sess) Rpc.System_err;
  );
  let module C = Cached(String_var) in
  C.cached
    Dio_proxy.V1.identity'async
    "identity"
    (fun x -> ignore_lost_connection emit x)
    (fun () -> 
       ignore_lost_connection (Rpc_server.reply_error sess) Rpc.System_err)


let proc_identity_discover conf sess exp_clustername emit =
  (** Similar, but we are quiet on errors *)
  if conf#dn_clustername = exp_clustername then (
    let module C = Cached(String_var) in
    C.cached
      Dio_proxy.V1.identity'async
      "identity"
      (fun x -> ignore_lost_connection emit x)
      (fun () -> ()) 
  )


let proc_size conf sess () emit =
  (** We forward this call to [Datanode_io] the first time the size is
      requested. The size does not change, so we can cache this value.
   *)
  let module C = Cached(Int64_var) in
  C.cached
    Dio_proxy.V1.size'async
    "size"
    (fun x -> ignore_lost_connection emit x)
    (fun () -> 
       ignore_lost_connection (Rpc_server.reply_error sess) Rpc.System_err)
  

let proc_blocksize conf sess () emit =
  emit conf#dn_blocksize


let proc_clustername conf sess () emit =
  emit conf#dn_clustername


let remember_shm dir shm_names =
  (** Remember this shm_name in the filesystem *)
  let f = open_out (dir ^ "/shm_name.new") in
  try
    List.iter
      (fun name ->
	 output_string f (name ^ "\n");
      )
      shm_names;
    close_out f;
    Unix.rename (dir ^ "/shm_name.new") (dir ^ "/shm_name")
  with
    | error ->
	close_out_noerr f;
	raise error


let remember_all_shm() =
  let cont = Netplex_cenv.self_cont() in
  let dir =
    cont # socket_service # socket_service_config # controller_config #
      socket_directory in
  let m = current_manager() in
  let names =
    m.shm_name ::
      (Hashtbl.fold (fun _ n acc -> n @ acc) m.user_shm []) in
  remember_shm dir names


let drop_old_shm dir =
  (** Look for a remembered old shm object and drop it *)
  if Sys.file_exists (dir ^ "/shm_name") then (
    let f = open_in (dir ^ "/shm_name") in
    try
      while true do
	let n = input_line f in
	try
	  Dn_shm.unlink n
	with
	  | error ->
	      Netlog.logf `Warning "Cannot unlink old shm %s: %s" 
		n (Netexn.to_string error)
      done
    with
      | End_of_file ->
	  close_in f
  )


let sess_is_local sess =
  try
    match Rpc_server.get_socket_name sess with
      | Unix.ADDR_INET(ip1,_) ->
	  ( match Rpc_server.get_peer_name sess with
	      | Unix.ADDR_INET(ip2,_) ->
		  ip1=ip2
	      | _ ->
		  assert false
	  )
      | Unix.ADDR_UNIX _ ->
	  true
  with
    | _ -> false


let proc_alloc_shm_if_local sess () emit =
  let m = current_manager() in
  let is_local = sess_is_local sess in
  if is_local then (
    try
      let (fd, name) = Plasma_util.get_shm_fd "plasma" 0 in
      Unix.close fd;
      let conn = Rpc_server.get_connection_id sess in
      let l =
	try Hashtbl.find m.user_shm conn
	with Not_found -> [] in
      Hashtbl.replace m.user_shm conn (name :: l);
      remember_all_shm();
      ignore_lost_connection emit (Some name)
    with
      | error ->
	  Netlog.logf `Warning "Cannot allocate shm: %s"
	    (Netexn.to_string error);
	  ignore_lost_connection emit None
  )
  else
    ignore_lost_connection emit None


let proc_udsocket_if_local sess () emit =
  let is_local = sess_is_local sess in
  if is_local then (
    let m = current_manager() in
    ignore_lost_connection emit m.udsocket
  )
  else
    ignore_lost_connection emit None



let on_tcp_close conn_id =
  let m = current_manager() in
  let l =
    try Hashtbl.find m.user_shm conn_id with Not_found -> [] in
  List.iter
    (fun n ->
       try
	 Netsys_posix.shm_unlink n
       with
	 | error ->
	     Netlog.logf `Warning "Cannot unlink shm %s: %s" 
	       n (Netexn.to_string error)
    )
    l;
  Hashtbl.remove m.user_shm conn_id;
  remember_all_shm()


let create_io_mset conf =
  (** We use here the [Rpc_proxy] abstraction for RPC clients with
      automatically managed connections. As the [Datanode_io] processes
      are running on the same system, connections are reliable, and
      we need not to configure this part of [Rpc_proxy]. We simply
      configure to open several connections up to a certain number.
      The RPC calls will use any of these connections.
   *)
  let esys =  (** use the container event system for these calls *)
    (Netplex_cenv.self_cont()) # event_system in
  let connector =
    match Netplex_cenv.lookup "Dn_io" "RPC" with
      | None ->
	  failwith("Socket not found for service Dn_io, protocol RPC")
      | Some path ->
	  Rpc_client.Unix path in
  let n = conf#dn_io_processes in
  let mclient_config =
    Rpc_proxy.ManagedClient.create_mclient_config
      ~programs:[ Rpcapi_clnt.Datanode_io.V1._program ]
      () in
  let mset_config =
    Rpc_proxy.ManagedSet.create_mset_config
      ~mclient_config
      ~policy:`Balance_load
      () in
  let mset =
    Rpc_proxy.ManagedSet.create_mset mset_config [| connector, n |] esys in
  mset


let proc_reset_all_safetrans conf sess () emit =
  let m = current_manager() in
  Hashtbl.clear m.safetrans;
  emit()
    
let proc_cancel_safetrans conf sess st_id_a emit =
  let m = current_manager() in
  Array.iter
    (fun st_id ->
       Hashtbl.remove m.safetrans st_id;
    )
    st_id_a;
  emit()

let proc_safetrans conf sess ticket_a emit =
  let m = current_manager() in
  Array.iter
    (fun t ->
       Hashtbl.replace m.safetrans t.A.st_id (t.A.st_tmo,t.A.st_secret)
    )
    ticket_a;
  emit ()

let maybe_timeout_safetrans m =
  (** Cancel safetrans that have timed out *)
  let t = Unix.gettimeofday() in
  let l =
    Hashtbl.fold
      (fun st_id (st_tmo,_) acc ->
	 if t > Int64.to_float st_tmo then st_id :: acc else acc
      )
      m.safetrans
      [] in
  List.iter
    (fun st_id ->
       Hashtbl.remove m.safetrans st_id
    )
    l


let mfac = Hashtbl.create 5

let () =
  Hashtbl.add mfac "writedata" Xdr_mstring.string_based_mstrings


let setup srv (cf,cfaddr,conf,shm_name,access) =
  Rpcapi_srv.Datanode.V1.bind_async
    ~proc_null:(fun _ _ emit -> emit ())
    ~proc_identity:(proc_identity conf)
    ~proc_size:(proc_size conf)
    ~proc_blocksize:(proc_blocksize conf)
    ~proc_clustername:(proc_clustername conf)
    ~proc_read:(proc_read conf)
    ~proc_write:(proc_write conf)
    ~proc_copy:(proc_copy conf)
    ~proc_zero:(proc_zero conf)
    ~proc_sync:(proc_sync conf)
    ~proc_alloc_shm_if_local
    ~proc_udsocket_if_local
    srv;
  Rpcapi_srv.Datanode_ctrl.V1.bind_async
    ~proc_null:(fun _ _ emit -> emit ())
    ~proc_reset_all_safetrans:(proc_reset_all_safetrans conf)
    ~proc_cancel_safetrans:(proc_cancel_safetrans conf)
    ~proc_safetrans:(proc_safetrans conf)
    srv;
  Rpc_server.set_mstring_factories srv mfac;
  Rpc_server.set_onclose_action srv on_tcp_close;
  Pfs_auth.configure_rpc_server srv access


let new_stats() =
  { cycle = 0;
    r_count = 0;
    r_lat = 0.0;
    r_peak = 0.0;
    w_count = 0;
    w_lat = 0.0;
    w_peak = 0.0;
    c_count = 0;
    c_lat = 0.0;
    c_peak = 0.0;
    t_t0 = 0.0;
    t_t0_r = 0.0;
    t_count = 0;
    t_lat = 0.0;
    t_peak = 0.0;
    q_peak = 0;
    req_count = Hashtbl.create 7;
    req_t0 = Unix.gettimeofday();
    bconn_acc = 0.0;
  }

let next_stats m =
  let s = m.stats in
  let t0 = Unix.gettimeofday() in
  s.cycle <- s.cycle + 1;
  s.r_count <- 0;
  s.r_lat <- 0.0;
  s.r_peak <- 0.0;
  s.w_count <- 0;
  s.w_lat <- 0.0;
  s.w_peak <- 0.0;
  s.c_count <- 0;
  s.c_lat <- 0.0;
  s.c_peak <- 0.0;
  (* s.t_t0 <- 0.0; *)
  s.t_t0_r <- t0;
  s.t_count <- 0;
  s.t_lat <- 0.0;
  s.t_peak <- 0.0;
  s.q_peak <- Queue.length m.wrequests;
  (* s.req_count <- Hashtbl.create 7; *)
  s.req_t0 <- t0;
  s.bconn_acc <- 0.0


let stats_period = 60.0

let print_stats m =
  let s = m.stats in
  let t0 = Unix.gettimeofday() in
  let l = Hashtbl.length s.req_count in
  s.bconn_acc <- s.bconn_acc +. (float l) *. (t0 -. s.req_t0);
  let msg =
    sprintf "cycle=%d \
             [reads: n=%d lat=%.1f avg=%.1f peak=%.1f] \
             [writes: n=%d lat=%.1f avg=%.1f peak=%.1f] \
             [copies: n=%d lat=%.1f avg=%.1f peak=%.1f] \
             [throttling: n=%d lat=%.1f avg=%.1f peak=%.1f] \
             [queue: peak_length=%d] \
             [parallel requests: avg=%.1f]"
      s.cycle
      s.r_count s.r_lat (s.r_lat /. float s.r_count) s.r_peak
      s.w_count s.w_lat (s.w_lat /. float s.w_count) s.w_peak
      s.c_count s.c_lat (s.c_lat /. float s.c_count) s.c_peak
      s.t_count s.t_lat (s.t_lat /. float s.t_count) s.t_peak
      s.q_peak
      (s.bconn_acc /. stats_period) in
  let cont =
    Netplex_cenv.self_cont() in
  cont # log_subch "statistics" `Info msg;
  next_stats m


let create_dn_discover_server conf container =
  (* This is a UDP server, and hence special *)
  let udp_port_opt = ref None in
  List.iter
    (fun p ->
       Array.iter
         (fun a -> 
            match a with
              | `Socket(Unix.ADDR_INET(h,p)) ->
                  udp_port_opt := Some (h,p)
              | _ -> ()
         )
         p # addresses
    )
    container # socket_service # socket_service_config # protocols;
  match !udp_port_opt with
    | Some(h,p) ->
        let esys =  container # event_system in
        let socket_config =
          ( object(self)
              inherit Rpc_server.default_socket_config
              method listen_options =
                { Uq_engines.default_listen_options with
                    Uq_engines.lstn_reuseaddr = true
                }
            end
          ) in
        let rpc = 
          Rpc_server.create2 (`Socket(Rpc.Udp, 
                                      Rpc_server.Internet(h,p),
                                      socket_config)) esys in
	Rpcapi_srv.Datanode_discover.V1.bind_async
	  ~proc_null:(fun _ _ emit -> emit ())
	  ~proc_identity:(proc_identity_discover conf)
	  rpc;
	Some rpc
	  
    | None -> None



let factory() =
  Rpc_netplex.rpc_factory
    ~configure:(fun cf cfaddr -> 
		  let conf = Dn_config.extract_dn_config cf in
		  (** Create the shm object - optionally, drop the old
                      object. Shm objects have kernel persistence, so they
                      are not automatically dropped when the process
                      terminates. We drop explicitly at [pre_finish_hook]
                      time, but for handling abnormally terminated processes
                      we also remember the name of the shm object and
                      drop it at next startup time. This is done at
                      [pre_start_hook] time below.
		   *)
		  let shm_name = 
		    Dn_shm.create (conf :> Dn_shm.dn_shm_config) in
		  let access =
		    Pfs_auth.extract_access_config cf cf#root_addr in
		  (cf, cfaddr, conf, shm_name, access)
	       )
    ~hooks:(fun (cf, cfaddr, conf, shm_name, access) ->
	      ( object
		  inherit Netplex_kit.empty_processor_hooks()

		  method post_add_hook _ ctrl =
		    ctrl # add_plugin Netplex_sharedvar.plugin;
		    (** Also enable the [Dn_io] service: *)
		    Dn_io.add_service conf cf cfaddr ctrl

		  method pre_start_hook _ ctrl _ =
		    drop_old_shm 
		      ctrl#controller_config#socket_directory;
		    remember_shm 
		      ctrl#controller_config#socket_directory 
		      [shm_name]

		  method post_start_hook cont =
		    let store =
		      Dn_store.open_store (conf :> Dn_store.dn_store_config) in
		    let shm =
		      Dn_shm.openshm
			(conf :> Dn_shm.dn_shm_config) 
			shm_name in
		    let udsocket =
		      Netplex_cenv.lookup "Dn_manager" "RPC" in
		    (* Start the special server for [Dn_discover]: *)
		    let dsrv = create_dn_discover_server conf cont in

		    let m = 
		      { shm = shm;
			shm_name = shm_name;
			slot_used = Array.make conf#dn_shm_queue_length false;
			wrequests = Queue.create();
			drequests = Queue.create();
			io_mset = create_io_mset conf;
			safetrans = Hashtbl.create 17;
			last_sync = 0L;
			syncing = false;
			slow_syncing = false;
			wsync = Queue.create();
			user_shm = Hashtbl.create 5;
			udsocket = udsocket;
			store = store;
			discover_server = dsrv;
			stats = new_stats()
		      } in
		    Manager_var.set "current_manager" m;

		    (** Publish the name of the shm object: *)
		    ignore(Netplex_sharedvar.create_var
			     "plasma.datanode.shm.name");
		    ignore(Netplex_sharedvar.set_value
			     "plasma.datanode.shm.name" shm_name);

		    (** Establish the timer calling [sync] periodically: *)
		    let _tm =
		      Netplex_cenv.create_timer
			(fun tm -> sync_cycle m; true)
			conf#dn_sync_period in

		    (** Establish a timer for [maybe_timeout_safetrans] *)
		    let _tm =
		      Netplex_cenv.create_timer
			(fun tm -> maybe_timeout_safetrans m; true)
			60.0 in

		    (** Establish a timer for printing statistics *)
		    let _tm =
		      Netplex_cenv.create_timer
			(fun tm -> print_stats m; true)
			stats_period in


		    ();

		  method system_shutdown() =
		    let m = current_manager() in
		    ( match m.discover_server with
			| None -> ()
			| Some rpc -> 
			    Netlog.logf `Info "Stopping discovery server";
			    Rpc_server.stop_server rpc
		    )


		  method pre_finish_hook cont =
		    ( try
			Dn_shm.unlink shm_name
		      with
			| error ->
			    Netlog.logf `Err
			      "Cannot unlink shm object: %s"
			      (Netexn.to_string error)
		    );
		    let m = current_manager() in
		    Dn_store.close_store m.store;
		      
		end
	      )
	   )
    ~name:"dn_manager"
    ~setup
    ()

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml