Some information about advanced techniques.
Contents
Netplex_advanced.timers
Netplex_advanced.contvars
Netplex_advanced.contsocks
Netplex_advanced.initonce
Netplex_advanced.sharedvars
Netplex_advanced.passdown
Netplex_advanced.levers
With Netplex_cenv.create_timer
one can start a timer that runs directly
in the container event loop. This event loop is normally used for accepting
new connections, and for exchanging control messages with the master
process. If the processor supports it (like the RPC processor), the
event loop is also used by the processor itself for
protocol interpretation. Running a timer in this loop means that the
expiration of the timer is first detected when the control flow of the
container returns to the event loop. In the worst case, this happens only
when the current connection is finished, and it is waited for the next
connection.
So, this is not the kind of high-precision timer one would use for the exact control of latencies. However, these timers are still useful for things that run only infrequently, like
Timers can be cancelled by called Netplex_cenv.cancel_timer
. Timers
are automatically cancelled at container shutdown time.
Example: Start a timer at container startup: We have to do this in the
post_start_hook
of the processor. It depends on the kind of
processor how the hooks are set. For example, the processor factories
Rpc_netplex.rpc_factory
and Nethttpd_plex.nethttpd_factory
have
an argument hooks
, and one can create it like:
let hooks =
( object
inherit Netplex_kit.empty_processor_hooks()
method post_start_hook cont =
let timer =
Netplex_cenv.create_timer
(fun timer -> ...)
tmo in
...
end
)
If multi-processing is used, one can simply store per-container values in global variables. This works because for every new container the whole program is forked, and thus a new instance of the variable is created.
For multi-threaded programs this is a lot more difficult. For this reason there is built-in support for per-container variables.
Example: We want to implement a statistics how often the functions
foo
and bar
are called, per-container. We define a record
type stats =
{ mutable foo_count : int;
mutable bar_count : int
}
Furthermore, we need an access module that looks for the current value
of the variable (get), or overwrites the value (set). We can simply
create this module by using the functor Netplex_cenv.Make_var_type
:
module Stats_var =
Netplex_cenv.Make_var_type(struct type t = stats end)
Now, one can get the value of a stats
-typed variable "count" by
calling
let stats =
Stats_var.get "count"
(which will raise Netplex_cenv.Container_variable_not_found
if the
value of "count" never has been set before), and one can set the value
by
Stats_var.set "count" stats
As mentioned, the variable "count" exists once per container. One
can access it only from the scope of a Netplex container (e.g. from a
callback function that is invoked by a Netplex processor). It is a
good idea to initialize "count" in the post_start_hook
of the
processor (see the timer example above).
See also below on "Storing global state" for another kind of variable that can be accessed from all containers.
Sometimes it is useful when a container can directly communicate with another container, and the latter can be addressed by a unique name within the Netplex system. A normal Netplex socket is not useful here because Netplex determines which container will accept new connections on the socket, i.e. from the perspective of the message sender it is random which container receives the message.
In Ocamlnet 3, a special kind of socket, called "container socket" has been added to solve this problem. This type of socket is not created by the master process, but by the container process (hence the name). The socket is a Unix Domain socket for Unix, and a named pipe for Win32. It has a unique name, and if the message sender knows the name, it can send the message to a specific container.
One creates such sockets by adding an address
section to the config
file that looks like
address {
type = "container"
}
If this address
section is simply added to an existing protocol
section, the network protocol of the container socket is the same as
that of the main socket of the container. If a different network protocol
is going to be used for the container socket, one can also add a second
protocol
section. For example, here is a main HTTP service, and a
separate service control
that is run over the container sockets:
service {
name = "sample"
protocol {
name = "http"
address {
type = "internet"
bind = "0.0.0.0:80"
}
}
protocol {
name = "control"
address {
type = "container"
}
}
processor {
type = "myproc";
http { ... webserver config ... }
control { ... rpc config ... }
}
}
One can now employ Netplex_kit.protocol_switch_factory
to route
incoming TCP connections arriving at "http" sockets to web server
code, and to route incoming TCP connections arriving at "control"
sockets to a e.g. an RPC server:
let compound_factory =
new Netplex_kit.protocol_switch_factory
"myproc"
[ "http", Nethttpd_plex.nethttpd_factory ...;
"control", Rpc_netplex.rpc_factory ...;
]
The implementation of "control" would be a normal RPC server.
The remaining question is now how to get the unique names of the
container sockets. There is the function
Netplex_cenv.lookup_container_sockets
helping here. The function
is called with the service name and the protocol name as arguments:
let cs_paths =
Netplex_cenv.lookup_container_sockets "sample" "control"
It returns an array of Unix Domain paths, each corresponding to the
container socket of one container. It is recommended to use
Netplex_sockserv.any_file_client_connector
for creating RPC
clients:
let clients =
List.map
(fun cs_path ->
let connector = Netplex_sockserv.any_file_client_connector cs_path in
create_client ... connector ...
)
cs_paths
There is no way to get more information about the cs_paths
, e.g.
in order to find a special container. (Of course, except by calling RPC
functions and asking the containers directly.)
A container can also find out the address of its own container socket.
Use the method owned_container_sockets
to get a list of pairs
(protocol_name, path)
, e.g.
let cont = Netplex_cenv.self_cont() in
let path = List.assoc "control" cont#owned_container_sockets
It is sometimes necessary to run some initialization code only once for all containers of a certain service. Of course, there is always the option of doing this at program startup. However, this might be too early, e.g. because some information is not yet known.
Another option is to do such initialization in the pre_start_hook
of
the container. The pre_start_hook
is run before the container process
is forked off, and executes in the master process. Because of this it is
easy to have a global variable that checks whether pre_start_hook
is
called the first time:
let first_time = ref true
let pre_start_hook _ _ _ =
if !first_time then (* do initialization *) ... ;
first_time := false
let hooks =
( object
inherit Netplex_kit.empty_processor_hooks()
method pre_start_hook socksrv ctrl cid =
pre_start_hook socksrv ctrl cid
end
)
Last but not least there is also the possibility to run such
initialization code in the post_start_hook
. This is different as
this hook is called from the container, i.e. from the forked-off child
process. This might be convenient if the initialization routine is
written for container context.
There is some additional complexity, though. One can no longer simply
use a global variable to catch the first time post_start_hook
is
called. Instead, one has to use a storage medium that is shared by all
containers, and that is accessible from all containers. There are
plenty of possibilities, e.g. a file. In this example, however, we use
a Netplex semaphore:
let hooks =
( object
inherit Netplex_kit.empty_processor_hooks()
method post_add_hook socksrv ctrl =
ctrl # add_plugin Netplex_semaphore.plugin
method post_start_hook cont =
let first_time =
Netplex_semaphore.create "myinit" 0L in
if first_time then (* do initialization *) ... ;
end
)
The semaphore is visible in the whole Netplex system. We use here the
fact that Netplex_semaphore.create
returns true
when the semaphore
is created at the first call of create
. The semaphore is then never
increased or decreased.
Sometimes global state is unavoidable. We mean here state variables that are accessed by all processes of the Netplex system.
Since Ocamlnet 3 there is Netplex_sharedvar
. This modules provides
Netplex-global string variables that are identified by a user-chosen
name.
For example, to make a variable of type stats
globally accessible
type stats =
{ mutable foo_count : int;
mutable bar_count : int
}
(see also above, "Container variables"), we can accomplish this as follows.
module Stats_var =
Netplex_sharedvar.Make_var_type(struct type t = stats end)
Now, this defines functions Stats_var.get
and Stats_var.set
to
get and set the value, respectively. Note that this is type-safe
although Netplex_sharedvar.Make_var_type
uses the Marshal
module
internally. If a get/set function is applied to a variable of the
wrong type we will get the exception
Netplex_sharedvar.Sharedvar_type_mismatch
.
Before one can get/set values, one has to create the variable with
let ok =
Netplex_sharedvar.create ~enc:true name
The parameter enc:true
is required for variables accessed via
Netplex_sharedvar.Make_var_type
.
In order to use Netplex_sharedvar
we have to add this plugin:
let hooks =
( object
inherit Netplex_kit.empty_processor_hooks()
method post_add_hook socksrv ctrl =
ctrl # add_plugin Netplex_sharedvar.plugin
end
)
Now, imagine that we want to increase the counters in a stats
variable. As we have now truly parallel accesses, we have to
ensure that these accesses do not overlap. We use a Netplex
mutex to ensure this like in:
let mutex = Netplex_mutex.access "mymutex" in
Netplex_mutex.lock mutex;
try
let v = Stats_var.get "mystats" in
v.foo_count <- v.foo_count + foo_delta;
v.bar_count <- v.bar_count + bar_delta;
Stats_var.set "mystats" v;
Netplex_mutex.unlock mutex;
with
error -> Netplex_mutex.unlock mutex; raise error
As Netplex mutexes are also plugins, we have to add them in the
post_add_hook
, too. Also see Netplex_mutex
for more information.
Generally, shared variables should not be used to store large quantities of data. A few megabytes are probably ok. The reason is that these variables exist in the Netplex master process, and each time a child is forked off the variables are also copied although this is not necessary. (It is possible and likely that a future version of Ocamnet improves this.)
For bigger amounts of data, it is advised to store them in an external
file, a shared memory segment (Netshm
might help here), or even in
a database system. Shared variables should then only be used to
pass around the name of this file/segment/database.
Usually, the user configures processor factories by creating hook objects. We have shown this already several times in previous sections of this chapter. Sometimes the question arises how to pass values from one hook to another.
The hooks are called in a certain order. Unfortunately, there is no easy way to pass values from one hook to another. As workaround, it is suggested to store the values in the hooks object.
For example, consider we need to allocate a database ID for each
container. We do this in the pre_start_hook
, so we know the ID
early. Of course, the code started from the post_start_hook
also
needs the ID, and in the post_finish_hook
we would like to delete
everything in the database referenced by this ID.
This could be done in a hook object like
let hooks =
( object
inherit Netplex_kit.empty_processor_hooks()
val db_id_tbl = Hashtbl.create 11
method pre_start_hook _ _ cid =
let db_id = allocate_db_id() in (* create db ID *)
Hashtbl.add db_id_tbl cid db_id (* remember it for later *)
method post_start_hook cont =
let cid = cont # container_id in (* the container ID *)
let db_id = Hashtbl.find db_id_tbl cid in (* look up the db ID *)
...
method post_finish_hook _ _ cid =
let db_id = Hashtbl.find db_id_tbl cid in (* look up the db ID *)
delete_db_id db_id; (* clean up db *)
Hashtbl.remove db_id_tbl cid
end
)
We use here the container ID to identify the container. This works in all used hooks - either the container ID is passed directly, or we can get it from the container object itself.
Normally there is only one controller per program. It is imaginable that a multi-threaded program has several controllers, though. In this case one has to be careful with this technique, because it should be avoided that values from the Netplex system driven by one controller are visible in the system driven by the other controller. Often, this can be easily achieved by creating separate hook objects, one per controller.
In a multi-process setup, the controller runs in the master process, and the containers run in child processes. Because of this, container code cannot directly invoke functions of the controller.
For multi-threaded programs, this is quite easy to solve. With the
function Netplex_cenv.run_in_controller_context
it can be
temporarily switched to the controller thread to run code there.
For example, to start a helper container one can do
Netplex_cenv.run_in_controller_context ctrl
(fun () ->
Netplex_kit.add_helper_service ctrl "helper1" hooks
)
which starts a new container with an empty processor that only consists
of the hooks
object. The post_start_hook
can be considered as the
"body" of the new thread. The advantage of this is (compared to
Thread.start
) that this thread counts as a regular container, and
can e.g. use logging functions.
There is no such easy way in the multi-processing case. As a workaround, a special mechanism has been added to Netplex, the so-called levers. Levers are registered functions that are known to the controller and which can be invoked from container context. Levers have an argument and can deliver a result. The types of argument and result can be arbitrary (but must be monomorphic, and must not contain functions). (The name, lever, was chosen because it reminds of additional operating handles, as we add such handles to the controller.)
Levers are usually registered in the post_add
hook of the processor.
For example, let us define a lever that can start a helper container.
As arguments we pass a tuple of a string and an int (s,i)
. The
arguments do not have any meaning here, we only do this to demonstrate
how to pass arguments. As result, we pass a boolean value back that
says whether the helper container was started successfully.
First we need to create a type module:
module T = struct
type s = string * int (* argument type *)
type r = bool (* result type *)
end
As second step, we need to create the lever module. This means only to
apply the functor Netplex_cenv.Make_lever
:
module L = Netplex_cenv.Make_lever(T)
What happens behind the scene is that a function L.register
is
created that can marshal the argument and result values from the
container process to the master process and back. This is invisible
to the user, and type-safe.
Now, we have to call L.register
from the post_add_hook
. The result
of L.register
is another function that represents the lever. By
calling it, the lever is activated:
let hooks =
( object
inherit Netplex_kit.empty_processor_hooks()
method post_add_hook socksrv ctrl =
let lever =
L.register ctrl
(fun (s,i) ->
try
Netplex_kit.add_helper_service ctrl "helper1" ...;
true (* successful *)
with error ->
false (* not successful *)
) in
...
end
)
So, when we call lever ("X",42)
from the container, the lever
mechanism routes this call to the controller process, and calls there
the function (fun (s,i) -> ...)
that is the argument of
L.register
.
Finally, the question is how can we make the function lever
known to
containers. The hackish way to do this is to store lever
in a global
variable. The clean way is to store lever
in a container variable,
e.g.
module LV = Netplex_cenv.Make_var_type(L)
(* This works because L.t is the type of the lever *)
let hooks =
( object
inherit Netplex_kit.empty_processor_hooks()
val mutable helper1_lever = (fun _ -> assert false)
method post_add_hook socksrv ctrl =
let lever =
L.register ctrl
(fun (s,i) ->
try
Netplex_kit.add_helper_service ctrl "helper1" ...;
true (* successful *)
with error ->
false (* not successful *)
) in
helper1_lever <- lever
method post_start_hook cont =
LV.set "helper1_lever" helper1_lever
end
)
and later in container code:
let helper1_lever = LV.get "helper1_lever" in
let success = helper1_lever ("X",42) in
if success then
print_endline "OK, started the new helper"
else
print_endline "There was an error"