Plasma GitLab Archive
Projects Blog Knowledge

(* $Id: rpc_netplex.ml 1412 2010-02-15 16:20:27Z gerd $ *)

open Printf

let debug_rpc_internals = ref false
let debug_rpc_service = ref false


let rpc_factory 
      ~configure
      ?(socket_config = fun _ -> Rpc_server.default_socket_config)
      ?(hooks = fun _ -> new Netplex_kit.empty_processor_hooks())
      ?(supported_ptypes = [ `Multi_processing; `Multi_threading ])
      ~name
      ~setup
      () =

  let pmap_register sockserv progs_and_versions port =
    List.iter
      (fun (_, fds) ->
	 Array.iter
	   (fun fd ->
	      match Unix.getsockname fd with
		| Unix.ADDR_INET(_,p) ->
		    ( match !port with
			| None -> port := Some p;
			| Some p' ->
			    if p <> p' then
			      failwith ("Cannot register RPC service in the portmapper when it listens to several ports")
		    )
		| _ -> ()
	   )
	   fds
      )
      sockserv#sockets;
    match !port with
      | None -> ()
      | Some p ->
	  let pmap = Rpc_portmapper.create_inet "localhost" in
	  List.iter
	    (fun (prog_nr, vers_nr) ->
	       ( match Rpc_portmapper.getport pmap prog_nr vers_nr Rpc.Tcp with
		   | 0 -> ()
		   | p' ->
		       let _ =
			 Rpc_portmapper.unset pmap prog_nr vers_nr Rpc.Tcp p' in
		       ()
	       );
	       ignore(Rpc_portmapper.set pmap prog_nr vers_nr Rpc.Tcp p)
	    )
	    progs_and_versions;
	  Rpc_portmapper.shut_down pmap
  in

  let pmap_unregister sockserv progs_and_versions port =
    match !port with
      | None -> ()
      | Some p ->
	  let pmap = Rpc_portmapper.create_inet "localhost" in
	  List.iter
	    (fun (prog_nr, vers_nr) ->
	       ignore(Rpc_portmapper.unset pmap prog_nr vers_nr Rpc.Tcp p);
	    )
	    progs_and_versions;
	  Rpc_portmapper.shut_down pmap;
  in

  ( object(self)
      method name = name
      method create_processor ctrl_cfg cf addr =
	let use_portmapper =
	  try
	    cf # bool_param (cf # resolve_parameter addr "portmapper") 
	  with Not_found -> false in
	let timeout_opt =
	  try
	    Some(cf # float_param (cf # resolve_parameter addr "timeout"))
	  with Not_found -> None in
	let custom_cfg = configure cf addr in
	let sconf = socket_config custom_cfg in

	(* Find out the bindings by creating a fake server: *)
	let progs_and_versions =
	  let esys = Unixqueue.create_unix_event_system () in
(*
	  let (fd0, fd1) = Unix.socketpair Unix.PF_UNIX Unix.SOCK_STREAM 0 in
	  Unix.close fd1;
	  let srv = Rpc_server.create2 (`Socket_endpoint(Rpc.Tcp,fd0)) esys in
 *)
	  let srv = Rpc_server.create2 (`Dummy Rpc.Tcp) esys in
	  setup srv custom_cfg;
	  (* Unix.close fd0; *)
	  let progs = Rpc_server.bound_programs srv in
	  List.map
	    (fun prog ->
	       (Rpc_program.program_number prog,
		Rpc_program.version_number prog)
	    )
	    progs
	in

	let port = ref None in
	let srv_list = ref [] in

	( object(self)
	    inherit Netplex_kit.processor_base (hooks custom_cfg) as super

	    method post_add_hook sockserv =
	      if use_portmapper then 
		pmap_register sockserv progs_and_versions port;
	      super # post_add_hook sockserv

	    method post_rm_hook sockserv =
	      if use_portmapper then 
		pmap_unregister sockserv progs_and_versions port;
	      super # post_rm_hook sockserv

	    method receive_admin_message cnt name args =
	      match name with
		| "netplex.connections" ->   (* intercept this one *)
		    List.iter
		      (fun (srv,fd) ->
			 cnt # update_detail fd 
			   ("Last action: " ^ 
			      (Rpc_server.get_last_proc_info srv))
		      )
		      !srv_list;
		| _ ->
		    super#receive_admin_message cnt name args

	    method shutdown () =
	      List.iter
		(fun (srv,_) ->
		   Rpc_server.stop_server 
		     ~graceful:true
		     srv)
		!srv_list;
	      srv_list := [];
	      super # shutdown()
		

	    method process ~when_done container fd proto =
	      (* We track here fd - it is released and closed by mplex_eng
                 because of close_inactive_descr:true
	       *)
	      Netlog.Debug.track_fd
		~owner:"Rpc_netplex"
		~descr:(sprintf "RPC connection %s"
			  (Netsys.string_of_fd fd))
		fd;
	      let esys = container # event_system in
	      let mplex_eng = sconf # multiplexing 
		~close_inactive_descr:true Rpc.Tcp fd esys in
	      Uq_engines.when_state
		~is_done:(fun mplex ->
			    let srv = 
			      Rpc_server.create2 
				(`Multiplexer_endpoint mplex) esys in
			    srv_list := (srv,fd) :: !srv_list;
			    Rpc_server.set_exception_handler srv
			      (fun err ->
				 container # log
				   `Crit
				   ("RPC server caught exception: " ^ 
				      Netexn.to_string err));
			    Rpc_server.set_onclose_action 
			      srv (fun _ ->
				     srv_list :=
				       List.filter
					 (fun (srv',_) -> srv' != srv)
					 !srv_list;
				     let g = Unixqueue.new_group esys in
				     Unixqueue.once esys g 0.0 when_done);
			    ( match timeout_opt with
				| Some t ->
				    Rpc_server.set_timeout srv t
				| None ->
				    ()
			    );
			    setup srv custom_cfg)
		~is_error:(fun err ->
			     container # log `Crit 
			       ("Cannot create RPC multiplexer: " ^ 
				  Netexn.to_string err)
			  )
		mplex_eng

	    method supported_ptypes = 
	      supported_ptypes

	  end
	)
    end
  )
;;

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml