Plasma GitLab Archive
Projects Blog Knowledge

(* $Id: rpc_client.ml 1864 2013-07-21 11:49:11Z gerd $
 * ----------------------------------------------------------------------
 *
 *)


open Rtypes
open Xdr
open Rpc
open Rpc_common
open Rpc_packer
open Unixqueue
open Printf

exception Message_not_processable


exception Message_lost
exception Message_timeout
exception Response_dropped
exception Communication_error of exn
exception Client_is_down
exception Keep_call
exception Unbound_exception of exn

module type USE_CLIENT = sig
  type t
  val use : t -> Rpc_program.t -> unit
  val unbound_sync_call : 
        t -> Rpc_program.t -> string -> xdr_value -> xdr_value
  val unbound_async_call :
        t -> Rpc_program.t -> string -> xdr_value -> 
        ((unit -> xdr_value) -> unit) -> unit
end

let () =
  Netexn.register_printer
    (Communication_error Not_found)
    (fun e ->
       match e with
	 | Communication_error e' ->
	     "Rpc_client.Communication_error(" ^ Netexn.to_string e' ^ ")"
	 | _ -> assert false
    );
  Netexn.register_printer
    (Unbound_exception Not_found)
    (fun e ->
       match e with
	 | Unbound_exception e' ->
	     "Rpc_client.Unbound_exception(" ^ Netexn.to_string e' ^ ")"
	 | _ -> assert false
    )



module SessionUint4 = struct
  type t = uint4
  let compare = (Pervasives.compare : uint4 -> uint4 -> int)
end

module SessionMap =
  Map.Make(SessionUint4)


type call_state =
  | Delayed    (* call waits for authentication protocol *)
  | Waiting    (* call has not yet been sent *)
  | Pending    (* call has been sent, still no answer *)
  | Done       (* got answer for the call *)

(* Normally, the state of the call is changed from Waiting to Pending to
 * Done.
 * In the case of a retransmission, the call is added to the waiting calls
 * again but its state remains 'Pending' (because the call is still member
 * of the set of pending calls).
 *)


(* The following class types are only preliminary definitions. The type
 * parameter 't is later instantiated with the type of the clients, t.
 *)

type reject_code =
    [ `Fail | `Retry | `Renew | `Next ]

class type ['t] pre_auth_session =
object
  method next_credentials :
     't -> Rpc_program.t -> string -> uint4 ->
     (string * string * string * string * Xdr.encoder option * 
	Xdr.decoder option)
  method server_rejects : 't -> uint4 -> server_error -> reject_code
  method server_accepts : 't -> uint4 -> string -> string -> unit
(*
  method drop_xid : 't -> uint4 -> unit
  method drop_client : 't -> unit
 *)
  method auth_protocol : 't pre_auth_protocol
end


and ['t] pre_auth_protocol =
object
  method state :
         [ `Emit | `Receive of uint4 | `Done of 't pre_auth_session | `Error]
  method emit : uint4 -> uint4 -> uint4 -> Rpc_packer.packed_value
  method receive : Rpc_packer.packed_value -> unit
  method auth_method : 't pre_auth_method
end


and ['t] pre_auth_method =
object
  method name : string
  method new_session : 't -> string option -> 't pre_auth_protocol
end


type regular_call =
    { mutable prog : Rpc_program.t;
      mutable proc : string;
      mutable param : xdr_value;       (* the argument of the call *)
      mutable get_result : (unit -> xdr_value) -> unit;
      mutable decoder : Xdr.decoder option;
      mutable call_user_name : string option;
    }

type call_detail =
    [ `Regular of regular_call
    | `Auth_proto of Rpc_program.t
    ]

type call =
      { mutable detail : call_detail;
	mutable state : call_state;
	mutable retrans_count : int;        (* retransmission counter *)
	mutable xid : uint4;
	mutable destination : Unix.sockaddr option;

	mutable call_timeout : float;
	mutable timeout_group : group option;
	  (* If a timeout handler has been set, this is the corresponding group *)

	mutable call_auth_proto : t pre_auth_protocol;
	  (* calls store the authentication protocol *)

	mutable batch_flag : bool;
	
	mutable request : packed_value option;
	(* The request while the call is waiting *)
      }

and t =
      { mutable ready : bool;
	mutable nolog : bool;

        mutable trans : Rpc_transport.rpc_multiplex_controller option;
	mutable progs :  Rpc_program.t list;
	mutable prot :  protocol;
        mutable esys :  event_system;
	
	mutable est_engine : Rpc_transport.rpc_multiplex_controller Uq_engines.engine option;
	mutable shutdown_connector : t -> Rpc_transport.rpc_multiplex_controller -> (unit->unit) -> unit;

	mutable delayed_calls : (t pre_auth_protocol,call Queue.t) Hashtbl.t;
	  (* delayed: the request cannot be sent - because the authentication
	     protocol is not yet done
	   *)
	mutable waiting_calls : call Queue.t;
	  (* waiting: the request can be sent when the connections allows it *)
	mutable pending_calls : call SessionMap.t;
	  (* pending: the request is sent; waiting for the reply *)

	mutable next_xid : uint4;
	mutable used_xids : unit SessionMap.t;
	mutable last_replier : Unix.sockaddr option;
	mutable last_xid : uint4 option;

	(* configs: *)
	mutable timeout : float;
        mutable max_retransmissions : int;
	mutable next_timeout : float;
	mutable next_max_retransmissions : int;
	mutable next_destination : Unix.sockaddr option;
	mutable next_batch_flag : bool;
	mutable max_resp_length : int option;
	mutable user_name : string option;
	mutable mstring_factories : Xdr_mstring.named_mstring_factories;

	(* authentication: *)
	mutable all_auth_methods : t pre_auth_method list;
	mutable auth_methods : t pre_auth_method list;     
	   (* remaining methods to try *)
	mutable auth_current : (string option,t pre_auth_protocol) Hashtbl.t;
	   (* The protocol used for this user *)

	mutable exception_handler : exn -> unit;
      }

and connector =
    Inet of (string * int)                        (* Hostname, port *)
  | Internet of (Unix.inet_addr * int)
  | Unix of string                                (* path to unix dom sock *)
  | W32_pipe of string
  | Descriptor of Unix.file_descr
  | Dynamic_descriptor of (unit -> Unix.file_descr)
  | Portmapped of string

class type auth_session = [t] pre_auth_session
class type auth_method = [t] pre_auth_method
class type auth_protocol = [t] pre_auth_protocol


let auth_none_session p : auth_session =
object
  method next_credentials _ _ _ _ =
    ("AUTH_NONE", "", "AUTH_NONE", "", None, None)
  method server_rejects _ _ _ = `Next
  method server_accepts _ _ _ _ = ()
  method auth_protocol = p
end


let auth_none_proto m : auth_protocol =
  let session = ref None in
object(self)
  initializer
    session := Some(auth_none_session self)
  method state =
    match !session with
      | None -> assert false
      | Some s -> `Done s
  method emit _ _ _ = assert false
  method receive _ = assert false
  method auth_method = m
end

let auth_none =
object(self)
  method name = "AUTH_NONE"
  method new_session _ _ =
    auth_none_proto self
end

module Debug = struct
  let enable = ref false
  let enable_ptrace = ref false
  let ptrace_verbosity = ref `Name_abbrev_args
  let disable_for_client c = c.nolog <- true
end

let dlog0 = Netlog.Debug.mk_dlog "Rpc_client" Debug.enable
let dlogr0 = Netlog.Debug.mk_dlogr "Rpc_client" Debug.enable

let dlog cl msg =
  if not cl.nolog then dlog0 msg

let dlogr cl getmsg =
  if not cl.nolog then dlogr0 getmsg

let dlog0_ptrace = Netlog.Debug.mk_dlog "Rpc_client.Ptrace" Debug.enable_ptrace
let dlogr0_ptrace = Netlog.Debug.mk_dlogr "Rpc_client.Ptrace" Debug.enable_ptrace

let dlog_ptrace cl msg =
  if not cl.nolog then dlog0_ptrace msg

let dlogr_ptrace cl getmsg =
  if not cl.nolog then dlogr0_ptrace getmsg


let () =
  Netlog.Debug.register_module "Rpc_client" Debug.enable;
  Netlog.Debug.register_module "Rpc_client.Ptrace" Debug.enable_ptrace


let connector_of_sockaddr =
  function
    | Unix.ADDR_INET(ip,p) ->
	Internet(ip,p)
    | Unix.ADDR_UNIX s ->
	Unix s


let connector_of_socksymbol =
  function
    | `Inet(ip,p) -> Internet(ip,p)
    | `Inet_byname(n,p) -> Inet(n,p)
    | `Unix p -> Unix p


  (*****)

let set_auth_methods cl list =
  match list with
    | _ :: _ ->
	Hashtbl.clear cl.auth_current;
	cl.auth_methods <- list;
	cl.all_auth_methods <- list;
    | [] ->
	invalid_arg "Rpc_client.set_auth_methods"

  (*****)

let stop_retransmission_timer cl call =
  match call.timeout_group with
    | None -> ()
    | Some g ->
	Unixqueue.clear cl.esys g

  (*****)

let pass_result cl call f =
  (* for regular calls only! *)

  (* Stop the timer, if any : *)
  stop_retransmission_timer cl call;

  (* Change the state of the call to 'Done': *)
  call.state <- Done;

  (* pass 'f' to the call back function: *)
  try
    dlog cl "Calling back";
    ( match call.detail with
	| `Regular rc ->
	    rc.get_result f;
	| _ ->
	    assert false
    );
    dlog cl "Returned from callback";
  with
    | Keep_call as x ->
	dlog cl "Keep_call";
	raise x
    | Unbound_exception x ->
	dlog cl "Unbound_exception";
	raise x
    | any ->
	begin  (* pass the exception to the exception handler: *)
	  dlogr cl (fun () -> 
		      "Exception from callback: " ^ Netexn.to_string any);
	  cl.exception_handler any
	end


let pass_exception ?(skip_auth=false) cl call x =
  (* Caution! This function does not remove [call] from the set of pending
   * calls.
   * 
   * For authproto messages, the exception is passed to the connected calls
   * instead. Option skip_auth: no notification for authproto messages.
   *)
  if call.state <> Done then (  (* Don't call back twice *)
    try
      dlogr cl
	(fun () ->
	   let sx = Netexn.to_string x in
	   "Passing exception " ^ sx);
      ( match call.detail with
	  | `Regular _ ->
	       pass_result cl call (fun () -> raise x)
	  | `Auth_proto prog ->
	       stop_retransmission_timer cl call;
	       call.state <- Done;
	       if not skip_auth then (
		 let q =
		   try Hashtbl.find cl.delayed_calls call.call_auth_proto
		   with Not_found -> Queue.create() in
		 Hashtbl.remove cl.delayed_calls call.call_auth_proto;
		 Queue.iter
		   (fun d_call ->
  		      pass_result cl d_call (fun () -> raise x)
		   )
		   q
	       )
      )
    with
      | Keep_call -> ()          (* ignore *)
  )

let pass_exception_to_all cl x =
  (* Caution! This function does not erase the set of pending calls.  *)
  dlog cl "Passing exception to all";

  let ht = Hashtbl.create 17 in
  let fn_list = ref [] in
  let add_fn xid call =
    if not (Hashtbl.mem ht xid) then (
      Hashtbl.add ht xid ();
      fn_list := call :: !fn_list
    )
  in

  SessionMap.iter (fun xid call -> add_fn xid call)  cl.pending_calls;
  Queue.iter      (fun call -> add_fn call.xid call) cl.waiting_calls;
  Hashtbl.iter
    (fun auth_proto q ->
       Queue.iter
	 (fun call -> add_fn call.xid call)
	 q
    )
    cl.delayed_calls;

  (* We already included delayed_calls, hence set skip_auth here *)
  List.iter (fun call -> pass_exception ~skip_auth:true cl call x) !fn_list

  (*****)

let close ?error ?(ondown=fun()->()) cl =
  if cl.ready then (
    dlog cl "Closing";
    cl.ready <- false;
    ( match error with
	| None -> pass_exception_to_all cl Message_lost
	| Some e -> pass_exception_to_all cl e
    );
    cl.pending_calls <- SessionMap.empty;
    cl.used_xids <- SessionMap.empty;
    Queue.clear cl.waiting_calls;
    Hashtbl.clear cl.delayed_calls;
    match cl.trans with
      | None -> 
	  ondown()
      | Some trans ->
	  cl.trans <- None;
	  cl.shutdown_connector cl trans ondown
  )
  else
    ondown()
;;

  (*****)

let check_for_input =  (* "forward declaration" *)
  ref (fun _ -> ());;

let check_for_output = (* "forward declaration" *)
  ref (fun _ -> ());;

  (*****)

let find_or_make_auth_protocol cl user_opt =
  try
    Hashtbl.find cl.auth_current user_opt
  with
    | Not_found ->
	let current_auth_method = 
	  match cl.auth_methods with
	    | hd :: _ -> hd
	    | [] -> auth_none in
	let p = 
	  current_auth_method # new_session cl user_opt in
	Hashtbl.add cl.auth_current user_opt p;
	p
;;

  (*****)

let rec next_xid cl =
  let xid = cl.next_xid in
  (* xid is uint4, so we increment as int64: *)
  let xid64 = Rtypes.int64_of_uint4 xid in
  let xid64' = Int64.logand (Int64.succ xid64) 0xffff_ffff_L in
  cl.next_xid <- Rtypes.uint4_of_int64 xid64';
  if SessionMap.mem xid cl.used_xids then
    next_xid cl
  else
    xid

  (*****)

let remove_pending_call cl call =
  cl.pending_calls <- SessionMap.remove call.xid cl.pending_calls;
  cl.used_xids <- SessionMap.remove call.xid cl.used_xids;
  stop_retransmission_timer cl call
;;


let abandon_call cl xid =
  try
    let call = SessionMap.find xid cl.pending_calls in
    remove_pending_call cl call
  with
    | Not_found ->
	let q = Queue.create() in
	Queue.transfer cl.waiting_calls q;
	while not(Queue.is_empty q) do
	  let call = Queue.take q in
	  if call.xid <> xid then Queue.add call cl.waiting_calls
	done;
	cl.used_xids <- SessionMap.remove xid cl.used_xids


  (*****)

let continue_call cl call =
  (* Called when the authentication protocol finishes, and [call] can now
     be added to the [waiting] queue.

     Also called for retransmitted regular requests.
   *)
  let authsess =
    match call.call_auth_proto # state with
      | `Done authsess -> authsess
      | _ -> assert false in
  let rc =
    match call.detail with
      | `Regular rc -> rc
      | _ -> assert false in
  
  call.state <- Waiting;

  let (cred_flav, cred_data, verf_flav, verf_data, enc_opt, dec_opt) =
    authsess # next_credentials 
      cl rc.prog rc.proc call.xid in
  rc.decoder <- dec_opt;
  let request =
    Rpc_packer.pack_call
      ?encoder:enc_opt
      rc.prog
      call.xid
      rc.proc
      cred_flav cred_data verf_flav verf_data
      rc.param in
  call.request <- Some request;

  Queue.add call cl.waiting_calls


(* FIXME: Retransmissions are not yet perfect. We allow here that calls time
   out while the request is still in the Waiting queue (especially TCP
   connect timeouts). If we retransmit with a new encryption key, we will
   send the same request twice ( and severs might not like this).

   Better: do not modify [call] in place, but create a copy.

   Also, it may happen that we get a response to attempt #1 while we already
   dropped it and pushed attempt #2 to Waiting. Now, the response #1 may
   turn out to use a different encryption key, and we cannot decode it.

   Better: the pending table should map xid to a list of possible calls.
   (Still to check how we find then the right one...)
 *)

let retransmit cl call =
  if call.state = Pending || call.state = Waiting then begin
    if call.retrans_count > 0 then begin
      dlog cl "Retransmitting";
      let old_state = call.state in
      (* Make the 'call' waiting again *)
      ( match call.detail with
	  | `Regular _ -> continue_call cl call
	  | `Auth_proto _ -> Queue.add call cl.waiting_calls
      );
      cl.used_xids <- SessionMap.add call.xid () cl.used_xids;
      (* Decrease the retransmission counter *)
      call.retrans_count <- call.retrans_count - 1;
      (* Ensure the call keeps its state: *)
      call.state <- old_state;
      (* Check state of reources: *)
      !check_for_output cl
      (* Note: The [call] remains in state [Pending] (if it is already). 
       * This prevents the [call]
       * from being added to [cl.pending_calls] again.
       *
       * If the [call] is encrypted, it is possible that the client only
       * accepts the response to the second trial, and rejects a late
       * response for the first request, because the encryption code
       * may have changed in the meantime.
       *)
    end
    else begin
      (* still no answer after maximum number of retransmissions *)
      dlog cl "Call timed out!";
      remove_pending_call cl call;
      (* Note that we do not remove the call from waiting_calls for
         performance reasons. We simply skip it there if we find it.
         pass_exception will set call.state to Done.
       *)
      pass_exception cl call Message_timeout;
      (* FIXME: do something with the delayed calls if [call] is 
	 an authproto message
       *)
      (* If we still try to connect the TCP socket, shut the client
         completely down:
       *)
      ( match cl.est_engine with
	  | None -> ()
	  | Some e -> 
	      e#abort();
	      close cl;
      );
      (* Check state of reources: *)
      !check_for_output cl
    end
  end


  (*****)

(* Note: For asynchronous authentication, it would be sufficient that
 * add_call (and add_call_again) are rewritten such that they first
 * schedule the authentication request, and when the request is replied,
 * the call is scheduled.
 *)

let set_timeout cl call =
  if call.call_timeout > 0.0 && call.timeout_group = None then (
    (* Note: Case call_timeout = 0.0 is handled elsewhere *)
    (* CHECK: What happens when the timeout comes before the message
     * is fully written? (Low priority because for stream connections
     * a timeout is usually not set.)
     *)
    let g = new_group cl.esys in
    Unixqueue.once cl.esys g call.call_timeout
      (fun () ->
	 call.timeout_group <- None;
	 dlog cl "Timeout handler";
	 retransmit cl call;
	 (* Maybe we have to cancel reading: *)
	 !check_for_input cl
      );
    call.timeout_group <- Some g
  )


let auth_proto_emit cl prog authproto =
  (* Emit an authproto message *)
  let xid = next_xid cl in
  let prog_nr = Rpc_program.program_number prog in
  let vers_nr = Rpc_program.version_number prog in

  let request = authproto # emit xid prog_nr vers_nr in

  (* THINK: maybe we want to set the call_timeout and the retrans_count
     separately for authproto messages (instead of just taking over the
     values for the regular call)
   *)
  let call =
    { detail = `Auth_proto prog;
      state = Waiting;
      retrans_count = cl.next_max_retransmissions;
      xid = xid;
      destination = cl.next_destination;
      call_timeout = cl.next_timeout;
      timeout_group = None;
      call_auth_proto = authproto;
      batch_flag = false;
      request = Some request;
    } in
  
  Queue.add call cl.waiting_calls;
  cl.used_xids <- SessionMap.add call.xid () cl.used_xids;
  
  (* For TCP and timeout > 0.0 set the timeout handler immediately, so the
     timeout includes connecting
   *)
  if cl.prot = Rpc.Tcp && call.call_timeout > 0.0
  then
    set_timeout cl call


let unbound_async_call_r cl prog procname param receiver authsess_opt =
  (* authsess_opt: if passed, this session is used *)
  if not cl.ready then
    raise Client_is_down;
  if cl.progs <> [] then (
    let prog_id = Rpc_program.id prog in
    if not (List.exists (fun p -> Rpc_program.id p = prog_id) cl.progs) then
      failwith "Rpc_client.unbound_async_call: \
                This client is not bound to the requested program"
  );

  let (_, _, out_type) =
    try Rpc_program.signature prog procname 
    with Not_found ->
      failwith ("Rpc_client.unbound_async_call: No such procedure: " ^
		  procname) in

  if cl.next_batch_flag && Xdr.xdr_type_term out_type <> X_void then
    failwith ("Rpc_client.unbound_async_call: Cannot call in batch mode: " ^
		procname);

  let rc =
    { prog = prog;
      proc = procname;
      param = param;
      get_result = receiver;
      decoder = None;
      call_user_name = cl.user_name;
    } in

  let eff_authsess_opt, eff_authproto =
    match authsess_opt with
      | None ->
	  let authproto = find_or_make_auth_protocol cl cl.user_name in
	  ( match authproto#state with
	      | `Done authsess ->
		  (Some authsess, authproto)
	      | _ ->
		  (None, authproto)
	  )
      | Some authsess ->
	  (Some authsess, authsess#auth_protocol) in

  let new_call =
    match eff_authsess_opt with
      | Some authsess ->
	  let xid = next_xid cl in
	  let (cred_flav, cred_data, verf_flav, verf_data, enc_opt, dec_opt) =
	    authsess # next_credentials 
	      cl prog procname xid in
	  rc.decoder <- dec_opt;
	  let request =
	    Rpc_packer.pack_call
	      ?encoder:enc_opt
	      prog
	      xid
	      procname
	      cred_flav cred_data verf_flav verf_data
	      param in
	  let call =
	    { detail = `Regular rc;
	      state = Waiting;
	      retrans_count = cl.next_max_retransmissions;
	      xid = xid;
	      destination = cl.next_destination;
	      call_timeout = cl.next_timeout;
	      timeout_group = None;
	      call_auth_proto = eff_authproto;
	      batch_flag = cl.next_batch_flag;
	      request = Some request;
	    } in
	  Queue.add call cl.waiting_calls;
	  call
	  
      | None ->
	  dlog cl "starting authentication protocol";
	  ( match eff_authproto#state with
	      | `Done _ -> 
		  assert false

	      | `Error ->
		  assert false

	      | ap_state ->
		  (* Authentication not yet ready. *)
		  if ap_state = `Emit then (
		    dlog cl "emitting new authentication token";
		    auth_proto_emit cl rc.prog eff_authproto;
		  );

		  let xid = next_xid cl in
		  let call =
		    { detail = `Regular rc;
		      state = Delayed;
		      retrans_count = cl.next_max_retransmissions;
		      xid = xid;
		      destination = cl.next_destination;
		      call_timeout = cl.next_timeout;
		      timeout_group = None;
		      call_auth_proto = eff_authproto;
		      batch_flag = cl.next_batch_flag;
		      request = None;
		    } in
		  let q =
		    try Hashtbl.find cl.delayed_calls eff_authproto
		    with Not_found -> Queue.create() in
		  Queue.add call q;
		  Hashtbl.replace cl.delayed_calls eff_authproto q;
		  call
	  )
  in

  cl.last_xid <- Some new_call.xid;
  cl.used_xids <- SessionMap.add new_call.xid () cl.used_xids;
  cl.next_timeout <- cl.timeout;
  cl.next_max_retransmissions <- cl.max_retransmissions;
  cl.next_batch_flag <- false;
  (* We keep next_destination, as required by the API. *)

  (* For TCP and timeout > 0.0 set the timeout handler immediately, so the
     timeout includes connecting
   *)
  if cl.prot = Rpc.Tcp && new_call.state = Waiting && 
     new_call.call_timeout > 0.0
  then
    set_timeout cl new_call;

  !check_for_output cl;

  new_call


let unbound_async_call cl prog procname param receiver =
  ignore(unbound_async_call_r cl prog procname param receiver None)


  (*****)


type 'a threeway =
  | Value of 'a
  | Novalue
  | Error of exn


let call_auth_session call =
  match call.call_auth_proto # state with
    | `Done s -> s
    | _ -> assert false  


let string_of_reject_code =
  function
    | `Fail -> "Fail"
    | `Retry -> "Retry"
    | `Renew -> "Renew"
    | `Next -> "Next"


let process_regular_incoming_message cl message peer sock call rc =
  (* Exceptions in the following block are forwarded to the callback
   * function
   *)
  let auth_sess = call_auth_session call in
  dlogr cl
    (fun () ->
       sprintf "process_regular_incoming_message auth_meth=%s have_decoder=%B"
	 auth_sess#auth_protocol#auth_method#name
	 (rc.decoder <> None)
    );
  let result_opt =
    try
      ( match Rpc_packer.peek_auth_error message with
	  | None ->
	      let (xid,verf_flavour,verf_data,response) =
		Rpc_packer.unpack_reply 
		  ~mstring_factories:cl.mstring_factories
		  ?decoder:rc.decoder
		  rc.prog rc.proc message
		  (* may raise an exception *)
              in
	      auth_sess # server_accepts cl call.xid verf_flavour verf_data;
	      Value response

	  | Some auth_problem ->
	      let code = 
		auth_sess # server_rejects cl call.xid auth_problem in
	      dlogr_ptrace cl
		(fun () ->
		   sprintf
		     "RPC <-- (sock=%s,peer=%s,xid=%Ld) Auth error %s - reaction: %s"
		     (Rpc_transport.string_of_sockaddr sock)
		     (Rpc_transport.string_of_sockaddr peer)
		     (Rtypes.int64_of_uint4 call.xid)
		     (Rpc.string_of_server_error auth_problem)
		     (string_of_reject_code code)
		);
	      remove_pending_call cl call;
	      ( match code with
		  | `Fail ->
		      Error(Rpc.Rpc_server auth_problem)
		  | `Retry ->
		      ignore(
			unbound_async_call_r 
			  cl rc.prog rc.proc rc.param rc.get_result
		          (Some auth_sess));
		      Novalue
		  | `Renew ->
		      (* FIXME: When we send several requests in sequence,
			 we may get here several `Renew codes. That should
			 be merged to a single renewal.
		       *)
		      Hashtbl.remove cl.auth_current rc.call_user_name;
		      unbound_async_call 
			cl rc.prog rc.proc rc.param rc.get_result;
		      Novalue
		  | `Next ->
		      let m = call.call_auth_proto # auth_method in
		      ( match cl.auth_methods with
			  | m0 :: _ :: _ when m0 = m ->
			      cl.auth_methods <- List.tl cl.auth_methods;
			      Hashtbl.remove 
				cl.auth_current rc.call_user_name;
			      (* FIXME: see `Renew *)
			  | _ ->
			      raise(Rpc.Rpc_server Auth_too_weak)
		      );
		      unbound_async_call 
			cl rc.prog rc.proc rc.param rc.get_result;
		      Novalue
	      )
      )
    with
	error ->
	  (* The call_auth_session is simply dropped. *)
	  (* Forward the exception [error] to the caller: *)
	  remove_pending_call cl call;
	  Error error
  in

  match result_opt with
    | Novalue ->
	(* There is no result yet *)
	()

    | Value result ->
	
        (* pass result to the user *)
	
	( try
	    dlogr_ptrace cl
	      (fun () ->
		 sprintf
		   "RPC <-- (sock=%s,peer=%s,xid=%Ld) %s"
		   (Rpc_transport.string_of_sockaddr sock)
		   (Rpc_transport.string_of_sockaddr peer)
		   (Rtypes.int64_of_uint4 call.xid)
		   (Rpc_util.string_of_response
		      !Debug.ptrace_verbosity
		      rc.prog
		      rc.proc
		      result
		   )
	      );
	    let f = (fun () -> result) in
	    pass_result cl call f;      (* may raise Keep_call *)
	    (* Side effect: Changes the state of [call] to [Done] *)
	    remove_pending_call cl call;
	  with
	      Keep_call ->
		call.state <- Pending
	)

    | Error error ->
	( try
	    dlogr_ptrace cl
	      (fun () ->
		 sprintf
		   "RPC <-- (sock=%s,peer=%s,xid=%Ld) Error %s"
		   (Rpc_transport.string_of_sockaddr sock)
		   (Rpc_transport.string_of_sockaddr peer)
		   (Rtypes.int64_of_uint4 call.xid)
		   (Netexn.to_string error)
	      );
	    let f = (fun () -> raise error) in
	    pass_result cl call f;      (* may raise Keep_call *)
	    (* Side effect: Changes the state of [call] to [Done] *)
	    remove_pending_call cl call;
	  with
	      Keep_call ->
		call.state <- Pending
	)


let process_incoming_message cl message peer =

  let sock = 
    match cl.trans with
      | None -> `Implied
      | Some t -> t#getsockname in

  (* Got a 'message' for which the corresponding 'call' must be searched: *)

  let xid = Rpc_packer.peek_xid message in

  let call =
    try
      SessionMap.find xid cl.pending_calls
    with
	Not_found ->
	  (* Strange: Got a message with a session ID that is not pending.
	   * We assume that this is an answer of a very old message that
	   * has been completely timed out.
	   *)
	  raise Message_not_processable
  in
  assert(call.state = Pending);
  
  match call.detail with
    | `Auth_proto prog ->
	dlog cl "continuing authentication protocol";
	( match call.call_auth_proto#state with
	    | `Receive expected_xid when expected_xid = xid ->
		remove_pending_call cl call;

		let err_opt =
		  try
		    dlog cl "receiving authentication token";
		    call.call_auth_proto#receive message; None
		  with error -> Some error in

		let err_opt' =
		  match call.call_auth_proto#state with
		    | `Emit ->
			(* Emit the next message of the protocol: *)
			assert(err_opt = None);
			( try
			    auth_proto_emit cl prog call.call_auth_proto;
			    None
			  with
			    | error -> Some error
			)
		    | `Done authsess ->
			(* We are done - so activate all delayed messages *)
			assert(err_opt = None);
			dlog cl "authentication protocol is done";
			let q =
			  try Hashtbl.find cl.delayed_calls call.call_auth_proto
			  with Not_found -> Queue.create() in
			Hashtbl.remove cl.delayed_calls call.call_auth_proto;
			Queue.iter
			  (fun d_call ->
			     continue_call cl d_call
			  )
			  q;
			None
		    | `Receive _ ->
			assert false
		    | `Error ->
			assert(err_opt <> None);
			err_opt  in

		( match err_opt' with
		    | None ->
			!check_for_output cl
		    | Some err ->
			(* Failed authentication! *)
			let q =
			  try Hashtbl.find cl.delayed_calls call.call_auth_proto
			  with Not_found -> Queue.create() in
			Hashtbl.remove cl.delayed_calls call.call_auth_proto;
			Queue.iter
			  (fun d_call ->
			     pass_exception cl d_call err
			  )
			  q
		)

	    | _ ->
		(* Unexpected messages. We just drop these - assuming these
		   are duplicates
		 *)
		raise Message_not_processable
	)

    | `Regular rc ->
	process_regular_incoming_message cl message peer sock call rc


let drop_response cl message peer =
  let sock = 
    match cl.trans with
      | None -> `Implied
      | Some t -> t#getsockname in
  let xid = Rpc_packer.peek_xid message in
  let call =
    try
      SessionMap.find xid cl.pending_calls
    with
	Not_found ->
	  raise Message_not_processable in
  assert(call.state = Pending);

  dlogr_ptrace cl
    (fun () ->
       sprintf
	 "RPC <-- (sock=%s,peer=%s) Dropping response"
	 (Rpc_transport.string_of_sockaddr sock)
	 (Rpc_transport.string_of_sockaddr peer)
    );
  
  try
    let f = (fun () -> raise Response_dropped) in
    pass_result cl call f;      (* may raise Keep_call *)
    (* Side effect: Changes the state of [call] to [Done] *)
    remove_pending_call cl call;
  with
      Keep_call ->
	call.state <- Pending

  (*****)

let rec handle_incoming_message cl r =
  (* Called when a complete message has been read by the transporter *)
  match r with
    | `Error e ->
	close ~error:(Communication_error e) cl

    | `Ok(msg,addr) ->
	dlog cl "Message arrived";
	( try
	    ( match addr with
		| `Implied -> ()
		| `Sockaddr a ->
		    cl.last_replier <- Some a
	    );
	    ( match msg with
		| `Accept pv ->
		    process_incoming_message cl pv addr
		| `Reject pv ->
		    drop_response cl pv addr
		| _ ->
		    assert false
	    )
	  with
	      Message_not_processable ->
		dlog cl "message not processable";
		()
	);
	(next_incoming_message cl : unit)

    | `End_of_file ->
	dlog cl "End of file";
	close cl

and next_incoming_message cl =
  match cl.trans with
    | None -> ()
    | Some trans -> next_incoming_message' cl trans

and next_incoming_message' cl trans =
  trans # cancel_rd_polling();
  if cl.pending_calls <> SessionMap.empty && not trans#reading then (
    trans # start_reading
      ~before_record:(fun n _ ->
			match cl.max_resp_length with
			  | None -> `Accept
			  | Some m -> if n > m then `Reject else `Accept
		     )
      ~when_done:(fun r ->
		    handle_incoming_message cl r)
      ()
  )
  else
    dlog cl "Stopping reading";
;;


check_for_input := next_incoming_message;;


let rec handle_outgoing_message cl call r =
  (* Called after a complete message has been sent by the transporter *)
  match r with
    | `Error e ->
	close ~error:(Communication_error e) cl

    | `Ok () ->
	dlog cl "message writing finished";
	if call.batch_flag || call.call_timeout = 0.0 then (
	  try
	    if call.batch_flag then
	      pass_result cl call (fun () -> XV_void) (* may raise Keep_call *)
	    else
	      pass_exception cl call Message_timeout;
	    remove_pending_call cl call;
	  with Keep_call ->  (* no removal *)
	    ()
	);
	!check_for_input cl;
	next_outgoing_message cl

and next_outgoing_message cl =
  match cl.trans with
    | None -> ()   (* Not yet initialized *)
    | Some trans -> 
	if not trans#writing then
	  next_outgoing_message' cl trans

and next_outgoing_message' cl trans =
  let call_opt = 
    try Some(Queue.take cl.waiting_calls) with Queue.Empty -> None in

  match call_opt with
    | Some call ->
	(* If the call is already 'Done', skip it. *)
  	(* Change the state of the call. It is now 'pending': *)

	if call.state = Done then (
	  (* That can happen for calls that timeout before they are sent *)
	  dlog cl "found call that has been done";
	  next_outgoing_message cl
	)
	else (
	  let dest =
	    match call.destination with
	      | Some d -> `Sockaddr d
	      | None -> trans#getpeername in
	  ( match call.state with
	      | Done -> assert false
	      | Delayed -> assert false
	      | Waiting ->
		  cl.pending_calls <-
		    SessionMap.add call.xid call cl.pending_calls;
		  (* The xid is already in used_xids *)
		  call.state <- Pending;
		  dlogr_ptrace cl
		    (fun () ->
		       sprintf
			 "RPC --> (sock=%s,peer=%s,xid=%Ld) %s"
			 (Rpc_transport.string_of_sockaddr trans#getsockname)
			 (Rpc_transport.string_of_sockaddr dest)
			 (Rtypes.int64_of_uint4 call.xid)
			 (match call.detail with
			    | `Regular rc ->
				Rpc_util.string_of_request
				  !Debug.ptrace_verbosity
				  rc.prog
				  rc.proc
				  rc.param
			    | `Auth_proto _ ->
				"<authprot>"
			 )
		    )
	      | Pending ->
		  ()
		    (* The call is already member of [pending_calls]
		     * (retransmitted)
		     *)
	  );

	  (* If there should be a timeout handler, add it: *)
	  set_timeout cl call;

	  (* Send the message: *)
	  let m =
	    match call.request with
	      | None -> assert false
	      | Some m -> m in

	  (* If this is a regular message, we can now drop the request buffer *)
	  ( match call.detail with
	      | `Regular _ -> call.request <- None
	      | _ -> ()
	  );

	  dlog cl "start_writing";
	  trans # start_writing
	    ~when_done:(fun r ->
			  handle_outgoing_message cl call r)
	    m
	    dest

	);

    | None ->
	()
;;


check_for_output := next_outgoing_message ;;


(* Shutdown:
 * We first try an orderly shutdown. If that does not work, just inactivate
 * the transport.
 *)


let shutdown_connector cl mplex ondown =
  dlog cl "shutdown_connector";
  mplex # abort_rw();
  ( try
      mplex # start_shutting_down
	~when_done:(fun exn_opt ->
		      ( match exn_opt with 
			  | `Ok _ -> ()
			  | `Error exn ->
			      dlogr cl
				(fun () -> 
				   sprintf "start_shutting_down: exception %s"
				     (Netexn.to_string exn))
		      );
		      mplex # inactivate();
		      ondown()
		   )
	()
    with
      | _ -> mplex # inactivate(); ondown()
  )


let mplex_of_fd ~close_inactive_descr prot fd esys =
  let preclose() =
    Netlog.Debug.release_fd fd in
  match prot with
    | Tcp ->
        Rpc_transport.stream_rpc_multiplex_controller
          ~close_inactive_descr ~preclose fd esys
    | Udp ->
        Rpc_transport.datagram_rpc_multiplex_controller
          ~close_inactive_descr ~preclose fd esys


class type socket_config =
object
  method non_blocking_connect : bool
  method multiplexing :
    close_inactive_descr:bool ->
    protocol -> Unix.file_descr -> Unixqueue.event_system ->
      Rpc_transport.rpc_multiplex_controller Uq_engines.engine
end


class default_socket_config : socket_config =
object
  method non_blocking_connect = true
  method multiplexing ~close_inactive_descr prot fd esys =
    let close() =
      if close_inactive_descr then (
	Netlog.Debug.release_fd fd;
	try
	  match Netsys_win32.lookup fd with
	    | Netsys_win32.W32_pipe ph ->
		Netsys_win32.pipe_shutdown ph;
		Unix.close fd
	    | _ -> 
		()
	with Not_found ->
	  Unix.close fd 
      ) in
    let eng = 
      try
	let mplex = mplex_of_fd ~close_inactive_descr prot fd esys in
	new Uq_engines.epsilon_engine (`Done mplex) esys 
      with
	| error -> 
	    new Uq_engines.epsilon_engine (`Error error) esys in
    Uq_engines.when_state
      ~is_aborted:(fun () -> close())
      ~is_error:(fun _ -> close())
      eng;
    eng
end


class blocking_socket_config : socket_config =
object
  inherit default_socket_config
  method non_blocking_connect = false
end

let default_socket_config = new default_socket_config
let blocking_socket_config = new blocking_socket_config

type mode2 =
    [ `Socket_endpoint of protocol * Unix.file_descr 
    | `Multiplexer_endpoint of Rpc_transport.rpc_multiplex_controller
    | `Socket of protocol * connector * socket_config
    ]


class unbound_async_call cl prog name v =
  let emit = ref (fun _ -> assert false) in
  let call = unbound_async_call_r cl prog name v (fun gr -> !emit gr) None in
object(self)
  inherit [ Xdr.xdr_value ] Uq_engines.engine_mixin (`Working 0) cl.esys

  initializer
    emit := (fun get_result -> 
	       try
		 let r = get_result() in
		 self # set_state (`Done r)
	       with
		 | err ->
		     self # set_state (`Error err)
	    )

  method event_system = cl.esys
  method abort() =
    match self#state with
      | `Working _ ->
	  if call.state <> Done then
	    remove_pending_call cl call;
	  call.state <- Done;
	  self # set_state `Aborted
      | _ -> ()
end


let string_of_file_descr fd =
  Int64.to_string (Netsys.int64_of_file_descr fd)

let rec internal_create initial_xid
                        shutdown
			prog_opt
                        mode esys =

  let id_s_0 =
    match mode with
      | `Socket_endpoint(_,fd) ->
	  "Socket_endpoint(_," ^ string_of_file_descr fd ^ ")"
      | `Multiplexer_endpoint ep ->
	  "Multiplexer_endpoint(" ^ 
	    string_of_int(Oo.id ep) ^ ")"
      | `Socket(_, conn, _) ->
	  let s_conn =
	    match conn with
	      | Inet(h,p) -> 
		  "inet/" ^ h ^ ":" ^ string_of_int p
	      | Internet(ip,p) -> 
		  "inet/" ^ Unix.string_of_inet_addr ip ^ ":" ^ string_of_int p
	      | Unix p ->
		  "unix/" ^ p
	      | W32_pipe p ->
		  "w32_pipe/" ^ p
	      | Descriptor fd ->
		  "fd/" ^ string_of_file_descr fd
	      | Dynamic_descriptor f ->
		  "dyn_fd"
	      | Portmapped h ->
		  "portmapped:" ^ h in
	  "Socket(_," ^ s_conn ^ ",_)" in
  let id_s =
    match prog_opt with
      | None -> id_s_0
      | Some prog ->
	  id_s_0 ^ " for program " ^ 
	    (Int32.to_string 
	       (Rtypes.logical_int32_of_uint4
		  (Rpc_program.program_number prog))) in

  let non_blocking_connect =
    match mode with
      | `Socket(_,_,conf) -> conf # non_blocking_connect
      | _ -> true in

  let cl =
    (* preliminary version of the client *)
    { ready = true;
      trans = None;
      progs = ( match prog_opt with None -> [] | Some p -> [p] );
      prot = Rpc.Udp;
      esys = esys;
      est_engine = None;
      shutdown_connector = shutdown;
      waiting_calls = Queue.create();
      pending_calls = SessionMap.empty;
      delayed_calls = Hashtbl.create 27;
      used_xids = SessionMap.empty;
      next_xid = initial_xid;
      next_destination = None;
      next_timeout = (-1.0);
      next_max_retransmissions = 0;
      next_batch_flag = false;
      max_resp_length = None;
      user_name = None;
      mstring_factories = Hashtbl.create 1;
      last_replier = None;
      last_xid = None;
      timeout = (-1.0);
      max_retransmissions = 0;
      exception_handler = (fun _ -> ());
      all_auth_methods = [ ];
      auth_methods = [ ];
      auth_current = Hashtbl.create 3;
      nolog = false;
    }
  in
  Hashtbl.add cl.mstring_factories "*" Xdr_mstring.string_based_mstrings;
  

  let portmapper_engine prot host prog esys = 
    (* Performs GETPORT for the program on [host]. We use 
     * Rpc_portmapper_aux but not Rpc_portmapper_clnt. The latter is
     * impossible because of a dependency cycle.
     *)
    dlog cl "starting portmapper query";
    let pm_port = Rtypes.int_of_uint4 Rpc_portmapper_aux.pmap_port in
    let pm_prog = Rpc_portmapper_aux.program_PMAP'V2 in
    let pm_client = 
      internal_create
	(Rtypes.uint4_of_int 0)
	shutdown_connector
	(Some pm_prog)
	(`Socket(Rpc.Udp, Inet(host, pm_port), default_socket_config))
	esys
    in
    let v =
      Rpc_portmapper_aux._of_PMAP'V2'pmapproc_getport'arg
	{ Rpc_portmapper_aux.prog = Rpc_program.program_number prog;
	  vers = Rpc_program.version_number prog;
	  prot = ( match prot with
		     | Tcp -> Rpc_portmapper_aux.ipproto_tcp
		     | Udp -> Rpc_portmapper_aux.ipproto_udp
		 );
	  port = Rtypes.uint4_of_int 0;
	} in
    let close_deferred() =
      Unixqueue.once esys (Unixqueue.new_group esys) 0.0 
	(fun() -> close pm_client) in
    new Uq_engines.map_engine
      ~map_done:(fun r ->
		   dlog cl "Portmapper GETPORT done";
		   let addr =
		     match pm_client.trans with
		       | None -> assert false
		       | Some trans ->
			   ( match trans # getpeername with
			       | `Implied -> assert false
			       | `Sockaddr a -> a
			   ) in
		   let port =
		     Rpc_portmapper_aux._to_PMAP'V2'pmapproc_getport'res r in
		   let port = Rtypes.int_of_uint4 port in
		   close_deferred();
		   if port = 0 then
		     `Error (Failure "Program not bound in Portmapper")
		   else
		     `Done(addr, port)
		)
      ~map_error:(fun err ->
		    dlog cl "Portmapper GETPORT error";
		    close_deferred();
		    `Error err)
      (new unbound_async_call pm_client pm_prog "PMAPPROC_GETPORT" v)
  in

  let connect_engine addr esys =
    match addr with
      | `Portmapped(prot,host) ->
	  ( match prog_opt with
	      | None ->
		  failwith 
		    "Rpc_client.unbound_create: Portmapped not supported"
	      | Some prog ->
		  new Uq_engines.seq_engine
		    (portmapper_engine prot host prog esys)
		    (fun (sockaddr, port) ->
		       let inetaddr =
			 match sockaddr with
			   | Unix.ADDR_INET(inet, _) -> inet
			   | _ -> assert false in
		       let stype = 
			 match prot with 
			   | Tcp -> Unix.SOCK_STREAM 
			   | Udp -> Unix.SOCK_DGRAM in
		       let addr = `Sock_inet(stype, inetaddr, port) in
		       let opts = Uq_engines.default_connect_options in
		       Uq_engines.connector (`Socket(addr,opts)) esys
		    )
	  )

      | #Uq_engines.connect_address as addr ->
	  Uq_engines.connector addr esys in

  let track fd =
    Netlog.Debug.track_fd ~owner:"Rpc_client" ~descr:id_s fd in

  let disable_nagle fd =
    try
      Unix.setsockopt fd Unix.TCP_NODELAY true
    with _ -> () in

  let open_socket_non_blocking addr prot conf =
    new Uq_engines.seq_engine
      (connect_engine addr esys)
      (fun status ->
	  dlogr  cl
	    (fun () -> 
	       "Non-blocking socket connect successful for " ^ id_s);
	 let fd = Uq_engines.client_endpoint status in
	 disable_nagle fd;
	 track fd;
	 conf # multiplexing ~close_inactive_descr:true prot fd esys
      ) in

  let open_socket_blocking addr prot conf =
    let conn_esys = Unixqueue.create_unix_event_system() in
    let c = connect_engine addr conn_esys in
    Unixqueue.run conn_esys;
    match c # state with
      | `Done status ->
	  dlogr cl
	    (fun () ->
	       "Blocking socket connect successful for " ^ id_s);
	  let fd = Uq_engines.client_endpoint status in
	  disable_nagle fd;
	  track fd;
	  conf # multiplexing ~close_inactive_descr:true prot fd esys
      | `Error err ->
	  raise err
      | _ ->
	  assert false
  in

  let open_socket =
    if non_blocking_connect then 
      open_socket_non_blocking
    else
      open_socket_blocking in

  let (prot, establish_engine) =
    match mode with
      | `Socket_endpoint(prot,fd) ->
	  disable_nagle fd;
	  track fd;
	  let m = mplex_of_fd ~close_inactive_descr:true prot fd esys in
	  (prot, new Uq_engines.epsilon_engine (`Done m) esys)
      | `Multiplexer_endpoint(mplex) ->
	  if mplex # event_system != esys then
            failwith "Rpc_client.create2: \
                      Multiplexer is attached to the wrong event system";
	  (mplex # protocol,
	   new Uq_engines.epsilon_engine (`Done mplex) esys)
       | `Socket(prot,conn,conf) ->
	   let stype = 
	     match prot with Tcp -> Unix.SOCK_STREAM | Udp -> Unix.SOCK_DGRAM in
	   (match conn with
	      | Inet (host,port) ->
		  let saddr = `Sock_inet_byname(stype, host, port) in
		  let addr = 
		    `Socket(saddr, Uq_engines.default_connect_options) in
		  (prot, open_socket addr prot conf)
	      | Internet (host,port) ->
		  let saddr = `Sock_inet(stype, host, port) in
		  let addr = 
		    `Socket(saddr, Uq_engines.default_connect_options) in
		  (prot, open_socket addr prot conf)
	      | Unix path ->
		  let saddr = `Sock_unix(stype, path) in
		  let addr = 
		    `Socket(saddr, 
			    Uq_engines.default_connect_options) in
		  (prot, open_socket addr prot conf)
	      | W32_pipe path ->
		  if prot <> Rpc.Tcp then
		    failwith "Rpc_client.create2: \
                              Pipe only supported for Rpc.Tcp protocol type";
		  let addr = `W32_pipe(Netsys_win32.Pipe_duplex, path) in
		  (prot, open_socket addr prot conf)
	      |	Descriptor fd -> 
		  (* no track! *)
		  let m = 
		    mplex_of_fd ~close_inactive_descr:false prot fd esys in
		  (prot, new Uq_engines.epsilon_engine (`Done m) esys)
	      |	Dynamic_descriptor f ->
		  let fd = f() in
		  track fd;
		  let m = 
		    mplex_of_fd ~close_inactive_descr:true prot fd esys in
		  (prot, new Uq_engines.epsilon_engine (`Done m) esys)
	      | Portmapped host ->
		  (prot, open_socket (`Portmapped(prot,host)) prot conf)
	   )
  in

  let timeout = if prot = Udp then 15.0 else (-.1.0) in
  let max_retransmissions = if prot = Udp then 3 else 0 in

  (* update cl: *)
  cl.prot <- prot;
  cl.est_engine <- Some establish_engine;
  cl.next_timeout <- timeout;
  cl.timeout <- timeout;
  cl.next_max_retransmissions <- max_retransmissions;
  cl.max_retransmissions <- max_retransmissions;
  cl.exception_handler <- (fun exn -> 
			     if not cl.nolog then
			       Netlog.logf `Crit
				 "Rpc_client: Uncaught exception %s"
				 (Netexn.to_string exn)
			  );

  Uq_engines.when_state
    ~is_done:(fun mplex ->
		dlogr cl
		  (fun () ->
		     sprintf 
		       "Fully connected for %s: (sock=%s,peer=%s)" 
		       id_s
		       (Rpc_transport.string_of_sockaddr mplex#getsockname)
		       (Rpc_transport.string_of_sockaddr mplex#getpeername));
		cl.trans <- Some mplex;
		cl.est_engine <- None;
		(* Maybe we already have messages to send: *)
		!check_for_output cl
	     )
    ~is_error:(fun err ->
		 cl.est_engine <- None;
		 close ~error:(Communication_error err) cl;
		 cl.exception_handler err
	      )
    ~is_aborted:(fun () ->
		   cl.est_engine <- None)
    establish_engine;

  cl
;;


let create2 ?program_number ?version_number ?(initial_xid=0)
            ?(shutdown = shutdown_connector)
            mode prog0 esys =
  
  let prog = Rpc_program.update ?program_number ?version_number prog0 in
  let cl = 
    internal_create
      (Rtypes.uint4_of_int initial_xid) shutdown (Some prog) mode esys in
  cl

let unbound_create ?(initial_xid=0) ?(shutdown = shutdown_connector) 
                   mode esys =
  internal_create (Rtypes.uint4_of_int initial_xid) shutdown None mode esys


let bind cl prog =
  cl.progs <- prog :: cl.progs

let use cl prog =
  let prog_id = Rpc_program.id prog in
  if not(List.exists (fun p -> Rpc_program.id p = prog_id) cl.progs) then
    failwith "Rpc_client.use: This program is not bound by this client"


  (*****)

let is_up cl =
  cl.ready

let configure_next_call cl max_retransmission_trials timeout =
  cl.next_max_retransmissions <- max_retransmission_trials;
  cl.next_timeout <- timeout

let configure cl max_retransmission_trials timeout =
  cl.max_retransmissions <- max_retransmission_trials;
  cl.timeout <- timeout;
  configure_next_call cl max_retransmission_trials timeout

let set_dgram_destination cl addr_opt =
  cl.next_destination <- addr_opt

let set_exception_handler cl xh =
  cl.exception_handler <- xh

let set_batch_call cl =
  cl.next_batch_flag <- true

let set_max_response_length cl n =
  cl.max_resp_length <- Some n

let set_mstring_factories cl fac =
  cl.mstring_factories <- fac

let set_user_name cl n =
  cl.user_name <- n

let gen_shutdown cl is_running run ondown =
  if cl.ready then (
    let b = is_running cl.esys in
    ( match cl.est_engine with
	| None -> ()
	| Some e -> e#abort()
    );
    close ~ondown cl;
    if not b then run cl.esys
  )
  else
    ondown()

let shut_down cl =
  gen_shutdown 
    cl 
    Unixqueue.is_running 
    Unixqueue.run 
    (fun () -> ())

let sync_shutdown cl =
  gen_shutdown 
    cl 
    (fun esys -> 
       if Unixqueue.is_running esys then
	 failwith "Rpc_client.sync_shutdown: called from event loop";
       false)
    Unixqueue.run 
    (fun () -> ())
  
let trigger_shutdown cl ondown =
  gen_shutdown
    cl
    (fun _ -> true)
    Unixqueue.run
    (fun () ->
       let g = Unixqueue.new_group cl.esys in
       Unixqueue.once cl.esys g 0.0 ondown
    )


let event_system cl =
  cl.esys

let program cl =
  List.hd cl.progs

let programs cl =
  cl.progs

let get_socket_name cl =
  match cl.trans with
    | None -> failwith "Rpc_client.get_socket_name: not connected"
    | Some trans ->
	( match trans # getsockname with
	    | `Implied ->
		failwith "Rpc_client.get_socket_name: not applicable"
	    | `Sockaddr a -> a
	)

let get_peer_name cl =
  match cl.trans with
    | None -> failwith "Rpc_client.get_peer_name: not connected"
    | Some trans ->
	( match trans # getpeername with
	    | `Implied ->
		failwith "Rpc_client.get_peer_name: not applicable"
	    | `Sockaddr a -> a
	)

let get_sender_of_last_response cl =
  match cl.last_replier with
    | None -> failwith "Rpc_client.get_sender_of_last_response: nothing received yet or sender's address not available from transport layer"
    | Some addr -> addr


let get_xid_of_last_call cl =
  match cl.last_xid with
    | None ->
	failwith "Rpc_client.get_xid_of_last_call: nothing called"
    | Some xid -> xid


let get_protocol cl =
  cl.prot

let verbose b =
  Debug.enable := b

  (*****)

(* Now synchronous calls: *)

type 'a result =
    No
  | Reply of 'a
  | Error of exn


exception Stop_call

let synchronize esys f_async arg =
  let r = ref No in
  let get_result transmitter =
    try
      r := Reply (transmitter())
    with
      x ->
	r := Error x;
	if x = Message_timeout then 
	  raise (Unbound_exception Stop_call)
  in
  (* push the request onto the queue: *)
  let () = f_async arg get_result in
  (* run through the queue and process all elements: *)
  ( try Unixqueue.run esys with Stop_call -> ());
  (* now a call back of 'get_result' should have happened. *)
  match !r with
    No -> failwith "Rpc_client.synchronize: internal error"
  | Reply x -> x
  | Error e -> raise e



let unbound_sync_call cl prog proc arg =
  synchronize
    cl.esys
    (unbound_async_call cl prog proc)
    arg


  (*****)

(* DEPRECATED FUNCTIONS *)


let add_call cl procname param receiver =
  if not cl.ready then
    raise Client_is_down;
  match cl.progs with
    | [ prog ] ->
	unbound_async_call cl prog procname param receiver
    | _ ->
	failwith "Rpc_client.add_call [deprecated function]: \
                  The client does not have exactly one bound program"


let sync_call cl procname param  =
  if not cl.ready then
    raise Client_is_down;
  match cl.progs with
    | [ prog ] ->
	unbound_sync_call cl prog procname param 
    | _ ->
	failwith "Rpc_client.sync_call [deprecated function]: \
                  The client does not have exactly one bound program"


let create ?program_number ?version_number ?(initial_xid=0) 
           ?(shutdown = shutdown_connector)
           esys c prot prog0 =
  create2 
    ?program_number ?version_number ~initial_xid ~shutdown
    (`Socket(prot, c, (new blocking_socket_config)))
    prog0
    esys


This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml