(* $Id: netplex_container.ml 1867 2013-07-21 12:43:13Z gerd $ *)
open Netplex_types
open Netplex_ctrl_aux
open Printf
module Debug = struct
let enable = ref false
end
let dlog = Netlog.Debug.mk_dlog "Netplex_container" Debug.enable
let dlogr = Netlog.Debug.mk_dlogr "Netplex_container" Debug.enable
let () =
Netlog.Debug.register_module "Netplex_container" Debug.enable
let string_of_event =
function
| `event_none -> "NONE"
| `event_accept n -> "ACCEPT(" ^ string_of_int n ^ ")"
| `event_noaccept -> "NOACCEPT"
| `event_received_message msg ->
"RECEIVED_MESSAGE(" ^ msg.msg_name ^ ")"
| `event_received_admin_message msg ->
"RECEIVED_ADMIN_MESSAGE(" ^ msg.msg_name ^ ")"
| `event_shutdown -> "SHUTDOWN"
| `event_system_shutdown -> "SYSTEM_SHUTDOWN"
let string_of_sys_id =
function
| `Thread id -> "thread " ^ string_of_int id
| `Process id -> "process " ^ string_of_int id
let t_poll = 0.1
class std_container ?esys
ptype sockserv =
let ssn = sockserv # name in
let esys =
match esys with
| Some esys -> esys
| None -> sockserv # processor # container_event_system() in
let sys_esys = Unixqueue.create_unix_event_system() in
let sys_mon = Uq_mt.create_monitor sys_esys in
let cont_thr_id = !Netsys_oothr.provider # self # id in
object(self)
val sys_esys = sys_esys (* for subclasses *)
val mutable rpc = None
val mutable sys_rpc = None
val mutable nr_conns = 0
val mutable nr_conns_total = 0
val mutable engines = []
val mutable vars = Hashtbl.create 10
val vars_mutex = !Netsys_oothr.provider # create_mutex()
val mutable polling = None
val mutable polling_timer = None
val mutable greedy = false
method container_id = (self :> container_id)
method socket_service_name = ssn
method socket_service = sockserv
method event_system = esys
method ptype = ptype
method start fd_clnt sys_fd_clnt =
(* note that logging is first possible when sys_rpc is initialized! *)
if rpc <> None then
failwith "#start: already started";
let sys_id = Netplex_cenv.current_sys_id() in
( match ptype with
| `Multi_processing ->
( match sockserv # socket_service_config # change_user_to with
| None -> ()
| Some(uid,gid) ->
(* In Netplex_config it has already been checked whether the
* effective uid of the process is root. So the following
* drops all privileges:
*)
Unix.setgid gid;
Unix.setuid uid
)
| _ -> ()
);
(* Note: fd_clnt and sys_fd_clnt are closed by the caller *)
let rpc_cl =
Netplex_ctrl_clnt.Control.V1.create_client
~esys
(Rpc_client.Descriptor fd_clnt)
Rpc.Tcp in
if not !Debug.enable then
Rpc_client.Debug.disable_for_client rpc_cl;
rpc <- Some rpc_cl;
let sys_rpc_cl =
Netplex_ctrl_clnt.System.V1.create_client
~esys:sys_esys
(Rpc_client.Descriptor sys_fd_clnt)
Rpc.Tcp in
if not !Debug.enable then
Rpc_client.Debug.disable_for_client sys_rpc_cl;
sys_rpc <- Some sys_rpc_cl;
self # setup_container_sockets();
dlogr
(fun () ->
sprintf "Container %d: Starting (start) in %s"
(Oo.id self) (string_of_sys_id sys_id));
self # protect "post_start_hook"
(sockserv # processor # post_start_hook)
(self : #container :> container);
self # restart_polling();
self # protect_run();
(* protect_run returns when all events are handled. This must include
[rpc], so the client must now be down.
*)
assert(rpc = None);
self # protect "pre_finish_hook"
(sockserv # processor # pre_finish_hook)
(self : #container :> container);
self # close_container_sockets();
rpc <- None;
dlogr
(fun () -> sprintf "Container %d: Finishing (finish)" (Oo.id self))
method private protect : 's. string -> ('s -> unit) -> 's -> unit =
fun label f arg ->
( try
dlogr
(fun () ->
sprintf "Container %d: calling hook %s" (Oo.id self) label);
f arg
with
| error ->
( match rpc with
| None -> () (* no way to report this error *)
| Some r ->
self # log `Crit (label ^ ": Exception " ^
Netexn.to_string error)
)
);
dlogr
(fun () ->
sprintf "Container %d: back from hook %s" (Oo.id self) label)
method private protect_run () =
try
sockserv # processor # container_run esys
with
| error ->
let bt = Printexc.get_backtrace() in
self # log `Crit ("run: Exception " ^ Netexn.to_string error);
if Printexc.backtrace_status() then
self # log `Crit ("run: Backtrace: " ^ bt);
let b = sockserv # processor # global_exception_handler error in
if b then self # protect_run()
(* The following logic means:
- For synchronous processors, setup_polling is always called after
the connection is processed, i.e. restart_polling will find that
polling=None in this case. Because there is no concurrency, the
poll loop cannot have been started before.
- For async processors, setup_polling is re-established after each
received command (e.g. an ACCEPT), and after at most t_poll seconds.
It is NOT immediately called when the processor is done. This
means that the controller sees decreased nr_conns and fully_busy params
after a delay of at most t_poll seconds.
*)
method private restart_polling() =
match polling with
| None ->
self # setup_polling()
| Some _ ->
(* Greedy: We suppress this setup_polling, but ensure that there is a
timer
*)
if not greedy || polling_timer = None then
self # set_polling_timer();
method private set_polling_timer() =
let g = Unixqueue.new_group esys in
Unixqueue.once esys g t_poll
(fun () ->
self # setup_polling();
);
polling_timer <- Some g;
method private reset_polling_timer() =
match polling_timer with
| None -> ()
| Some g ->
Unixqueue.clear esys g;
polling_timer <- None
method private setup_polling() =
self # reset_polling_timer();
match rpc with
| None -> () (* Already shut down ! *)
| Some r ->
let fully_busy = self#conn_limit_reached in
dlogr
(fun () ->
sprintf "Container %d: Polling nr_conns=%d fully_busy=%B"
(Oo.id self) nr_conns fully_busy);
polling <- Some(nr_conns,fully_busy,engines=[]);
Netplex_ctrl_clnt.Control.V1.poll'async r (nr_conns,fully_busy)
(fun getreply ->
polling <- None;
let continue =
( try
let reply = getreply() in
dlogr
(fun () ->
sprintf "Container %d: Got event from polling: %s"
(Oo.id self) (string_of_event reply));
( match reply with
| `event_none ->
(* When [setup_calling] is called twice, the
second poll loop throws out the first poll
loop. The first loop gets then [`event_none],
and has to terminate. This is used after
a connection is done to gain attention from
the server.
*)
false
| `event_accept n_accept ->
self # enable_accepting n_accept;
true
| `event_noaccept ->
self # disable_accepting();
true
| `event_received_message msg ->
self # protect
"receive_message"
(sockserv # processor # receive_message
(self : #container :> container)
msg.msg_name)
msg.msg_arguments;
true
| `event_received_admin_message msg ->
self # receive_admin_message msg;
true
| `event_shutdown ->
self # disable_accepting();
self # disable_container_sockets();
self # protect
"shutdown"
(sockserv # processor # shutdown)
();
( try
Netplex_cenv.cancel_all_timers()
with
(* can happen in the admin case: *)
| Netplex_cenv.Not_in_container_thread -> ()
);
( match rpc with
| None -> ()
| Some r ->
rpc <- None;
Rpc_client.trigger_shutdown
r
self # shutdown_extra;
(* We ensure that shutdown_extra is called
when the client is already taken down
*)
);
false
| `event_system_shutdown ->
self # protect
"system_shutdown"
(sockserv # processor # system_shutdown)
();
true
)
with
| Rpc_client.Message_lost ->
(* This can happen when the container is shut down
* and several [poll] calls are active at the same
* time.
*)
false
| error ->
self # log `Crit ("poll: Exception " ^
Netexn.to_string error);
true
) in
if continue then
self # setup_polling()
)
method private enable_accepting n_accept =
if engines = [] then (
List.iter
(fun (proto, fd_array) ->
Array.iter
(fun fd ->
dlogr
(fun () ->
sprintf "Container %d: Accepting on fd %Ld"
(Oo.id self)
(Netsys.int64_of_file_descr fd));
let acc = new Uq_engines.direct_acceptor fd esys in
let e = acc # accept() in
Uq_engines.when_state
~is_done:(fun (fd_slave,_) ->
(* The first engine accepted a connection. It is
* possible that other descriptors are ready
* for being accepted at this time. By disabling
* accepting, we ensure that these engines
* are aborted and the events are ignored.
* It is essential that the direct_endpoint_acceptor
* accepts in one step so intermediate states
* are impossible.
*)
dlogr
(fun () ->
sprintf "Container %d: Accepted as fd %Ld"
(Oo.id self)
(Netsys.int64_of_file_descr fd_slave));
self # disable_accepting();
self # greedy_accepting
(n_accept - 1) [fd_slave, proto]
)
~is_error:(fun err ->
self # log `Crit
("accept: Exception " ^
Netexn.to_string err)
)
e;
engines <- e :: engines
)
fd_array
)
sockserv#sockets
)
method private disable_accepting() =
dlogr
(fun () ->
sprintf "Container %d: No longer accepting" (Oo.id self));
List.iter (fun e -> e # abort()) engines;
engines <- [];
method private greedy_accepting n_accept accept_list =
let n_accept = ref n_accept in
let accept_list = ref accept_list in
let sockets = sockserv#sockets in
let x_sockets =
List.map
(fun (proto, fd_array) ->
(proto, fd_array, Array.map (fun _ -> true) fd_array)
)
sockets in
( try
let cont = ref true in
if !n_accept > 0 then greedy <- true;
while !cont && !n_accept > 0 do
cont := false;
List.iter
(fun (proto, fd_array, fd_cont) ->
Array.iteri
(fun k fd ->
if fd_cont.(k) then (
dlogr
(fun () ->
sprintf "Container %d: Greedy accepting on fd %Ld"
(Oo.id self)
(Netsys.int64_of_file_descr fd));
try
let fd_slave, _ = Unix.accept fd in
cont := true; (* try for another round *)
accept_list := (fd_slave, proto) :: !accept_list;
Unix.set_nonblock fd_slave;
dlogr
(fun () ->
sprintf "Container %d: Accepted as fd %Ld"
(Oo.id self)
(Netsys.int64_of_file_descr fd_slave));
decr n_accept;
if !n_accept = 0 then raise Exit;
with
| Unix.Unix_error((Unix.EAGAIN|Unix.EWOULDBLOCK|
Unix.EINTR),
_,_) ->
fd_cont.(k) <- false
| Unix.Unix_error _ as error ->
self # log `Crit
("accept: Exception " ^ Netexn.to_string error);
raise Exit
)
)
fd_array
)
x_sockets
done
with
| Exit ->
()
);
match !accept_list with
| [] -> ()
| (fd_slave_last,proto_last) :: l ->
List.iter
(fun (fd_slave, proto) ->
self # accepted_greedy fd_slave proto
)
(List.rev l);
(* The last connection in this round is always processed in
non-greedy style, so the controller gets a notification.
*)
self # accepted_nongreedy fd_slave_last proto_last
method private accepted_nongreedy fd_slave proto =
(* We first respond with the "accepted" message to the controller.
This is especially important for synchronous processors, because
this is the last chance to notify the controller about the state
change in sync contexts.
*)
self # prep_socket fd_slave proto;
match rpc with
| None -> assert false
| Some r ->
(* Resetting polling: From this point on the container counts as
busy, and we want that this state is left ASAP
*)
polling <- None;
Rpc_client.set_batch_call r;
Rpc_client.unbound_async_call
r Netplex_ctrl_aux.program_Control'V1 "accepted" Xdr.XV_void
(fun _ ->
self # process_conn fd_slave proto
)
method private accepted_greedy fd_slave proto =
self # prep_socket fd_slave proto;
self # process_conn fd_slave proto
method private process_conn fd_slave proto =
nr_conns <- nr_conns + 1;
nr_conns_total <- nr_conns_total + 1;
let regid = self # reg_conn fd_slave in
let when_done_called = ref false in
dlogr
(fun () ->
sprintf
"Container %d: processing connection on fd %Ld (total conns: %d)"
(Oo.id self)
(Netsys.int64_of_file_descr fd_slave)
nr_conns
);
self # workload_hook true;
self # protect
"process"
(sockserv # processor # process
~when_done:(fun fd ->
(* Note: It is up to the user to close
the descriptor. So the descriptor can
already be used for different purposes
right now!
*)
if not !when_done_called then (
nr_conns <- nr_conns - 1;
self # unreg_conn fd_slave regid;
when_done_called := true;
self # workload_hook false;
self # restart_polling();
dlogr
(fun () ->
sprintf "Container %d: \
Done with connection on fd %Ld \
(total conns %d)"
(Oo.id self)
(Netsys.int64_of_file_descr fd_slave)
nr_conns);
)
)
(self : #container :> container)
fd_slave
)
proto;
if not !when_done_called then
self # restart_polling();
method private prep_socket fd_slave proto =
try
let proto_obj =
List.find
(fun proto_obj ->
proto_obj#name = proto
)
sockserv#socket_service_config#protocols in
if proto_obj#tcp_nodelay then
Unix.setsockopt fd_slave Unix.TCP_NODELAY true
with
| Not_found -> ()
val mutable reg_conns = Hashtbl.create 10
val mutable reg_conns_cnt = 0
method private reg_conn fd =
let ifd = Netsys.int64_of_file_descr fd in
let line = Netplex_cenv.report_connection_string fd "" in
let cnt = reg_conns_cnt in
reg_conns_cnt <- cnt+1;
Hashtbl.replace reg_conns ifd (cnt,line);
cnt
method private unreg_conn fd cnt =
let ifd = Netsys.int64_of_file_descr fd in
try
let cnt',_ = Hashtbl.find reg_conns ifd in
if cnt' = cnt then (
Hashtbl.remove reg_conns ifd
)
with
| Not_found -> ()
method n_connections = nr_conns
method n_total = nr_conns_total
method private conn_limit_reached =
match sockserv#socket_service_config#conn_limit with
| None -> false
| Some lim -> nr_conns_total >= lim
val mutable gc_timer = None
method private workload_hook busier_flag =
if busier_flag then (
match gc_timer with
| None -> ()
| Some g -> Unixqueue.clear esys g
);
if self#conn_limit_reached && nr_conns = 0 then
self#shutdown()
else (
self # protect "workload_hook"
(sockserv # processor # workload_hook
(self : #container :> container)
busier_flag
)
nr_conns;
if nr_conns = 0 && sockserv#socket_service_config#gc_when_idle then (
match gc_timer with
| None ->
let g = Unixqueue.new_group esys in
Unixqueue.once esys g 1.0
(fun () -> Gc.full_major());
gc_timer <- Some g
| Some _ -> ()
)
)
method update_detail fd detail =
let ifd = Netsys.int64_of_file_descr fd in
let line = Netplex_cenv.report_connection_string fd detail in
try
let (cnt,_) = Hashtbl.find reg_conns ifd in
Hashtbl.replace reg_conns ifd (cnt,line)
with
| Not_found ->
failwith "#update_detail: unknown descriptor"
method system =
match sys_rpc with
| None -> failwith "#system: No RPC client available"
| Some r -> r
method system_monitor = sys_mon
method shutdown() =
dlogr
(fun () ->
sprintf
"Container %d: shutdown" (Oo.id self));
(* This method can be called from a different thread. In this case,
we have to ensure that the shutdown code is run in the main thread
of the container, and then have to wait until the shutdown is
complete
*)
let mt_case =
(cont_thr_id <> !Netsys_oothr.provider # self # id) in
if mt_case then (
let mutex = !Netsys_oothr.provider # create_mutex() in
let cond = !Netsys_oothr.provider # create_condition() in
let g = Unixqueue.new_group esys in
Unixqueue.once esys g 0.0
(fun () ->
self # shutdown_action();
cond # signal()
);
cond # wait mutex
)
else
self # shutdown_action()
method private shutdown_action() =
( try
Netplex_cenv.cancel_all_timers()
with
(* can happen in the admin case: *)
| Netplex_cenv.Not_in_container_thread -> ()
);
self # reset_polling_timer();
self # disable_accepting();
self # disable_container_sockets();
( match rpc with
| None -> ()
| Some r ->
rpc <- None;
Rpc_client.trigger_shutdown
r
self # shutdown_extra;
(* We ensure that shutdown_extra is called
when the client is already taken down
*)
)
method private shutdown_extra() =
(* to be subclassed *)
()
method log_subch subchannel level message =
match sys_rpc with
| None -> ()
| Some r ->
( try
let lev =
match level with
| `Emerg -> log_emerg
| `Alert -> log_alert
| `Crit -> log_crit
| `Err -> log_err
| `Warning -> log_warning
| `Notice -> log_notice
| `Info -> log_info
| `Debug -> log_debug in
Uq_mt.monitor_async
sys_mon
(fun arg emit ->
Rpc_client.set_batch_call r;
Netplex_ctrl_clnt.System.V1.log'async r arg emit
)
(lev,subchannel,message);
with
| error ->
prerr_endline("Netplex Catastrophic Error: Unable to send log message - exception " ^ Netexn.to_string error);
prerr_endline("Log message is: " ^ message)
)
method log level message =
self # log_subch "" level message
method lookup service protocol =
match sys_rpc with
| None -> failwith "#lookup: No RPC client available"
| Some r ->
Uq_mt.monitor_async
sys_mon
(Netplex_ctrl_clnt.System.V1.lookup'async r)
(service,protocol)
method send_message pat msg_name msg_arguments =
dlogr
(fun () ->
sprintf
"Container %d: send_message %s to %s" (Oo.id self) msg_name pat);
match sys_rpc with
| None -> failwith "#send_message: No RPC client available"
| Some r ->
let msg =
{ msg_name = msg_name;
msg_arguments = msg_arguments
} in
Uq_mt.monitor_async
sys_mon
(Netplex_ctrl_clnt.System.V1.send_message'async r)
(pat, msg)
method var name =
Netsys_oothr.serialize
vars_mutex
(Hashtbl.find vars) name
method set_var name value =
Netsys_oothr.serialize
vars_mutex
(Hashtbl.replace vars name) value
method call_plugin p proc_name arg =
dlogr
(fun () ->
sprintf
"Container %d: call_plugin id=%d procname=%s"
(Oo.id self) (Oo.id p) proc_name);
match sys_rpc with
| None -> failwith "#call_plugin: No RPC client available"
| Some r ->
let (_, arg_ty,res_ty) =
try
Rpc_program.signature p#program proc_name
with
| Not_found -> failwith "call_plugin: procedure not found" in
let arg_str = Xdr.pack_xdr_value_as_string arg arg_ty [] in
let res_str =
Uq_mt.monitor_async
sys_mon
(Netplex_ctrl_clnt.System.V1.call_plugin'async r)
((Int64.of_int (Oo.id p)), proc_name, arg_str) in
let res = Xdr.unpack_xdr_value ~fast:true res_str res_ty [] in
res
method private receive_admin_message msg =
match msg.msg_name with
| "netplex.debug.enable" ->
( try
let m =
match msg.msg_arguments with
| [| m |] -> m
| [| |] -> failwith "Missing argument"
| _ -> failwith "Too many arguments" in
Netlog.Debug.enable_module m
with
| Failure s ->
self # log
`Err
("netplex.debug.enable: " ^ s)
)
| "netplex.debug.disable" ->
( try
let m =
match msg.msg_arguments with
| [| m |] -> m
| [| |] -> failwith "Missing argument"
| _ -> failwith "Too many arguments" in
Netlog.Debug.disable_module m
with
| Failure s ->
self # log
`Err
("netplex.debug.disable: " ^ s)
)
| "netplex.fd_table" ->
( List.iter
(fun line ->
self # log
`Debug
(sprintf "fd_table(%5d): %s"
( match Netplex_cenv.current_sys_id() with
| `Thread n -> n
| `Process n -> n
)
line)
)
(Netlog.Debug.fd_table())
)
| "netplex.connections" ->
(* First forward the message, so user code can call [update_detail] *)
self # forward_admin_message msg;
Hashtbl.iter
(fun _ (_,line) ->
(* There is a chance that this connection is already closed,
and even that fd is used for something else. So be very
careful here.
*)
self # log `Debug line
)
reg_conns
| "netplex.mem.major" ->
(* FIXME: This style does not make sense for multi-threaded
programs *)
let t0 = Unix.gettimeofday() in
Gc.major();
let t1 = Unix.gettimeofday() in
self # log `Info
(sprintf "Gc.major: pid %d - %f seconds"
(Unix.getpid()) (t1 -. t0))
| "netplex.mem.compact" ->
(* FIXME: This style does not make sense for multi-threaded
programs *)
let t0 = Unix.gettimeofday() in
Gc.compact();
let t1 = Unix.gettimeofday() in
self # log `Info
(sprintf "Gc.compact: pid %d - %f seconds"
(Unix.getpid()) (t1 -. t0))
| "netplex.mem.pools" ->
(* FIXME: This style does not make sense for multi-threaded
programs *)
self # log `Info
(sprintf "Default pool: pid %d - %s"
(Unix.getpid())
(Netsys_mem.pool_report Netsys_mem.default_pool));
self # log `Info
(sprintf "Small pool: pid %d - %s"
(Unix.getpid())
(Netsys_mem.pool_report Netsys_mem.small_pool))
| "netplex.mem.stats" ->
let (name, inch, outch) = Netchannels.make_temporary_file() in
Gc.print_stat outch;
close_out outch;
let n = in_channel_length inch in
let s = String.create n in
really_input inch s 0 n;
close_in inch;
Sys.remove name;
self # log `Info
(sprintf "GC stats pid %d:\n%s"
(Unix.getpid()) s)
| _ ->
self # forward_admin_message msg
method private forward_admin_message msg =
self # protect
"receive_admin_message"
(sockserv # processor # receive_admin_message
(self : #container :> container)
msg.msg_name)
msg.msg_arguments
method activate_lever id arg_enc =
match sys_rpc with
| None -> failwith "#activate_lever: No RPC client available"
| Some r ->
let arg_str = Marshal.to_string arg_enc [] in
let res_str =
Uq_mt.monitor_async
sys_mon
(Netplex_ctrl_clnt.System.V1.activate_lever'async r)
(id, arg_str) in
let res = Marshal.from_string res_str 0 in
res
(* --- container sockets --- *)
(* Note that the container sockets do not interfer with event polling.
It is a completely separate execution thread.
*)
val mutable cs_engines = []
val mutable cs_sockets = []
method private setup_container_sockets() =
let protos = sockserv # socket_service_config # protocols in
List.iter
(fun proto ->
Array.iter
(function
| `Container(dir,sname,pname,_) ->
(* sname, pname: we use this only for registration
purposes. Otherwise sockserv#name and proto#name are
more reliable
*)
let (dir', path) =
Netplex_util.path_of_container_socket
dir sname pname (Netplex_cenv.current_sys_id()) in
let () = Netplex_util.try_mkdir dir in
let () = Netplex_util.try_mkdir dir' in
let pseudo_addr =
match Sys.os_type with
| "Win32" ->
`W32_pipe_file path
| _ ->
`Socket(Unix.ADDR_UNIX path) in
let fd =
Netplex_util.create_server_socket
ssn proto pseudo_addr in
self # register_container_socket sname pname path;
self # accept_on_container_socket proto fd;
cs_sockets <- (fd, pname, path) :: cs_sockets
| _ -> ()
)
proto#addresses
)
protos
method private accept_on_container_socket proto fd =
dlogr
(fun () ->
sprintf "Container %d: Accepting on fd %Ld"
(Oo.id self)
(Netsys.int64_of_file_descr fd));
let acc = new Uq_engines.direct_acceptor fd esys in
let e = acc # accept() in
Uq_engines.when_state
~is_done:(fun (fd_slave,_) ->
dlogr
(fun () ->
sprintf "Container %d: Accepted as fd %Ld"
(Oo.id self)
(Netsys.int64_of_file_descr fd_slave));
let when_done_called = ref false in
dlogr
(fun () ->
sprintf
"Container %d: processing container connection \
on fd %Ld"
(Oo.id self)
(Netsys.int64_of_file_descr fd_slave)
);
self # protect
"process"
(sockserv # processor # process
~when_done:(fun fd ->
if not !when_done_called then (
when_done_called := true;
dlogr
(fun () ->
sprintf "Container %d: \
Done with container connection on fd %Ld"
(Oo.id self)
(Netsys.int64_of_file_descr
fd_slave))
)
)
(self : #container :> container)
fd_slave)
proto#name;
self # accept_on_container_socket proto fd
)
~is_error:(fun err ->
self # log `Crit
("accept: Exception " ^
Netexn.to_string err)
)
e;
cs_engines <- e :: cs_engines
method private disable_container_sockets() =
dlogr
(fun () ->
sprintf "Container %d: No longer accepting cont conns" (Oo.id self));
List.iter (fun e -> e # abort()) cs_engines;
cs_engines <- []
method private close_container_sockets() =
List.iter
(fun (fd, _, _) ->
Netplex_util.close_server_socket fd
)
cs_sockets;
cs_sockets <- []
method private register_container_socket sname pname path =
match sys_rpc with
| None -> failwith "#register_container_socket: No RPC client available"
| Some r ->
Uq_mt.monitor_async
sys_mon
(Netplex_ctrl_clnt.System.V1.register_container_socket'async r)
(sname, pname, path)
method owned_container_sockets =
List.map
(fun (_, pname, path) -> (pname,path))
cs_sockets
method lookup_container_sockets service protocol =
match sys_rpc with
| None -> failwith "#lookup_container_sockets: No RPC client available"
| Some r ->
Uq_mt.monitor_async
sys_mon
(Netplex_ctrl_clnt.System.V1.lookup_container_sockets'async r)
(service,protocol)
method startup_directory =
sockserv # startup_directory
end
(* The admin container is special because the esys is shared with the
system-wide controller
*)
class admin_container esys ptype sockserv =
object(self)
inherit std_container ~esys ptype sockserv as super
val mutable c_fd_clnt = None
val mutable c_sys_fd_clnt = None
method start fd_clnt sys_fd_clnt =
if rpc <> None then
failwith "#start: already started";
let rpc_cl =
Netplex_ctrl_clnt.Control.V1.create_client
~esys
(Rpc_client.Descriptor fd_clnt)
Rpc.Tcp in
Rpc_client.Debug.disable_for_client rpc_cl;
rpc <- Some rpc_cl;
let sys_rpc_cl =
Netplex_ctrl_clnt.System.V1.create_client
~esys:sys_esys
(Rpc_client.Descriptor sys_fd_clnt)
Rpc.Tcp in
Rpc_client.Debug.disable_for_client sys_rpc_cl;
sys_rpc <- Some sys_rpc_cl;
c_fd_clnt <- Some fd_clnt;
c_sys_fd_clnt <- Some sys_fd_clnt;
self # restart_polling();
method private shutdown_extra() =
(* In the admin case, fd_clnt and sys_fd_clnt are never closed.
Do this now. (In the non-admin case the caller does this after
[start] returns.)
*)
super # shutdown_extra();
dlogr
(fun () ->
sprintf "Container %d: Closing admin clients" (Oo.id self));
( match c_fd_clnt with
| None -> ()
| Some fd ->
let fd_style = Netsys.get_fd_style fd in
( try Netsys.gshutdown fd_style fd Unix.SHUTDOWN_ALL
with _ -> ()
);
Netlog.Debug.release_fd fd;
Netsys.gclose fd_style fd;
c_fd_clnt <- None
);
( match c_sys_fd_clnt with
| None -> ()
| Some fd ->
let fd_style = Netsys.get_fd_style fd in
( try Netsys.gshutdown fd_style fd Unix.SHUTDOWN_ALL
with _ -> ()
);
Netlog.Debug.release_fd fd;
Netsys.gclose fd_style fd;
c_sys_fd_clnt <- None
)
end
let create_container ptype sockserv =
new std_container ptype sockserv
let create_admin_container esys ptype sockserv =
new admin_container esys ptype sockserv