Plasma GitLab Archive
Projects Blog Knowledge

(* $Id: netplex_container.ml 1459 2010-06-02 13:26:10Z gerd $ *)

open Netplex_types
open Netplex_ctrl_aux
open Printf

module Debug = struct
  let enable = ref false
end

let dlog = Netlog.Debug.mk_dlog "Netplex_container" Debug.enable
let dlogr = Netlog.Debug.mk_dlogr "Netplex_container" Debug.enable

let () =
  Netlog.Debug.register_module "Netplex_container" Debug.enable

let string_of_event = 
  function
    | `event_none -> "NONE"
    | `event_accept ->  "ACCEPT"
    | `event_noaccept ->  "NOACCEPT"
    | `event_received_message msg ->
	"RECEIVED_MESSAGE(" ^ msg.msg_name ^ ")"
    | `event_received_admin_message msg -> 
	"RECEIVED_ADMIN_MESSAGE(" ^ msg.msg_name ^ ")"
    | `event_shutdown -> "SHUTDOWN"
    | `event_system_shutdown -> "SYSTEM_SHUTDOWN"

let string_of_sys_id =
  function
    | `Thread id -> "thread " ^ string_of_int id
    | `Process id -> "process " ^ string_of_int id


class std_container ?(esys = Unixqueue.create_unix_event_system()) 
                    ptype sockserv =
  let ssn = sockserv # name in
object(self)
  val sys_esys = Unixqueue.create_unix_event_system()
  val mutable rpc = None
  val mutable sys_rpc = None
  val mutable nr_conns = 0
  val mutable engines = []
  val mutable vars = Hashtbl.create 10 

  method container_id = (self :> container_id)

  method socket_service_name = ssn
  method socket_service = sockserv

  method event_system = esys

  method ptype = ptype

  method start fd_clnt sys_fd_clnt =
    (* note that logging is first possible when sys_rpc is initialized! *)
    if rpc <> None then
      failwith "#start: already started";
    let sys_id = Netplex_cenv.current_sys_id() in
    ( match ptype with
	| `Multi_processing ->
	    ( match sockserv # socket_service_config # change_user_to with
		| None -> ()
		| Some(uid,gid) ->
		    (* In Netplex_config it has already been checked whether the
                     * effective uid of the process is root. So the following 
                     * drops all privileges:
		     *)
		    Unix.setgid gid;
		    Unix.setuid uid
	    )
	| _ -> ()
    );
    (* Note: fd_clnt and sys_fd_clnt are closed by the caller *)
    let rpc_cl =
      Netplex_ctrl_clnt.Control.V1.create_client
      ~esys
      (Rpc_client.Descriptor fd_clnt)
      Rpc.Tcp in
    if not !Debug.enable then
      Rpc_client.Debug.disable_for_client rpc_cl;
    rpc <- Some rpc_cl;
    let sys_rpc_cl =
      Netplex_ctrl_clnt.System.V1.create_client
	~esys:sys_esys
	(Rpc_client.Descriptor sys_fd_clnt)
	Rpc.Tcp in
    if not !Debug.enable then
      Rpc_client.Debug.disable_for_client sys_rpc_cl;
    sys_rpc <- Some sys_rpc_cl;
    self # setup_container_sockets();
    dlogr 
      (fun () -> 
	 sprintf "Container %d: Starting (start) in %s" 
	   (Oo.id self) (string_of_sys_id sys_id));
    self # protect "post_start_hook"
      (sockserv # processor # post_start_hook)
      (self : #container :> container);

    self # setup_polling();
    self # protect_run();
    (* protect_run returns when all events are handled. This must include
       [rpc], so the client must now be down.
     *)
    assert(rpc = None);

    self # protect "pre_finish_hook"
      (sockserv # processor # pre_finish_hook)
      (self : #container :> container);
    self # close_container_sockets();
    rpc <- None;

    dlogr
      (fun () -> sprintf "Container %d: Finishing (finish)" (Oo.id self))


  method private protect : 's. string -> ('s -> unit) -> 's -> unit =
    fun label f arg ->
      ( try
	  dlogr
	    (fun () -> 
	       sprintf "Container %d: calling hook %s" (Oo.id self) label);
	  f arg 
	with
	  | error ->
	      ( match rpc with
		  | None -> ()  (* no way to report this error *)
		  | Some r ->
		      self # log `Crit (label ^ ": Exception " ^ 
					  Netexn.to_string error)
	      )
      );
       dlogr
	 (fun () -> 
	    sprintf "Container %d: back from hook %s" (Oo.id self) label)


  method private protect_run () =
    try 
      Unixqueue.run esys
    with
      | error ->
	  self # log `Crit ("run: Exception " ^ Netexn.to_string error);
	  let b = sockserv # processor # global_exception_handler error in
	  if b then self # protect_run()


  method private setup_polling() =
    match rpc with
      | None -> ()  (* Already shut down ! *)
      | Some r ->
	  dlogr
	    (fun () -> 
	       sprintf "Container %d: Polling" (Oo.id self));
	  Netplex_ctrl_clnt.Control.V1.poll'async r nr_conns
	    (fun getreply ->
	       let continue =
		 ( try
		     let reply = getreply() in
		     dlogr
		       (fun () -> 
			  sprintf "Container %d: Got event from polling: %s"
			    (Oo.id self) (string_of_event reply));
		     ( match reply with
			 | `event_none ->
			     (* When [setup_calling] is called twice, the
                                second poll loop throws out the first poll
                                loop. The first loop gets then [`event_none],
                                and has to terminate. This is used after
                                a connection is done to gain attention from
                                the server.
			      *)
			     false
			 | `event_accept -> 
			     self # enable_accepting();
			     true
			 | `event_noaccept -> 
			     self # disable_accepting();
			     true
			 | `event_received_message msg ->
			     self # protect
			       "receive_message"
			       (sockserv # processor # receive_message
				  (self : #container :> container)
				  msg.msg_name)
			       msg.msg_arguments;
			     true
			 | `event_received_admin_message msg ->
			     self # receive_admin_message msg;
			     true
			 | `event_shutdown ->
			     self # disable_accepting();
			     self # disable_container_sockets();
			     self # protect
			       "shutdown"
			       (sockserv # processor # shutdown)
			       ();
			     ( try
				 Netplex_cenv.cancel_all_timers()
			       with
				   (* can happen in the admin case: *)
				 | Netplex_cenv.Not_in_container_thread -> ()
			     );
			     ( match rpc with
				 | None -> ()
				 | Some r -> 
				     rpc <- None;
				     Rpc_client.trigger_shutdown 
				       r
				       self # shutdown_extra;
				     (* We ensure that shutdown_extra is called
                                        when the client is already taken down
				      *)
			     );
			     false
			 | `event_system_shutdown ->
			     self # protect
			       "system_shutdown"
			       (sockserv # processor # system_shutdown)
			       ();
			     true
		     )
		   with
		     | Rpc_client.Message_lost ->
			 (* This can happen when the container is shut down
                          * and several [poll] calls are active at the same
                          * time.
                          *)
			 false
		     | error ->
			 self # log `Crit ("poll: Exception " ^ 
					    Netexn.to_string error);
			 true
		 ) in
	       if continue then
		 self # setup_polling()
	    )
    
  method private enable_accepting() =
    if engines = [] then (
      List.iter
	(fun (proto, fd_array) ->
	   Array.iter
	     (fun fd ->
		dlogr
		  (fun () ->
		     sprintf "Container %d: Accepting on fd %Ld"
		       (Oo.id self)
		       (Netsys.int64_of_file_descr fd));
		let acc = new Uq_engines.direct_acceptor fd esys in
		let e = acc # accept() in
		Uq_engines.when_state
		  ~is_done:(fun (fd_slave,_) ->
			      (* The first engine accepted a connection. It is
                               * possible that other descriptors are ready
                               * for being accepted at this time. By disabling
                               * accepting, we ensure that these engines
                               * are aborted and the events are ignored.
                               * It is essential that the direct_endpoint_acceptor
                               * accepts in one step so intermediate states
                               * are impossible.
                               *)
			      dlogr
				(fun () ->
				   sprintf "Container %d: Accepted as fd %Ld"
				     (Oo.id self)
				     (Netsys.int64_of_file_descr fd_slave));
			      self # disable_accepting();
			      self # accepted fd_slave proto
			   )
		  ~is_error:(fun err ->
			       self # log `Crit
				 ("accept: Exception " ^ 
				    Netexn.to_string err)
			    )
		  e;
		engines <- e :: engines
	     )
	     fd_array
	)
	sockserv#sockets
    )

  method private disable_accepting() =
    dlogr
      (fun () ->
	 sprintf "Container %d: No longer accepting" (Oo.id self));
    List.iter (fun e -> e # abort()) engines;
    engines <- [];

  method private accepted fd_slave proto =
    match rpc with
      | None -> assert false
      | Some r ->
	  Rpc_client.set_batch_call r;
	  Rpc_client.unbound_async_call
	    r Netplex_ctrl_aux.program_Control'V1 "accepted" Xdr.XV_void
	    (fun _ ->
	       nr_conns <- nr_conns + 1;
	       let regid = self # reg_conn fd_slave in
	       let when_done_called = ref false in
	       dlogr
		 (fun () -> 
		    sprintf
		      "Container %d: processing connection on fd %Ld \
                       (total conns: %d)" 
		      (Oo.id self) 
		      (Netsys.int64_of_file_descr fd_slave)
		      nr_conns
		 );
	       self # protect
		 "process"
		 (sockserv # processor # process
		    ~when_done:(fun fd ->
				  (* Note: It is up to the user to close
                                     the descriptor. So the descriptor can
                                     already be used for different purposes
                                     right now!
				   *)
				  if not !when_done_called then (
				    nr_conns <- nr_conns - 1;
				    self # unreg_conn fd_slave regid;
				    when_done_called := true;
				    self # setup_polling();
				    dlogr
				      (fun () ->
					 sprintf "Container %d: \
                                               Done with connection on fd %Ld \
                                                  (total conns %d)"
					   (Oo.id self) 
					   (Netsys.int64_of_file_descr fd_slave)
					   nr_conns);
				  )
			       )
		    (self : #container :> container)
		    fd_slave
		 )
		 proto;
	       if not !when_done_called then
		 self # setup_polling();
	    )

  val mutable reg_conns = Hashtbl.create 10
  val mutable reg_conns_cnt = 0

  method private reg_conn fd =
    let ifd = Netsys.int64_of_file_descr fd in
    let line = Netplex_cenv.report_connection_string fd "" in
    let cnt = reg_conns_cnt in
    reg_conns_cnt <- cnt+1;
    Hashtbl.replace reg_conns ifd (cnt,line);
    cnt

  method private unreg_conn fd cnt =
    let ifd = Netsys.int64_of_file_descr fd in
    try
      let cnt',_ = Hashtbl.find reg_conns ifd in
      if cnt' = cnt then (
	Hashtbl.remove reg_conns ifd
      )
    with
      | Not_found -> ()

  method update_detail fd detail =
    let ifd = Netsys.int64_of_file_descr fd in
    let line = Netplex_cenv.report_connection_string fd detail in
    try
      let (cnt,_) = Hashtbl.find reg_conns ifd in
      Hashtbl.replace reg_conns ifd (cnt,line)
    with
      | Not_found ->
	  failwith "#update_detail: unknown descriptor"

  method system =
    match sys_rpc with
      | None -> failwith "#system: No RPC client available"
      | Some r -> r

  method shutdown() =
    dlogr
      (fun () -> 
	 sprintf
	   "Container %d: shutdown" (Oo.id self));
    ( try
	Netplex_cenv.cancel_all_timers()
      with
	  (* can happen in the admin case: *)
	| Netplex_cenv.Not_in_container_thread -> ()
    );
    self # disable_accepting();
    self # disable_container_sockets();
    ( match rpc with
	| None -> ()
	| Some r -> 
	    rpc <- None;
	    Rpc_client.trigger_shutdown 
	      r
	      self # shutdown_extra;
	    (* We ensure that shutdown_extra is called
               when the client is already taken down
	     *)
    )

  method private shutdown_extra() = 
    (* to be subclassed *)
    ()

  method log_subch subchannel level message =
    match sys_rpc with
      | None -> ()
      | Some r ->
	  ( try
	      let lev = 
		match level with
		  | `Emerg -> log_emerg
		  | `Alert -> log_alert
		  | `Crit -> log_crit
		  | `Err -> log_err
		  | `Warning -> log_warning
		  | `Notice -> log_notice
		  | `Info -> log_info
		  | `Debug -> log_debug in
	      Rpc_client.set_batch_call r;
	      Netplex_ctrl_clnt.System.V1.log r (lev,subchannel,message);
	    with
	      | error ->
		  prerr_endline("Netplex Catastrophic Error: Unable to send log message - exception " ^ Netexn.to_string error);
		  prerr_endline("Log message is: " ^ message)
	  )

  method log level message =
    self # log_subch "" level message

  method lookup service protocol =
    match sys_rpc with
      | None -> failwith "#lookup: No RPC client available"
      | Some r ->
	  Netplex_ctrl_clnt.System.V1.lookup r (service,protocol)

  method send_message pat msg_name msg_arguments =
    dlogr
      (fun () -> 
	 sprintf
	   "Container %d: send_message %s to %s" (Oo.id self) msg_name pat);
    match sys_rpc with
      | None -> failwith "#send_message: No RPC client available"
      | Some r ->
	  let msg =
	    { msg_name = msg_name;
	      msg_arguments = msg_arguments
	    } in
	  Netplex_ctrl_clnt.System.V1.send_message r (pat, msg)

  method var name =
    Hashtbl.find vars name

  method set_var name value =
    Hashtbl.replace vars name value

  method call_plugin p proc_name arg =
    dlogr
      (fun () ->
	 sprintf
	   "Container %d: call_plugin id=%d procname=%s" 
	   (Oo.id self) (Oo.id p) proc_name);
    match sys_rpc with
      | None -> failwith "#call_plugin: No RPC client available"
      | Some r ->
	  let (_, arg_ty,res_ty) = 
	    try
	      Rpc_program.signature p#program proc_name
	    with
	      | Not_found -> failwith "call_plugin: procedure not found" in
	  let arg_str = Xdr.pack_xdr_value_as_string arg arg_ty [] in
	  let res_str =
	    Netplex_ctrl_clnt.System.V1.call_plugin
	      r
	      ((Int64.of_int (Oo.id p)), proc_name, arg_str) in
	  let res = Xdr.unpack_xdr_value ~fast:true res_str res_ty [] in
	  res
	    
  method private receive_admin_message msg =
    match msg.msg_name with
      | "netplex.debug.enable" ->
	  ( try
	      let m = 
		match msg.msg_arguments with
		  | [| m |] -> m
		  | [| |] -> failwith "Missing argument"
		  | _  -> failwith "Too many arguments" in
	      Netlog.Debug.enable_module m
	    with
	      | Failure s ->
		  self # log 
		    `Err
		    ("netplex.debug.enable: " ^ s)
	  )
      | "netplex.debug.disable" ->
	  ( try
	      let m = 
		match msg.msg_arguments with
		  | [| m |] -> m
		  | [| |] -> failwith "Missing argument"
		  | _  -> failwith "Too many arguments" in
	      Netlog.Debug.disable_module m
	    with
	      | Failure s ->
		  self # log 
		    `Err
		    ("netplex.debug.disable: " ^ s)
	  )
      | "netplex.fd_table" ->
	  ( List.iter
	      (fun line ->
		 self # log
		   `Debug
		   (sprintf "fd_table(%5d): %s"
		      ( match Netplex_cenv.current_sys_id() with
			  | `Thread n -> n
			  | `Process n -> n
		      )
		      line)
	      )
	      (Netlog.Debug.fd_table())
	  )
      | "netplex.connections" ->
	  (* First forward the message, so user code can call [update_detail] *)
	  self # forward_admin_message msg;
	  Hashtbl.iter
	    (fun _ (_,line) ->
	       (* There is a chance that this connection is already closed,
                  and even that fd is used for something else. So be very
                  careful here.
		*)
	       self # log `Debug line
	    )
	    reg_conns

      | _ ->
	  self # forward_admin_message msg

  method private forward_admin_message msg =
    self # protect
      "receive_admin_message"
      (sockserv # processor # receive_admin_message
	 (self : #container :> container)
	 msg.msg_name)
      msg.msg_arguments


  method activate_lever id arg_enc =
    match sys_rpc with
      | None -> failwith "#activate_lever: No RPC client available"
      | Some r ->
	  let arg_str = Marshal.to_string arg_enc [] in
	  let res_str =
	    Netplex_ctrl_clnt.System.V1.activate_lever r (id, arg_str) in
	  let res = Marshal.from_string res_str 0 in
	  res

  (* --- container sockets --- *)

  (* Note that the container sockets do not interfer with event polling.
     It is a completely separate execution thread.
   *)

  val mutable cs_engines = []
  val mutable cs_sockets = []

  method private setup_container_sockets() =
    let protos = sockserv # socket_service_config # protocols in
    List.iter
      (fun proto ->
	 Array.iter
	   (function
	      | `Container(dir,sname,pname,_) ->
		  (* sname, pname: we use this only for registration
                     purposes. Otherwise sockserv#name and proto#name are
                     more reliable
		   *)
		  let (dir', path) =
		    Netplex_util.path_of_container_socket
		      dir sname pname (Netplex_cenv.current_sys_id()) in
		  let () = Netplex_util.try_mkdir dir in
		  let () = Netplex_util.try_mkdir dir' in
		  let pseudo_addr =
		    match Sys.os_type with
		      | "Win32" ->
			  `W32_pipe_file path
		      | _ ->
			  `Socket(Unix.ADDR_UNIX path) in
		  let fd =
		    Netplex_util.create_server_socket 
		      ssn proto pseudo_addr in
		  self # register_container_socket sname pname path;
		  self # accept_on_container_socket proto fd;
		  cs_sockets <- (fd, pname, path) :: cs_sockets
	      | _ -> ()
	   )
	   proto#addresses
      )
      protos

  method private accept_on_container_socket proto fd =
    dlogr
      (fun () ->
	 sprintf "Container %d: Accepting on fd %Ld"
	   (Oo.id self)
	   (Netsys.int64_of_file_descr fd));
    let acc = new Uq_engines.direct_acceptor fd esys in
    let e = acc # accept() in
    Uq_engines.when_state
      ~is_done:(fun (fd_slave,_) ->
		  dlogr
		    (fun () ->
		       sprintf "Container %d: Accepted as fd %Ld"
			 (Oo.id self)
			 (Netsys.int64_of_file_descr fd_slave));
		  let when_done_called = ref false in
		  dlogr
		    (fun () -> 
		       sprintf
			 "Container %d: processing container connection \
                          on fd %Ld"
			 (Oo.id self) 
			 (Netsys.int64_of_file_descr fd_slave)
		    );
		  self # protect
		    "process"
		    (sockserv # processor # process
		       ~when_done:(fun fd ->
				     if not !when_done_called then (
				       when_done_called := true;
				       dlogr
					 (fun () ->
					    sprintf "Container %d: \
                                    Done with container connection on fd %Ld"
					      (Oo.id self) 
					      (Netsys.int64_of_file_descr 
						 fd_slave))
				     )
				  )
		       (self : #container :> container)
		       fd_slave)
		    proto#name;
		  self # accept_on_container_socket proto fd
	       )
      ~is_error:(fun err ->
		   self # log `Crit
		     ("accept: Exception " ^ 
			Netexn.to_string err)
		)
      e;
    cs_engines <- e :: cs_engines


  method private disable_container_sockets() =
    dlogr
      (fun () ->
	 sprintf "Container %d: No longer accepting cont conns" (Oo.id self));
    List.iter (fun e -> e # abort()) cs_engines;
    cs_engines <- []


  method private close_container_sockets() =
    List.iter
      (fun (fd, _, _) ->
	 Netplex_util.close_server_socket fd
      )
      cs_sockets;
    cs_sockets <- []


  method private register_container_socket sname pname path =
    match sys_rpc with
      | None -> failwith "#register_container_socket: No RPC client available"
      | Some r ->
	  Netplex_ctrl_clnt.System.V1.register_container_socket
	    r (sname, pname, path)

  method owned_container_sockets =
    List.map
      (fun (_, pname, path) -> (pname,path))
      cs_sockets
    

  method lookup_container_sockets service protocol =
    match sys_rpc with
      | None -> failwith "#lookup_container_sockets: No RPC client available"
      | Some r ->
	  Netplex_ctrl_clnt.System.V1.lookup_container_sockets
	    r (service,protocol)

 
end


(* The admin container is special because the esys is shared with the
   system-wide controller
 *)

class admin_container esys ptype sockserv =
object(self)
  inherit std_container ~esys ptype sockserv as super

  val mutable c_fd_clnt = None
  val mutable c_sys_fd_clnt = None

  method start fd_clnt sys_fd_clnt =
    if rpc <> None then
      failwith "#start: already started";
    let rpc_cl =
      Netplex_ctrl_clnt.Control.V1.create_client
	~esys
	(Rpc_client.Descriptor fd_clnt)
	Rpc.Tcp in
    Rpc_client.Debug.disable_for_client rpc_cl;
    rpc <- Some rpc_cl;
    let sys_rpc_cl =
      Netplex_ctrl_clnt.System.V1.create_client
	~esys:sys_esys
	(Rpc_client.Descriptor sys_fd_clnt)
	Rpc.Tcp in
    Rpc_client.Debug.disable_for_client sys_rpc_cl;
    sys_rpc <- Some sys_rpc_cl;
    c_fd_clnt <- Some fd_clnt;
    c_sys_fd_clnt <- Some sys_fd_clnt;
    self # setup_polling();

  method private shutdown_extra() =
    (* In the admin case, fd_clnt and sys_fd_clnt are never closed.
       Do this now. (In the non-admin case the caller does this after
       [start] returns.)
     *)
    super # shutdown_extra();
    dlogr
      (fun () ->
	 sprintf "Container %d: Closing admin clients" (Oo.id self));
    ( match c_fd_clnt with
	| None -> ()
	| Some fd -> 
	    let fd_style = Netsys.get_fd_style fd in
	    ( try Netsys.gshutdown fd_style fd Unix.SHUTDOWN_ALL
	      with _ -> ()
	    );
	    Netlog.Debug.release_fd fd;
	    Netsys.gclose fd_style fd;
	    c_fd_clnt <- None
    );
    ( match c_sys_fd_clnt with
	| None -> ()
	| Some fd -> 
	    let fd_style = Netsys.get_fd_style fd in
	    ( try Netsys.gshutdown fd_style fd Unix.SHUTDOWN_ALL
	      with _ -> ()
	    );
	    Netlog.Debug.release_fd fd;
	    Netsys.gclose fd_style fd;
	    c_sys_fd_clnt <- None
    )
end


let create_container ptype sockserv =
  new std_container ptype sockserv

let create_admin_container esys ptype sockserv =
  new admin_container esys ptype sockserv

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml