Docs GODI Archive
Projects Blog Knowledge

Look up function:

(e.g. "List.find" or "keysym")
More options

Netplex_advanced



Advanced features of Netplex

Some information about advanced techniques.

Contents

Running timers in containers

With Netplex_cenv.create_timer one can start a timer that runs directly in the container event loop. This event loop is normally used for accepting new connections, and for exchanging control messages with the master process. If the processor supports it (like the RPC processor), the event loop is also used by the processor itself for protocol interpretation. Running a timer in this loop means that the expiration of the timer is first detected when the control flow of the container returns to the event loop. In the worst case, this happens only when the current connection is finished, and it is waited for the next connection.

So, this is not the kind of high-precision timer one would use for the exact control of latencies. However, these timers are still useful for things that run only infrequently, like

  • processing statistical information
  • checking whether configuration updates have arrived
  • checking whether resources have "timed out" and can be released (e.g. whether a connection to a database system can be closed)
Timers can be cancelled by called Netplex_cenv.cancel_timer. Timers are automatically cancelled at container shutdown time.

Example: Start a timer at container startup: We have to do this in the post_start_hook of the processor. It depends on the kind of processor how the hooks are set. For example, the processor factories Rpc_netplex.rpc_factory and Nethttpd_plex.nethttpd_factory have an argument hooks, and one can create it like:

  let hooks =
    ( object
        inherit Netplex_kit.empty_processor_hooks()
        method post_start_hook cont =
          let timer =
            Netplex_cenv.create_timer
              (fun timer -> ...)
              tmo in
          ...
      end
    )

Container variables

If multi-processing is used, one can simply store per-container values in global variables. This works because for every new container the whole program is forked, and thus a new instance of the variable is created.

For multi-threaded programs this is a lot more difficult. For this reason there is built-in support for per-container variables.

Example: We want to implement a statistics how often the functions foo and bar are called, per-container. We define a record

type stats =
  { mutable foo_count : int;
    mutable bar_count : int
  }

Furthermore, we need an access module that looks for the current value of the variable (get), or overwrites the value (set). We can simply create this module by using the functor Netplex_cenv.Make_var_type:

module Stats_var =
  Netplex_cenv.Make_var_type(struct type t = stats end)

Now, one can get the value of a stats-typed variable "count" by calling

let stats =
  Stats_var.get "count"

(which will raise Netplex_cenv.Container_variable_not_found if the value of "count" never has been set before), and one can set the value by

Stats_var.set "count" stats

As mentioned, the variable "count" exists once per container. One can access it only from the scope of a Netplex container (e.g. from a callback function that is invoked by a Netplex processor). It is a good idea to initialize "count" in the post_start_hook of the processor (see the timer example above).

See also below on "Storing global state" for another kind of variable that can be accessed from all containers.

Sending messages to individual containers

Sometimes it is useful when a container can directly communicate with another container, and the latter can be addressed by a unique name within the Netplex system. A normal Netplex socket is not useful here because Netplex determines which container will accept new connections on the socket, i.e. from the perspective of the message sender it is random which container receives the message.

In Ocamlnet 3, a special kind of socket, called "container socket" has been added to solve this problem. This type of socket is not created by the master process, but by the container process (hence the name). The socket is a Unix Domain socket for Unix, and a named pipe for Win32. It has a unique name, and if the message sender knows the name, it can send the message to a specific container.

One creates such sockets by adding an address section to the config file that looks like

  address {
    type = "container"
  }

If this address section is simply added to an existing protocol section, the network protocol of the container socket is the same as that of the main socket of the container. If a different network protocol is going to be used for the container socket, one can also add a second protocol section. For example, here is a main HTTP service, and a separate service control that is run over the container sockets:

  service {
    name = "sample"
    protocol {
      name = "http"
      address {
        type = "internet"
        bind = "0.0.0.0:80"
      }
    }
    protocol {
      name = "control"
      address {
        type = "container"
      }
    }
    processor { 
      type = "myproc";
      http { ... webserver config ... }
      control { ... rpc config ... }
    }
  }

One can now employ Netplex_kit.protocol_switch_factory to route incoming TCP connections arriving at "http" sockets to web server code, and to route incoming TCP connections arriving at "control" sockets to a e.g. an RPC server:

  let compound_factory =
    new Netplex_kit.protocol_switch_factory
      "myproc"
      [ "http", Nethttpd_plex.nethttpd_factory ...;
        "control", Rpc_netplex.rpc_factory ...;
      ]

The implementation of "control" would be a normal RPC server.

The remaining question is now how to get the unique names of the container sockets. There is the function Netplex_cenv.lookup_container_sockets helping here. The function is called with the service name and the protocol name as arguments:

  let cs_paths =
    Netplex_cenv.lookup_container_sockets "sample" "control"

It returns an array of Unix Domain paths, each corresponding to the container socket of one container. It is recommended to use Netplex_sockserv.any_file_client_connector for creating RPC clients:

  let clients =
    List.map
      (fun cs_path ->
        let connector = Netplex_sockserv.any_file_client_connector cs_path in
        create_client ... connector ...
      )
      cs_paths

There is no way to get more information about the cs_paths, e.g. in order to find a special container. (Of course, except by calling RPC functions and asking the containers directly.)

A container can also find out the address of its own container socket. Use the method owned_container_sockets to get a list of pairs (protocol_name, path), e.g.

  let cont = Netplex_cenv.self_cont() in
  let path = List.assoc "control" cont#owned_container_sockets

One-time initialization code

It is sometimes necessary to run some initialization code only once for all containers of a certain service. Of course, there is always the option of doing this at program startup. However, this might be too early, e.g. because some information is not yet known.

Another option is to do such initialization in the pre_start_hook of the container. The pre_start_hook is run before the container process is forked off, and executes in the master process. Because of this it is easy to have a global variable that checks whether pre_start_hook is called the first time:

  let first_time = ref true

  let pre_start_hook _ _ _ =
    if !first_time then (* do initialization *) ... ;
    first_time := false

  let hooks =
    ( object
        inherit Netplex_kit.empty_processor_hooks()
        method pre_start_hook socksrv ctrl cid =
          pre_start_hook socksrv ctrl cid
      end
    )

Last but not least there is also the possibility to run such initialization code in the post_start_hook. This is different as this hook is called from the container, i.e. from the forked-off child process. This might be convenient if the initialization routine is written for container context.

There is some additional complexity, though. One can no longer simply use a global variable to catch the first time post_start_hook is called. Instead, one has to use a storage medium that is shared by all containers, and that is accessible from all containers. There are plenty of possibilities, e.g. a file. In this example, however, we use a Netplex semaphore:

  let hooks =
    ( object
        inherit Netplex_kit.empty_processor_hooks()

        method post_add_hook socksrv ctrl =
          ctrl # add_plugin Netplex_semaphore.plugin

        method post_start_hook cont =
          let first_time =
            Netplex_semaphore.create "myinit" 0L in
          if first_time then (* do initialization *) ... ;
      end
    )

The semaphore is visible in the whole Netplex system. We use here the fact that Netplex_semaphore.create returns true when the semaphore is created at the first call of create. The semaphore is then never increased or decreased.

Storing global state

Sometimes global state is unavoidable. We mean here state variables that are accessed by all processes of the Netplex system.

Since Ocamlnet 3 there is Netplex_sharedvar. This modules provides Netplex-global string variables that are identified by a user-chosen name.

For example, to make a variable of type stats globally accessible

type stats =
  { mutable foo_count : int;
    mutable bar_count : int
  }

(see also above, "Container variables"), we can accomplish this as follows.

module Stats_var =
  Netplex_sharedvar.Make_var_type(struct type t = stats end)

Now, this defines functions Stats_var.get and Stats_var.set to get and set the value, respectively. Note that this is type-safe although Netplex_sharedvar.Make_var_type uses the Marshal module internally. If a get/set function is applied to a variable of the wrong type we will get the exception Netplex_sharedvar.Sharedvar_type_mismatch.

Before one can get/set values, one has to create the variable with

let ok =
  Netplex_sharedvar.create ~enc:true name

The parameter enc:true is required for variables accessed via Netplex_sharedvar.Make_var_type.

In order to use Netplex_sharedvar we have to add this plugin:

  let hooks =
    ( object
        inherit Netplex_kit.empty_processor_hooks()

        method post_add_hook socksrv ctrl =
          ctrl # add_plugin Netplex_sharedvar.plugin
      end
    )

Now, imagine that we want to increase the counters in a stats variable. As we have now truly parallel accesses, we have to ensure that these accesses do not overlap. We use a Netplex mutex to ensure this like in:

  let mutex = Netplex_mutex.access "mymutex" in
   Netplex_mutex.lock mutex;
   try 
     let v = Stats_var.get "mystats" in
     v.foo_count <- v.foo_count + foo_delta;
     v.bar_count <- v.bar_count + bar_delta;
     Stats_var.set "mystats" v;
     Netplex_mutex.unlock mutex;
   with
     error -> Netplex_mutex.unlock mutex; raise error

As Netplex mutexes are also plugins, we have to add them in the post_add_hook, too. Also see Netplex_mutex for more information.

Generally, shared variables should not be used to store large quantities of data. A few megabytes are probably ok. The reason is that these variables exist in the Netplex master process, and each time a child is forked off the variables are also copied although this is not necessary. (It is possible and likely that a future version of Ocamnet improves this.)

For bigger amounts of data, it is advised to store them in an external file, a shared memory segment (Netshm might help here), or even in a database system. Shared variables should then only be used to pass around the name of this file/segment/database.

Hooks, and how to pass values down

Usually, the user configures processor factories by creating hook objects. We have shown this already several times in previous sections of this chapter. Sometimes the question arises how to pass values from one hook to another.

The hooks are called in a certain order. Unfortunately, there is no easy way to pass values from one hook to another. As workaround, it is suggested to store the values in the hooks object.

For example, consider we need to allocate a database ID for each container. We do this in the pre_start_hook, so we know the ID early. Of course, the code started from the post_start_hook also needs the ID, and in the post_finish_hook we would like to delete everything in the database referenced by this ID.

This could be done in a hook object like

  let hooks =
    ( object
        inherit Netplex_kit.empty_processor_hooks()

        val db_id_tbl = Hashtbl.create 11

        method pre_start_hook _ _ cid =
          let db_id = allocate_db_id() in       (* create db ID *)
          Hashtbl.add db_id_tbl cid db_id       (* remember it for later *)

        method post_start_hook cont =
          let cid = cont # container_id in           (* the container ID *)
          let db_id = Hashtbl.find db_id_tbl cid in  (* look up the db ID *)
          ...

        method post_finish_hook _ _ cid =
          let db_id = Hashtbl.find db_id_tbl cid in  (* look up the db ID *)
          delete_db_id db_id;                        (* clean up db *)
          Hashtbl.remove db_id_tbl cid
      end
    )

We use here the container ID to identify the container. This works in all used hooks - either the container ID is passed directly, or we can get it from the container object itself.

Normally there is only one controller per program. It is imaginable that a multi-threaded program has several controllers, though. In this case one has to be careful with this technique, because it should be avoided that values from the Netplex system driven by one controller are visible in the system driven by the other controller. Often, this can be easily achieved by creating separate hook objects, one per controller.

Levers - calling controller functions from containers

In a multi-process setup, the controller runs in the master process, and the containers run in child processes. Because of this, container code cannot directly invoke functions of the controller.

For multi-threaded programs, this is quite easy to solve. With the function Netplex_cenv.run_in_controller_context it can be temporarily switched to the controller thread to run code there.

For example, to start a helper container one can do

  Netplex_cenv.run_in_controller_context ctrl
    (fun () ->
       Netplex_kit.add_helper_service ctrl "helper1" hooks
    )

which starts a new container with an empty processor that only consists of the hooks object. The post_start_hook can be considered as the "body" of the new thread. The advantage of this is (compared to Thread.start) that this thread counts as a regular container, and can e.g. use logging functions.

There is no such easy way in the multi-processing case. As a workaround, a special mechanism has been added to Netplex, the so-called levers. Levers are registered functions that are known to the controller and which can be invoked from container context. Levers have an argument and can deliver a result. The types of argument and result can be arbitrary (but must be monomorphic, and must not contain functions). (The name, lever, was chosen because it reminds of additional operating handles, as we add such handles to the controller.)

Levers are usually registered in the post_add hook of the processor. For example, let us define a lever that can start a helper container. As arguments we pass a tuple of a string and an int (s,i). The arguments do not have any meaning here, we only do this to demonstrate how to pass arguments. As result, we pass a boolean value back that says whether the helper container was started successfully.

First we need to create a type module:

module T = struct
  type s = string * int    (* argument type *)
  type r = bool            (* result type *)
end

As second step, we need to create the lever module. This means only to apply the functor Netplex_cenv.Make_lever:

module L = Netplex_cenv.Make_lever(T)

What happens behind the scene is that a function L.register is created that can marshal the argument and result values from the container process to the master process and back. This is invisible to the user, and type-safe.

Now, we have to call L.register from the post_add_hook. The result of L.register is another function that represents the lever. By calling it, the lever is activated:

  let hooks =
    ( object
        inherit Netplex_kit.empty_processor_hooks()

        method post_add_hook socksrv ctrl =
          let lever = 
            L.register ctrl
              (fun (s,i) ->
                 try
                   Netplex_kit.add_helper_service ctrl "helper1" ...;
                   true   (* successful *)
                 with error ->
                   false  (* not successful *)
              ) in
           ...
      end
    )

So, when we call lever ("X",42) from the container, the lever mechanism routes this call to the controller process, and calls there the function (fun (s,i) -> ...) that is the argument of L.register.

Finally, the question is how can we make the function lever known to containers. The hackish way to do this is to store lever in a global variable. The clean way is to store lever in a container variable, e.g.

  module LV = Netplex_cenv.Make_var_type(L)
    (* This works because L.t is the type of the lever *)

  let hooks =
    ( object
        inherit Netplex_kit.empty_processor_hooks()

        val mutable helper1_lever = (fun _ -> assert false)

        method post_add_hook socksrv ctrl =
          let lever = 
            L.register ctrl
              (fun (s,i) ->
                 try
                   Netplex_kit.add_helper_service ctrl "helper1" ...;
                   true   (* successful *)
                 with error ->
                   false  (* not successful *)
              ) in
           helper1_lever <- lever

        method post_start_hook cont =
          LV.set "helper1_lever" helper1_lever
      end
    )

and later in container code:

  let helper1_lever = LV.get "helper1_lever" in
  let success = helper1_lever ("X",42) in
  if success then
    print_endline "OK, started the new helper"
  else
    print_endline "There was an error"

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml