(* $Id: netplex_container.ml 1459 2010-06-02 13:26:10Z gerd $ *) open Netplex_types open Netplex_ctrl_aux open Printf module Debug = struct let enable = ref false end let dlog = Netlog.Debug.mk_dlog "Netplex_container" Debug.enable let dlogr = Netlog.Debug.mk_dlogr "Netplex_container" Debug.enable let () = Netlog.Debug.register_module "Netplex_container" Debug.enable let string_of_event = function | `event_none -> "NONE" | `event_accept -> "ACCEPT" | `event_noaccept -> "NOACCEPT" | `event_received_message msg -> "RECEIVED_MESSAGE(" ^ msg.msg_name ^ ")" | `event_received_admin_message msg -> "RECEIVED_ADMIN_MESSAGE(" ^ msg.msg_name ^ ")" | `event_shutdown -> "SHUTDOWN" | `event_system_shutdown -> "SYSTEM_SHUTDOWN" let string_of_sys_id = function | `Thread id -> "thread " ^ string_of_int id | `Process id -> "process " ^ string_of_int id class std_container ?(esys = Unixqueue.create_unix_event_system()) ptype sockserv = let ssn = sockserv # name in object(self) val sys_esys = Unixqueue.create_unix_event_system() val mutable rpc = None val mutable sys_rpc = None val mutable nr_conns = 0 val mutable engines = [] val mutable vars = Hashtbl.create 10 method container_id = (self :> container_id) method socket_service_name = ssn method socket_service = sockserv method event_system = esys method ptype = ptype method start fd_clnt sys_fd_clnt = (* note that logging is first possible when sys_rpc is initialized! *) if rpc <> None then failwith "#start: already started"; let sys_id = Netplex_cenv.current_sys_id() in ( match ptype with | `Multi_processing -> ( match sockserv # socket_service_config # change_user_to with | None -> () | Some(uid,gid) -> (* In Netplex_config it has already been checked whether the * effective uid of the process is root. So the following * drops all privileges: *) Unix.setgid gid; Unix.setuid uid ) | _ -> () ); (* Note: fd_clnt and sys_fd_clnt are closed by the caller *) let rpc_cl = Netplex_ctrl_clnt.Control.V1.create_client ~esys (Rpc_client.Descriptor fd_clnt) Rpc.Tcp in if not !Debug.enable then Rpc_client.Debug.disable_for_client rpc_cl; rpc <- Some rpc_cl; let sys_rpc_cl = Netplex_ctrl_clnt.System.V1.create_client ~esys:sys_esys (Rpc_client.Descriptor sys_fd_clnt) Rpc.Tcp in if not !Debug.enable then Rpc_client.Debug.disable_for_client sys_rpc_cl; sys_rpc <- Some sys_rpc_cl; self # setup_container_sockets(); dlogr (fun () -> sprintf "Container %d: Starting (start) in %s" (Oo.id self) (string_of_sys_id sys_id)); self # protect "post_start_hook" (sockserv # processor # post_start_hook) (self : #container :> container); self # setup_polling(); self # protect_run(); (* protect_run returns when all events are handled. This must include [rpc], so the client must now be down. *) assert(rpc = None); self # protect "pre_finish_hook" (sockserv # processor # pre_finish_hook) (self : #container :> container); self # close_container_sockets(); rpc <- None; dlogr (fun () -> sprintf "Container %d: Finishing (finish)" (Oo.id self)) method private protect : 's. string -> ('s -> unit) -> 's -> unit = fun label f arg -> ( try dlogr (fun () -> sprintf "Container %d: calling hook %s" (Oo.id self) label); f arg with | error -> ( match rpc with | None -> () (* no way to report this error *) | Some r -> self # log `Crit (label ^ ": Exception " ^ Netexn.to_string error) ) ); dlogr (fun () -> sprintf "Container %d: back from hook %s" (Oo.id self) label) method private protect_run () = try Unixqueue.run esys with | error -> self # log `Crit ("run: Exception " ^ Netexn.to_string error); let b = sockserv # processor # global_exception_handler error in if b then self # protect_run() method private setup_polling() = match rpc with | None -> () (* Already shut down ! *) | Some r -> dlogr (fun () -> sprintf "Container %d: Polling" (Oo.id self)); Netplex_ctrl_clnt.Control.V1.poll'async r nr_conns (fun getreply -> let continue = ( try let reply = getreply() in dlogr (fun () -> sprintf "Container %d: Got event from polling: %s" (Oo.id self) (string_of_event reply)); ( match reply with | `event_none -> (* When [setup_calling] is called twice, the second poll loop throws out the first poll loop. The first loop gets then [`event_none], and has to terminate. This is used after a connection is done to gain attention from the server. *) false | `event_accept -> self # enable_accepting(); true | `event_noaccept -> self # disable_accepting(); true | `event_received_message msg -> self # protect "receive_message" (sockserv # processor # receive_message (self : #container :> container) msg.msg_name) msg.msg_arguments; true | `event_received_admin_message msg -> self # receive_admin_message msg; true | `event_shutdown -> self # disable_accepting(); self # disable_container_sockets(); self # protect "shutdown" (sockserv # processor # shutdown) (); ( try Netplex_cenv.cancel_all_timers() with (* can happen in the admin case: *) | Netplex_cenv.Not_in_container_thread -> () ); ( match rpc with | None -> () | Some r -> rpc <- None; Rpc_client.trigger_shutdown r self # shutdown_extra; (* We ensure that shutdown_extra is called when the client is already taken down *) ); false | `event_system_shutdown -> self # protect "system_shutdown" (sockserv # processor # system_shutdown) (); true ) with | Rpc_client.Message_lost -> (* This can happen when the container is shut down * and several [poll] calls are active at the same * time. *) false | error -> self # log `Crit ("poll: Exception " ^ Netexn.to_string error); true ) in if continue then self # setup_polling() ) method private enable_accepting() = if engines = [] then ( List.iter (fun (proto, fd_array) -> Array.iter (fun fd -> dlogr (fun () -> sprintf "Container %d: Accepting on fd %Ld" (Oo.id self) (Netsys.int64_of_file_descr fd)); let acc = new Uq_engines.direct_acceptor fd esys in let e = acc # accept() in Uq_engines.when_state ~is_done:(fun (fd_slave,_) -> (* The first engine accepted a connection. It is * possible that other descriptors are ready * for being accepted at this time. By disabling * accepting, we ensure that these engines * are aborted and the events are ignored. * It is essential that the direct_endpoint_acceptor * accepts in one step so intermediate states * are impossible. *) dlogr (fun () -> sprintf "Container %d: Accepted as fd %Ld" (Oo.id self) (Netsys.int64_of_file_descr fd_slave)); self # disable_accepting(); self # accepted fd_slave proto ) ~is_error:(fun err -> self # log `Crit ("accept: Exception " ^ Netexn.to_string err) ) e; engines <- e :: engines ) fd_array ) sockserv#sockets ) method private disable_accepting() = dlogr (fun () -> sprintf "Container %d: No longer accepting" (Oo.id self)); List.iter (fun e -> e # abort()) engines; engines <- []; method private accepted fd_slave proto = match rpc with | None -> assert false | Some r -> Rpc_client.set_batch_call r; Rpc_client.unbound_async_call r Netplex_ctrl_aux.program_Control'V1 "accepted" Xdr.XV_void (fun _ -> nr_conns <- nr_conns + 1; let regid = self # reg_conn fd_slave in let when_done_called = ref false in dlogr (fun () -> sprintf "Container %d: processing connection on fd %Ld \ (total conns: %d)" (Oo.id self) (Netsys.int64_of_file_descr fd_slave) nr_conns ); self # protect "process" (sockserv # processor # process ~when_done:(fun fd -> (* Note: It is up to the user to close the descriptor. So the descriptor can already be used for different purposes right now! *) if not !when_done_called then ( nr_conns <- nr_conns - 1; self # unreg_conn fd_slave regid; when_done_called := true; self # setup_polling(); dlogr (fun () -> sprintf "Container %d: \ Done with connection on fd %Ld \ (total conns %d)" (Oo.id self) (Netsys.int64_of_file_descr fd_slave) nr_conns); ) ) (self : #container :> container) fd_slave ) proto; if not !when_done_called then self # setup_polling(); ) val mutable reg_conns = Hashtbl.create 10 val mutable reg_conns_cnt = 0 method private reg_conn fd = let ifd = Netsys.int64_of_file_descr fd in let line = Netplex_cenv.report_connection_string fd "" in let cnt = reg_conns_cnt in reg_conns_cnt <- cnt+1; Hashtbl.replace reg_conns ifd (cnt,line); cnt method private unreg_conn fd cnt = let ifd = Netsys.int64_of_file_descr fd in try let cnt',_ = Hashtbl.find reg_conns ifd in if cnt' = cnt then ( Hashtbl.remove reg_conns ifd ) with | Not_found -> () method update_detail fd detail = let ifd = Netsys.int64_of_file_descr fd in let line = Netplex_cenv.report_connection_string fd detail in try let (cnt,_) = Hashtbl.find reg_conns ifd in Hashtbl.replace reg_conns ifd (cnt,line) with | Not_found -> failwith "#update_detail: unknown descriptor" method system = match sys_rpc with | None -> failwith "#system: No RPC client available" | Some r -> r method shutdown() = dlogr (fun () -> sprintf "Container %d: shutdown" (Oo.id self)); ( try Netplex_cenv.cancel_all_timers() with (* can happen in the admin case: *) | Netplex_cenv.Not_in_container_thread -> () ); self # disable_accepting(); self # disable_container_sockets(); ( match rpc with | None -> () | Some r -> rpc <- None; Rpc_client.trigger_shutdown r self # shutdown_extra; (* We ensure that shutdown_extra is called when the client is already taken down *) ) method private shutdown_extra() = (* to be subclassed *) () method log_subch subchannel level message = match sys_rpc with | None -> () | Some r -> ( try let lev = match level with | `Emerg -> log_emerg | `Alert -> log_alert | `Crit -> log_crit | `Err -> log_err | `Warning -> log_warning | `Notice -> log_notice | `Info -> log_info | `Debug -> log_debug in Rpc_client.set_batch_call r; Netplex_ctrl_clnt.System.V1.log r (lev,subchannel,message); with | error -> prerr_endline("Netplex Catastrophic Error: Unable to send log message - exception " ^ Netexn.to_string error); prerr_endline("Log message is: " ^ message) ) method log level message = self # log_subch "" level message method lookup service protocol = match sys_rpc with | None -> failwith "#lookup: No RPC client available" | Some r -> Netplex_ctrl_clnt.System.V1.lookup r (service,protocol) method send_message pat msg_name msg_arguments = dlogr (fun () -> sprintf "Container %d: send_message %s to %s" (Oo.id self) msg_name pat); match sys_rpc with | None -> failwith "#send_message: No RPC client available" | Some r -> let msg = { msg_name = msg_name; msg_arguments = msg_arguments } in Netplex_ctrl_clnt.System.V1.send_message r (pat, msg) method var name = Hashtbl.find vars name method set_var name value = Hashtbl.replace vars name value method call_plugin p proc_name arg = dlogr (fun () -> sprintf "Container %d: call_plugin id=%d procname=%s" (Oo.id self) (Oo.id p) proc_name); match sys_rpc with | None -> failwith "#call_plugin: No RPC client available" | Some r -> let (_, arg_ty,res_ty) = try Rpc_program.signature p#program proc_name with | Not_found -> failwith "call_plugin: procedure not found" in let arg_str = Xdr.pack_xdr_value_as_string arg arg_ty [] in let res_str = Netplex_ctrl_clnt.System.V1.call_plugin r ((Int64.of_int (Oo.id p)), proc_name, arg_str) in let res = Xdr.unpack_xdr_value ~fast:true res_str res_ty [] in res method private receive_admin_message msg = match msg.msg_name with | "netplex.debug.enable" -> ( try let m = match msg.msg_arguments with | [| m |] -> m | [| |] -> failwith "Missing argument" | _ -> failwith "Too many arguments" in Netlog.Debug.enable_module m with | Failure s -> self # log `Err ("netplex.debug.enable: " ^ s) ) | "netplex.debug.disable" -> ( try let m = match msg.msg_arguments with | [| m |] -> m | [| |] -> failwith "Missing argument" | _ -> failwith "Too many arguments" in Netlog.Debug.disable_module m with | Failure s -> self # log `Err ("netplex.debug.disable: " ^ s) ) | "netplex.fd_table" -> ( List.iter (fun line -> self # log `Debug (sprintf "fd_table(%5d): %s" ( match Netplex_cenv.current_sys_id() with | `Thread n -> n | `Process n -> n ) line) ) (Netlog.Debug.fd_table()) ) | "netplex.connections" -> (* First forward the message, so user code can call [update_detail] *) self # forward_admin_message msg; Hashtbl.iter (fun _ (_,line) -> (* There is a chance that this connection is already closed, and even that fd is used for something else. So be very careful here. *) self # log `Debug line ) reg_conns | _ -> self # forward_admin_message msg method private forward_admin_message msg = self # protect "receive_admin_message" (sockserv # processor # receive_admin_message (self : #container :> container) msg.msg_name) msg.msg_arguments method activate_lever id arg_enc = match sys_rpc with | None -> failwith "#activate_lever: No RPC client available" | Some r -> let arg_str = Marshal.to_string arg_enc [] in let res_str = Netplex_ctrl_clnt.System.V1.activate_lever r (id, arg_str) in let res = Marshal.from_string res_str 0 in res (* --- container sockets --- *) (* Note that the container sockets do not interfer with event polling. It is a completely separate execution thread. *) val mutable cs_engines = [] val mutable cs_sockets = [] method private setup_container_sockets() = let protos = sockserv # socket_service_config # protocols in List.iter (fun proto -> Array.iter (function | `Container(dir,sname,pname,_) -> (* sname, pname: we use this only for registration purposes. Otherwise sockserv#name and proto#name are more reliable *) let (dir', path) = Netplex_util.path_of_container_socket dir sname pname (Netplex_cenv.current_sys_id()) in let () = Netplex_util.try_mkdir dir in let () = Netplex_util.try_mkdir dir' in let pseudo_addr = match Sys.os_type with | "Win32" -> `W32_pipe_file path | _ -> `Socket(Unix.ADDR_UNIX path) in let fd = Netplex_util.create_server_socket ssn proto pseudo_addr in self # register_container_socket sname pname path; self # accept_on_container_socket proto fd; cs_sockets <- (fd, pname, path) :: cs_sockets | _ -> () ) proto#addresses ) protos method private accept_on_container_socket proto fd = dlogr (fun () -> sprintf "Container %d: Accepting on fd %Ld" (Oo.id self) (Netsys.int64_of_file_descr fd)); let acc = new Uq_engines.direct_acceptor fd esys in let e = acc # accept() in Uq_engines.when_state ~is_done:(fun (fd_slave,_) -> dlogr (fun () -> sprintf "Container %d: Accepted as fd %Ld" (Oo.id self) (Netsys.int64_of_file_descr fd_slave)); let when_done_called = ref false in dlogr (fun () -> sprintf "Container %d: processing container connection \ on fd %Ld" (Oo.id self) (Netsys.int64_of_file_descr fd_slave) ); self # protect "process" (sockserv # processor # process ~when_done:(fun fd -> if not !when_done_called then ( when_done_called := true; dlogr (fun () -> sprintf "Container %d: \ Done with container connection on fd %Ld" (Oo.id self) (Netsys.int64_of_file_descr fd_slave)) ) ) (self : #container :> container) fd_slave) proto#name; self # accept_on_container_socket proto fd ) ~is_error:(fun err -> self # log `Crit ("accept: Exception " ^ Netexn.to_string err) ) e; cs_engines <- e :: cs_engines method private disable_container_sockets() = dlogr (fun () -> sprintf "Container %d: No longer accepting cont conns" (Oo.id self)); List.iter (fun e -> e # abort()) cs_engines; cs_engines <- [] method private close_container_sockets() = List.iter (fun (fd, _, _) -> Netplex_util.close_server_socket fd ) cs_sockets; cs_sockets <- [] method private register_container_socket sname pname path = match sys_rpc with | None -> failwith "#register_container_socket: No RPC client available" | Some r -> Netplex_ctrl_clnt.System.V1.register_container_socket r (sname, pname, path) method owned_container_sockets = List.map (fun (_, pname, path) -> (pname,path)) cs_sockets method lookup_container_sockets service protocol = match sys_rpc with | None -> failwith "#lookup_container_sockets: No RPC client available" | Some r -> Netplex_ctrl_clnt.System.V1.lookup_container_sockets r (service,protocol) end (* The admin container is special because the esys is shared with the system-wide controller *) class admin_container esys ptype sockserv = object(self) inherit std_container ~esys ptype sockserv as super val mutable c_fd_clnt = None val mutable c_sys_fd_clnt = None method start fd_clnt sys_fd_clnt = if rpc <> None then failwith "#start: already started"; let rpc_cl = Netplex_ctrl_clnt.Control.V1.create_client ~esys (Rpc_client.Descriptor fd_clnt) Rpc.Tcp in Rpc_client.Debug.disable_for_client rpc_cl; rpc <- Some rpc_cl; let sys_rpc_cl = Netplex_ctrl_clnt.System.V1.create_client ~esys:sys_esys (Rpc_client.Descriptor sys_fd_clnt) Rpc.Tcp in Rpc_client.Debug.disable_for_client sys_rpc_cl; sys_rpc <- Some sys_rpc_cl; c_fd_clnt <- Some fd_clnt; c_sys_fd_clnt <- Some sys_fd_clnt; self # setup_polling(); method private shutdown_extra() = (* In the admin case, fd_clnt and sys_fd_clnt are never closed. Do this now. (In the non-admin case the caller does this after [start] returns.) *) super # shutdown_extra(); dlogr (fun () -> sprintf "Container %d: Closing admin clients" (Oo.id self)); ( match c_fd_clnt with | None -> () | Some fd -> let fd_style = Netsys.get_fd_style fd in ( try Netsys.gshutdown fd_style fd Unix.SHUTDOWN_ALL with _ -> () ); Netlog.Debug.release_fd fd; Netsys.gclose fd_style fd; c_fd_clnt <- None ); ( match c_sys_fd_clnt with | None -> () | Some fd -> let fd_style = Netsys.get_fd_style fd in ( try Netsys.gshutdown fd_style fd Unix.SHUTDOWN_ALL with _ -> () ); Netlog.Debug.release_fd fd; Netsys.gclose fd_style fd; c_sys_fd_clnt <- None ) end let create_container ptype sockserv = new std_container ptype sockserv let create_admin_container esys ptype sockserv = new admin_container esys ptype sockserv