(* $Id: rpc_client.ml 1864 2013-07-21 11:49:11Z gerd $
* ----------------------------------------------------------------------
*
*)
open Rtypes
open Xdr
open Rpc
open Rpc_common
open Rpc_packer
open Unixqueue
open Printf
exception Message_not_processable
exception Message_lost
exception Message_timeout
exception Response_dropped
exception Communication_error of exn
exception Client_is_down
exception Keep_call
exception Unbound_exception of exn
module type USE_CLIENT = sig
type t
val use : t -> Rpc_program.t -> unit
val unbound_sync_call :
t -> Rpc_program.t -> string -> xdr_value -> xdr_value
val unbound_async_call :
t -> Rpc_program.t -> string -> xdr_value ->
((unit -> xdr_value) -> unit) -> unit
end
let () =
Netexn.register_printer
(Communication_error Not_found)
(fun e ->
match e with
| Communication_error e' ->
"Rpc_client.Communication_error(" ^ Netexn.to_string e' ^ ")"
| _ -> assert false
);
Netexn.register_printer
(Unbound_exception Not_found)
(fun e ->
match e with
| Unbound_exception e' ->
"Rpc_client.Unbound_exception(" ^ Netexn.to_string e' ^ ")"
| _ -> assert false
)
module SessionUint4 = struct
type t = uint4
let compare = (Pervasives.compare : uint4 -> uint4 -> int)
end
module SessionMap =
Map.Make(SessionUint4)
type call_state =
| Delayed (* call waits for authentication protocol *)
| Waiting (* call has not yet been sent *)
| Pending (* call has been sent, still no answer *)
| Done (* got answer for the call *)
(* Normally, the state of the call is changed from Waiting to Pending to
* Done.
* In the case of a retransmission, the call is added to the waiting calls
* again but its state remains 'Pending' (because the call is still member
* of the set of pending calls).
*)
(* The following class types are only preliminary definitions. The type
* parameter 't is later instantiated with the type of the clients, t.
*)
type reject_code =
[ `Fail | `Retry | `Renew | `Next ]
class type ['t] pre_auth_session =
object
method next_credentials :
't -> Rpc_program.t -> string -> uint4 ->
(string * string * string * string * Xdr.encoder option *
Xdr.decoder option)
method server_rejects : 't -> uint4 -> server_error -> reject_code
method server_accepts : 't -> uint4 -> string -> string -> unit
(*
method drop_xid : 't -> uint4 -> unit
method drop_client : 't -> unit
*)
method auth_protocol : 't pre_auth_protocol
end
and ['t] pre_auth_protocol =
object
method state :
[ `Emit | `Receive of uint4 | `Done of 't pre_auth_session | `Error]
method emit : uint4 -> uint4 -> uint4 -> Rpc_packer.packed_value
method receive : Rpc_packer.packed_value -> unit
method auth_method : 't pre_auth_method
end
and ['t] pre_auth_method =
object
method name : string
method new_session : 't -> string option -> 't pre_auth_protocol
end
type regular_call =
{ mutable prog : Rpc_program.t;
mutable proc : string;
mutable param : xdr_value; (* the argument of the call *)
mutable get_result : (unit -> xdr_value) -> unit;
mutable decoder : Xdr.decoder option;
mutable call_user_name : string option;
}
type call_detail =
[ `Regular of regular_call
| `Auth_proto of Rpc_program.t
]
type call =
{ mutable detail : call_detail;
mutable state : call_state;
mutable retrans_count : int; (* retransmission counter *)
mutable xid : uint4;
mutable destination : Unix.sockaddr option;
mutable call_timeout : float;
mutable timeout_group : group option;
(* If a timeout handler has been set, this is the corresponding group *)
mutable call_auth_proto : t pre_auth_protocol;
(* calls store the authentication protocol *)
mutable batch_flag : bool;
mutable request : packed_value option;
(* The request while the call is waiting *)
}
and t =
{ mutable ready : bool;
mutable nolog : bool;
mutable trans : Rpc_transport.rpc_multiplex_controller option;
mutable progs : Rpc_program.t list;
mutable prot : protocol;
mutable esys : event_system;
mutable est_engine : Rpc_transport.rpc_multiplex_controller Uq_engines.engine option;
mutable shutdown_connector : t -> Rpc_transport.rpc_multiplex_controller -> (unit->unit) -> unit;
mutable delayed_calls : (t pre_auth_protocol,call Queue.t) Hashtbl.t;
(* delayed: the request cannot be sent - because the authentication
protocol is not yet done
*)
mutable waiting_calls : call Queue.t;
(* waiting: the request can be sent when the connections allows it *)
mutable pending_calls : call SessionMap.t;
(* pending: the request is sent; waiting for the reply *)
mutable next_xid : uint4;
mutable used_xids : unit SessionMap.t;
mutable last_replier : Unix.sockaddr option;
mutable last_xid : uint4 option;
(* configs: *)
mutable timeout : float;
mutable max_retransmissions : int;
mutable next_timeout : float;
mutable next_max_retransmissions : int;
mutable next_destination : Unix.sockaddr option;
mutable next_batch_flag : bool;
mutable max_resp_length : int option;
mutable user_name : string option;
mutable mstring_factories : Xdr_mstring.named_mstring_factories;
(* authentication: *)
mutable all_auth_methods : t pre_auth_method list;
mutable auth_methods : t pre_auth_method list;
(* remaining methods to try *)
mutable auth_current : (string option,t pre_auth_protocol) Hashtbl.t;
(* The protocol used for this user *)
mutable exception_handler : exn -> unit;
}
and connector =
Inet of (string * int) (* Hostname, port *)
| Internet of (Unix.inet_addr * int)
| Unix of string (* path to unix dom sock *)
| W32_pipe of string
| Descriptor of Unix.file_descr
| Dynamic_descriptor of (unit -> Unix.file_descr)
| Portmapped of string
class type auth_session = [t] pre_auth_session
class type auth_method = [t] pre_auth_method
class type auth_protocol = [t] pre_auth_protocol
let auth_none_session p : auth_session =
object
method next_credentials _ _ _ _ =
("AUTH_NONE", "", "AUTH_NONE", "", None, None)
method server_rejects _ _ _ = `Next
method server_accepts _ _ _ _ = ()
method auth_protocol = p
end
let auth_none_proto m : auth_protocol =
let session = ref None in
object(self)
initializer
session := Some(auth_none_session self)
method state =
match !session with
| None -> assert false
| Some s -> `Done s
method emit _ _ _ = assert false
method receive _ = assert false
method auth_method = m
end
let auth_none =
object(self)
method name = "AUTH_NONE"
method new_session _ _ =
auth_none_proto self
end
module Debug = struct
let enable = ref false
let enable_ptrace = ref false
let ptrace_verbosity = ref `Name_abbrev_args
let disable_for_client c = c.nolog <- true
end
let dlog0 = Netlog.Debug.mk_dlog "Rpc_client" Debug.enable
let dlogr0 = Netlog.Debug.mk_dlogr "Rpc_client" Debug.enable
let dlog cl msg =
if not cl.nolog then dlog0 msg
let dlogr cl getmsg =
if not cl.nolog then dlogr0 getmsg
let dlog0_ptrace = Netlog.Debug.mk_dlog "Rpc_client.Ptrace" Debug.enable_ptrace
let dlogr0_ptrace = Netlog.Debug.mk_dlogr "Rpc_client.Ptrace" Debug.enable_ptrace
let dlog_ptrace cl msg =
if not cl.nolog then dlog0_ptrace msg
let dlogr_ptrace cl getmsg =
if not cl.nolog then dlogr0_ptrace getmsg
let () =
Netlog.Debug.register_module "Rpc_client" Debug.enable;
Netlog.Debug.register_module "Rpc_client.Ptrace" Debug.enable_ptrace
let connector_of_sockaddr =
function
| Unix.ADDR_INET(ip,p) ->
Internet(ip,p)
| Unix.ADDR_UNIX s ->
Unix s
let connector_of_socksymbol =
function
| `Inet(ip,p) -> Internet(ip,p)
| `Inet_byname(n,p) -> Inet(n,p)
| `Unix p -> Unix p
(*****)
let set_auth_methods cl list =
match list with
| _ :: _ ->
Hashtbl.clear cl.auth_current;
cl.auth_methods <- list;
cl.all_auth_methods <- list;
| [] ->
invalid_arg "Rpc_client.set_auth_methods"
(*****)
let stop_retransmission_timer cl call =
match call.timeout_group with
| None -> ()
| Some g ->
Unixqueue.clear cl.esys g
(*****)
let pass_result cl call f =
(* for regular calls only! *)
(* Stop the timer, if any : *)
stop_retransmission_timer cl call;
(* Change the state of the call to 'Done': *)
call.state <- Done;
(* pass 'f' to the call back function: *)
try
dlog cl "Calling back";
( match call.detail with
| `Regular rc ->
rc.get_result f;
| _ ->
assert false
);
dlog cl "Returned from callback";
with
| Keep_call as x ->
dlog cl "Keep_call";
raise x
| Unbound_exception x ->
dlog cl "Unbound_exception";
raise x
| any ->
begin (* pass the exception to the exception handler: *)
dlogr cl (fun () ->
"Exception from callback: " ^ Netexn.to_string any);
cl.exception_handler any
end
let pass_exception ?(skip_auth=false) cl call x =
(* Caution! This function does not remove [call] from the set of pending
* calls.
*
* For authproto messages, the exception is passed to the connected calls
* instead. Option skip_auth: no notification for authproto messages.
*)
if call.state <> Done then ( (* Don't call back twice *)
try
dlogr cl
(fun () ->
let sx = Netexn.to_string x in
"Passing exception " ^ sx);
( match call.detail with
| `Regular _ ->
pass_result cl call (fun () -> raise x)
| `Auth_proto prog ->
stop_retransmission_timer cl call;
call.state <- Done;
if not skip_auth then (
let q =
try Hashtbl.find cl.delayed_calls call.call_auth_proto
with Not_found -> Queue.create() in
Hashtbl.remove cl.delayed_calls call.call_auth_proto;
Queue.iter
(fun d_call ->
pass_result cl d_call (fun () -> raise x)
)
q
)
)
with
| Keep_call -> () (* ignore *)
)
let pass_exception_to_all cl x =
(* Caution! This function does not erase the set of pending calls. *)
dlog cl "Passing exception to all";
let ht = Hashtbl.create 17 in
let fn_list = ref [] in
let add_fn xid call =
if not (Hashtbl.mem ht xid) then (
Hashtbl.add ht xid ();
fn_list := call :: !fn_list
)
in
SessionMap.iter (fun xid call -> add_fn xid call) cl.pending_calls;
Queue.iter (fun call -> add_fn call.xid call) cl.waiting_calls;
Hashtbl.iter
(fun auth_proto q ->
Queue.iter
(fun call -> add_fn call.xid call)
q
)
cl.delayed_calls;
(* We already included delayed_calls, hence set skip_auth here *)
List.iter (fun call -> pass_exception ~skip_auth:true cl call x) !fn_list
(*****)
let close ?error ?(ondown=fun()->()) cl =
if cl.ready then (
dlog cl "Closing";
cl.ready <- false;
( match error with
| None -> pass_exception_to_all cl Message_lost
| Some e -> pass_exception_to_all cl e
);
cl.pending_calls <- SessionMap.empty;
cl.used_xids <- SessionMap.empty;
Queue.clear cl.waiting_calls;
Hashtbl.clear cl.delayed_calls;
match cl.trans with
| None ->
ondown()
| Some trans ->
cl.trans <- None;
cl.shutdown_connector cl trans ondown
)
else
ondown()
;;
(*****)
let check_for_input = (* "forward declaration" *)
ref (fun _ -> ());;
let check_for_output = (* "forward declaration" *)
ref (fun _ -> ());;
(*****)
let find_or_make_auth_protocol cl user_opt =
try
Hashtbl.find cl.auth_current user_opt
with
| Not_found ->
let current_auth_method =
match cl.auth_methods with
| hd :: _ -> hd
| [] -> auth_none in
let p =
current_auth_method # new_session cl user_opt in
Hashtbl.add cl.auth_current user_opt p;
p
;;
(*****)
let rec next_xid cl =
let xid = cl.next_xid in
(* xid is uint4, so we increment as int64: *)
let xid64 = Rtypes.int64_of_uint4 xid in
let xid64' = Int64.logand (Int64.succ xid64) 0xffff_ffff_L in
cl.next_xid <- Rtypes.uint4_of_int64 xid64';
if SessionMap.mem xid cl.used_xids then
next_xid cl
else
xid
(*****)
let remove_pending_call cl call =
cl.pending_calls <- SessionMap.remove call.xid cl.pending_calls;
cl.used_xids <- SessionMap.remove call.xid cl.used_xids;
stop_retransmission_timer cl call
;;
let abandon_call cl xid =
try
let call = SessionMap.find xid cl.pending_calls in
remove_pending_call cl call
with
| Not_found ->
let q = Queue.create() in
Queue.transfer cl.waiting_calls q;
while not(Queue.is_empty q) do
let call = Queue.take q in
if call.xid <> xid then Queue.add call cl.waiting_calls
done;
cl.used_xids <- SessionMap.remove xid cl.used_xids
(*****)
let continue_call cl call =
(* Called when the authentication protocol finishes, and [call] can now
be added to the [waiting] queue.
Also called for retransmitted regular requests.
*)
let authsess =
match call.call_auth_proto # state with
| `Done authsess -> authsess
| _ -> assert false in
let rc =
match call.detail with
| `Regular rc -> rc
| _ -> assert false in
call.state <- Waiting;
let (cred_flav, cred_data, verf_flav, verf_data, enc_opt, dec_opt) =
authsess # next_credentials
cl rc.prog rc.proc call.xid in
rc.decoder <- dec_opt;
let request =
Rpc_packer.pack_call
?encoder:enc_opt
rc.prog
call.xid
rc.proc
cred_flav cred_data verf_flav verf_data
rc.param in
call.request <- Some request;
Queue.add call cl.waiting_calls
(* FIXME: Retransmissions are not yet perfect. We allow here that calls time
out while the request is still in the Waiting queue (especially TCP
connect timeouts). If we retransmit with a new encryption key, we will
send the same request twice ( and severs might not like this).
Better: do not modify [call] in place, but create a copy.
Also, it may happen that we get a response to attempt #1 while we already
dropped it and pushed attempt #2 to Waiting. Now, the response #1 may
turn out to use a different encryption key, and we cannot decode it.
Better: the pending table should map xid to a list of possible calls.
(Still to check how we find then the right one...)
*)
let retransmit cl call =
if call.state = Pending || call.state = Waiting then begin
if call.retrans_count > 0 then begin
dlog cl "Retransmitting";
let old_state = call.state in
(* Make the 'call' waiting again *)
( match call.detail with
| `Regular _ -> continue_call cl call
| `Auth_proto _ -> Queue.add call cl.waiting_calls
);
cl.used_xids <- SessionMap.add call.xid () cl.used_xids;
(* Decrease the retransmission counter *)
call.retrans_count <- call.retrans_count - 1;
(* Ensure the call keeps its state: *)
call.state <- old_state;
(* Check state of reources: *)
!check_for_output cl
(* Note: The [call] remains in state [Pending] (if it is already).
* This prevents the [call]
* from being added to [cl.pending_calls] again.
*
* If the [call] is encrypted, it is possible that the client only
* accepts the response to the second trial, and rejects a late
* response for the first request, because the encryption code
* may have changed in the meantime.
*)
end
else begin
(* still no answer after maximum number of retransmissions *)
dlog cl "Call timed out!";
remove_pending_call cl call;
(* Note that we do not remove the call from waiting_calls for
performance reasons. We simply skip it there if we find it.
pass_exception will set call.state to Done.
*)
pass_exception cl call Message_timeout;
(* FIXME: do something with the delayed calls if [call] is
an authproto message
*)
(* If we still try to connect the TCP socket, shut the client
completely down:
*)
( match cl.est_engine with
| None -> ()
| Some e ->
e#abort();
close cl;
);
(* Check state of reources: *)
!check_for_output cl
end
end
(*****)
(* Note: For asynchronous authentication, it would be sufficient that
* add_call (and add_call_again) are rewritten such that they first
* schedule the authentication request, and when the request is replied,
* the call is scheduled.
*)
let set_timeout cl call =
if call.call_timeout > 0.0 && call.timeout_group = None then (
(* Note: Case call_timeout = 0.0 is handled elsewhere *)
(* CHECK: What happens when the timeout comes before the message
* is fully written? (Low priority because for stream connections
* a timeout is usually not set.)
*)
let g = new_group cl.esys in
Unixqueue.once cl.esys g call.call_timeout
(fun () ->
call.timeout_group <- None;
dlog cl "Timeout handler";
retransmit cl call;
(* Maybe we have to cancel reading: *)
!check_for_input cl
);
call.timeout_group <- Some g
)
let auth_proto_emit cl prog authproto =
(* Emit an authproto message *)
let xid = next_xid cl in
let prog_nr = Rpc_program.program_number prog in
let vers_nr = Rpc_program.version_number prog in
let request = authproto # emit xid prog_nr vers_nr in
(* THINK: maybe we want to set the call_timeout and the retrans_count
separately for authproto messages (instead of just taking over the
values for the regular call)
*)
let call =
{ detail = `Auth_proto prog;
state = Waiting;
retrans_count = cl.next_max_retransmissions;
xid = xid;
destination = cl.next_destination;
call_timeout = cl.next_timeout;
timeout_group = None;
call_auth_proto = authproto;
batch_flag = false;
request = Some request;
} in
Queue.add call cl.waiting_calls;
cl.used_xids <- SessionMap.add call.xid () cl.used_xids;
(* For TCP and timeout > 0.0 set the timeout handler immediately, so the
timeout includes connecting
*)
if cl.prot = Rpc.Tcp && call.call_timeout > 0.0
then
set_timeout cl call
let unbound_async_call_r cl prog procname param receiver authsess_opt =
(* authsess_opt: if passed, this session is used *)
if not cl.ready then
raise Client_is_down;
if cl.progs <> [] then (
let prog_id = Rpc_program.id prog in
if not (List.exists (fun p -> Rpc_program.id p = prog_id) cl.progs) then
failwith "Rpc_client.unbound_async_call: \
This client is not bound to the requested program"
);
let (_, _, out_type) =
try Rpc_program.signature prog procname
with Not_found ->
failwith ("Rpc_client.unbound_async_call: No such procedure: " ^
procname) in
if cl.next_batch_flag && Xdr.xdr_type_term out_type <> X_void then
failwith ("Rpc_client.unbound_async_call: Cannot call in batch mode: " ^
procname);
let rc =
{ prog = prog;
proc = procname;
param = param;
get_result = receiver;
decoder = None;
call_user_name = cl.user_name;
} in
let eff_authsess_opt, eff_authproto =
match authsess_opt with
| None ->
let authproto = find_or_make_auth_protocol cl cl.user_name in
( match authproto#state with
| `Done authsess ->
(Some authsess, authproto)
| _ ->
(None, authproto)
)
| Some authsess ->
(Some authsess, authsess#auth_protocol) in
let new_call =
match eff_authsess_opt with
| Some authsess ->
let xid = next_xid cl in
let (cred_flav, cred_data, verf_flav, verf_data, enc_opt, dec_opt) =
authsess # next_credentials
cl prog procname xid in
rc.decoder <- dec_opt;
let request =
Rpc_packer.pack_call
?encoder:enc_opt
prog
xid
procname
cred_flav cred_data verf_flav verf_data
param in
let call =
{ detail = `Regular rc;
state = Waiting;
retrans_count = cl.next_max_retransmissions;
xid = xid;
destination = cl.next_destination;
call_timeout = cl.next_timeout;
timeout_group = None;
call_auth_proto = eff_authproto;
batch_flag = cl.next_batch_flag;
request = Some request;
} in
Queue.add call cl.waiting_calls;
call
| None ->
dlog cl "starting authentication protocol";
( match eff_authproto#state with
| `Done _ ->
assert false
| `Error ->
assert false
| ap_state ->
(* Authentication not yet ready. *)
if ap_state = `Emit then (
dlog cl "emitting new authentication token";
auth_proto_emit cl rc.prog eff_authproto;
);
let xid = next_xid cl in
let call =
{ detail = `Regular rc;
state = Delayed;
retrans_count = cl.next_max_retransmissions;
xid = xid;
destination = cl.next_destination;
call_timeout = cl.next_timeout;
timeout_group = None;
call_auth_proto = eff_authproto;
batch_flag = cl.next_batch_flag;
request = None;
} in
let q =
try Hashtbl.find cl.delayed_calls eff_authproto
with Not_found -> Queue.create() in
Queue.add call q;
Hashtbl.replace cl.delayed_calls eff_authproto q;
call
)
in
cl.last_xid <- Some new_call.xid;
cl.used_xids <- SessionMap.add new_call.xid () cl.used_xids;
cl.next_timeout <- cl.timeout;
cl.next_max_retransmissions <- cl.max_retransmissions;
cl.next_batch_flag <- false;
(* We keep next_destination, as required by the API. *)
(* For TCP and timeout > 0.0 set the timeout handler immediately, so the
timeout includes connecting
*)
if cl.prot = Rpc.Tcp && new_call.state = Waiting &&
new_call.call_timeout > 0.0
then
set_timeout cl new_call;
!check_for_output cl;
new_call
let unbound_async_call cl prog procname param receiver =
ignore(unbound_async_call_r cl prog procname param receiver None)
(*****)
type 'a threeway =
| Value of 'a
| Novalue
| Error of exn
let call_auth_session call =
match call.call_auth_proto # state with
| `Done s -> s
| _ -> assert false
let string_of_reject_code =
function
| `Fail -> "Fail"
| `Retry -> "Retry"
| `Renew -> "Renew"
| `Next -> "Next"
let process_regular_incoming_message cl message peer sock call rc =
(* Exceptions in the following block are forwarded to the callback
* function
*)
let auth_sess = call_auth_session call in
dlogr cl
(fun () ->
sprintf "process_regular_incoming_message auth_meth=%s have_decoder=%B"
auth_sess#auth_protocol#auth_method#name
(rc.decoder <> None)
);
let result_opt =
try
( match Rpc_packer.peek_auth_error message with
| None ->
let (xid,verf_flavour,verf_data,response) =
Rpc_packer.unpack_reply
~mstring_factories:cl.mstring_factories
?decoder:rc.decoder
rc.prog rc.proc message
(* may raise an exception *)
in
auth_sess # server_accepts cl call.xid verf_flavour verf_data;
Value response
| Some auth_problem ->
let code =
auth_sess # server_rejects cl call.xid auth_problem in
dlogr_ptrace cl
(fun () ->
sprintf
"RPC <-- (sock=%s,peer=%s,xid=%Ld) Auth error %s - reaction: %s"
(Rpc_transport.string_of_sockaddr sock)
(Rpc_transport.string_of_sockaddr peer)
(Rtypes.int64_of_uint4 call.xid)
(Rpc.string_of_server_error auth_problem)
(string_of_reject_code code)
);
remove_pending_call cl call;
( match code with
| `Fail ->
Error(Rpc.Rpc_server auth_problem)
| `Retry ->
ignore(
unbound_async_call_r
cl rc.prog rc.proc rc.param rc.get_result
(Some auth_sess));
Novalue
| `Renew ->
(* FIXME: When we send several requests in sequence,
we may get here several `Renew codes. That should
be merged to a single renewal.
*)
Hashtbl.remove cl.auth_current rc.call_user_name;
unbound_async_call
cl rc.prog rc.proc rc.param rc.get_result;
Novalue
| `Next ->
let m = call.call_auth_proto # auth_method in
( match cl.auth_methods with
| m0 :: _ :: _ when m0 = m ->
cl.auth_methods <- List.tl cl.auth_methods;
Hashtbl.remove
cl.auth_current rc.call_user_name;
(* FIXME: see `Renew *)
| _ ->
raise(Rpc.Rpc_server Auth_too_weak)
);
unbound_async_call
cl rc.prog rc.proc rc.param rc.get_result;
Novalue
)
)
with
error ->
(* The call_auth_session is simply dropped. *)
(* Forward the exception [error] to the caller: *)
remove_pending_call cl call;
Error error
in
match result_opt with
| Novalue ->
(* There is no result yet *)
()
| Value result ->
(* pass result to the user *)
( try
dlogr_ptrace cl
(fun () ->
sprintf
"RPC <-- (sock=%s,peer=%s,xid=%Ld) %s"
(Rpc_transport.string_of_sockaddr sock)
(Rpc_transport.string_of_sockaddr peer)
(Rtypes.int64_of_uint4 call.xid)
(Rpc_util.string_of_response
!Debug.ptrace_verbosity
rc.prog
rc.proc
result
)
);
let f = (fun () -> result) in
pass_result cl call f; (* may raise Keep_call *)
(* Side effect: Changes the state of [call] to [Done] *)
remove_pending_call cl call;
with
Keep_call ->
call.state <- Pending
)
| Error error ->
( try
dlogr_ptrace cl
(fun () ->
sprintf
"RPC <-- (sock=%s,peer=%s,xid=%Ld) Error %s"
(Rpc_transport.string_of_sockaddr sock)
(Rpc_transport.string_of_sockaddr peer)
(Rtypes.int64_of_uint4 call.xid)
(Netexn.to_string error)
);
let f = (fun () -> raise error) in
pass_result cl call f; (* may raise Keep_call *)
(* Side effect: Changes the state of [call] to [Done] *)
remove_pending_call cl call;
with
Keep_call ->
call.state <- Pending
)
let process_incoming_message cl message peer =
let sock =
match cl.trans with
| None -> `Implied
| Some t -> t#getsockname in
(* Got a 'message' for which the corresponding 'call' must be searched: *)
let xid = Rpc_packer.peek_xid message in
let call =
try
SessionMap.find xid cl.pending_calls
with
Not_found ->
(* Strange: Got a message with a session ID that is not pending.
* We assume that this is an answer of a very old message that
* has been completely timed out.
*)
raise Message_not_processable
in
assert(call.state = Pending);
match call.detail with
| `Auth_proto prog ->
dlog cl "continuing authentication protocol";
( match call.call_auth_proto#state with
| `Receive expected_xid when expected_xid = xid ->
remove_pending_call cl call;
let err_opt =
try
dlog cl "receiving authentication token";
call.call_auth_proto#receive message; None
with error -> Some error in
let err_opt' =
match call.call_auth_proto#state with
| `Emit ->
(* Emit the next message of the protocol: *)
assert(err_opt = None);
( try
auth_proto_emit cl prog call.call_auth_proto;
None
with
| error -> Some error
)
| `Done authsess ->
(* We are done - so activate all delayed messages *)
assert(err_opt = None);
dlog cl "authentication protocol is done";
let q =
try Hashtbl.find cl.delayed_calls call.call_auth_proto
with Not_found -> Queue.create() in
Hashtbl.remove cl.delayed_calls call.call_auth_proto;
Queue.iter
(fun d_call ->
continue_call cl d_call
)
q;
None
| `Receive _ ->
assert false
| `Error ->
assert(err_opt <> None);
err_opt in
( match err_opt' with
| None ->
!check_for_output cl
| Some err ->
(* Failed authentication! *)
let q =
try Hashtbl.find cl.delayed_calls call.call_auth_proto
with Not_found -> Queue.create() in
Hashtbl.remove cl.delayed_calls call.call_auth_proto;
Queue.iter
(fun d_call ->
pass_exception cl d_call err
)
q
)
| _ ->
(* Unexpected messages. We just drop these - assuming these
are duplicates
*)
raise Message_not_processable
)
| `Regular rc ->
process_regular_incoming_message cl message peer sock call rc
let drop_response cl message peer =
let sock =
match cl.trans with
| None -> `Implied
| Some t -> t#getsockname in
let xid = Rpc_packer.peek_xid message in
let call =
try
SessionMap.find xid cl.pending_calls
with
Not_found ->
raise Message_not_processable in
assert(call.state = Pending);
dlogr_ptrace cl
(fun () ->
sprintf
"RPC <-- (sock=%s,peer=%s) Dropping response"
(Rpc_transport.string_of_sockaddr sock)
(Rpc_transport.string_of_sockaddr peer)
);
try
let f = (fun () -> raise Response_dropped) in
pass_result cl call f; (* may raise Keep_call *)
(* Side effect: Changes the state of [call] to [Done] *)
remove_pending_call cl call;
with
Keep_call ->
call.state <- Pending
(*****)
let rec handle_incoming_message cl r =
(* Called when a complete message has been read by the transporter *)
match r with
| `Error e ->
close ~error:(Communication_error e) cl
| `Ok(msg,addr) ->
dlog cl "Message arrived";
( try
( match addr with
| `Implied -> ()
| `Sockaddr a ->
cl.last_replier <- Some a
);
( match msg with
| `Accept pv ->
process_incoming_message cl pv addr
| `Reject pv ->
drop_response cl pv addr
| _ ->
assert false
)
with
Message_not_processable ->
dlog cl "message not processable";
()
);
(next_incoming_message cl : unit)
| `End_of_file ->
dlog cl "End of file";
close cl
and next_incoming_message cl =
match cl.trans with
| None -> ()
| Some trans -> next_incoming_message' cl trans
and next_incoming_message' cl trans =
trans # cancel_rd_polling();
if cl.pending_calls <> SessionMap.empty && not trans#reading then (
trans # start_reading
~before_record:(fun n _ ->
match cl.max_resp_length with
| None -> `Accept
| Some m -> if n > m then `Reject else `Accept
)
~when_done:(fun r ->
handle_incoming_message cl r)
()
)
else
dlog cl "Stopping reading";
;;
check_for_input := next_incoming_message;;
let rec handle_outgoing_message cl call r =
(* Called after a complete message has been sent by the transporter *)
match r with
| `Error e ->
close ~error:(Communication_error e) cl
| `Ok () ->
dlog cl "message writing finished";
if call.batch_flag || call.call_timeout = 0.0 then (
try
if call.batch_flag then
pass_result cl call (fun () -> XV_void) (* may raise Keep_call *)
else
pass_exception cl call Message_timeout;
remove_pending_call cl call;
with Keep_call -> (* no removal *)
()
);
!check_for_input cl;
next_outgoing_message cl
and next_outgoing_message cl =
match cl.trans with
| None -> () (* Not yet initialized *)
| Some trans ->
if not trans#writing then
next_outgoing_message' cl trans
and next_outgoing_message' cl trans =
let call_opt =
try Some(Queue.take cl.waiting_calls) with Queue.Empty -> None in
match call_opt with
| Some call ->
(* If the call is already 'Done', skip it. *)
(* Change the state of the call. It is now 'pending': *)
if call.state = Done then (
(* That can happen for calls that timeout before they are sent *)
dlog cl "found call that has been done";
next_outgoing_message cl
)
else (
let dest =
match call.destination with
| Some d -> `Sockaddr d
| None -> trans#getpeername in
( match call.state with
| Done -> assert false
| Delayed -> assert false
| Waiting ->
cl.pending_calls <-
SessionMap.add call.xid call cl.pending_calls;
(* The xid is already in used_xids *)
call.state <- Pending;
dlogr_ptrace cl
(fun () ->
sprintf
"RPC --> (sock=%s,peer=%s,xid=%Ld) %s"
(Rpc_transport.string_of_sockaddr trans#getsockname)
(Rpc_transport.string_of_sockaddr dest)
(Rtypes.int64_of_uint4 call.xid)
(match call.detail with
| `Regular rc ->
Rpc_util.string_of_request
!Debug.ptrace_verbosity
rc.prog
rc.proc
rc.param
| `Auth_proto _ ->
"<authprot>"
)
)
| Pending ->
()
(* The call is already member of [pending_calls]
* (retransmitted)
*)
);
(* If there should be a timeout handler, add it: *)
set_timeout cl call;
(* Send the message: *)
let m =
match call.request with
| None -> assert false
| Some m -> m in
(* If this is a regular message, we can now drop the request buffer *)
( match call.detail with
| `Regular _ -> call.request <- None
| _ -> ()
);
dlog cl "start_writing";
trans # start_writing
~when_done:(fun r ->
handle_outgoing_message cl call r)
m
dest
);
| None ->
()
;;
check_for_output := next_outgoing_message ;;
(* Shutdown:
* We first try an orderly shutdown. If that does not work, just inactivate
* the transport.
*)
let shutdown_connector cl mplex ondown =
dlog cl "shutdown_connector";
mplex # abort_rw();
( try
mplex # start_shutting_down
~when_done:(fun exn_opt ->
( match exn_opt with
| `Ok _ -> ()
| `Error exn ->
dlogr cl
(fun () ->
sprintf "start_shutting_down: exception %s"
(Netexn.to_string exn))
);
mplex # inactivate();
ondown()
)
()
with
| _ -> mplex # inactivate(); ondown()
)
let mplex_of_fd ~close_inactive_descr prot fd esys =
let preclose() =
Netlog.Debug.release_fd fd in
match prot with
| Tcp ->
Rpc_transport.stream_rpc_multiplex_controller
~close_inactive_descr ~preclose fd esys
| Udp ->
Rpc_transport.datagram_rpc_multiplex_controller
~close_inactive_descr ~preclose fd esys
class type socket_config =
object
method non_blocking_connect : bool
method multiplexing :
close_inactive_descr:bool ->
protocol -> Unix.file_descr -> Unixqueue.event_system ->
Rpc_transport.rpc_multiplex_controller Uq_engines.engine
end
class default_socket_config : socket_config =
object
method non_blocking_connect = true
method multiplexing ~close_inactive_descr prot fd esys =
let close() =
if close_inactive_descr then (
Netlog.Debug.release_fd fd;
try
match Netsys_win32.lookup fd with
| Netsys_win32.W32_pipe ph ->
Netsys_win32.pipe_shutdown ph;
Unix.close fd
| _ ->
()
with Not_found ->
Unix.close fd
) in
let eng =
try
let mplex = mplex_of_fd ~close_inactive_descr prot fd esys in
new Uq_engines.epsilon_engine (`Done mplex) esys
with
| error ->
new Uq_engines.epsilon_engine (`Error error) esys in
Uq_engines.when_state
~is_aborted:(fun () -> close())
~is_error:(fun _ -> close())
eng;
eng
end
class blocking_socket_config : socket_config =
object
inherit default_socket_config
method non_blocking_connect = false
end
let default_socket_config = new default_socket_config
let blocking_socket_config = new blocking_socket_config
type mode2 =
[ `Socket_endpoint of protocol * Unix.file_descr
| `Multiplexer_endpoint of Rpc_transport.rpc_multiplex_controller
| `Socket of protocol * connector * socket_config
]
class unbound_async_call cl prog name v =
let emit = ref (fun _ -> assert false) in
let call = unbound_async_call_r cl prog name v (fun gr -> !emit gr) None in
object(self)
inherit [ Xdr.xdr_value ] Uq_engines.engine_mixin (`Working 0) cl.esys
initializer
emit := (fun get_result ->
try
let r = get_result() in
self # set_state (`Done r)
with
| err ->
self # set_state (`Error err)
)
method event_system = cl.esys
method abort() =
match self#state with
| `Working _ ->
if call.state <> Done then
remove_pending_call cl call;
call.state <- Done;
self # set_state `Aborted
| _ -> ()
end
let string_of_file_descr fd =
Int64.to_string (Netsys.int64_of_file_descr fd)
let rec internal_create initial_xid
shutdown
prog_opt
mode esys =
let id_s_0 =
match mode with
| `Socket_endpoint(_,fd) ->
"Socket_endpoint(_," ^ string_of_file_descr fd ^ ")"
| `Multiplexer_endpoint ep ->
"Multiplexer_endpoint(" ^
string_of_int(Oo.id ep) ^ ")"
| `Socket(_, conn, _) ->
let s_conn =
match conn with
| Inet(h,p) ->
"inet/" ^ h ^ ":" ^ string_of_int p
| Internet(ip,p) ->
"inet/" ^ Unix.string_of_inet_addr ip ^ ":" ^ string_of_int p
| Unix p ->
"unix/" ^ p
| W32_pipe p ->
"w32_pipe/" ^ p
| Descriptor fd ->
"fd/" ^ string_of_file_descr fd
| Dynamic_descriptor f ->
"dyn_fd"
| Portmapped h ->
"portmapped:" ^ h in
"Socket(_," ^ s_conn ^ ",_)" in
let id_s =
match prog_opt with
| None -> id_s_0
| Some prog ->
id_s_0 ^ " for program " ^
(Int32.to_string
(Rtypes.logical_int32_of_uint4
(Rpc_program.program_number prog))) in
let non_blocking_connect =
match mode with
| `Socket(_,_,conf) -> conf # non_blocking_connect
| _ -> true in
let cl =
(* preliminary version of the client *)
{ ready = true;
trans = None;
progs = ( match prog_opt with None -> [] | Some p -> [p] );
prot = Rpc.Udp;
esys = esys;
est_engine = None;
shutdown_connector = shutdown;
waiting_calls = Queue.create();
pending_calls = SessionMap.empty;
delayed_calls = Hashtbl.create 27;
used_xids = SessionMap.empty;
next_xid = initial_xid;
next_destination = None;
next_timeout = (-1.0);
next_max_retransmissions = 0;
next_batch_flag = false;
max_resp_length = None;
user_name = None;
mstring_factories = Hashtbl.create 1;
last_replier = None;
last_xid = None;
timeout = (-1.0);
max_retransmissions = 0;
exception_handler = (fun _ -> ());
all_auth_methods = [ ];
auth_methods = [ ];
auth_current = Hashtbl.create 3;
nolog = false;
}
in
Hashtbl.add cl.mstring_factories "*" Xdr_mstring.string_based_mstrings;
let portmapper_engine prot host prog esys =
(* Performs GETPORT for the program on [host]. We use
* Rpc_portmapper_aux but not Rpc_portmapper_clnt. The latter is
* impossible because of a dependency cycle.
*)
dlog cl "starting portmapper query";
let pm_port = Rtypes.int_of_uint4 Rpc_portmapper_aux.pmap_port in
let pm_prog = Rpc_portmapper_aux.program_PMAP'V2 in
let pm_client =
internal_create
(Rtypes.uint4_of_int 0)
shutdown_connector
(Some pm_prog)
(`Socket(Rpc.Udp, Inet(host, pm_port), default_socket_config))
esys
in
let v =
Rpc_portmapper_aux._of_PMAP'V2'pmapproc_getport'arg
{ Rpc_portmapper_aux.prog = Rpc_program.program_number prog;
vers = Rpc_program.version_number prog;
prot = ( match prot with
| Tcp -> Rpc_portmapper_aux.ipproto_tcp
| Udp -> Rpc_portmapper_aux.ipproto_udp
);
port = Rtypes.uint4_of_int 0;
} in
let close_deferred() =
Unixqueue.once esys (Unixqueue.new_group esys) 0.0
(fun() -> close pm_client) in
new Uq_engines.map_engine
~map_done:(fun r ->
dlog cl "Portmapper GETPORT done";
let addr =
match pm_client.trans with
| None -> assert false
| Some trans ->
( match trans # getpeername with
| `Implied -> assert false
| `Sockaddr a -> a
) in
let port =
Rpc_portmapper_aux._to_PMAP'V2'pmapproc_getport'res r in
let port = Rtypes.int_of_uint4 port in
close_deferred();
if port = 0 then
`Error (Failure "Program not bound in Portmapper")
else
`Done(addr, port)
)
~map_error:(fun err ->
dlog cl "Portmapper GETPORT error";
close_deferred();
`Error err)
(new unbound_async_call pm_client pm_prog "PMAPPROC_GETPORT" v)
in
let connect_engine addr esys =
match addr with
| `Portmapped(prot,host) ->
( match prog_opt with
| None ->
failwith
"Rpc_client.unbound_create: Portmapped not supported"
| Some prog ->
new Uq_engines.seq_engine
(portmapper_engine prot host prog esys)
(fun (sockaddr, port) ->
let inetaddr =
match sockaddr with
| Unix.ADDR_INET(inet, _) -> inet
| _ -> assert false in
let stype =
match prot with
| Tcp -> Unix.SOCK_STREAM
| Udp -> Unix.SOCK_DGRAM in
let addr = `Sock_inet(stype, inetaddr, port) in
let opts = Uq_engines.default_connect_options in
Uq_engines.connector (`Socket(addr,opts)) esys
)
)
| #Uq_engines.connect_address as addr ->
Uq_engines.connector addr esys in
let track fd =
Netlog.Debug.track_fd ~owner:"Rpc_client" ~descr:id_s fd in
let disable_nagle fd =
try
Unix.setsockopt fd Unix.TCP_NODELAY true
with _ -> () in
let open_socket_non_blocking addr prot conf =
new Uq_engines.seq_engine
(connect_engine addr esys)
(fun status ->
dlogr cl
(fun () ->
"Non-blocking socket connect successful for " ^ id_s);
let fd = Uq_engines.client_endpoint status in
disable_nagle fd;
track fd;
conf # multiplexing ~close_inactive_descr:true prot fd esys
) in
let open_socket_blocking addr prot conf =
let conn_esys = Unixqueue.create_unix_event_system() in
let c = connect_engine addr conn_esys in
Unixqueue.run conn_esys;
match c # state with
| `Done status ->
dlogr cl
(fun () ->
"Blocking socket connect successful for " ^ id_s);
let fd = Uq_engines.client_endpoint status in
disable_nagle fd;
track fd;
conf # multiplexing ~close_inactive_descr:true prot fd esys
| `Error err ->
raise err
| _ ->
assert false
in
let open_socket =
if non_blocking_connect then
open_socket_non_blocking
else
open_socket_blocking in
let (prot, establish_engine) =
match mode with
| `Socket_endpoint(prot,fd) ->
disable_nagle fd;
track fd;
let m = mplex_of_fd ~close_inactive_descr:true prot fd esys in
(prot, new Uq_engines.epsilon_engine (`Done m) esys)
| `Multiplexer_endpoint(mplex) ->
if mplex # event_system != esys then
failwith "Rpc_client.create2: \
Multiplexer is attached to the wrong event system";
(mplex # protocol,
new Uq_engines.epsilon_engine (`Done mplex) esys)
| `Socket(prot,conn,conf) ->
let stype =
match prot with Tcp -> Unix.SOCK_STREAM | Udp -> Unix.SOCK_DGRAM in
(match conn with
| Inet (host,port) ->
let saddr = `Sock_inet_byname(stype, host, port) in
let addr =
`Socket(saddr, Uq_engines.default_connect_options) in
(prot, open_socket addr prot conf)
| Internet (host,port) ->
let saddr = `Sock_inet(stype, host, port) in
let addr =
`Socket(saddr, Uq_engines.default_connect_options) in
(prot, open_socket addr prot conf)
| Unix path ->
let saddr = `Sock_unix(stype, path) in
let addr =
`Socket(saddr,
Uq_engines.default_connect_options) in
(prot, open_socket addr prot conf)
| W32_pipe path ->
if prot <> Rpc.Tcp then
failwith "Rpc_client.create2: \
Pipe only supported for Rpc.Tcp protocol type";
let addr = `W32_pipe(Netsys_win32.Pipe_duplex, path) in
(prot, open_socket addr prot conf)
| Descriptor fd ->
(* no track! *)
let m =
mplex_of_fd ~close_inactive_descr:false prot fd esys in
(prot, new Uq_engines.epsilon_engine (`Done m) esys)
| Dynamic_descriptor f ->
let fd = f() in
track fd;
let m =
mplex_of_fd ~close_inactive_descr:true prot fd esys in
(prot, new Uq_engines.epsilon_engine (`Done m) esys)
| Portmapped host ->
(prot, open_socket (`Portmapped(prot,host)) prot conf)
)
in
let timeout = if prot = Udp then 15.0 else (-.1.0) in
let max_retransmissions = if prot = Udp then 3 else 0 in
(* update cl: *)
cl.prot <- prot;
cl.est_engine <- Some establish_engine;
cl.next_timeout <- timeout;
cl.timeout <- timeout;
cl.next_max_retransmissions <- max_retransmissions;
cl.max_retransmissions <- max_retransmissions;
cl.exception_handler <- (fun exn ->
if not cl.nolog then
Netlog.logf `Crit
"Rpc_client: Uncaught exception %s"
(Netexn.to_string exn)
);
Uq_engines.when_state
~is_done:(fun mplex ->
dlogr cl
(fun () ->
sprintf
"Fully connected for %s: (sock=%s,peer=%s)"
id_s
(Rpc_transport.string_of_sockaddr mplex#getsockname)
(Rpc_transport.string_of_sockaddr mplex#getpeername));
cl.trans <- Some mplex;
cl.est_engine <- None;
(* Maybe we already have messages to send: *)
!check_for_output cl
)
~is_error:(fun err ->
cl.est_engine <- None;
close ~error:(Communication_error err) cl;
cl.exception_handler err
)
~is_aborted:(fun () ->
cl.est_engine <- None)
establish_engine;
cl
;;
let create2 ?program_number ?version_number ?(initial_xid=0)
?(shutdown = shutdown_connector)
mode prog0 esys =
let prog = Rpc_program.update ?program_number ?version_number prog0 in
let cl =
internal_create
(Rtypes.uint4_of_int initial_xid) shutdown (Some prog) mode esys in
cl
let unbound_create ?(initial_xid=0) ?(shutdown = shutdown_connector)
mode esys =
internal_create (Rtypes.uint4_of_int initial_xid) shutdown None mode esys
let bind cl prog =
cl.progs <- prog :: cl.progs
let use cl prog =
let prog_id = Rpc_program.id prog in
if not(List.exists (fun p -> Rpc_program.id p = prog_id) cl.progs) then
failwith "Rpc_client.use: This program is not bound by this client"
(*****)
let is_up cl =
cl.ready
let configure_next_call cl max_retransmission_trials timeout =
cl.next_max_retransmissions <- max_retransmission_trials;
cl.next_timeout <- timeout
let configure cl max_retransmission_trials timeout =
cl.max_retransmissions <- max_retransmission_trials;
cl.timeout <- timeout;
configure_next_call cl max_retransmission_trials timeout
let set_dgram_destination cl addr_opt =
cl.next_destination <- addr_opt
let set_exception_handler cl xh =
cl.exception_handler <- xh
let set_batch_call cl =
cl.next_batch_flag <- true
let set_max_response_length cl n =
cl.max_resp_length <- Some n
let set_mstring_factories cl fac =
cl.mstring_factories <- fac
let set_user_name cl n =
cl.user_name <- n
let gen_shutdown cl is_running run ondown =
if cl.ready then (
let b = is_running cl.esys in
( match cl.est_engine with
| None -> ()
| Some e -> e#abort()
);
close ~ondown cl;
if not b then run cl.esys
)
else
ondown()
let shut_down cl =
gen_shutdown
cl
Unixqueue.is_running
Unixqueue.run
(fun () -> ())
let sync_shutdown cl =
gen_shutdown
cl
(fun esys ->
if Unixqueue.is_running esys then
failwith "Rpc_client.sync_shutdown: called from event loop";
false)
Unixqueue.run
(fun () -> ())
let trigger_shutdown cl ondown =
gen_shutdown
cl
(fun _ -> true)
Unixqueue.run
(fun () ->
let g = Unixqueue.new_group cl.esys in
Unixqueue.once cl.esys g 0.0 ondown
)
let event_system cl =
cl.esys
let program cl =
List.hd cl.progs
let programs cl =
cl.progs
let get_socket_name cl =
match cl.trans with
| None -> failwith "Rpc_client.get_socket_name: not connected"
| Some trans ->
( match trans # getsockname with
| `Implied ->
failwith "Rpc_client.get_socket_name: not applicable"
| `Sockaddr a -> a
)
let get_peer_name cl =
match cl.trans with
| None -> failwith "Rpc_client.get_peer_name: not connected"
| Some trans ->
( match trans # getpeername with
| `Implied ->
failwith "Rpc_client.get_peer_name: not applicable"
| `Sockaddr a -> a
)
let get_sender_of_last_response cl =
match cl.last_replier with
| None -> failwith "Rpc_client.get_sender_of_last_response: nothing received yet or sender's address not available from transport layer"
| Some addr -> addr
let get_xid_of_last_call cl =
match cl.last_xid with
| None ->
failwith "Rpc_client.get_xid_of_last_call: nothing called"
| Some xid -> xid
let get_protocol cl =
cl.prot
let verbose b =
Debug.enable := b
(*****)
(* Now synchronous calls: *)
type 'a result =
No
| Reply of 'a
| Error of exn
exception Stop_call
let synchronize esys f_async arg =
let r = ref No in
let get_result transmitter =
try
r := Reply (transmitter())
with
x ->
r := Error x;
if x = Message_timeout then
raise (Unbound_exception Stop_call)
in
(* push the request onto the queue: *)
let () = f_async arg get_result in
(* run through the queue and process all elements: *)
( try Unixqueue.run esys with Stop_call -> ());
(* now a call back of 'get_result' should have happened. *)
match !r with
No -> failwith "Rpc_client.synchronize: internal error"
| Reply x -> x
| Error e -> raise e
let unbound_sync_call cl prog proc arg =
synchronize
cl.esys
(unbound_async_call cl prog proc)
arg
(*****)
(* DEPRECATED FUNCTIONS *)
let add_call cl procname param receiver =
if not cl.ready then
raise Client_is_down;
match cl.progs with
| [ prog ] ->
unbound_async_call cl prog procname param receiver
| _ ->
failwith "Rpc_client.add_call [deprecated function]: \
The client does not have exactly one bound program"
let sync_call cl procname param =
if not cl.ready then
raise Client_is_down;
match cl.progs with
| [ prog ] ->
unbound_sync_call cl prog procname param
| _ ->
failwith "Rpc_client.sync_call [deprecated function]: \
The client does not have exactly one bound program"
let create ?program_number ?version_number ?(initial_xid=0)
?(shutdown = shutdown_connector)
esys c prot prog0 =
create2
?program_number ?version_number ~initial_xid ~shutdown
(`Socket(prot, c, (new blocking_socket_config)))
prog0
esys