{1 Advanced features of Netplex} Some information about advanced techniques. {b Contents} - {!Netplex_advanced.timers} - {!Netplex_advanced.contvars} - {!Netplex_advanced.contsocks} - {!Netplex_advanced.initonce} - {!Netplex_advanced.sharedvars} - {!Netplex_advanced.passdown} - {!Netplex_advanced.levers} {2:timers Running timers in containers} With {!Netplex_cenv.create_timer} one can start a timer that runs directly in the container event loop. This event loop is normally used for accepting new connections, and for exchanging control messages with the master process. If the processor supports it (like the RPC processor), the event loop is also used by the processor itself for protocol interpretation. Running a timer in this loop means that the expiration of the timer is first detected when the control flow of the container returns to the event loop. In the worst case, this happens only when the current connection is finished, and it is waited for the next connection. So, this is not the kind of high-precision timer one would use for the exact control of latencies. However, these timers are still useful for things that run only infrequently, like - processing statistical information - checking whether configuration updates have arrived - checking whether resources have "timed out" and can be released (e.g. whether a connection to a database system can be closed) Timers can be cancelled by called {!Netplex_cenv.cancel_timer}. Timers are automatically cancelled at container shutdown time. Example: Start a timer at container startup: We have to do this in the [post_start_hook] of the processor. It depends on the kind of processor how the hooks are set. For example, the processor factories {!Rpc_netplex.rpc_factory} and {!Nethttpd_plex.nethttpd_factory} have an argument [hooks], and one can create it like: {[ let hooks = ( object inherit Netplex_kit.empty_processor_hooks() method post_start_hook cont = let timer = Netplex_cenv.create_timer (fun timer -> ...) tmo in ... end ) ]} {2:contvars Container variables} If multi-processing is used, one can simply store per-container values in global variables. This works because for every new container the whole program is forked, and thus a new instance of the variable is created. For multi-threaded programs this is a lot more difficult. For this reason there is built-in support for per-container variables. Example: We want to implement a statistics how often the functions [foo] and [bar] are called, per-container. We define a record {[ type stats = { mutable foo_count : int; mutable bar_count : int } ]} Furthermore, we need an access module that looks for the current value of the variable (get), or overwrites the value (set). We can simply create this module by using the functor {!Netplex_cenv.Make_var_type}: {[ module Stats_var = Netplex_cenv.Make_var_type(struct type t = stats end) ]} Now, one can get the value of a [stats]-typed variable "count" by calling {[ let stats = Stats_var.get "count" ]} (which will raise {!Netplex_cenv.Container_variable_not_found} if the value of "count" never has been set before), and one can set the value by {[ Stats_var.set "count" stats ]} As mentioned, the variable "count" exists once per container. One can access it only from the scope of a Netplex container (e.g. from a callback function that is invoked by a Netplex processor). It is a good idea to initialize "count" in the [post_start_hook] of the processor (see the timer example above). See also below on "Storing global state" for another kind of variable that can be accessed from all containers. {2:contsocks Sending messages to individual containers} Sometimes it is useful when a container can directly communicate with another container, and the latter can be addressed by a unique name within the Netplex system. A normal Netplex socket is not useful here because Netplex determines which container will accept new connections on the socket, i.e. from the perspective of the message sender it is random which container receives the message. In Ocamlnet 3, a special kind of socket, called "container socket" has been added to solve this problem. This type of socket is not created by the master process, but by the container process (hence the name). The socket is a Unix Domain socket for Unix, and a named pipe for Win32. It has a unique name, and if the message sender knows the name, it can send the message to a specific container. One creates such sockets by adding an [address] section to the config file that looks like {[ address { type = "container" } ]} If this [address] section is simply added to an existing [protocol] section, the network protocol of the container socket is the same as that of the main socket of the container. If a different network protocol is going to be used for the container socket, one can also add a second [protocol] section. For example, here is a main HTTP service, and a separate service [control] that is run over the container sockets: {[ service { name = "sample" protocol { name = "http" address { type = "internet" bind = "0.0.0.0:80" } } protocol { name = "control" address { type = "container" } } processor { type = "myproc"; http { ... webserver config ... } control { ... rpc config ... } } } ]} One can now employ {!Netplex_kit.protocol_switch_factory} to route incoming TCP connections arriving at "http" sockets to web server code, and to route incoming TCP connections arriving at "control" sockets to a e.g. an RPC server: {[ let compound_factory = new Netplex_kit.protocol_switch_factory "myproc" [ "http", Nethttpd_plex.nethttpd_factory ...; "control", Rpc_netplex.rpc_factory ...; ] ]} The implementation of "control" would be a normal RPC server. The remaining question is now how to get the unique names of the container sockets. There is the function {!Netplex_cenv.lookup_container_sockets} helping here. The function is called with the service name and the protocol name as arguments: {[ let cs_paths = Netplex_cenv.lookup_container_sockets "sample" "control" ]} It returns an array of Unix Domain paths, each corresponding to the container socket of one container. It is recommended to use {!Netplex_sockserv.any_file_client_connector} for creating RPC clients: {[ let clients = List.map (fun cs_path -> let connector = Netplex_sockserv.any_file_client_connector cs_path in create_client ... connector ... ) cs_paths ]} There is no way to get more information about the [cs_paths], e.g. in order to find a special container. (Of course, except by calling RPC functions and asking the containers directly.) A container can also find out the address of its own container socket. Use the method [owned_container_sockets] to get a list of pairs [(protocol_name, path)], e.g. {[ let cont = Netplex_cenv.self_cont() in let path = List.assoc "control" cont#owned_container_sockets ]} {2:initonce One-time initialization code} It is sometimes necessary to run some initialization code only once for all containers of a certain service. Of course, there is always the option of doing this at program startup. However, this might be too early, e.g. because some information is not yet known. Another option is to do such initialization in the [pre_start_hook] of the container. The [pre_start_hook] is run before the container process is forked off, and executes in the master process. Because of this it is easy to have a global variable that checks whether [pre_start_hook] is called the first time: {[ let first_time = ref true let pre_start_hook _ _ _ = if !first_time then (* do initialization *) ... ; first_time := false let hooks = ( object inherit Netplex_kit.empty_processor_hooks() method pre_start_hook socksrv ctrl cid = pre_start_hook socksrv ctrl cid end ) ]} Last but not least there is also the possibility to run such initialization code in the [post_start_hook]. This is different as this hook is called from the container, i.e. from the forked-off child process. This might be convenient if the initialization routine is written for container context. There is some additional complexity, though. One can no longer simply use a global variable to catch the first time [post_start_hook] is called. Instead, one has to use a storage medium that is shared by all containers, and that is accessible from all containers. There are plenty of possibilities, e.g. a file. In this example, however, we use a Netplex semaphore: {[ let hooks = ( object inherit Netplex_kit.empty_processor_hooks() method post_add_hook socksrv ctrl = ctrl # add_plugin Netplex_semaphore.plugin method post_start_hook cont = let first_time = Netplex_semaphore.create "myinit" 0L in if first_time then (* do initialization *) ... ; end ) ]} The semaphore is visible in the whole Netplex system. We use here the fact that {!Netplex_semaphore.create} returns [true] when the semaphore is created at the first call of [create]. The semaphore is then never increased or decreased. {2:sharedvars Storing global state} Sometimes global state is unavoidable. We mean here state variables that are accessed by all processes of the Netplex system. Since Ocamlnet 3 there is {!Netplex_sharedvar}. This modules provides Netplex-global string variables that are identified by a user-chosen name. For example, to make a variable of [type stats] globally accessible {[ type stats = { mutable foo_count : int; mutable bar_count : int } ]} (see also above, "Container variables"), we can accomplish this as follows. {[ module Stats_var = Netplex_sharedvar.Make_var_type(struct type t = stats end) ]} Now, this defines functions [Stats_var.get] and [Stats_var.set] to get and set the value, respectively. Note that this is type-safe although {!Netplex_sharedvar.Make_var_type} uses the [Marshal] module internally. If a get/set function is applied to a variable of the wrong type we will get the exception {!Netplex_sharedvar.Sharedvar_type_mismatch}. Before one can get/set values, one has to create the variable with {[ let ok = Netplex_sharedvar.create ~enc:true name ]} The parameter [enc:true] is required for variables accessed via {!Netplex_sharedvar.Make_var_type}. In order to use {!Netplex_sharedvar} we have to add this plugin: {[ let hooks = ( object inherit Netplex_kit.empty_processor_hooks() method post_add_hook socksrv ctrl = ctrl # add_plugin Netplex_sharedvar.plugin end ) ]} Now, imagine that we want to increase the counters in a [stats] variable. As we have now truly parallel accesses, we have to ensure that these accesses do not overlap. We use a Netplex mutex to ensure this like in: {[ let mutex = Netplex_mutex.access "mymutex" in Netplex_mutex.lock mutex; try let v = Stats_var.get "mystats" in v.foo_count <- v.foo_count + foo_delta; v.bar_count <- v.bar_count + bar_delta; Stats_var.set "mystats" v; Netplex_mutex.unlock mutex; with error -> Netplex_mutex.unlock mutex; raise error ]} As Netplex mutexes are also plugins, we have to add them in the [post_add_hook], too. Also see {!Netplex_mutex} for more information. Generally, shared variables should not be used to store large quantities of data. A few megabytes are probably ok. The reason is that these variables exist in the Netplex master process, and each time a child is forked off the variables are also copied although this is not necessary. (It is possible and likely that a future version of Ocamnet improves this.) For bigger amounts of data, it is advised to store them in an external file, a shared memory segment ({!Netshm} might help here), or even in a database system. Shared variables should then only be used to pass around the name of this file/segment/database. {2:passdown Hooks, and how to pass values down} Usually, the user configures processor factories by creating hook objects. We have shown this already several times in previous sections of this chapter. Sometimes the question arises how to pass values from one hook to another. The hooks are called in a certain order. Unfortunately, there is no easy way to pass values from one hook to another. As workaround, it is suggested to store the values in the hooks object. For example, consider we need to allocate a database ID for each container. We do this in the [pre_start_hook], so we know the ID early. Of course, the code started from the [post_start_hook] also needs the ID, and in the [post_finish_hook] we would like to delete everything in the database referenced by this ID. This could be done in a hook object like {[ let hooks = ( object inherit Netplex_kit.empty_processor_hooks() val db_id_tbl = Hashtbl.create 11 method pre_start_hook _ _ cid = let db_id = allocate_db_id() in (* create db ID *) Hashtbl.add db_id_tbl cid db_id (* remember it for later *) method post_start_hook cont = let cid = cont # container_id in (* the container ID *) let db_id = Hashtbl.find db_id_tbl cid in (* look up the db ID *) ... method post_finish_hook _ _ cid = let db_id = Hashtbl.find db_id_tbl cid in (* look up the db ID *) delete_db_id db_id; (* clean up db *) Hashtbl.remove db_id_tbl cid end ) ]} We use here the container ID to identify the container. This works in all used hooks - either the container ID is passed directly, or we can get it from the container object itself. Normally there is only one controller per program. It is imaginable that a multi-threaded program has several controllers, though. In this case one has to be careful with this technique, because it should be avoided that values from the Netplex system driven by one controller are visible in the system driven by the other controller. Often, this can be easily achieved by creating separate hook objects, one per controller. {2:levers Levers - calling controller functions from containers} In a multi-process setup, the controller runs in the master process, and the containers run in child processes. Because of this, container code cannot directly invoke functions of the controller. For multi-threaded programs, this is quite easy to solve. With the function {!Netplex_cenv.run_in_controller_context} it can be temporarily switched to the controller thread to run code there. For example, to start a helper container one can do {[ Netplex_cenv.run_in_controller_context ctrl (fun () -> Netplex_kit.add_helper_service ctrl "helper1" hooks ) ]} which starts a new container with an empty processor that only consists of the [hooks] object. The [post_start_hook] can be considered as the "body" of the new thread. The advantage of this is (compared to [Thread.start]) that this thread counts as a regular container, and can e.g. use logging functions. There is no such easy way in the multi-processing case. As a workaround, a special mechanism has been added to Netplex, the so-called {b levers}. Levers are registered functions that are known to the controller and which can be invoked from container context. Levers have an argument and can deliver a result. The types of argument and result can be arbitrary (but must be monomorphic, and must not contain functions). (The name, lever, was chosen because it reminds of additional operating handles, as we add such handles to the controller.) Levers are usually registered in the [post_add] hook of the processor. For example, let us define a lever that can start a helper container. As arguments we pass a tuple of a string and an int [(s,i)]. The arguments do not have any meaning here, we only do this to demonstrate how to pass arguments. As result, we pass a boolean value back that says whether the helper container was started successfully. First we need to create a type module: {[ module T = struct type s = string * int (* argument type *) type r = bool (* result type *) end ]} As second step, we need to create the lever module. This means only to apply the functor {!Netplex_cenv.Make_lever}: {[ module L = Netplex_cenv.Make_lever(T) ]} What happens behind the scene is that a function [L.register] is created that can marshal the argument and result values from the container process to the master process and back. This is invisible to the user, and type-safe. Now, we have to call [L.register] from the [post_add_hook]. The result of [L.register] is another function that represents the lever. By calling it, the lever is activated: {[ let hooks = ( object inherit Netplex_kit.empty_processor_hooks() method post_add_hook socksrv ctrl = let lever = L.register ctrl (fun (s,i) -> try Netplex_kit.add_helper_service ctrl "helper1" ...; true (* successful *) with error -> false (* not successful *) ) in ... end ) ]} So, when we call [lever ("X",42)] from the container, the lever mechanism routes this call to the controller process, and calls there the function [(fun (s,i) -> ...)] that is the argument of [L.register]. Finally, the question is how can we make the function [lever] known to containers. The hackish way to do this is to store [lever] in a global variable. The clean way is to store [lever] in a container variable, e.g. {[ module LV = Netplex_cenv.Make_var_type(L) (* This works because L.t is the type of the lever *) let hooks = ( object inherit Netplex_kit.empty_processor_hooks() val mutable helper1_lever = (fun _ -> assert false) method post_add_hook socksrv ctrl = let lever = L.register ctrl (fun (s,i) -> try Netplex_kit.add_helper_service ctrl "helper1" ...; true (* successful *) with error -> false (* not successful *) ) in helper1_lever <- lever method post_start_hook cont = LV.set "helper1_lever" helper1_lever end ) ]} and later in container code: {[ let helper1_lever = LV.get "helper1_lever" in let success = helper1_lever ("X",42) in if success then print_endline "OK, started the new helper" else print_endline "There was an error" ]}