(* $Id: rpc_netplex.ml 2195 2015-01-01 12:23:39Z gerd $ *)
open Printf
let debug_rpc_internals = ref false
let debug_rpc_service = ref false
let rpc_factory
~configure
?(socket_config = fun _ -> Rpc_server.default_socket_config)
?(hooks = fun _ -> new Netplex_kit.empty_processor_hooks())
?(supported_ptypes = [ `Multi_processing; `Multi_threading ])
~name
~setup
() =
let pmap_register sockserv progs_and_versions port =
let need_ipv6 = ref false in
List.iter
(fun (_, fds) ->
Array.iter
(fun fd ->
match Unix.getsockname fd with
| Unix.ADDR_INET(a,p) ->
if Netsys.is_ipv6_inet_addr a then need_ipv6 := true;
( match !port with
| None -> port := Some p;
| Some p' ->
if p <> p' then
failwith ("Cannot register RPC service in the portmapper when it listens to several ports")
)
| _ -> ()
)
fds
)
sockserv#sockets;
match !port with
| None -> ()
| Some p ->
let pmap = Rpc_portmapper.create_local() in
List.iter
(fun (prog_nr, vers_nr) ->
ignore(
Rpc_portmapper.unset_rpcbind pmap prog_nr vers_nr "" "" "")
)
progs_and_versions;
let addrs =
if !need_ipv6 then
[ Unix.inet6_addr_any; Unix.inet_addr_any ]
else
[ Unix.inet_addr_any ] in
List.iter
(fun (prog_nr, vers_nr) ->
List.iter
(fun addr ->
let netid = Rpc.netid_of_inet_addr addr Rpc.Tcp in
let uaddr = Rpc.create_inet_uaddr addr p in
let owner = string_of_int (Unix.getuid()) in
ignore(Rpc_portmapper.set_rpcbind pmap prog_nr vers_nr netid
uaddr owner)
)
addrs
)
progs_and_versions;
Rpc_portmapper.shut_down pmap
in
let pmap_unregister sockserv progs_and_versions port =
match !port with
| None -> ()
| Some p ->
let pmap = Rpc_portmapper.create_local() in
List.iter
(fun (prog_nr, vers_nr) ->
ignore(
Rpc_portmapper.unset_rpcbind pmap prog_nr vers_nr "" "" "")
)
progs_and_versions;
Rpc_portmapper.shut_down pmap;
in
( object(self)
method name = name
method create_processor ctrl_cfg cf addr =
let use_portmapper =
try
cf # bool_param (cf # resolve_parameter addr "portmapper")
with Not_found -> false in
let timeout_opt =
try
Some(cf # float_param (cf # resolve_parameter addr "timeout"))
with Not_found -> None in
let custom_cfg = configure cf addr in
let sconf = socket_config custom_cfg in
(* Find out the bindings by creating a fake server: *)
let progs_and_versions =
let esys = Unixqueue.create_unix_event_system () in
(*
let (fd0, fd1) = Unix.socketpair Unix.PF_UNIX Unix.SOCK_STREAM 0 in
Unix.close fd1;
let srv = Rpc_server.create2 (`Socket_endpoint(Rpc.Tcp,fd0)) esys in
*)
let srv = Rpc_server.create2 (`Dummy Rpc.Tcp) esys in
setup srv custom_cfg;
(* Unix.close fd0; *)
let progs = Rpc_server.bound_programs srv in
List.map
(fun prog ->
(Rpc_program.program_number prog,
Rpc_program.version_number prog)
)
progs
in
let port = ref None in
let srv_list = ref [] in
( object(self)
inherit Netplex_kit.processor_base (hooks custom_cfg) as super
method post_add_hook sockserv =
if use_portmapper then
pmap_register sockserv progs_and_versions port;
super # post_add_hook sockserv
method post_rm_hook sockserv =
if use_portmapper then
pmap_unregister sockserv progs_and_versions port;
super # post_rm_hook sockserv
method receive_admin_message cnt name args =
match name with
| "netplex.connections" -> (* intercept this one *)
List.iter
(fun (srv,fd) ->
cnt # update_detail fd
("Last action: " ^
(Rpc_server.get_last_proc_info srv))
)
!srv_list;
| _ ->
super#receive_admin_message cnt name args
method shutdown () =
List.iter
(fun (srv,_) ->
Rpc_server.stop_server
~graceful:true
srv)
!srv_list;
srv_list := [];
super # shutdown()
method process ~when_done container fd proto =
(* We track here fd - it is released and closed by mplex_eng
because of close_inactive_descr:true
*)
Netlog.Debug.track_fd
~owner:"Rpc_netplex"
~descr:(sprintf "RPC connection %s"
(Netsys.string_of_fd fd))
fd;
let esys = container # event_system in
let mplex_eng = sconf # multiplexing
~close_inactive_descr:true Rpc.Tcp fd esys in
Uq_engines.when_state
~is_done:(fun mplex ->
let srv =
Rpc_server.create2
(`Multiplexer_endpoint mplex) esys in
srv_list := (srv,fd) :: !srv_list;
Rpc_server.set_exception_handler srv
(fun err bt ->
container # log
`Crit
("RPC server caught exception: " ^
Netexn.to_string err);
container # log
`Crit
("Backtrace: " ^ bt);
);
Rpc_server.set_onclose_action
srv (fun _ ->
srv_list :=
List.filter
(fun (srv',_) -> srv' != srv)
!srv_list;
let g = Unixqueue.new_group esys in
Unixqueue.once esys g 0.0 when_done);
( match timeout_opt with
| Some t ->
Rpc_server.set_timeout srv t
| None ->
()
);
setup srv custom_cfg)
~is_error:(fun err ->
container # log `Crit
("Cannot create RPC multiplexer: " ^
Netexn.to_string err)
)
mplex_eng
method supported_ptypes =
supported_ptypes
end
)
end
)
;;