Plasma GitLab Archive
Projects Blog Knowledge

{1 Advanced features of Netplex}

Some information about advanced techniques.

{b Contents}

- {!Netplex_advanced.timers}
- {!Netplex_advanced.contvars}
- {!Netplex_advanced.contsocks}
- {!Netplex_advanced.initonce}
- {!Netplex_advanced.sharedvars}
- {!Netplex_advanced.passdown}
- {!Netplex_advanced.levers}

{2:timers Running timers in containers}

With {!Netplex_cenv.create_timer} one can start a timer that runs directly
in the container event loop. This event loop is normally used for accepting
new connections, and for exchanging control messages with the master
process. If the processor supports it (like the RPC processor), the
event loop is also used by the processor itself for 
protocol interpretation. Running a timer in this loop means that the
expiration of the timer is first detected when the control flow of the
container returns to the event loop. In the worst case, this happens only
when the current connection is finished, and it is waited for the next
connection.

So, this is not the kind of high-precision timer one would use for the
exact control of latencies. However, these timers are still useful for
things that run only infrequently, like

- processing statistical information
- checking whether configuration updates have arrived
- checking whether resources have "timed out" and can be released
  (e.g. whether a connection to a database system can be closed)

Timers can be cancelled by called {!Netplex_cenv.cancel_timer}. Timers
are automatically cancelled at container shutdown time.

Example: Start a timer at container startup: We have to do this in the
[post_start_hook] of the processor. It depends on the kind of
processor how the hooks are set. For example, the processor factories
{!Rpc_netplex.rpc_factory} and {!Nethttpd_plex.nethttpd_factory} have
an argument [hooks], and one can create it like:

{[
  let hooks =
    ( object
        inherit Netplex_kit.empty_processor_hooks()
        method post_start_hook cont =
          let timer =
            Netplex_cenv.create_timer
              (fun timer -> ...)
              tmo in
          ...
      end
    )
]}


{2:contvars Container variables}

If multi-processing is used, one can simply store per-container values
in global variables. This works because for every new container the
whole program is forked, and thus a new instance of the variable is
created.

For multi-threaded programs this is a lot more difficult. For this
reason there is built-in support for per-container variables.

Example: We want to implement a statistics how often the functions
[foo] and [bar] are called, per-container. We define a record

{[
type stats =
  { mutable foo_count : int;
    mutable bar_count : int
  }
]}

Furthermore, we need an access module that looks for the current value
of the variable (get), or overwrites the value (set). We can simply
create this module by using the functor {!Netplex_cenv.Make_var_type}:

{[
module Stats_var =
  Netplex_cenv.Make_var_type(struct type t = stats end)
]}

Now, one can get the value of a [stats]-typed variable "count" by
calling

{[
let stats =
  Stats_var.get "count"
]}

(which will raise {!Netplex_cenv.Container_variable_not_found} if the
value of "count" never has been set before), and one can set the value
by

{[
Stats_var.set "count" stats
]}

As mentioned, the variable "count" exists once per container. One
can access it only from the scope of a Netplex container (e.g. from a 
callback function that is invoked by a Netplex processor). It is a 
good idea to initialize "count" in the [post_start_hook] of the
processor (see the timer example above).

See also below on "Storing global state" for another kind of variable that
can be accessed from all containers.


{2:contsocks Sending messages to individual containers}

Sometimes it is useful when a container can directly communicate with 
another container, and the latter can be addressed by a unique name
within the Netplex system. A normal Netplex socket is not useful here
because Netplex determines which container will accept new connections
on the socket, i.e. from the perspective of the message sender it is
random which container receives the message.

In Ocamlnet 3, a special kind of socket, called "container socket" has
been added to solve this problem. This type of socket is not created by
the master process, but by the container process (hence the name). The
socket is a Unix Domain socket for Unix, and a named pipe for Win32.
It has a unique name, and if the message sender knows the name, it can
send the message to a specific container.

One creates such sockets by adding an [address] section to the config
file that looks like

{[
  address {
    type = "container"
  }
]}

If this [address] section is simply added to an existing [protocol]
section, the network protocol of the container socket is the same as
that of the main socket of the container. If a different network protocol
is going to be used for the container socket, one can also add a second
[protocol] section. For example, here is a main HTTP service, and a
separate service [control] that is run over the container sockets:

{[
  service {
    name = "sample"
    protocol {
      name = "http"
      address {
        type = "internet"
        bind = "0.0.0.0:80"
      }
    }
    protocol {
      name = "control"
      address {
        type = "container"
      }
    }
    processor { 
      type = "myproc";
      http { ... webserver config ... }
      control { ... rpc config ... }
    }
  }
]}

One can now employ {!Netplex_kit.protocol_switch_factory} to route
incoming TCP connections arriving at "http" sockets to web server
code, and to route incoming TCP connections arriving at "control"
sockets to a e.g. an RPC server:

{[
  let compound_factory =
    new Netplex_kit.protocol_switch_factory
      "myproc"
      [ "http", Nethttpd_plex.nethttpd_factory ...;
        "control", Rpc_netplex.rpc_factory ...;
      ]
]}

The implementation of "control" would be a normal RPC server.

The remaining question is now how to get the unique names of the
container sockets. There is the function 
{!Netplex_cenv.lookup_container_sockets} helping here. The function
is called with the service name and the protocol name as arguments:

{[
  let cs_paths =
    Netplex_cenv.lookup_container_sockets "sample" "control"
]}

It returns an array of Unix Domain paths, each corresponding to the
container socket of one container. It is recommended to use
{!Netplex_sockserv.any_file_client_connector} for creating RPC
clients:

{[
  let clients =
    List.map
      (fun cs_path ->
        let connector = Netplex_sockserv.any_file_client_connector cs_path in
        create_client ... connector ...
      )
      cs_paths
]}

There is no way to get more information about the [cs_paths], e.g.
in order to find a special container. (Of course, except by calling RPC
functions and asking the containers directly.)

A container can also find out the address of its own container socket.
Use the method [owned_container_sockets] to get a list of pairs
[(protocol_name, path)], e.g.

{[
  let cont = Netplex_cenv.self_cont() in
  let path = List.assoc "control" cont#owned_container_sockets
]}



{2:initonce One-time initialization code}

It is sometimes necessary to run some initialization code only once
for all containers of a certain service. Of course, there is always
the option of doing this at program startup. However, this might be
too early, e.g. because some information is not yet known.

Another option is to do such initialization in the [pre_start_hook] of
the container. The [pre_start_hook] is run before the container process
is forked off, and executes in the master process. Because of this it is
easy to have a global variable that checks whether [pre_start_hook] is
called the first time:

{[
  let first_time = ref true

  let pre_start_hook _ _ _ =
    if !first_time then (* do initialization *) ... ;
    first_time := false

  let hooks =
    ( object
        inherit Netplex_kit.empty_processor_hooks()
        method pre_start_hook socksrv ctrl cid =
          pre_start_hook socksrv ctrl cid
      end
    )
]}

Last but not least there is also the possibility to run such
initialization code in the [post_start_hook]. This is different as
this hook is called from the container, i.e. from the forked-off child
process. This might be convenient if the initialization routine is
written for container context.

There is some additional complexity, though. One can no longer simply
use a global variable to catch the first time [post_start_hook] is
called. Instead, one has to use a storage medium that is shared by all
containers, and that is accessible from all containers. There are
plenty of possibilities, e.g. a file. In this example, however, we use
a Netplex semaphore:

{[
  let hooks =
    ( object
        inherit Netplex_kit.empty_processor_hooks()

        method post_add_hook socksrv ctrl =
          ctrl # add_plugin Netplex_semaphore.plugin

        method post_start_hook cont =
          let first_time =
            Netplex_semaphore.create "myinit" 0L in
          if first_time then (* do initialization *) ... ;
      end
    )
]}

The semaphore is visible in the whole Netplex system. We use here the
fact that {!Netplex_semaphore.create} returns [true] when the semaphore
is created at the first call of [create]. The semaphore is then never
increased or decreased.


{2:sharedvars Storing global state}

Sometimes global state is unavoidable. We mean here state variables
that are accessed by all processes of the Netplex system.

Since Ocamlnet 3 there is {!Netplex_sharedvar}. This modules provides
Netplex-global string variables that are identified by a user-chosen
name.

For example, to make a variable of [type stats] globally accessible

{[
type stats =
  { mutable foo_count : int;
    mutable bar_count : int
  }
]}

(see also above, "Container variables"), we can accomplish this as
follows.

{[
module Stats_var =
  Netplex_sharedvar.Make_var_type(struct type t = stats end)
]}

Now, this defines functions [Stats_var.get] and [Stats_var.set] to 
get and set the value, respectively. Note that this is type-safe
although {!Netplex_sharedvar.Make_var_type} uses the [Marshal] module
internally. If a get/set function is applied to a variable of the
wrong type we will get the exception
{!Netplex_sharedvar.Sharedvar_type_mismatch}.

Before one can get/set values, one has to create the variable with

{[
let ok =
  Netplex_sharedvar.create ~enc:true name
]}

The parameter [enc:true] is required for variables accessed via
{!Netplex_sharedvar.Make_var_type}.

In order to use {!Netplex_sharedvar} we have to add this plugin:

{[
  let hooks =
    ( object
        inherit Netplex_kit.empty_processor_hooks()

        method post_add_hook socksrv ctrl =
          ctrl # add_plugin Netplex_sharedvar.plugin
      end
    )
]}

Now, imagine that we want to increase the counters in a [stats]
variable. As we have now truly parallel accesses, we have to
ensure that these accesses do not overlap. We use a Netplex
mutex to ensure this like in:

{[
  let mutex = Netplex_mutex.access "mymutex" in
   Netplex_mutex.lock mutex;
   try 
     let v = Stats_var.get "mystats" in
     v.foo_count <- v.foo_count + foo_delta;
     v.bar_count <- v.bar_count + bar_delta;
     Stats_var.set "mystats" v;
     Netplex_mutex.unlock mutex;
   with
     error -> Netplex_mutex.unlock mutex; raise error
]}

As Netplex mutexes are also plugins, we have to add them in the 
[post_add_hook], too. Also see {!Netplex_mutex} for more information.

Generally, shared variables should not be used to store large
quantities of data. A few megabytes are probably ok. The reason is
that these variables exist in the Netplex master process, and each
time a child is forked off the variables are also copied although this
is not necessary. (It is possible and likely that a future version of
Ocamnet improves this.)

For bigger amounts of data, it is advised to store them in an external
file, a shared memory segment ({!Netshm} might help here), or even in
a database system. Shared variables should then only be used to
pass around the name of this file/segment/database.


{2:passdown Hooks, and how to pass values down}

Usually, the user configures processor factories by creating hook
objects.  We have shown this already several times in previous
sections of this chapter. Sometimes the question arises how to pass
values from one hook to another.

The hooks are called in a certain order. Unfortunately, there is
no easy way to pass values from one hook to another. As workaround,
it is suggested to store the values in the hooks object.

For example, consider we need to allocate a database ID for each
container. We do this in the [pre_start_hook], so we know the ID
early. Of course, the code started from the [post_start_hook] also
needs the ID, and in the [post_finish_hook] we would like to delete
everything in the database referenced by this ID.

This could be done in a hook object like

{[
  let hooks =
    ( object
        inherit Netplex_kit.empty_processor_hooks()

        val db_id_tbl = Hashtbl.create 11

        method pre_start_hook _ _ cid =
          let db_id = allocate_db_id() in       (* create db ID *)
          Hashtbl.add db_id_tbl cid db_id       (* remember it for later *)

        method post_start_hook cont =
          let cid = cont # container_id in           (* the container ID *)
          let db_id = Hashtbl.find db_id_tbl cid in  (* look up the db ID *)
          ...

        method post_finish_hook _ _ cid =
          let db_id = Hashtbl.find db_id_tbl cid in  (* look up the db ID *)
          delete_db_id db_id;                        (* clean up db *)
          Hashtbl.remove db_id_tbl cid
      end
    )
]}

We use here the container ID to identify the container. This works in
all used hooks - either the container ID is passed directly, or we can
get it from the container object itself.

Normally there is only one controller per program. It is imaginable that
a multi-threaded program has several controllers, though. In this case
one has to be careful with this technique, because it should be avoided
that values from the Netplex system driven by one controller are visible
in the system driven by the other controller. Often, this can be easily
achieved by creating separate hook objects, one per controller.


{2:levers Levers - calling controller functions from containers}

In a multi-process setup, the controller runs in the master process,
and the containers run in child processes. Because of this, container
code cannot directly invoke functions of the controller.

For multi-threaded programs, this is quite easy to solve. With the
function {!Netplex_cenv.run_in_controller_context} it can be
temporarily switched to the controller thread to run code there.

For example, to start a helper container one can do

{[
  Netplex_cenv.run_in_controller_context ctrl
    (fun () ->
       Netplex_kit.add_helper_service ctrl "helper1" hooks
    )
]}

which starts a new container with an empty processor that only consists
of the [hooks] object. The [post_start_hook] can be considered as the
"body" of the new thread. The advantage of this is (compared to
[Thread.start]) that this thread counts as a regular container, and
can e.g. use logging functions.

There is no such easy way in the multi-processing case. As a
workaround, a special mechanism has been added to Netplex, the
so-called {b levers}. Levers are registered functions that are known
to the controller and which can be invoked from container context.
Levers have an argument and can deliver a result. The types of
argument and result can be arbitrary (but must be monomorphic, and
must not contain functions). (The name, lever, was chosen because
it reminds of additional operating handles, as we add such handles
to the controller.)

Levers are usually registered in the [post_add] hook of the processor.
For example, let us define a lever that can start a helper container.
As arguments we pass a tuple of a string and an int [(s,i)]. The
arguments do not have any meaning here, we only do this to demonstrate
how to pass arguments. As result, we pass a boolean value back that
says whether the helper container was started successfully.

First we need to create a type module:

{[
module T = struct
  type s = string * int    (* argument type *)
  type r = bool            (* result type *)
end
]}

As second step, we need to create the lever module. This means only to
apply the functor {!Netplex_cenv.Make_lever}:

{[
module L = Netplex_cenv.Make_lever(T)
]}

What happens behind the scene is that a function [L.register] is
created that can marshal the argument and result values from the
container process to the master process and back. This is invisible
to the user, and type-safe.

Now, we have to call [L.register] from the [post_add_hook]. The result
of [L.register] is another function that represents the lever. By
calling it, the lever is activated:

{[
  let hooks =
    ( object
        inherit Netplex_kit.empty_processor_hooks()

        method post_add_hook socksrv ctrl =
          let lever = 
            L.register ctrl
              (fun (s,i) ->
                 try
                   Netplex_kit.add_helper_service ctrl "helper1" ...;
                   true   (* successful *)
                 with error ->
                   false  (* not successful *)
              ) in
           ...
      end
    )
]}

So, when we call [lever ("X",42)] from the container, the lever
mechanism routes this call to the controller process, and calls there
the function [(fun (s,i) -> ...)] that is the argument of
[L.register].

Finally, the question is how can we make the function [lever] known to
containers. The hackish way to do this is to store [lever] in a global
variable. The clean way is to store [lever] in a container variable,
e.g.


{[
  module LV = Netplex_cenv.Make_var_type(L)
    (* This works because L.t is the type of the lever *)

  let hooks =
    ( object
        inherit Netplex_kit.empty_processor_hooks()

        val mutable helper1_lever = (fun _ -> assert false)

        method post_add_hook socksrv ctrl =
          let lever = 
            L.register ctrl
              (fun (s,i) ->
                 try
                   Netplex_kit.add_helper_service ctrl "helper1" ...;
                   true   (* successful *)
                 with error ->
                   false  (* not successful *)
              ) in
           helper1_lever <- lever

        method post_start_hook cont =
          LV.set "helper1_lever" helper1_lever
      end
    )
]}

and later in container code:

{[
  let helper1_lever = LV.get "helper1_lever" in
  let success = helper1_lever ("X",42) in
  if success then
    print_endline "OK, started the new helper"
  else
    print_endline "There was an error"
]}

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml