Plasma GitLab Archive
Projects Blog Knowledge

Module Rpc_proxy.ManagedClient


module ManagedClient: sig .. end


Managed clients are Rpc_client clients with the ability to reconnect in the case of errors.

Additional features:

  • they can also be disabled, either based on a time criterion or a customizable hook. This encodes the assumption that failing servers need some time to recover
  • unused connections are closed (driven by a timeout)
  • support for the initial ping after establishing the connection
Initial pings are useful to test whether the connection is really working. Servers normally accept new TCP connections without knowing whether there are resources for processing the connections (i.e. whether there is a process or thread waiting for serving it). Because of this, the client cannot assume that the TCP connection is really up only because the connect system call said the connection is there. The initial ping fixes this problem: The null procedure is once called after the TCP connection has been established. Only when this works the client believes the connection is really up. It is required that mclient_programs is configured with at least one program, and this program must have a procedure number 0 of type void -> void.

In multi-threaded programs, threads must not share managed clients.

Managed clients can be used together with ocamlrpcgen-generated modules. Provided the generated module M_clnt contains the client code for program P and version V, one can do

         module MC = M_clnt.Make'P(Rpc_proxy.ManagedClient)
      

and call RPCs f as in

         let res = MC.V.f mc arg
      

(if mc is the managed client, and arg the argument).

type mclient 
A managed client

type mclient_config = {
   mclient_rcache : Rpc_proxy.ReliabilityCache.rcache; (*The rcache*)
   mclient_socket_config : Rpc_client.socket_config; (*The socket configuration*)
   mclient_idle_timeout : float; (*After how many seconds unused connections are closed. A negative value means never. 0 means immediately. A positive value is a point in time in the future.*)
   mclient_programs : Rpc_program.t list; (*The programs to bind*)
   mclient_msg_timeout : float; (*After how many seconds the reply must have been arrived. A negative value means there is no timeout. 0 means immediately. A positive value is a point in time in the future.*)
   mclient_msg_timeout_is_fatal : bool; (*Whether a message timeout is to be considered as fatal error (the client is shut down, and the error counter for the endpoint is increased)*)
   mclient_exception_handler : (exn -> unit) option; (*Whether to call Rpc_client.set_exception_handler*)
   mclient_auth_methods : Rpc_client.auth_method list; (*Set these authentication methods in the client*)
   mclient_initial_ping : bool; (*Whether to call procedure 0 of the first program after connection establishment (see comments above)*)
   mclient_max_response_length : int option; (*The maximum response length. See Rpc_client.set_max_response_length.*)
   mclient_mstring_factories : Xdr_mstring.named_mstring_factories option; (*The factories to use for decoding managed strings*)
}
exception Service_unavailable
Procedure calls may end with this exception when the reliability cache disables the service
val create_mclient_config : ?rcache:Rpc_proxy.ReliabilityCache.rcache ->
?socket_config:Rpc_client.socket_config ->
?idle_timeout:float ->
?programs:Rpc_program.t list ->
?msg_timeout:float ->
?msg_timeout_is_fatal:bool ->
?exception_handler:(exn -> unit) ->
?auth_methods:Rpc_client.auth_method list ->
?initial_ping:bool ->
?max_response_length:int ->
?mstring_factories:Xdr_mstring.named_mstring_factories ->
unit -> mclient_config

Create a config record. The optional arguments set the config components with the same name. The defaults are:
  • rcache: Use the global reliability cache
  • socket_config: Rpc_client.default_socket_config
  • programs: The empty list. It is very advisable to fill this!
  • msg_timeout: (-1), i.e. none
  • msg_timeout_is_fatal: false
  • exception_handler: None
  • auth_methods: empty list
  • initial_ping: false
  • max_response_length: None
  • mstring_factories: None

val create_mclient : mclient_config ->
Rpc_client.connector ->
Unixqueue.event_system -> mclient
Create a managed client for this config connecting to this connector. Only Internet and Unix connectors are supported.
type state = [ `Connecting | `Down | `Up of Unix.sockaddr option ] 
The state:
  • `Down: The initial state, and also reached after a socket error, or after one of the shutdown functions is called. Although `Down, there might still some cleanup to do. When RPC functions are called, the client is automatically revived.
  • `Connecting: This state is used while the initial ping is done. It does not reflect whether the client is really TCP-connected. Without initial ping, this state cannot occur.
  • `Up s: The client is (so far known) up and can be used. s is the socket address of the local socket

val mclient_state : mclient -> state
Get the state
val mclient_serial : mclient -> int
Get the serial number of the connection. The serial number is increased when the client is reconnected. If the client is down the serial number of the next connection attempt is returned.
val pending_calls : mclient -> int
Returns the number of pending calls
val event_system : mclient -> Unixqueue.event_system
Return the event system
val shut_down : mclient -> unit
val sync_shutdown : mclient -> unit
val trigger_shutdown : mclient -> (unit -> unit) -> unit
Shut down the managed client. See the corresponding functions Rpc_client.shut_down, Rpc_client.sync_shutdown, and Rpc_client.trigger_shutdown
val record_unavailability : mclient -> unit
Increases the error counter in the reliability cache for this connection. The only effect can be that the error counter exceeds the rcache_threshold so that the server endpoint is disabled for some time. However, this only affects new connections, not existing ones.

For a stricter interpretation of errors see enforce_unavailability.

The error counter is increased anyway when a socket error happens, or an RPC call times out and msg_timeout_is_fatal is set. This function can be used to also interpret other misbehaviors as fatal errors.

val enforce_unavailability : mclient -> unit
Enforces that all pending procedure calls get the Service_unavailable exception, and that the client is shut down. The background is this: When the reliability cache discovers an unavailable port or host, only the new call is stopped with this exception, but older calls remain unaffected. This function can be used to change the policy, and to stop even pending calls.

The difference to trigger_shutdown is that the pending RPC calls get the exception Service_unavailable instead of Rpc_client.Message_lost, and that it is enforced that the shutdown is recorded as fatal error in the reliability cache.

val set_batch_call : mclient -> unit
The next call is a batch call. See Rpc_client.set_batch_call
val rpc_engine : mclient ->
(mclient -> 'a -> ((unit -> 'b) -> unit) -> unit) ->
'a -> 'b Uq_engines.engine
Call an RPC function in engine style:

 let e = rpc_engine mc f_rpc 

where f_rpc is one of the generated client functions (async signature). The engine reaches `Done r when the result r has arrived.

The engine is not abortable (shut the client down instead).

val compare : mclient -> mclient -> int
ManagedClient can be used with Set.Make and Map.Make
include Rpc_client.USE_CLIENT

We implement the USE_CLIENT interface for calling procedures
This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml