Plasma GitLab Archive
Projects Blog Knowledge

(* $Id: helloworld.ml 1652 2011-08-03 21:50:30Z gerd $ *)

(* This example demonstrates some very basic features of Netplex:
    - Starting an "empty service", i.e. one that does not provide
      any network functionality. Nevertheless a service process is
      forked, and all the hook functions are executed.
    - Echo service: Opens a network port, and echos all lines
      sent to it
    - Simple RPC server: Implements the RPC "operation" from operation.x

    All three services are started from a single program.

    Note that there is a fundamental difference between "echo" and "operation":
    As echo is written in synchronous (blocking) style, and we have only
    one process, there can be at most one TCP connection to this service.
    In contrast to this, "operation" can serve many connections in parallel.
    This is a functionality of the RPC layer.

    Test helloworld:
      $ ./helloworld -fg -conf helloworld.cfg

    Then connect to "echo":
      $ netcat localhost 4343

    Connect to "operation":
      $ ./test_client -port 4444 foo

    Use netplex-admin for administration:

      $ ../../src/netplex/netplex-admin -list
      operation: Enabled 1 containers
	  rpc/operation @ inet:0.0.0.0:4444
      echo: Enabled 1 containers
	  echo_proto @ inet:0.0.0.0:4343
      empty: Enabled 1 containers
	  dummy @ -
      netplex.controller: Enabled 1 containers
	  admin @ local:/tmp/.netplex/netplex.controller/admin

      $ ../../src/netplex/netplex-admin -containers
      operation: Enabled 1 containers
	  rpc/operation @ inet:0.0.0.0:4444
	  Process 29390: selected
      echo: Enabled 1 containers
	  echo_proto @ inet:0.0.0.0:4343
	  Process 29389: selected
      empty: Enabled 1 containers
	  dummy @ -
	  Process 29388: selected
      netplex.controller: Enabled 1 containers
	  admin @ local:/tmp/.netplex/netplex.controller/admin
	  AttachedToCtrlProcess 29387: selected

      $ ../../src/netplex/netplex-admin -restart empty
      (and watch the log messages from "empty")

      $ ../../src/netplex/netplex-admin foo bar baz
      (and watch the log messages from all services)

      $ ../../src/netplex/netplex-admin -shutdown
      (and watch the log messages from all services)
 *)

open Printf

(**********************************************************************)

(* hello_hooks:

   Define the processor hooks so that a message is logged
   for each. Normally,� one inherits from 
   Netplex_kit.empty_processor_hooks and defines only the
   hooks that are needed.
 *)

class hello_hooks service_name : Netplex_types.processor_hooks =
object(self)
  inherit Netplex_kit.empty_processor_hooks()

  method post_add_hook _ _ =
    Netlog.logf `Info "%s: post_add_hook" service_name
      
  method post_rm_hook _ _ = 
    Netlog.logf `Info "%s: post_rm_hook" service_name
      
  method pre_start_hook _ _ _ =
    Netlog.logf `Info "%s: pre_start_hook" service_name
      
  method post_start_hook _ =
    Netlog.logf `Info "%s: post_start_hook" service_name
      
  method pre_finish_hook _ =
    Netlog.logf `Info "%s: pre_finish_hook" service_name
      
  method post_finish_hook _ _ _ =
    Netlog.logf `Info "%s: post_finish_hook" service_name
      
  method receive_message _ msg args =
    Netlog.logf `Info "%s: receive_message(\"%s\", [%s])"
      service_name
      (String.escaped msg)
      (String.concat ","
	 (List.map
	    (fun arg -> "\"" ^ String.escaped arg ^ "\"")
	    (Array.to_list args)))
      
  method receive_admin_message _ msg args =
    Netlog.logf `Info "%s: receive_admin_message(\"%s\", [%s])"
      service_name
      (String.escaped msg)
      (String.concat ","
	 (List.map
	    (fun arg -> "\"" ^ String.escaped arg ^ "\"")
	    (Array.to_list args)))
      
  method system_shutdown () =
    Netlog.logf `Info "%s: system_shutdown" service_name
      
  method shutdown() =
    Netlog.logf `Info "%s: shutdown" service_name
      
  method global_exception_handler e =
    Netlog.logf `Info "%s: global_exception_handler(%s)"
      service_name
      (Netexn.to_string e);
    true
end


(**********************************************************************)
(* Empty service                                                      *)
(**********************************************************************)

let empty_service_factory() : Netplex_types.processor_factory =
  ( object
      method name = "empty_service"

      method create_processor ctrl_cfg cf addr =
	( object (self)
	    inherit hello_hooks "empty_service"

	    method supported_ptypes = [ `Multi_processing; `Multi_threading ]

	    (* We don't expect that process is called. *)

	    method process ~when_done cont fd proto =
	      Netlog.logf `Info "empty_service: process(%s)" proto;
	      Unix.close fd;
	      when_done()

	  end
	)
    end
  )

(**********************************************************************)
(* Echo service                                                       *)
(**********************************************************************)

let echo_service_factory() : Netplex_types.processor_factory =
  ( object
      method name = "echo_service"

      method create_processor ctrl_cfg cf addr =
	( object (self)
	    inherit hello_hooks "echo_service"

	    method supported_ptypes = [ `Multi_processing; `Multi_threading ]

	    method process ~when_done cont fd proto =
	      Netlog.logf `Info "echo_service: process(%s)" proto;
	      (* fd is non-blocking, but we want it again blocking: *)
	      Unix.clear_nonblock fd;
	      (* We have to call when_done under all circumstances, so 
                 catch exceptions here
	       *)
	      try
		(* We cannot use here in_channel/out_channel as we have
                   a bidirectional connection. Netchannels has something
                   for us:
		 *)
		let rch = new Netchannels.socket_descr fd in
		(* On top of this, create buffered channels *)
		let ich = 
		  Netchannels.lift_in
		    (`Raw (rch :> Netchannels.raw_in_channel)) in
		let och = 
		  Netchannels.lift_out
		    (`Raw (rch :> Netchannels.raw_out_channel)) in
		( try
		    while true do
		      (* Read a line from ich, echo to och: *)
		      let line = ich # input_line() in
		      och # output_string line;
		      och # output_char '\n';
		      och # flush();
		    done;
		    assert false  (* don't reach this point *)
		  with
		    | End_of_file ->
			(* We finally get End_of_file from input_line *)
			ich # close_in();
			och # close_out()
		    | error ->
			ich # close_in();
			och # close_out();
			raise error
		);
		(* Done with it: *)
		when_done()
	      with
		| error ->
		    (* We have to ensure that when_done is always called,
                       even on error.
		     *)
		    Netlog.logf `Err
		      "Exception while echoing: %s" (Netexn.to_string error);
		    when_done();
		    (* We could raise the exception here again... *)
	  end
	)
    end
  )

(**********************************************************************)
(* "Operation"                                                        *)
(**********************************************************************)

let operation_service_factory() : Netplex_types.processor_factory =
  let proc_operation s =
    let l = String.length s in
    let u = String.create l in
    for k = 0 to l-1 do
      u.[k] <- s.[l-1-k]
    done;
    u in
  let setup srv _ =
    Operation_srv.P.V.bind
    ~proc_null:(fun () -> ())
    ~proc_operation
    srv in
  Rpc_netplex.rpc_factory
    ~configure:(fun _ _ -> ())
    ~name:"operation_service"
    ~setup
    ~hooks:(fun _ -> new hello_hooks "operation_service")
    ()

(**********************************************************************)
(* Main                                                               *)
(**********************************************************************)

let main() =
  let (opt_list, cmdline_cfg) = Netplex_main.args() in

  let use_mt = ref false in
  
  let opt_list' =
    [ "-mt", Arg.Set use_mt,
      "  Use multi-threading instead of multi-processing";
      
      "-debug", Arg.String (fun s -> Netlog.Debug.enable_module s),
      "<module>  Enable debug messages for <module>";

      "-debug-all", Arg.Unit (fun () -> Netlog.Debug.enable_all()),
      "  Enable all debug messages";

      "-debug-list", Arg.Unit (fun () -> 
                                 List.iter print_endline (Netlog.Debug.names());
                                 exit 0),
      "  Show possible modules for -debug, then exit";
   ] @ opt_list in
  
  Arg.parse
    opt_list'
    (fun s -> raise (Arg.Bad ("Don't know what to do with: " ^ s)))
    (sprintf "usage: %s [options]" (Filename.basename Sys.argv.(0)));

  let parallelizer =
    if !use_mt then
      Netplex_mt.mt()     (* multi-threading *)
    else
      Netplex_mp.mp() in  (* multi-processing *)
  
  Netplex_main.startup
    parallelizer
    Netplex_log.logger_factories   (* allow all built-in logging styles *)
    Netplex_workload.workload_manager_factories (* ... all ways of workload management *)
    [ empty_service_factory();
      echo_service_factory();
      operation_service_factory()
    ]
    cmdline_cfg


let () =
  Netsys_signal.init();
  main()

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml